Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 7b4c1cb9

History | View | Annotate | Download (358.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay to many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39

    
40
from ganeti import ssh
41
from ganeti import utils
42
from ganeti import errors
43
from ganeti import hypervisor
44
from ganeti import locking
45
from ganeti import constants
46
from ganeti import objects
47
from ganeti import serializer
48
from ganeti import ssconf
49
from ganeti import uidpool
50
from ganeti import compat
51
from ganeti import masterd
52
from ganeti import netutils
53

    
54
import ganeti.masterd.instance # pylint: disable-msg=W0611
55

    
56

    
57
# Modifiable default values; need to define these here before the
58
# actual LUs
59

    
60
def _EmptyList():
61
  """Returns an empty list.
62

63
  """
64
  return []
65

    
66

    
67
def _EmptyDict():
68
  """Returns an empty dict.
69

70
  """
71
  return {}
72

    
73

    
74
#: The without-default default value
75
_NoDefault = object()
76

    
77

    
78
#: The no-type (value to complex to check it in the type system)
79
_NoType = object()
80

    
81

    
82
# Some basic types
83
def _TNotNone(val):
84
  """Checks if the given value is not None.
85

86
  """
87
  return val is not None
88

    
89

    
90
def _TNone(val):
91
  """Checks if the given value is None.
92

93
  """
94
  return val is None
95

    
96

    
97
def _TBool(val):
98
  """Checks if the given value is a boolean.
99

100
  """
101
  return isinstance(val, bool)
102

    
103

    
104
def _TInt(val):
105
  """Checks if the given value is an integer.
106

107
  """
108
  return isinstance(val, int)
109

    
110

    
111
def _TFloat(val):
112
  """Checks if the given value is a float.
113

114
  """
115
  return isinstance(val, float)
116

    
117

    
118
def _TString(val):
119
  """Checks if the given value is a string.
120

121
  """
122
  return isinstance(val, basestring)
123

    
124

    
125
def _TTrue(val):
126
  """Checks if a given value evaluates to a boolean True value.
127

128
  """
129
  return bool(val)
130

    
131

    
132
def _TElemOf(target_list):
133
  """Builds a function that checks if a given value is a member of a list.
134

135
  """
136
  return lambda val: val in target_list
137

    
138

    
139
# Container types
140
def _TList(val):
141
  """Checks if the given value is a list.
142

143
  """
144
  return isinstance(val, list)
145

    
146

    
147
def _TDict(val):
148
  """Checks if the given value is a dictionary.
149

150
  """
151
  return isinstance(val, dict)
152

    
153

    
154
# Combinator types
155
def _TAnd(*args):
156
  """Combine multiple functions using an AND operation.
157

158
  """
159
  def fn(val):
160
    return compat.all(t(val) for t in args)
161
  return fn
162

    
163

    
164
def _TOr(*args):
165
  """Combine multiple functions using an AND operation.
166

167
  """
168
  def fn(val):
169
    return compat.any(t(val) for t in args)
170
  return fn
171

    
172

    
173
# Type aliases
174

    
175
#: a non-empty string
176
_TNonEmptyString = _TAnd(_TString, _TTrue)
177

    
178

    
179
#: a maybe non-empty string
180
_TMaybeString = _TOr(_TNonEmptyString, _TNone)
181

    
182

    
183
#: a maybe boolean (bool or none)
184
_TMaybeBool = _TOr(_TBool, _TNone)
185

    
186

    
187
#: a positive integer
188
_TPositiveInt = _TAnd(_TInt, lambda v: v >= 0)
189

    
190
#: a strictly positive integer
191
_TStrictPositiveInt = _TAnd(_TInt, lambda v: v > 0)
192

    
193

    
194
def _TListOf(my_type):
195
  """Checks if a given value is a list with all elements of the same type.
196

197
  """
198
  return _TAnd(_TList,
199
               lambda lst: compat.all(my_type(v) for v in lst))
200

    
201

    
202
def _TDictOf(key_type, val_type):
203
  """Checks a dict type for the type of its key/values.
204

205
  """
206
  return _TAnd(_TDict,
207
               lambda my_dict: (compat.all(key_type(v) for v in my_dict.keys())
208
                                and compat.all(val_type(v)
209
                                               for v in my_dict.values())))
210

    
211

    
212
# Common opcode attributes
213

    
214
#: output fields for a query operation
215
_POutputFields = ("output_fields", _NoDefault, _TListOf(_TNonEmptyString))
216

    
217

    
218
#: the shutdown timeout
219
_PShutdownTimeout = ("shutdown_timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT,
220
                     _TPositiveInt)
221

    
222
#: the force parameter
223
_PForce = ("force", False, _TBool)
224

    
225
#: a required instance name (for single-instance LUs)
226
_PInstanceName = ("instance_name", _NoDefault, _TNonEmptyString)
227

    
228

    
229
#: a required node name (for single-node LUs)
230
_PNodeName = ("node_name", _NoDefault, _TNonEmptyString)
231

    
232

    
233
# End types
234
class LogicalUnit(object):
235
  """Logical Unit base class.
236

237
  Subclasses must follow these rules:
238
    - implement ExpandNames
239
    - implement CheckPrereq (except when tasklets are used)
240
    - implement Exec (except when tasklets are used)
241
    - implement BuildHooksEnv
242
    - redefine HPATH and HTYPE
243
    - optionally redefine their run requirements:
244
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
245

246
  Note that all commands require root permissions.
247

248
  @ivar dry_run_result: the value (if any) that will be returned to the caller
249
      in dry-run mode (signalled by opcode dry_run parameter)
250
  @cvar _OP_PARAMS: a list of opcode attributes, their defaults values
251
      they should get if not already defined, and types they must match
252

253
  """
254
  HPATH = None
255
  HTYPE = None
256
  _OP_PARAMS = []
257
  REQ_BGL = True
258

    
259
  def __init__(self, processor, op, context, rpc):
260
    """Constructor for LogicalUnit.
261

262
    This needs to be overridden in derived classes in order to check op
263
    validity.
264

265
    """
266
    self.proc = processor
267
    self.op = op
268
    self.cfg = context.cfg
269
    self.context = context
270
    self.rpc = rpc
271
    # Dicts used to declare locking needs to mcpu
272
    self.needed_locks = None
273
    self.acquired_locks = {}
274
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
275
    self.add_locks = {}
276
    self.remove_locks = {}
277
    # Used to force good behavior when calling helper functions
278
    self.recalculate_locks = {}
279
    self.__ssh = None
280
    # logging
281
    self.Log = processor.Log # pylint: disable-msg=C0103
282
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
283
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
284
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
285
    # support for dry-run
286
    self.dry_run_result = None
287
    # support for generic debug attribute
288
    if (not hasattr(self.op, "debug_level") or
289
        not isinstance(self.op.debug_level, int)):
290
      self.op.debug_level = 0
291

    
292
    # Tasklets
293
    self.tasklets = None
294

    
295
    # The new kind-of-type-system
296
    op_id = self.op.OP_ID
297
    for attr_name, aval, test in self._OP_PARAMS:
298
      if not hasattr(op, attr_name):
299
        if aval == _NoDefault:
300
          raise errors.OpPrereqError("Required parameter '%s.%s' missing" %
301
                                     (op_id, attr_name), errors.ECODE_INVAL)
302
        else:
303
          if callable(aval):
304
            dval = aval()
305
          else:
306
            dval = aval
307
          setattr(self.op, attr_name, dval)
308
      attr_val = getattr(op, attr_name)
309
      if test == _NoType:
310
        # no tests here
311
        continue
312
      if not callable(test):
313
        raise errors.ProgrammerError("Validation for parameter '%s.%s' failed,"
314
                                     " given type is not a proper type (%s)" %
315
                                     (op_id, attr_name, test))
316
      if not test(attr_val):
317
        logging.error("OpCode %s, parameter %s, has invalid type %s/value %s",
318
                      self.op.OP_ID, attr_name, type(attr_val), attr_val)
319
        raise errors.OpPrereqError("Parameter '%s.%s' fails validation" %
320
                                   (op_id, attr_name), errors.ECODE_INVAL)
321

    
322
    self.CheckArguments()
323

    
324
  def __GetSSH(self):
325
    """Returns the SshRunner object
326

327
    """
328
    if not self.__ssh:
329
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
330
    return self.__ssh
331

    
332
  ssh = property(fget=__GetSSH)
333

    
334
  def CheckArguments(self):
335
    """Check syntactic validity for the opcode arguments.
336

337
    This method is for doing a simple syntactic check and ensure
338
    validity of opcode parameters, without any cluster-related
339
    checks. While the same can be accomplished in ExpandNames and/or
340
    CheckPrereq, doing these separate is better because:
341

342
      - ExpandNames is left as as purely a lock-related function
343
      - CheckPrereq is run after we have acquired locks (and possible
344
        waited for them)
345

346
    The function is allowed to change the self.op attribute so that
347
    later methods can no longer worry about missing parameters.
348

349
    """
350
    pass
351

    
352
  def ExpandNames(self):
353
    """Expand names for this LU.
354

355
    This method is called before starting to execute the opcode, and it should
356
    update all the parameters of the opcode to their canonical form (e.g. a
357
    short node name must be fully expanded after this method has successfully
358
    completed). This way locking, hooks, logging, ecc. can work correctly.
359

360
    LUs which implement this method must also populate the self.needed_locks
361
    member, as a dict with lock levels as keys, and a list of needed lock names
362
    as values. Rules:
363

364
      - use an empty dict if you don't need any lock
365
      - if you don't need any lock at a particular level omit that level
366
      - don't put anything for the BGL level
367
      - if you want all locks at a level use locking.ALL_SET as a value
368

369
    If you need to share locks (rather than acquire them exclusively) at one
370
    level you can modify self.share_locks, setting a true value (usually 1) for
371
    that level. By default locks are not shared.
372

373
    This function can also define a list of tasklets, which then will be
374
    executed in order instead of the usual LU-level CheckPrereq and Exec
375
    functions, if those are not defined by the LU.
376

377
    Examples::
378

379
      # Acquire all nodes and one instance
380
      self.needed_locks = {
381
        locking.LEVEL_NODE: locking.ALL_SET,
382
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
383
      }
384
      # Acquire just two nodes
385
      self.needed_locks = {
386
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
387
      }
388
      # Acquire no locks
389
      self.needed_locks = {} # No, you can't leave it to the default value None
390

391
    """
392
    # The implementation of this method is mandatory only if the new LU is
393
    # concurrent, so that old LUs don't need to be changed all at the same
394
    # time.
395
    if self.REQ_BGL:
396
      self.needed_locks = {} # Exclusive LUs don't need locks.
397
    else:
398
      raise NotImplementedError
399

    
400
  def DeclareLocks(self, level):
401
    """Declare LU locking needs for a level
402

403
    While most LUs can just declare their locking needs at ExpandNames time,
404
    sometimes there's the need to calculate some locks after having acquired
405
    the ones before. This function is called just before acquiring locks at a
406
    particular level, but after acquiring the ones at lower levels, and permits
407
    such calculations. It can be used to modify self.needed_locks, and by
408
    default it does nothing.
409

410
    This function is only called if you have something already set in
411
    self.needed_locks for the level.
412

413
    @param level: Locking level which is going to be locked
414
    @type level: member of ganeti.locking.LEVELS
415

416
    """
417

    
418
  def CheckPrereq(self):
419
    """Check prerequisites for this LU.
420

421
    This method should check that the prerequisites for the execution
422
    of this LU are fulfilled. It can do internode communication, but
423
    it should be idempotent - no cluster or system changes are
424
    allowed.
425

426
    The method should raise errors.OpPrereqError in case something is
427
    not fulfilled. Its return value is ignored.
428

429
    This method should also update all the parameters of the opcode to
430
    their canonical form if it hasn't been done by ExpandNames before.
431

432
    """
433
    if self.tasklets is not None:
434
      for (idx, tl) in enumerate(self.tasklets):
435
        logging.debug("Checking prerequisites for tasklet %s/%s",
436
                      idx + 1, len(self.tasklets))
437
        tl.CheckPrereq()
438
    else:
439
      pass
440

    
441
  def Exec(self, feedback_fn):
442
    """Execute the LU.
443

444
    This method should implement the actual work. It should raise
445
    errors.OpExecError for failures that are somewhat dealt with in
446
    code, or expected.
447

448
    """
449
    if self.tasklets is not None:
450
      for (idx, tl) in enumerate(self.tasklets):
451
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
452
        tl.Exec(feedback_fn)
453
    else:
454
      raise NotImplementedError
455

    
456
  def BuildHooksEnv(self):
457
    """Build hooks environment for this LU.
458

459
    This method should return a three-node tuple consisting of: a dict
460
    containing the environment that will be used for running the
461
    specific hook for this LU, a list of node names on which the hook
462
    should run before the execution, and a list of node names on which
463
    the hook should run after the execution.
464

465
    The keys of the dict must not have 'GANETI_' prefixed as this will
466
    be handled in the hooks runner. Also note additional keys will be
467
    added by the hooks runner. If the LU doesn't define any
468
    environment, an empty dict (and not None) should be returned.
469

470
    No nodes should be returned as an empty list (and not None).
471

472
    Note that if the HPATH for a LU class is None, this function will
473
    not be called.
474

475
    """
476
    raise NotImplementedError
477

    
478
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
479
    """Notify the LU about the results of its hooks.
480

481
    This method is called every time a hooks phase is executed, and notifies
482
    the Logical Unit about the hooks' result. The LU can then use it to alter
483
    its result based on the hooks.  By default the method does nothing and the
484
    previous result is passed back unchanged but any LU can define it if it
485
    wants to use the local cluster hook-scripts somehow.
486

487
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
488
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
489
    @param hook_results: the results of the multi-node hooks rpc call
490
    @param feedback_fn: function used send feedback back to the caller
491
    @param lu_result: the previous Exec result this LU had, or None
492
        in the PRE phase
493
    @return: the new Exec result, based on the previous result
494
        and hook results
495

496
    """
497
    # API must be kept, thus we ignore the unused argument and could
498
    # be a function warnings
499
    # pylint: disable-msg=W0613,R0201
500
    return lu_result
501

    
502
  def _ExpandAndLockInstance(self):
503
    """Helper function to expand and lock an instance.
504

505
    Many LUs that work on an instance take its name in self.op.instance_name
506
    and need to expand it and then declare the expanded name for locking. This
507
    function does it, and then updates self.op.instance_name to the expanded
508
    name. It also initializes needed_locks as a dict, if this hasn't been done
509
    before.
510

511
    """
512
    if self.needed_locks is None:
513
      self.needed_locks = {}
514
    else:
515
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
516
        "_ExpandAndLockInstance called with instance-level locks set"
517
    self.op.instance_name = _ExpandInstanceName(self.cfg,
518
                                                self.op.instance_name)
519
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
520

    
521
  def _LockInstancesNodes(self, primary_only=False):
522
    """Helper function to declare instances' nodes for locking.
523

524
    This function should be called after locking one or more instances to lock
525
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
526
    with all primary or secondary nodes for instances already locked and
527
    present in self.needed_locks[locking.LEVEL_INSTANCE].
528

529
    It should be called from DeclareLocks, and for safety only works if
530
    self.recalculate_locks[locking.LEVEL_NODE] is set.
531

532
    In the future it may grow parameters to just lock some instance's nodes, or
533
    to just lock primaries or secondary nodes, if needed.
534

535
    If should be called in DeclareLocks in a way similar to::
536

537
      if level == locking.LEVEL_NODE:
538
        self._LockInstancesNodes()
539

540
    @type primary_only: boolean
541
    @param primary_only: only lock primary nodes of locked instances
542

543
    """
544
    assert locking.LEVEL_NODE in self.recalculate_locks, \
545
      "_LockInstancesNodes helper function called with no nodes to recalculate"
546

    
547
    # TODO: check if we're really been called with the instance locks held
548

    
549
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
550
    # future we might want to have different behaviors depending on the value
551
    # of self.recalculate_locks[locking.LEVEL_NODE]
552
    wanted_nodes = []
553
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
554
      instance = self.context.cfg.GetInstanceInfo(instance_name)
555
      wanted_nodes.append(instance.primary_node)
556
      if not primary_only:
557
        wanted_nodes.extend(instance.secondary_nodes)
558

    
559
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
560
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
561
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
562
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
563

    
564
    del self.recalculate_locks[locking.LEVEL_NODE]
565

    
566

    
567
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
568
  """Simple LU which runs no hooks.
569

570
  This LU is intended as a parent for other LogicalUnits which will
571
  run no hooks, in order to reduce duplicate code.
572

573
  """
574
  HPATH = None
575
  HTYPE = None
576

    
577
  def BuildHooksEnv(self):
578
    """Empty BuildHooksEnv for NoHooksLu.
579

580
    This just raises an error.
581

582
    """
583
    assert False, "BuildHooksEnv called for NoHooksLUs"
584

    
585

    
586
class Tasklet:
587
  """Tasklet base class.
588

589
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
590
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
591
  tasklets know nothing about locks.
592

593
  Subclasses must follow these rules:
594
    - Implement CheckPrereq
595
    - Implement Exec
596

597
  """
598
  def __init__(self, lu):
599
    self.lu = lu
600

    
601
    # Shortcuts
602
    self.cfg = lu.cfg
603
    self.rpc = lu.rpc
604

    
605
  def CheckPrereq(self):
606
    """Check prerequisites for this tasklets.
607

608
    This method should check whether the prerequisites for the execution of
609
    this tasklet are fulfilled. It can do internode communication, but it
610
    should be idempotent - no cluster or system changes are allowed.
611

612
    The method should raise errors.OpPrereqError in case something is not
613
    fulfilled. Its return value is ignored.
614

615
    This method should also update all parameters to their canonical form if it
616
    hasn't been done before.
617

618
    """
619
    pass
620

    
621
  def Exec(self, feedback_fn):
622
    """Execute the tasklet.
623

624
    This method should implement the actual work. It should raise
625
    errors.OpExecError for failures that are somewhat dealt with in code, or
626
    expected.
627

628
    """
629
    raise NotImplementedError
630

    
631

    
632
def _GetWantedNodes(lu, nodes):
633
  """Returns list of checked and expanded node names.
634

635
  @type lu: L{LogicalUnit}
636
  @param lu: the logical unit on whose behalf we execute
637
  @type nodes: list
638
  @param nodes: list of node names or None for all nodes
639
  @rtype: list
640
  @return: the list of nodes, sorted
641
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
642

643
  """
644
  if not nodes:
645
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
646
      " non-empty list of nodes whose name is to be expanded.")
647

    
648
  wanted = [_ExpandNodeName(lu.cfg, name) for name in nodes]
649
  return utils.NiceSort(wanted)
650

    
651

    
652
def _GetWantedInstances(lu, instances):
653
  """Returns list of checked and expanded instance names.
654

655
  @type lu: L{LogicalUnit}
656
  @param lu: the logical unit on whose behalf we execute
657
  @type instances: list
658
  @param instances: list of instance names or None for all instances
659
  @rtype: list
660
  @return: the list of instances, sorted
661
  @raise errors.OpPrereqError: if the instances parameter is wrong type
662
  @raise errors.OpPrereqError: if any of the passed instances is not found
663

664
  """
665
  if instances:
666
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
667
  else:
668
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
669
  return wanted
670

    
671

    
672
def _GetUpdatedParams(old_params, update_dict,
673
                      use_default=True, use_none=False):
674
  """Return the new version of a parameter dictionary.
675

676
  @type old_params: dict
677
  @param old_params: old parameters
678
  @type update_dict: dict
679
  @param update_dict: dict containing new parameter values, or
680
      constants.VALUE_DEFAULT to reset the parameter to its default
681
      value
682
  @param use_default: boolean
683
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
684
      values as 'to be deleted' values
685
  @param use_none: boolean
686
  @type use_none: whether to recognise C{None} values as 'to be
687
      deleted' values
688
  @rtype: dict
689
  @return: the new parameter dictionary
690

691
  """
692
  params_copy = copy.deepcopy(old_params)
693
  for key, val in update_dict.iteritems():
694
    if ((use_default and val == constants.VALUE_DEFAULT) or
695
        (use_none and val is None)):
696
      try:
697
        del params_copy[key]
698
      except KeyError:
699
        pass
700
    else:
701
      params_copy[key] = val
702
  return params_copy
703

    
704

    
705
def _CheckOutputFields(static, dynamic, selected):
706
  """Checks whether all selected fields are valid.
707

708
  @type static: L{utils.FieldSet}
709
  @param static: static fields set
710
  @type dynamic: L{utils.FieldSet}
711
  @param dynamic: dynamic fields set
712

713
  """
714
  f = utils.FieldSet()
715
  f.Extend(static)
716
  f.Extend(dynamic)
717

    
718
  delta = f.NonMatching(selected)
719
  if delta:
720
    raise errors.OpPrereqError("Unknown output fields selected: %s"
721
                               % ",".join(delta), errors.ECODE_INVAL)
722

    
723

    
724
def _CheckGlobalHvParams(params):
725
  """Validates that given hypervisor params are not global ones.
726

727
  This will ensure that instances don't get customised versions of
728
  global params.
729

730
  """
731
  used_globals = constants.HVC_GLOBALS.intersection(params)
732
  if used_globals:
733
    msg = ("The following hypervisor parameters are global and cannot"
734
           " be customized at instance level, please modify them at"
735
           " cluster level: %s" % utils.CommaJoin(used_globals))
736
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
737

    
738

    
739
def _CheckNodeOnline(lu, node):
740
  """Ensure that a given node is online.
741

742
  @param lu: the LU on behalf of which we make the check
743
  @param node: the node to check
744
  @raise errors.OpPrereqError: if the node is offline
745

746
  """
747
  if lu.cfg.GetNodeInfo(node).offline:
748
    raise errors.OpPrereqError("Can't use offline node %s" % node,
749
                               errors.ECODE_INVAL)
750

    
751

    
752
def _CheckNodeNotDrained(lu, node):
753
  """Ensure that a given node is not drained.
754

755
  @param lu: the LU on behalf of which we make the check
756
  @param node: the node to check
757
  @raise errors.OpPrereqError: if the node is drained
758

759
  """
760
  if lu.cfg.GetNodeInfo(node).drained:
761
    raise errors.OpPrereqError("Can't use drained node %s" % node,
762
                               errors.ECODE_INVAL)
763

    
764

    
765
def _CheckNodeHasOS(lu, node, os_name, force_variant):
766
  """Ensure that a node supports a given OS.
767

768
  @param lu: the LU on behalf of which we make the check
769
  @param node: the node to check
770
  @param os_name: the OS to query about
771
  @param force_variant: whether to ignore variant errors
772
  @raise errors.OpPrereqError: if the node is not supporting the OS
773

774
  """
775
  result = lu.rpc.call_os_get(node, os_name)
776
  result.Raise("OS '%s' not in supported OS list for node %s" %
777
               (os_name, node),
778
               prereq=True, ecode=errors.ECODE_INVAL)
779
  if not force_variant:
780
    _CheckOSVariant(result.payload, os_name)
781

    
782

    
783
def _RequireFileStorage():
784
  """Checks that file storage is enabled.
785

786
  @raise errors.OpPrereqError: when file storage is disabled
787

788
  """
789
  if not constants.ENABLE_FILE_STORAGE:
790
    raise errors.OpPrereqError("File storage disabled at configure time",
791
                               errors.ECODE_INVAL)
792

    
793

    
794
def _CheckDiskTemplate(template):
795
  """Ensure a given disk template is valid.
796

797
  """
798
  if template not in constants.DISK_TEMPLATES:
799
    msg = ("Invalid disk template name '%s', valid templates are: %s" %
800
           (template, utils.CommaJoin(constants.DISK_TEMPLATES)))
801
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
802
  if template == constants.DT_FILE:
803
    _RequireFileStorage()
804
  return True
805

    
806

    
807
def _CheckStorageType(storage_type):
808
  """Ensure a given storage type is valid.
809

810
  """
811
  if storage_type not in constants.VALID_STORAGE_TYPES:
812
    raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
813
                               errors.ECODE_INVAL)
814
  if storage_type == constants.ST_FILE:
815
    _RequireFileStorage()
816
  return True
817

    
818

    
819
def _GetClusterDomainSecret():
820
  """Reads the cluster domain secret.
821

822
  """
823
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
824
                               strict=True)
825

    
826

    
827
def _CheckInstanceDown(lu, instance, reason):
828
  """Ensure that an instance is not running."""
829
  if instance.admin_up:
830
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
831
                               (instance.name, reason), errors.ECODE_STATE)
832

    
833
  pnode = instance.primary_node
834
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
835
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
836
              prereq=True, ecode=errors.ECODE_ENVIRON)
837

    
838
  if instance.name in ins_l.payload:
839
    raise errors.OpPrereqError("Instance %s is running, %s" %
840
                               (instance.name, reason), errors.ECODE_STATE)
841

    
842

    
843
def _ExpandItemName(fn, name, kind):
844
  """Expand an item name.
845

846
  @param fn: the function to use for expansion
847
  @param name: requested item name
848
  @param kind: text description ('Node' or 'Instance')
849
  @return: the resolved (full) name
850
  @raise errors.OpPrereqError: if the item is not found
851

852
  """
853
  full_name = fn(name)
854
  if full_name is None:
855
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
856
                               errors.ECODE_NOENT)
857
  return full_name
858

    
859

    
860
def _ExpandNodeName(cfg, name):
861
  """Wrapper over L{_ExpandItemName} for nodes."""
862
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
863

    
864

    
865
def _ExpandInstanceName(cfg, name):
866
  """Wrapper over L{_ExpandItemName} for instance."""
867
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
868

    
869

    
870
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
871
                          memory, vcpus, nics, disk_template, disks,
872
                          bep, hvp, hypervisor_name):
873
  """Builds instance related env variables for hooks
874

875
  This builds the hook environment from individual variables.
876

877
  @type name: string
878
  @param name: the name of the instance
879
  @type primary_node: string
880
  @param primary_node: the name of the instance's primary node
881
  @type secondary_nodes: list
882
  @param secondary_nodes: list of secondary nodes as strings
883
  @type os_type: string
884
  @param os_type: the name of the instance's OS
885
  @type status: boolean
886
  @param status: the should_run status of the instance
887
  @type memory: string
888
  @param memory: the memory size of the instance
889
  @type vcpus: string
890
  @param vcpus: the count of VCPUs the instance has
891
  @type nics: list
892
  @param nics: list of tuples (ip, mac, mode, link) representing
893
      the NICs the instance has
894
  @type disk_template: string
895
  @param disk_template: the disk template of the instance
896
  @type disks: list
897
  @param disks: the list of (size, mode) pairs
898
  @type bep: dict
899
  @param bep: the backend parameters for the instance
900
  @type hvp: dict
901
  @param hvp: the hypervisor parameters for the instance
902
  @type hypervisor_name: string
903
  @param hypervisor_name: the hypervisor for the instance
904
  @rtype: dict
905
  @return: the hook environment for this instance
906

907
  """
908
  if status:
909
    str_status = "up"
910
  else:
911
    str_status = "down"
912
  env = {
913
    "OP_TARGET": name,
914
    "INSTANCE_NAME": name,
915
    "INSTANCE_PRIMARY": primary_node,
916
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
917
    "INSTANCE_OS_TYPE": os_type,
918
    "INSTANCE_STATUS": str_status,
919
    "INSTANCE_MEMORY": memory,
920
    "INSTANCE_VCPUS": vcpus,
921
    "INSTANCE_DISK_TEMPLATE": disk_template,
922
    "INSTANCE_HYPERVISOR": hypervisor_name,
923
  }
924

    
925
  if nics:
926
    nic_count = len(nics)
927
    for idx, (ip, mac, mode, link) in enumerate(nics):
928
      if ip is None:
929
        ip = ""
930
      env["INSTANCE_NIC%d_IP" % idx] = ip
931
      env["INSTANCE_NIC%d_MAC" % idx] = mac
932
      env["INSTANCE_NIC%d_MODE" % idx] = mode
933
      env["INSTANCE_NIC%d_LINK" % idx] = link
934
      if mode == constants.NIC_MODE_BRIDGED:
935
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
936
  else:
937
    nic_count = 0
938

    
939
  env["INSTANCE_NIC_COUNT"] = nic_count
940

    
941
  if disks:
942
    disk_count = len(disks)
943
    for idx, (size, mode) in enumerate(disks):
944
      env["INSTANCE_DISK%d_SIZE" % idx] = size
945
      env["INSTANCE_DISK%d_MODE" % idx] = mode
946
  else:
947
    disk_count = 0
948

    
949
  env["INSTANCE_DISK_COUNT"] = disk_count
950

    
951
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
952
    for key, value in source.items():
953
      env["INSTANCE_%s_%s" % (kind, key)] = value
954

    
955
  return env
956

    
957

    
958
def _NICListToTuple(lu, nics):
959
  """Build a list of nic information tuples.
960

961
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
962
  value in LUQueryInstanceData.
963

964
  @type lu:  L{LogicalUnit}
965
  @param lu: the logical unit on whose behalf we execute
966
  @type nics: list of L{objects.NIC}
967
  @param nics: list of nics to convert to hooks tuples
968

969
  """
970
  hooks_nics = []
971
  cluster = lu.cfg.GetClusterInfo()
972
  for nic in nics:
973
    ip = nic.ip
974
    mac = nic.mac
975
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
976
    mode = filled_params[constants.NIC_MODE]
977
    link = filled_params[constants.NIC_LINK]
978
    hooks_nics.append((ip, mac, mode, link))
979
  return hooks_nics
980

    
981

    
982
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
983
  """Builds instance related env variables for hooks from an object.
984

985
  @type lu: L{LogicalUnit}
986
  @param lu: the logical unit on whose behalf we execute
987
  @type instance: L{objects.Instance}
988
  @param instance: the instance for which we should build the
989
      environment
990
  @type override: dict
991
  @param override: dictionary with key/values that will override
992
      our values
993
  @rtype: dict
994
  @return: the hook environment dictionary
995

996
  """
997
  cluster = lu.cfg.GetClusterInfo()
998
  bep = cluster.FillBE(instance)
999
  hvp = cluster.FillHV(instance)
1000
  args = {
1001
    'name': instance.name,
1002
    'primary_node': instance.primary_node,
1003
    'secondary_nodes': instance.secondary_nodes,
1004
    'os_type': instance.os,
1005
    'status': instance.admin_up,
1006
    'memory': bep[constants.BE_MEMORY],
1007
    'vcpus': bep[constants.BE_VCPUS],
1008
    'nics': _NICListToTuple(lu, instance.nics),
1009
    'disk_template': instance.disk_template,
1010
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
1011
    'bep': bep,
1012
    'hvp': hvp,
1013
    'hypervisor_name': instance.hypervisor,
1014
  }
1015
  if override:
1016
    args.update(override)
1017
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
1018

    
1019

    
1020
def _AdjustCandidatePool(lu, exceptions):
1021
  """Adjust the candidate pool after node operations.
1022

1023
  """
1024
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1025
  if mod_list:
1026
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1027
               utils.CommaJoin(node.name for node in mod_list))
1028
    for name in mod_list:
1029
      lu.context.ReaddNode(name)
1030
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1031
  if mc_now > mc_max:
1032
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1033
               (mc_now, mc_max))
1034

    
1035

    
1036
def _DecideSelfPromotion(lu, exceptions=None):
1037
  """Decide whether I should promote myself as a master candidate.
1038

1039
  """
1040
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1041
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1042
  # the new node will increase mc_max with one, so:
1043
  mc_should = min(mc_should + 1, cp_size)
1044
  return mc_now < mc_should
1045

    
1046

    
1047
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1048
  """Check that the brigdes needed by a list of nics exist.
1049

1050
  """
1051
  cluster = lu.cfg.GetClusterInfo()
1052
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1053
  brlist = [params[constants.NIC_LINK] for params in paramslist
1054
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1055
  if brlist:
1056
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1057
    result.Raise("Error checking bridges on destination node '%s'" %
1058
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1059

    
1060

    
1061
def _CheckInstanceBridgesExist(lu, instance, node=None):
1062
  """Check that the brigdes needed by an instance exist.
1063

1064
  """
1065
  if node is None:
1066
    node = instance.primary_node
1067
  _CheckNicsBridgesExist(lu, instance.nics, node)
1068

    
1069

    
1070
def _CheckOSVariant(os_obj, name):
1071
  """Check whether an OS name conforms to the os variants specification.
1072

1073
  @type os_obj: L{objects.OS}
1074
  @param os_obj: OS object to check
1075
  @type name: string
1076
  @param name: OS name passed by the user, to check for validity
1077

1078
  """
1079
  if not os_obj.supported_variants:
1080
    return
1081
  try:
1082
    variant = name.split("+", 1)[1]
1083
  except IndexError:
1084
    raise errors.OpPrereqError("OS name must include a variant",
1085
                               errors.ECODE_INVAL)
1086

    
1087
  if variant not in os_obj.supported_variants:
1088
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1089

    
1090

    
1091
def _GetNodeInstancesInner(cfg, fn):
1092
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1093

    
1094

    
1095
def _GetNodeInstances(cfg, node_name):
1096
  """Returns a list of all primary and secondary instances on a node.
1097

1098
  """
1099

    
1100
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1101

    
1102

    
1103
def _GetNodePrimaryInstances(cfg, node_name):
1104
  """Returns primary instances on a node.
1105

1106
  """
1107
  return _GetNodeInstancesInner(cfg,
1108
                                lambda inst: node_name == inst.primary_node)
1109

    
1110

    
1111
def _GetNodeSecondaryInstances(cfg, node_name):
1112
  """Returns secondary instances on a node.
1113

1114
  """
1115
  return _GetNodeInstancesInner(cfg,
1116
                                lambda inst: node_name in inst.secondary_nodes)
1117

    
1118

    
1119
def _GetStorageTypeArgs(cfg, storage_type):
1120
  """Returns the arguments for a storage type.
1121

1122
  """
1123
  # Special case for file storage
1124
  if storage_type == constants.ST_FILE:
1125
    # storage.FileStorage wants a list of storage directories
1126
    return [[cfg.GetFileStorageDir()]]
1127

    
1128
  return []
1129

    
1130

    
1131
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
1132
  faulty = []
1133

    
1134
  for dev in instance.disks:
1135
    cfg.SetDiskID(dev, node_name)
1136

    
1137
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
1138
  result.Raise("Failed to get disk status from node %s" % node_name,
1139
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1140

    
1141
  for idx, bdev_status in enumerate(result.payload):
1142
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1143
      faulty.append(idx)
1144

    
1145
  return faulty
1146

    
1147

    
1148
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1149
  """Check the sanity of iallocator and node arguments and use the
1150
  cluster-wide iallocator if appropriate.
1151

1152
  Check that at most one of (iallocator, node) is specified. If none is
1153
  specified, then the LU's opcode's iallocator slot is filled with the
1154
  cluster-wide default iallocator.
1155

1156
  @type iallocator_slot: string
1157
  @param iallocator_slot: the name of the opcode iallocator slot
1158
  @type node_slot: string
1159
  @param node_slot: the name of the opcode target node slot
1160

1161
  """
1162
  node = getattr(lu.op, node_slot, None)
1163
  iallocator = getattr(lu.op, iallocator_slot, None)
1164

    
1165
  if node is not None and iallocator is not None:
1166
    raise errors.OpPrereqError("Do not specify both, iallocator and node.",
1167
                               errors.ECODE_INVAL)
1168
  elif node is None and iallocator is None:
1169
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1170
    if default_iallocator:
1171
      setattr(lu.op, iallocator_slot, default_iallocator)
1172
    else:
1173
      raise errors.OpPrereqError("No iallocator or node given and no"
1174
                                 " cluster-wide default iallocator found."
1175
                                 " Please specify either an iallocator or a"
1176
                                 " node, or set a cluster-wide default"
1177
                                 " iallocator.")
1178

    
1179

    
1180
class LUPostInitCluster(LogicalUnit):
1181
  """Logical unit for running hooks after cluster initialization.
1182

1183
  """
1184
  HPATH = "cluster-init"
1185
  HTYPE = constants.HTYPE_CLUSTER
1186

    
1187
  def BuildHooksEnv(self):
1188
    """Build hooks env.
1189

1190
    """
1191
    env = {"OP_TARGET": self.cfg.GetClusterName()}
1192
    mn = self.cfg.GetMasterNode()
1193
    return env, [], [mn]
1194

    
1195
  def Exec(self, feedback_fn):
1196
    """Nothing to do.
1197

1198
    """
1199
    return True
1200

    
1201

    
1202
class LUDestroyCluster(LogicalUnit):
1203
  """Logical unit for destroying the cluster.
1204

1205
  """
1206
  HPATH = "cluster-destroy"
1207
  HTYPE = constants.HTYPE_CLUSTER
1208

    
1209
  def BuildHooksEnv(self):
1210
    """Build hooks env.
1211

1212
    """
1213
    env = {"OP_TARGET": self.cfg.GetClusterName()}
1214
    return env, [], []
1215

    
1216
  def CheckPrereq(self):
1217
    """Check prerequisites.
1218

1219
    This checks whether the cluster is empty.
1220

1221
    Any errors are signaled by raising errors.OpPrereqError.
1222

1223
    """
1224
    master = self.cfg.GetMasterNode()
1225

    
1226
    nodelist = self.cfg.GetNodeList()
1227
    if len(nodelist) != 1 or nodelist[0] != master:
1228
      raise errors.OpPrereqError("There are still %d node(s) in"
1229
                                 " this cluster." % (len(nodelist) - 1),
1230
                                 errors.ECODE_INVAL)
1231
    instancelist = self.cfg.GetInstanceList()
1232
    if instancelist:
1233
      raise errors.OpPrereqError("There are still %d instance(s) in"
1234
                                 " this cluster." % len(instancelist),
1235
                                 errors.ECODE_INVAL)
1236

    
1237
  def Exec(self, feedback_fn):
1238
    """Destroys the cluster.
1239

1240
    """
1241
    master = self.cfg.GetMasterNode()
1242
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1243

    
1244
    # Run post hooks on master node before it's removed
1245
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
1246
    try:
1247
      hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
1248
    except:
1249
      # pylint: disable-msg=W0702
1250
      self.LogWarning("Errors occurred running hooks on %s" % master)
1251

    
1252
    result = self.rpc.call_node_stop_master(master, False)
1253
    result.Raise("Could not disable the master role")
1254

    
1255
    if modify_ssh_setup:
1256
      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1257
      utils.CreateBackup(priv_key)
1258
      utils.CreateBackup(pub_key)
1259

    
1260
    return master
1261

    
1262

    
1263
def _VerifyCertificate(filename):
1264
  """Verifies a certificate for LUVerifyCluster.
1265

1266
  @type filename: string
1267
  @param filename: Path to PEM file
1268

1269
  """
1270
  try:
1271
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1272
                                           utils.ReadFile(filename))
1273
  except Exception, err: # pylint: disable-msg=W0703
1274
    return (LUVerifyCluster.ETYPE_ERROR,
1275
            "Failed to load X509 certificate %s: %s" % (filename, err))
1276

    
1277
  (errcode, msg) = \
1278
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1279
                                constants.SSL_CERT_EXPIRATION_ERROR)
1280

    
1281
  if msg:
1282
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1283
  else:
1284
    fnamemsg = None
1285

    
1286
  if errcode is None:
1287
    return (None, fnamemsg)
1288
  elif errcode == utils.CERT_WARNING:
1289
    return (LUVerifyCluster.ETYPE_WARNING, fnamemsg)
1290
  elif errcode == utils.CERT_ERROR:
1291
    return (LUVerifyCluster.ETYPE_ERROR, fnamemsg)
1292

    
1293
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1294

    
1295

    
1296
class LUVerifyCluster(LogicalUnit):
1297
  """Verifies the cluster status.
1298

1299
  """
1300
  HPATH = "cluster-verify"
1301
  HTYPE = constants.HTYPE_CLUSTER
1302
  _OP_PARAMS = [
1303
    ("skip_checks", _EmptyList,
1304
     _TListOf(_TElemOf(constants.VERIFY_OPTIONAL_CHECKS))),
1305
    ("verbose", False, _TBool),
1306
    ("error_codes", False, _TBool),
1307
    ("debug_simulate_errors", False, _TBool),
1308
    ]
1309
  REQ_BGL = False
1310

    
1311
  TCLUSTER = "cluster"
1312
  TNODE = "node"
1313
  TINSTANCE = "instance"
1314

    
1315
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1316
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1317
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1318
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1319
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1320
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1321
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1322
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1323
  ENODEDRBD = (TNODE, "ENODEDRBD")
1324
  ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
1325
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1326
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1327
  ENODEHV = (TNODE, "ENODEHV")
1328
  ENODELVM = (TNODE, "ENODELVM")
1329
  ENODEN1 = (TNODE, "ENODEN1")
1330
  ENODENET = (TNODE, "ENODENET")
1331
  ENODEOS = (TNODE, "ENODEOS")
1332
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1333
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1334
  ENODERPC = (TNODE, "ENODERPC")
1335
  ENODESSH = (TNODE, "ENODESSH")
1336
  ENODEVERSION = (TNODE, "ENODEVERSION")
1337
  ENODESETUP = (TNODE, "ENODESETUP")
1338
  ENODETIME = (TNODE, "ENODETIME")
1339

    
1340
  ETYPE_FIELD = "code"
1341
  ETYPE_ERROR = "ERROR"
1342
  ETYPE_WARNING = "WARNING"
1343

    
1344
  class NodeImage(object):
1345
    """A class representing the logical and physical status of a node.
1346

1347
    @type name: string
1348
    @ivar name: the node name to which this object refers
1349
    @ivar volumes: a structure as returned from
1350
        L{ganeti.backend.GetVolumeList} (runtime)
1351
    @ivar instances: a list of running instances (runtime)
1352
    @ivar pinst: list of configured primary instances (config)
1353
    @ivar sinst: list of configured secondary instances (config)
1354
    @ivar sbp: diction of {secondary-node: list of instances} of all peers
1355
        of this node (config)
1356
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1357
    @ivar dfree: free disk, as reported by the node (runtime)
1358
    @ivar offline: the offline status (config)
1359
    @type rpc_fail: boolean
1360
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1361
        not whether the individual keys were correct) (runtime)
1362
    @type lvm_fail: boolean
1363
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1364
    @type hyp_fail: boolean
1365
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1366
    @type ghost: boolean
1367
    @ivar ghost: whether this is a known node or not (config)
1368
    @type os_fail: boolean
1369
    @ivar os_fail: whether the RPC call didn't return valid OS data
1370
    @type oslist: list
1371
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1372

1373
    """
1374
    def __init__(self, offline=False, name=None):
1375
      self.name = name
1376
      self.volumes = {}
1377
      self.instances = []
1378
      self.pinst = []
1379
      self.sinst = []
1380
      self.sbp = {}
1381
      self.mfree = 0
1382
      self.dfree = 0
1383
      self.offline = offline
1384
      self.rpc_fail = False
1385
      self.lvm_fail = False
1386
      self.hyp_fail = False
1387
      self.ghost = False
1388
      self.os_fail = False
1389
      self.oslist = {}
1390

    
1391
  def ExpandNames(self):
1392
    self.needed_locks = {
1393
      locking.LEVEL_NODE: locking.ALL_SET,
1394
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1395
    }
1396
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1397

    
1398
  def _Error(self, ecode, item, msg, *args, **kwargs):
1399
    """Format an error message.
1400

1401
    Based on the opcode's error_codes parameter, either format a
1402
    parseable error code, or a simpler error string.
1403

1404
    This must be called only from Exec and functions called from Exec.
1405

1406
    """
1407
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1408
    itype, etxt = ecode
1409
    # first complete the msg
1410
    if args:
1411
      msg = msg % args
1412
    # then format the whole message
1413
    if self.op.error_codes:
1414
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1415
    else:
1416
      if item:
1417
        item = " " + item
1418
      else:
1419
        item = ""
1420
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1421
    # and finally report it via the feedback_fn
1422
    self._feedback_fn("  - %s" % msg)
1423

    
1424
  def _ErrorIf(self, cond, *args, **kwargs):
1425
    """Log an error message if the passed condition is True.
1426

1427
    """
1428
    cond = bool(cond) or self.op.debug_simulate_errors
1429
    if cond:
1430
      self._Error(*args, **kwargs)
1431
    # do not mark the operation as failed for WARN cases only
1432
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1433
      self.bad = self.bad or cond
1434

    
1435
  def _VerifyNode(self, ninfo, nresult):
1436
    """Run multiple tests against a node.
1437

1438
    Test list:
1439

1440
      - compares ganeti version
1441
      - checks vg existence and size > 20G
1442
      - checks config file checksum
1443
      - checks ssh to other nodes
1444

1445
    @type ninfo: L{objects.Node}
1446
    @param ninfo: the node to check
1447
    @param nresult: the results from the node
1448
    @rtype: boolean
1449
    @return: whether overall this call was successful (and we can expect
1450
         reasonable values in the respose)
1451

1452
    """
1453
    node = ninfo.name
1454
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1455

    
1456
    # main result, nresult should be a non-empty dict
1457
    test = not nresult or not isinstance(nresult, dict)
1458
    _ErrorIf(test, self.ENODERPC, node,
1459
                  "unable to verify node: no data returned")
1460
    if test:
1461
      return False
1462

    
1463
    # compares ganeti version
1464
    local_version = constants.PROTOCOL_VERSION
1465
    remote_version = nresult.get("version", None)
1466
    test = not (remote_version and
1467
                isinstance(remote_version, (list, tuple)) and
1468
                len(remote_version) == 2)
1469
    _ErrorIf(test, self.ENODERPC, node,
1470
             "connection to node returned invalid data")
1471
    if test:
1472
      return False
1473

    
1474
    test = local_version != remote_version[0]
1475
    _ErrorIf(test, self.ENODEVERSION, node,
1476
             "incompatible protocol versions: master %s,"
1477
             " node %s", local_version, remote_version[0])
1478
    if test:
1479
      return False
1480

    
1481
    # node seems compatible, we can actually try to look into its results
1482

    
1483
    # full package version
1484
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1485
                  self.ENODEVERSION, node,
1486
                  "software version mismatch: master %s, node %s",
1487
                  constants.RELEASE_VERSION, remote_version[1],
1488
                  code=self.ETYPE_WARNING)
1489

    
1490
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1491
    if isinstance(hyp_result, dict):
1492
      for hv_name, hv_result in hyp_result.iteritems():
1493
        test = hv_result is not None
1494
        _ErrorIf(test, self.ENODEHV, node,
1495
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1496

    
1497

    
1498
    test = nresult.get(constants.NV_NODESETUP,
1499
                           ["Missing NODESETUP results"])
1500
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1501
             "; ".join(test))
1502

    
1503
    return True
1504

    
1505
  def _VerifyNodeTime(self, ninfo, nresult,
1506
                      nvinfo_starttime, nvinfo_endtime):
1507
    """Check the node time.
1508

1509
    @type ninfo: L{objects.Node}
1510
    @param ninfo: the node to check
1511
    @param nresult: the remote results for the node
1512
    @param nvinfo_starttime: the start time of the RPC call
1513
    @param nvinfo_endtime: the end time of the RPC call
1514

1515
    """
1516
    node = ninfo.name
1517
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1518

    
1519
    ntime = nresult.get(constants.NV_TIME, None)
1520
    try:
1521
      ntime_merged = utils.MergeTime(ntime)
1522
    except (ValueError, TypeError):
1523
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1524
      return
1525

    
1526
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1527
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1528
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1529
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1530
    else:
1531
      ntime_diff = None
1532

    
1533
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1534
             "Node time diverges by at least %s from master node time",
1535
             ntime_diff)
1536

    
1537
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1538
    """Check the node time.
1539

1540
    @type ninfo: L{objects.Node}
1541
    @param ninfo: the node to check
1542
    @param nresult: the remote results for the node
1543
    @param vg_name: the configured VG name
1544

1545
    """
1546
    if vg_name is None:
1547
      return
1548

    
1549
    node = ninfo.name
1550
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1551

    
1552
    # checks vg existence and size > 20G
1553
    vglist = nresult.get(constants.NV_VGLIST, None)
1554
    test = not vglist
1555
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1556
    if not test:
1557
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1558
                                            constants.MIN_VG_SIZE)
1559
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1560

    
1561
    # check pv names
1562
    pvlist = nresult.get(constants.NV_PVLIST, None)
1563
    test = pvlist is None
1564
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1565
    if not test:
1566
      # check that ':' is not present in PV names, since it's a
1567
      # special character for lvcreate (denotes the range of PEs to
1568
      # use on the PV)
1569
      for _, pvname, owner_vg in pvlist:
1570
        test = ":" in pvname
1571
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1572
                 " '%s' of VG '%s'", pvname, owner_vg)
1573

    
1574
  def _VerifyNodeNetwork(self, ninfo, nresult):
1575
    """Check the node time.
1576

1577
    @type ninfo: L{objects.Node}
1578
    @param ninfo: the node to check
1579
    @param nresult: the remote results for the node
1580

1581
    """
1582
    node = ninfo.name
1583
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1584

    
1585
    test = constants.NV_NODELIST not in nresult
1586
    _ErrorIf(test, self.ENODESSH, node,
1587
             "node hasn't returned node ssh connectivity data")
1588
    if not test:
1589
      if nresult[constants.NV_NODELIST]:
1590
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1591
          _ErrorIf(True, self.ENODESSH, node,
1592
                   "ssh communication with node '%s': %s", a_node, a_msg)
1593

    
1594
    test = constants.NV_NODENETTEST not in nresult
1595
    _ErrorIf(test, self.ENODENET, node,
1596
             "node hasn't returned node tcp connectivity data")
1597
    if not test:
1598
      if nresult[constants.NV_NODENETTEST]:
1599
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1600
        for anode in nlist:
1601
          _ErrorIf(True, self.ENODENET, node,
1602
                   "tcp communication with node '%s': %s",
1603
                   anode, nresult[constants.NV_NODENETTEST][anode])
1604

    
1605
    test = constants.NV_MASTERIP not in nresult
1606
    _ErrorIf(test, self.ENODENET, node,
1607
             "node hasn't returned node master IP reachability data")
1608
    if not test:
1609
      if not nresult[constants.NV_MASTERIP]:
1610
        if node == self.master_node:
1611
          msg = "the master node cannot reach the master IP (not configured?)"
1612
        else:
1613
          msg = "cannot reach the master IP"
1614
        _ErrorIf(True, self.ENODENET, node, msg)
1615

    
1616

    
1617
  def _VerifyInstance(self, instance, instanceconfig, node_image):
1618
    """Verify an instance.
1619

1620
    This function checks to see if the required block devices are
1621
    available on the instance's node.
1622

1623
    """
1624
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1625
    node_current = instanceconfig.primary_node
1626

    
1627
    node_vol_should = {}
1628
    instanceconfig.MapLVsByNode(node_vol_should)
1629

    
1630
    for node in node_vol_should:
1631
      n_img = node_image[node]
1632
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1633
        # ignore missing volumes on offline or broken nodes
1634
        continue
1635
      for volume in node_vol_should[node]:
1636
        test = volume not in n_img.volumes
1637
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1638
                 "volume %s missing on node %s", volume, node)
1639

    
1640
    if instanceconfig.admin_up:
1641
      pri_img = node_image[node_current]
1642
      test = instance not in pri_img.instances and not pri_img.offline
1643
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1644
               "instance not running on its primary node %s",
1645
               node_current)
1646

    
1647
    for node, n_img in node_image.items():
1648
      if (not node == node_current):
1649
        test = instance in n_img.instances
1650
        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1651
                 "instance should not run on node %s", node)
1652

    
1653
  def _VerifyOrphanVolumes(self, node_vol_should, node_image):
1654
    """Verify if there are any unknown volumes in the cluster.
1655

1656
    The .os, .swap and backup volumes are ignored. All other volumes are
1657
    reported as unknown.
1658

1659
    """
1660
    for node, n_img in node_image.items():
1661
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1662
        # skip non-healthy nodes
1663
        continue
1664
      for volume in n_img.volumes:
1665
        test = (node not in node_vol_should or
1666
                volume not in node_vol_should[node])
1667
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1668
                      "volume %s is unknown", volume)
1669

    
1670
  def _VerifyOrphanInstances(self, instancelist, node_image):
1671
    """Verify the list of running instances.
1672

1673
    This checks what instances are running but unknown to the cluster.
1674

1675
    """
1676
    for node, n_img in node_image.items():
1677
      for o_inst in n_img.instances:
1678
        test = o_inst not in instancelist
1679
        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1680
                      "instance %s on node %s should not exist", o_inst, node)
1681

    
1682
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1683
    """Verify N+1 Memory Resilience.
1684

1685
    Check that if one single node dies we can still start all the
1686
    instances it was primary for.
1687

1688
    """
1689
    for node, n_img in node_image.items():
1690
      # This code checks that every node which is now listed as
1691
      # secondary has enough memory to host all instances it is
1692
      # supposed to should a single other node in the cluster fail.
1693
      # FIXME: not ready for failover to an arbitrary node
1694
      # FIXME: does not support file-backed instances
1695
      # WARNING: we currently take into account down instances as well
1696
      # as up ones, considering that even if they're down someone
1697
      # might want to start them even in the event of a node failure.
1698
      for prinode, instances in n_img.sbp.items():
1699
        needed_mem = 0
1700
        for instance in instances:
1701
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1702
          if bep[constants.BE_AUTO_BALANCE]:
1703
            needed_mem += bep[constants.BE_MEMORY]
1704
        test = n_img.mfree < needed_mem
1705
        self._ErrorIf(test, self.ENODEN1, node,
1706
                      "not enough memory on to accommodate"
1707
                      " failovers should peer node %s fail", prinode)
1708

    
1709
  def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum,
1710
                       master_files):
1711
    """Verifies and computes the node required file checksums.
1712

1713
    @type ninfo: L{objects.Node}
1714
    @param ninfo: the node to check
1715
    @param nresult: the remote results for the node
1716
    @param file_list: required list of files
1717
    @param local_cksum: dictionary of local files and their checksums
1718
    @param master_files: list of files that only masters should have
1719

1720
    """
1721
    node = ninfo.name
1722
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1723

    
1724
    remote_cksum = nresult.get(constants.NV_FILELIST, None)
1725
    test = not isinstance(remote_cksum, dict)
1726
    _ErrorIf(test, self.ENODEFILECHECK, node,
1727
             "node hasn't returned file checksum data")
1728
    if test:
1729
      return
1730

    
1731
    for file_name in file_list:
1732
      node_is_mc = ninfo.master_candidate
1733
      must_have = (file_name not in master_files) or node_is_mc
1734
      # missing
1735
      test1 = file_name not in remote_cksum
1736
      # invalid checksum
1737
      test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1738
      # existing and good
1739
      test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1740
      _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1741
               "file '%s' missing", file_name)
1742
      _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1743
               "file '%s' has wrong checksum", file_name)
1744
      # not candidate and this is not a must-have file
1745
      _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1746
               "file '%s' should not exist on non master"
1747
               " candidates (and the file is outdated)", file_name)
1748
      # all good, except non-master/non-must have combination
1749
      _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1750
               "file '%s' should not exist"
1751
               " on non master candidates", file_name)
1752

    
1753
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
1754
                      drbd_map):
1755
    """Verifies and the node DRBD status.
1756

1757
    @type ninfo: L{objects.Node}
1758
    @param ninfo: the node to check
1759
    @param nresult: the remote results for the node
1760
    @param instanceinfo: the dict of instances
1761
    @param drbd_helper: the configured DRBD usermode helper
1762
    @param drbd_map: the DRBD map as returned by
1763
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
1764

1765
    """
1766
    node = ninfo.name
1767
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1768

    
1769
    if drbd_helper:
1770
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
1771
      test = (helper_result == None)
1772
      _ErrorIf(test, self.ENODEDRBDHELPER, node,
1773
               "no drbd usermode helper returned")
1774
      if helper_result:
1775
        status, payload = helper_result
1776
        test = not status
1777
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
1778
                 "drbd usermode helper check unsuccessful: %s", payload)
1779
        test = status and (payload != drbd_helper)
1780
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
1781
                 "wrong drbd usermode helper: %s", payload)
1782

    
1783
    # compute the DRBD minors
1784
    node_drbd = {}
1785
    for minor, instance in drbd_map[node].items():
1786
      test = instance not in instanceinfo
1787
      _ErrorIf(test, self.ECLUSTERCFG, None,
1788
               "ghost instance '%s' in temporary DRBD map", instance)
1789
        # ghost instance should not be running, but otherwise we
1790
        # don't give double warnings (both ghost instance and
1791
        # unallocated minor in use)
1792
      if test:
1793
        node_drbd[minor] = (instance, False)
1794
      else:
1795
        instance = instanceinfo[instance]
1796
        node_drbd[minor] = (instance.name, instance.admin_up)
1797

    
1798
    # and now check them
1799
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
1800
    test = not isinstance(used_minors, (tuple, list))
1801
    _ErrorIf(test, self.ENODEDRBD, node,
1802
             "cannot parse drbd status file: %s", str(used_minors))
1803
    if test:
1804
      # we cannot check drbd status
1805
      return
1806

    
1807
    for minor, (iname, must_exist) in node_drbd.items():
1808
      test = minor not in used_minors and must_exist
1809
      _ErrorIf(test, self.ENODEDRBD, node,
1810
               "drbd minor %d of instance %s is not active", minor, iname)
1811
    for minor in used_minors:
1812
      test = minor not in node_drbd
1813
      _ErrorIf(test, self.ENODEDRBD, node,
1814
               "unallocated drbd minor %d is in use", minor)
1815

    
1816
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
1817
    """Builds the node OS structures.
1818

1819
    @type ninfo: L{objects.Node}
1820
    @param ninfo: the node to check
1821
    @param nresult: the remote results for the node
1822
    @param nimg: the node image object
1823

1824
    """
1825
    node = ninfo.name
1826
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1827

    
1828
    remote_os = nresult.get(constants.NV_OSLIST, None)
1829
    test = (not isinstance(remote_os, list) or
1830
            not compat.all(isinstance(v, list) and len(v) == 7
1831
                           for v in remote_os))
1832

    
1833
    _ErrorIf(test, self.ENODEOS, node,
1834
             "node hasn't returned valid OS data")
1835

    
1836
    nimg.os_fail = test
1837

    
1838
    if test:
1839
      return
1840

    
1841
    os_dict = {}
1842

    
1843
    for (name, os_path, status, diagnose,
1844
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
1845

    
1846
      if name not in os_dict:
1847
        os_dict[name] = []
1848

    
1849
      # parameters is a list of lists instead of list of tuples due to
1850
      # JSON lacking a real tuple type, fix it:
1851
      parameters = [tuple(v) for v in parameters]
1852
      os_dict[name].append((os_path, status, diagnose,
1853
                            set(variants), set(parameters), set(api_ver)))
1854

    
1855
    nimg.oslist = os_dict
1856

    
1857
  def _VerifyNodeOS(self, ninfo, nimg, base):
1858
    """Verifies the node OS list.
1859

1860
    @type ninfo: L{objects.Node}
1861
    @param ninfo: the node to check
1862
    @param nimg: the node image object
1863
    @param base: the 'template' node we match against (e.g. from the master)
1864

1865
    """
1866
    node = ninfo.name
1867
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1868

    
1869
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
1870

    
1871
    for os_name, os_data in nimg.oslist.items():
1872
      assert os_data, "Empty OS status for OS %s?!" % os_name
1873
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
1874
      _ErrorIf(not f_status, self.ENODEOS, node,
1875
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
1876
      _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
1877
               "OS '%s' has multiple entries (first one shadows the rest): %s",
1878
               os_name, utils.CommaJoin([v[0] for v in os_data]))
1879
      # this will catched in backend too
1880
      _ErrorIf(compat.any(v >= constants.OS_API_V15 for v in f_api)
1881
               and not f_var, self.ENODEOS, node,
1882
               "OS %s with API at least %d does not declare any variant",
1883
               os_name, constants.OS_API_V15)
1884
      # comparisons with the 'base' image
1885
      test = os_name not in base.oslist
1886
      _ErrorIf(test, self.ENODEOS, node,
1887
               "Extra OS %s not present on reference node (%s)",
1888
               os_name, base.name)
1889
      if test:
1890
        continue
1891
      assert base.oslist[os_name], "Base node has empty OS status?"
1892
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
1893
      if not b_status:
1894
        # base OS is invalid, skipping
1895
        continue
1896
      for kind, a, b in [("API version", f_api, b_api),
1897
                         ("variants list", f_var, b_var),
1898
                         ("parameters", f_param, b_param)]:
1899
        _ErrorIf(a != b, self.ENODEOS, node,
1900
                 "OS %s %s differs from reference node %s: %s vs. %s",
1901
                 kind, os_name, base.name,
1902
                 utils.CommaJoin(a), utils.CommaJoin(b))
1903

    
1904
    # check any missing OSes
1905
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
1906
    _ErrorIf(missing, self.ENODEOS, node,
1907
             "OSes present on reference node %s but missing on this node: %s",
1908
             base.name, utils.CommaJoin(missing))
1909

    
1910
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
1911
    """Verifies and updates the node volume data.
1912

1913
    This function will update a L{NodeImage}'s internal structures
1914
    with data from the remote call.
1915

1916
    @type ninfo: L{objects.Node}
1917
    @param ninfo: the node to check
1918
    @param nresult: the remote results for the node
1919
    @param nimg: the node image object
1920
    @param vg_name: the configured VG name
1921

1922
    """
1923
    node = ninfo.name
1924
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1925

    
1926
    nimg.lvm_fail = True
1927
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1928
    if vg_name is None:
1929
      pass
1930
    elif isinstance(lvdata, basestring):
1931
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1932
               utils.SafeEncode(lvdata))
1933
    elif not isinstance(lvdata, dict):
1934
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1935
    else:
1936
      nimg.volumes = lvdata
1937
      nimg.lvm_fail = False
1938

    
1939
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
1940
    """Verifies and updates the node instance list.
1941

1942
    If the listing was successful, then updates this node's instance
1943
    list. Otherwise, it marks the RPC call as failed for the instance
1944
    list key.
1945

1946
    @type ninfo: L{objects.Node}
1947
    @param ninfo: the node to check
1948
    @param nresult: the remote results for the node
1949
    @param nimg: the node image object
1950

1951
    """
1952
    idata = nresult.get(constants.NV_INSTANCELIST, None)
1953
    test = not isinstance(idata, list)
1954
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
1955
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
1956
    if test:
1957
      nimg.hyp_fail = True
1958
    else:
1959
      nimg.instances = idata
1960

    
1961
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
1962
    """Verifies and computes a node information map
1963

1964
    @type ninfo: L{objects.Node}
1965
    @param ninfo: the node to check
1966
    @param nresult: the remote results for the node
1967
    @param nimg: the node image object
1968
    @param vg_name: the configured VG name
1969

1970
    """
1971
    node = ninfo.name
1972
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1973

    
1974
    # try to read free memory (from the hypervisor)
1975
    hv_info = nresult.get(constants.NV_HVINFO, None)
1976
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
1977
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1978
    if not test:
1979
      try:
1980
        nimg.mfree = int(hv_info["memory_free"])
1981
      except (ValueError, TypeError):
1982
        _ErrorIf(True, self.ENODERPC, node,
1983
                 "node returned invalid nodeinfo, check hypervisor")
1984

    
1985
    # FIXME: devise a free space model for file based instances as well
1986
    if vg_name is not None:
1987
      test = (constants.NV_VGLIST not in nresult or
1988
              vg_name not in nresult[constants.NV_VGLIST])
1989
      _ErrorIf(test, self.ENODELVM, node,
1990
               "node didn't return data for the volume group '%s'"
1991
               " - it is either missing or broken", vg_name)
1992
      if not test:
1993
        try:
1994
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
1995
        except (ValueError, TypeError):
1996
          _ErrorIf(True, self.ENODERPC, node,
1997
                   "node returned invalid LVM info, check LVM status")
1998

    
1999
  def BuildHooksEnv(self):
2000
    """Build hooks env.
2001

2002
    Cluster-Verify hooks just ran in the post phase and their failure makes
2003
    the output be logged in the verify output and the verification to fail.
2004

2005
    """
2006
    all_nodes = self.cfg.GetNodeList()
2007
    env = {
2008
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2009
      }
2010
    for node in self.cfg.GetAllNodesInfo().values():
2011
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
2012

    
2013
    return env, [], all_nodes
2014

    
2015
  def Exec(self, feedback_fn):
2016
    """Verify integrity of cluster, performing various test on nodes.
2017

2018
    """
2019
    self.bad = False
2020
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2021
    verbose = self.op.verbose
2022
    self._feedback_fn = feedback_fn
2023
    feedback_fn("* Verifying global settings")
2024
    for msg in self.cfg.VerifyConfig():
2025
      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
2026

    
2027
    # Check the cluster certificates
2028
    for cert_filename in constants.ALL_CERT_FILES:
2029
      (errcode, msg) = _VerifyCertificate(cert_filename)
2030
      _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
2031

    
2032
    vg_name = self.cfg.GetVGName()
2033
    drbd_helper = self.cfg.GetDRBDHelper()
2034
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2035
    cluster = self.cfg.GetClusterInfo()
2036
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
2037
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
2038
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
2039
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
2040
                        for iname in instancelist)
2041
    i_non_redundant = [] # Non redundant instances
2042
    i_non_a_balanced = [] # Non auto-balanced instances
2043
    n_offline = 0 # Count of offline nodes
2044
    n_drained = 0 # Count of nodes being drained
2045
    node_vol_should = {}
2046

    
2047
    # FIXME: verify OS list
2048
    # do local checksums
2049
    master_files = [constants.CLUSTER_CONF_FILE]
2050
    master_node = self.master_node = self.cfg.GetMasterNode()
2051
    master_ip = self.cfg.GetMasterIP()
2052

    
2053
    file_names = ssconf.SimpleStore().GetFileList()
2054
    file_names.extend(constants.ALL_CERT_FILES)
2055
    file_names.extend(master_files)
2056
    if cluster.modify_etc_hosts:
2057
      file_names.append(constants.ETC_HOSTS)
2058

    
2059
    local_checksums = utils.FingerprintFiles(file_names)
2060

    
2061
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
2062
    node_verify_param = {
2063
      constants.NV_FILELIST: file_names,
2064
      constants.NV_NODELIST: [node.name for node in nodeinfo
2065
                              if not node.offline],
2066
      constants.NV_HYPERVISOR: hypervisors,
2067
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
2068
                                  node.secondary_ip) for node in nodeinfo
2069
                                 if not node.offline],
2070
      constants.NV_INSTANCELIST: hypervisors,
2071
      constants.NV_VERSION: None,
2072
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2073
      constants.NV_NODESETUP: None,
2074
      constants.NV_TIME: None,
2075
      constants.NV_MASTERIP: (master_node, master_ip),
2076
      constants.NV_OSLIST: None,
2077
      }
2078

    
2079
    if vg_name is not None:
2080
      node_verify_param[constants.NV_VGLIST] = None
2081
      node_verify_param[constants.NV_LVLIST] = vg_name
2082
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2083
      node_verify_param[constants.NV_DRBDLIST] = None
2084

    
2085
    if drbd_helper:
2086
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2087

    
2088
    # Build our expected cluster state
2089
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2090
                                                 name=node.name))
2091
                      for node in nodeinfo)
2092

    
2093
    for instance in instancelist:
2094
      inst_config = instanceinfo[instance]
2095

    
2096
      for nname in inst_config.all_nodes:
2097
        if nname not in node_image:
2098
          # ghost node
2099
          gnode = self.NodeImage(name=nname)
2100
          gnode.ghost = True
2101
          node_image[nname] = gnode
2102

    
2103
      inst_config.MapLVsByNode(node_vol_should)
2104

    
2105
      pnode = inst_config.primary_node
2106
      node_image[pnode].pinst.append(instance)
2107

    
2108
      for snode in inst_config.secondary_nodes:
2109
        nimg = node_image[snode]
2110
        nimg.sinst.append(instance)
2111
        if pnode not in nimg.sbp:
2112
          nimg.sbp[pnode] = []
2113
        nimg.sbp[pnode].append(instance)
2114

    
2115
    # At this point, we have the in-memory data structures complete,
2116
    # except for the runtime information, which we'll gather next
2117

    
2118
    # Due to the way our RPC system works, exact response times cannot be
2119
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2120
    # time before and after executing the request, we can at least have a time
2121
    # window.
2122
    nvinfo_starttime = time.time()
2123
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
2124
                                           self.cfg.GetClusterName())
2125
    nvinfo_endtime = time.time()
2126

    
2127
    all_drbd_map = self.cfg.ComputeDRBDMap()
2128

    
2129
    feedback_fn("* Verifying node status")
2130

    
2131
    refos_img = None
2132

    
2133
    for node_i in nodeinfo:
2134
      node = node_i.name
2135
      nimg = node_image[node]
2136

    
2137
      if node_i.offline:
2138
        if verbose:
2139
          feedback_fn("* Skipping offline node %s" % (node,))
2140
        n_offline += 1
2141
        continue
2142

    
2143
      if node == master_node:
2144
        ntype = "master"
2145
      elif node_i.master_candidate:
2146
        ntype = "master candidate"
2147
      elif node_i.drained:
2148
        ntype = "drained"
2149
        n_drained += 1
2150
      else:
2151
        ntype = "regular"
2152
      if verbose:
2153
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2154

    
2155
      msg = all_nvinfo[node].fail_msg
2156
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
2157
      if msg:
2158
        nimg.rpc_fail = True
2159
        continue
2160

    
2161
      nresult = all_nvinfo[node].payload
2162

    
2163
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2164
      self._VerifyNodeNetwork(node_i, nresult)
2165
      self._VerifyNodeLVM(node_i, nresult, vg_name)
2166
      self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums,
2167
                            master_files)
2168
      self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper,
2169
                           all_drbd_map)
2170
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2171

    
2172
      self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2173
      self._UpdateNodeInstances(node_i, nresult, nimg)
2174
      self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2175
      self._UpdateNodeOS(node_i, nresult, nimg)
2176
      if not nimg.os_fail:
2177
        if refos_img is None:
2178
          refos_img = nimg
2179
        self._VerifyNodeOS(node_i, nimg, refos_img)
2180

    
2181
    feedback_fn("* Verifying instance status")
2182
    for instance in instancelist:
2183
      if verbose:
2184
        feedback_fn("* Verifying instance %s" % instance)
2185
      inst_config = instanceinfo[instance]
2186
      self._VerifyInstance(instance, inst_config, node_image)
2187
      inst_nodes_offline = []
2188

    
2189
      pnode = inst_config.primary_node
2190
      pnode_img = node_image[pnode]
2191
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2192
               self.ENODERPC, pnode, "instance %s, connection to"
2193
               " primary node failed", instance)
2194

    
2195
      if pnode_img.offline:
2196
        inst_nodes_offline.append(pnode)
2197

    
2198
      # If the instance is non-redundant we cannot survive losing its primary
2199
      # node, so we are not N+1 compliant. On the other hand we have no disk
2200
      # templates with more than one secondary so that situation is not well
2201
      # supported either.
2202
      # FIXME: does not support file-backed instances
2203
      if not inst_config.secondary_nodes:
2204
        i_non_redundant.append(instance)
2205
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
2206
               instance, "instance has multiple secondary nodes: %s",
2207
               utils.CommaJoin(inst_config.secondary_nodes),
2208
               code=self.ETYPE_WARNING)
2209

    
2210
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2211
        i_non_a_balanced.append(instance)
2212

    
2213
      for snode in inst_config.secondary_nodes:
2214
        s_img = node_image[snode]
2215
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
2216
                 "instance %s, connection to secondary node failed", instance)
2217

    
2218
        if s_img.offline:
2219
          inst_nodes_offline.append(snode)
2220

    
2221
      # warn that the instance lives on offline nodes
2222
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
2223
               "instance lives on offline node(s) %s",
2224
               utils.CommaJoin(inst_nodes_offline))
2225
      # ... or ghost nodes
2226
      for node in inst_config.all_nodes:
2227
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
2228
                 "instance lives on ghost node %s", node)
2229

    
2230
    feedback_fn("* Verifying orphan volumes")
2231
    self._VerifyOrphanVolumes(node_vol_should, node_image)
2232

    
2233
    feedback_fn("* Verifying orphan instances")
2234
    self._VerifyOrphanInstances(instancelist, node_image)
2235

    
2236
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2237
      feedback_fn("* Verifying N+1 Memory redundancy")
2238
      self._VerifyNPlusOneMemory(node_image, instanceinfo)
2239

    
2240
    feedback_fn("* Other Notes")
2241
    if i_non_redundant:
2242
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2243
                  % len(i_non_redundant))
2244

    
2245
    if i_non_a_balanced:
2246
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2247
                  % len(i_non_a_balanced))
2248

    
2249
    if n_offline:
2250
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2251

    
2252
    if n_drained:
2253
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2254

    
2255
    return not self.bad
2256

    
2257
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2258
    """Analyze the post-hooks' result
2259

2260
    This method analyses the hook result, handles it, and sends some
2261
    nicely-formatted feedback back to the user.
2262

2263
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2264
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2265
    @param hooks_results: the results of the multi-node hooks rpc call
2266
    @param feedback_fn: function used send feedback back to the caller
2267
    @param lu_result: previous Exec result
2268
    @return: the new Exec result, based on the previous result
2269
        and hook results
2270

2271
    """
2272
    # We only really run POST phase hooks, and are only interested in
2273
    # their results
2274
    if phase == constants.HOOKS_PHASE_POST:
2275
      # Used to change hooks' output to proper indentation
2276
      indent_re = re.compile('^', re.M)
2277
      feedback_fn("* Hooks Results")
2278
      assert hooks_results, "invalid result from hooks"
2279

    
2280
      for node_name in hooks_results:
2281
        res = hooks_results[node_name]
2282
        msg = res.fail_msg
2283
        test = msg and not res.offline
2284
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
2285
                      "Communication failure in hooks execution: %s", msg)
2286
        if res.offline or msg:
2287
          # No need to investigate payload if node is offline or gave an error.
2288
          # override manually lu_result here as _ErrorIf only
2289
          # overrides self.bad
2290
          lu_result = 1
2291
          continue
2292
        for script, hkr, output in res.payload:
2293
          test = hkr == constants.HKR_FAIL
2294
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
2295
                        "Script %s failed, output:", script)
2296
          if test:
2297
            output = indent_re.sub('      ', output)
2298
            feedback_fn("%s" % output)
2299
            lu_result = 0
2300

    
2301
      return lu_result
2302

    
2303

    
2304
class LUVerifyDisks(NoHooksLU):
2305
  """Verifies the cluster disks status.
2306

2307
  """
2308
  REQ_BGL = False
2309

    
2310
  def ExpandNames(self):
2311
    self.needed_locks = {
2312
      locking.LEVEL_NODE: locking.ALL_SET,
2313
      locking.LEVEL_INSTANCE: locking.ALL_SET,
2314
    }
2315
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
2316

    
2317
  def Exec(self, feedback_fn):
2318
    """Verify integrity of cluster disks.
2319

2320
    @rtype: tuple of three items
2321
    @return: a tuple of (dict of node-to-node_error, list of instances
2322
        which need activate-disks, dict of instance: (node, volume) for
2323
        missing volumes
2324

2325
    """
2326
    result = res_nodes, res_instances, res_missing = {}, [], {}
2327

    
2328
    vg_name = self.cfg.GetVGName()
2329
    nodes = utils.NiceSort(self.cfg.GetNodeList())
2330
    instances = [self.cfg.GetInstanceInfo(name)
2331
                 for name in self.cfg.GetInstanceList()]
2332

    
2333
    nv_dict = {}
2334
    for inst in instances:
2335
      inst_lvs = {}
2336
      if (not inst.admin_up or
2337
          inst.disk_template not in constants.DTS_NET_MIRROR):
2338
        continue
2339
      inst.MapLVsByNode(inst_lvs)
2340
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
2341
      for node, vol_list in inst_lvs.iteritems():
2342
        for vol in vol_list:
2343
          nv_dict[(node, vol)] = inst
2344

    
2345
    if not nv_dict:
2346
      return result
2347

    
2348
    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
2349

    
2350
    for node in nodes:
2351
      # node_volume
2352
      node_res = node_lvs[node]
2353
      if node_res.offline:
2354
        continue
2355
      msg = node_res.fail_msg
2356
      if msg:
2357
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
2358
        res_nodes[node] = msg
2359
        continue
2360

    
2361
      lvs = node_res.payload
2362
      for lv_name, (_, _, lv_online) in lvs.items():
2363
        inst = nv_dict.pop((node, lv_name), None)
2364
        if (not lv_online and inst is not None
2365
            and inst.name not in res_instances):
2366
          res_instances.append(inst.name)
2367

    
2368
    # any leftover items in nv_dict are missing LVs, let's arrange the
2369
    # data better
2370
    for key, inst in nv_dict.iteritems():
2371
      if inst.name not in res_missing:
2372
        res_missing[inst.name] = []
2373
      res_missing[inst.name].append(key)
2374

    
2375
    return result
2376

    
2377

    
2378
class LURepairDiskSizes(NoHooksLU):
2379
  """Verifies the cluster disks sizes.
2380

2381
  """
2382
  _OP_PARAMS = [("instances", _EmptyList, _TListOf(_TNonEmptyString))]
2383
  REQ_BGL = False
2384

    
2385
  def ExpandNames(self):
2386
    if self.op.instances:
2387
      self.wanted_names = []
2388
      for name in self.op.instances:
2389
        full_name = _ExpandInstanceName(self.cfg, name)
2390
        self.wanted_names.append(full_name)
2391
      self.needed_locks = {
2392
        locking.LEVEL_NODE: [],
2393
        locking.LEVEL_INSTANCE: self.wanted_names,
2394
        }
2395
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2396
    else:
2397
      self.wanted_names = None
2398
      self.needed_locks = {
2399
        locking.LEVEL_NODE: locking.ALL_SET,
2400
        locking.LEVEL_INSTANCE: locking.ALL_SET,
2401
        }
2402
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
2403

    
2404
  def DeclareLocks(self, level):
2405
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
2406
      self._LockInstancesNodes(primary_only=True)
2407

    
2408
  def CheckPrereq(self):
2409
    """Check prerequisites.
2410

2411
    This only checks the optional instance list against the existing names.
2412

2413
    """
2414
    if self.wanted_names is None:
2415
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2416

    
2417
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
2418
                             in self.wanted_names]
2419

    
2420
  def _EnsureChildSizes(self, disk):
2421
    """Ensure children of the disk have the needed disk size.
2422

2423
    This is valid mainly for DRBD8 and fixes an issue where the
2424
    children have smaller disk size.
2425

2426
    @param disk: an L{ganeti.objects.Disk} object
2427

2428
    """
2429
    if disk.dev_type == constants.LD_DRBD8:
2430
      assert disk.children, "Empty children for DRBD8?"
2431
      fchild = disk.children[0]
2432
      mismatch = fchild.size < disk.size
2433
      if mismatch:
2434
        self.LogInfo("Child disk has size %d, parent %d, fixing",
2435
                     fchild.size, disk.size)
2436
        fchild.size = disk.size
2437

    
2438
      # and we recurse on this child only, not on the metadev
2439
      return self._EnsureChildSizes(fchild) or mismatch
2440
    else:
2441
      return False
2442

    
2443
  def Exec(self, feedback_fn):
2444
    """Verify the size of cluster disks.
2445

2446
    """
2447
    # TODO: check child disks too
2448
    # TODO: check differences in size between primary/secondary nodes
2449
    per_node_disks = {}
2450
    for instance in self.wanted_instances:
2451
      pnode = instance.primary_node
2452
      if pnode not in per_node_disks:
2453
        per_node_disks[pnode] = []
2454
      for idx, disk in enumerate(instance.disks):
2455
        per_node_disks[pnode].append((instance, idx, disk))
2456

    
2457
    changed = []
2458
    for node, dskl in per_node_disks.items():
2459
      newl = [v[2].Copy() for v in dskl]
2460
      for dsk in newl:
2461
        self.cfg.SetDiskID(dsk, node)
2462
      result = self.rpc.call_blockdev_getsizes(node, newl)
2463
      if result.fail_msg:
2464
        self.LogWarning("Failure in blockdev_getsizes call to node"
2465
                        " %s, ignoring", node)
2466
        continue
2467
      if len(result.data) != len(dskl):
2468
        self.LogWarning("Invalid result from node %s, ignoring node results",
2469
                        node)
2470
        continue
2471
      for ((instance, idx, disk), size) in zip(dskl, result.data):
2472
        if size is None:
2473
          self.LogWarning("Disk %d of instance %s did not return size"
2474
                          " information, ignoring", idx, instance.name)
2475
          continue
2476
        if not isinstance(size, (int, long)):
2477
          self.LogWarning("Disk %d of instance %s did not return valid"
2478
                          " size information, ignoring", idx, instance.name)
2479
          continue
2480
        size = size >> 20
2481
        if size != disk.size:
2482
          self.LogInfo("Disk %d of instance %s has mismatched size,"
2483
                       " correcting: recorded %d, actual %d", idx,
2484
                       instance.name, disk.size, size)
2485
          disk.size = size
2486
          self.cfg.Update(instance, feedback_fn)
2487
          changed.append((instance.name, idx, size))
2488
        if self._EnsureChildSizes(disk):
2489
          self.cfg.Update(instance, feedback_fn)
2490
          changed.append((instance.name, idx, disk.size))
2491
    return changed
2492

    
2493

    
2494
class LURenameCluster(LogicalUnit):
2495
  """Rename the cluster.
2496

2497
  """
2498
  HPATH = "cluster-rename"
2499
  HTYPE = constants.HTYPE_CLUSTER
2500
  _OP_PARAMS = [("name", _NoDefault, _TNonEmptyString)]
2501

    
2502
  def BuildHooksEnv(self):
2503
    """Build hooks env.
2504

2505
    """
2506
    env = {
2507
      "OP_TARGET": self.cfg.GetClusterName(),
2508
      "NEW_NAME": self.op.name,
2509
      }
2510
    mn = self.cfg.GetMasterNode()
2511
    all_nodes = self.cfg.GetNodeList()
2512
    return env, [mn], all_nodes
2513

    
2514
  def CheckPrereq(self):
2515
    """Verify that the passed name is a valid one.
2516

2517
    """
2518
    hostname = netutils.GetHostInfo(self.op.name)
2519

    
2520
    new_name = hostname.name
2521
    self.ip = new_ip = hostname.ip
2522
    old_name = self.cfg.GetClusterName()
2523
    old_ip = self.cfg.GetMasterIP()
2524
    if new_name == old_name and new_ip == old_ip:
2525
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
2526
                                 " cluster has changed",
2527
                                 errors.ECODE_INVAL)
2528
    if new_ip != old_ip:
2529
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
2530
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
2531
                                   " reachable on the network. Aborting." %
2532
                                   new_ip, errors.ECODE_NOTUNIQUE)
2533

    
2534
    self.op.name = new_name
2535

    
2536
  def Exec(self, feedback_fn):
2537
    """Rename the cluster.
2538

2539
    """
2540
    clustername = self.op.name
2541
    ip = self.ip
2542

    
2543
    # shutdown the master IP
2544
    master = self.cfg.GetMasterNode()
2545
    result = self.rpc.call_node_stop_master(master, False)
2546
    result.Raise("Could not disable the master role")
2547

    
2548
    try:
2549
      cluster = self.cfg.GetClusterInfo()
2550
      cluster.cluster_name = clustername
2551
      cluster.master_ip = ip
2552
      self.cfg.Update(cluster, feedback_fn)
2553

    
2554
      # update the known hosts file
2555
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
2556
      node_list = self.cfg.GetNodeList()
2557
      try:
2558
        node_list.remove(master)
2559
      except ValueError:
2560
        pass
2561
      result = self.rpc.call_upload_file(node_list,
2562
                                         constants.SSH_KNOWN_HOSTS_FILE)
2563
      for to_node, to_result in result.iteritems():
2564
        msg = to_result.fail_msg
2565
        if msg:
2566
          msg = ("Copy of file %s to node %s failed: %s" %
2567
                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
2568
          self.proc.LogWarning(msg)
2569

    
2570
    finally:
2571
      result = self.rpc.call_node_start_master(master, False, False)
2572
      msg = result.fail_msg
2573
      if msg:
2574
        self.LogWarning("Could not re-enable the master role on"
2575
                        " the master, please restart manually: %s", msg)
2576

    
2577

    
2578
class LUSetClusterParams(LogicalUnit):
2579
  """Change the parameters of the cluster.
2580

2581
  """
2582
  HPATH = "cluster-modify"
2583
  HTYPE = constants.HTYPE_CLUSTER
2584
  _OP_PARAMS = [
2585
    ("vg_name", None, _TMaybeString),
2586
    ("enabled_hypervisors", None,
2587
     _TOr(_TAnd(_TListOf(_TElemOf(constants.HYPER_TYPES)), _TTrue), _TNone)),
2588
    ("hvparams", None, _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)),
2589
    ("beparams", None, _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)),
2590
    ("os_hvp", None, _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)),
2591
    ("osparams", None, _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)),
2592
    ("candidate_pool_size", None, _TOr(_TStrictPositiveInt, _TNone)),
2593
    ("uid_pool", None, _NoType),
2594
    ("add_uids", None, _NoType),
2595
    ("remove_uids", None, _NoType),
2596
    ("maintain_node_health", None, _TMaybeBool),
2597
    ("nicparams", None, _TOr(_TDict, _TNone)),
2598
    ("drbd_helper", None, _TOr(_TString, _TNone)),
2599
    ("default_iallocator", None, _TMaybeString),
2600
    ]
2601
  REQ_BGL = False
2602

    
2603
  def CheckArguments(self):
2604
    """Check parameters
2605

2606
    """
2607
    if self.op.uid_pool:
2608
      uidpool.CheckUidPool(self.op.uid_pool)
2609

    
2610
    if self.op.add_uids:
2611
      uidpool.CheckUidPool(self.op.add_uids)
2612

    
2613
    if self.op.remove_uids:
2614
      uidpool.CheckUidPool(self.op.remove_uids)
2615

    
2616
  def ExpandNames(self):
2617
    # FIXME: in the future maybe other cluster params won't require checking on
2618
    # all nodes to be modified.
2619
    self.needed_locks = {
2620
      locking.LEVEL_NODE: locking.ALL_SET,
2621
    }
2622
    self.share_locks[locking.LEVEL_NODE] = 1
2623

    
2624
  def BuildHooksEnv(self):
2625
    """Build hooks env.
2626

2627
    """
2628
    env = {
2629
      "OP_TARGET": self.cfg.GetClusterName(),
2630
      "NEW_VG_NAME": self.op.vg_name,
2631
      }
2632
    mn = self.cfg.GetMasterNode()
2633
    return env, [mn], [mn]
2634

    
2635
  def CheckPrereq(self):
2636
    """Check prerequisites.
2637

2638
    This checks whether the given params don't conflict and
2639
    if the given volume group is valid.
2640

2641
    """
2642
    if self.op.vg_name is not None and not self.op.vg_name:
2643
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
2644
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
2645
                                   " instances exist", errors.ECODE_INVAL)
2646

    
2647
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
2648
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
2649
        raise errors.OpPrereqError("Cannot disable drbd helper while"
2650
                                   " drbd-based instances exist",
2651
                                   errors.ECODE_INVAL)
2652

    
2653
    node_list = self.acquired_locks[locking.LEVEL_NODE]
2654

    
2655
    # if vg_name not None, checks given volume group on all nodes
2656
    if self.op.vg_name:
2657
      vglist = self.rpc.call_vg_list(node_list)
2658
      for node in node_list:
2659
        msg = vglist[node].fail_msg
2660
        if msg:
2661
          # ignoring down node
2662
          self.LogWarning("Error while gathering data on node %s"
2663
                          " (ignoring node): %s", node, msg)
2664
          continue
2665
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2666
                                              self.op.vg_name,
2667
                                              constants.MIN_VG_SIZE)
2668
        if vgstatus:
2669
          raise errors.OpPrereqError("Error on node '%s': %s" %
2670
                                     (node, vgstatus), errors.ECODE_ENVIRON)
2671

    
2672
    if self.op.drbd_helper:
2673
      # checks given drbd helper on all nodes
2674
      helpers = self.rpc.call_drbd_helper(node_list)
2675
      for node in node_list:
2676
        ninfo = self.cfg.GetNodeInfo(node)
2677
        if ninfo.offline:
2678
          self.LogInfo("Not checking drbd helper on offline node %s", node)
2679
          continue
2680
        msg = helpers[node].fail_msg
2681
        if msg:
2682
          raise errors.OpPrereqError("Error checking drbd helper on node"
2683
                                     " '%s': %s" % (node, msg),
2684
                                     errors.ECODE_ENVIRON)
2685
        node_helper = helpers[node].payload
2686
        if node_helper != self.op.drbd_helper:
2687
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
2688
                                     (node, node_helper), errors.ECODE_ENVIRON)
2689

    
2690
    self.cluster = cluster = self.cfg.GetClusterInfo()
2691
    # validate params changes
2692
    if self.op.beparams:
2693
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2694
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
2695

    
2696
    if self.op.nicparams:
2697
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2698
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
2699
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
2700
      nic_errors = []
2701

    
2702
      # check all instances for consistency
2703
      for instance in self.cfg.GetAllInstancesInfo().values():
2704
        for nic_idx, nic in enumerate(instance.nics):
2705
          params_copy = copy.deepcopy(nic.nicparams)
2706
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
2707

    
2708
          # check parameter syntax
2709
          try:
2710
            objects.NIC.CheckParameterSyntax(params_filled)
2711
          except errors.ConfigurationError, err:
2712
            nic_errors.append("Instance %s, nic/%d: %s" %
2713
                              (instance.name, nic_idx, err))
2714

    
2715
          # if we're moving instances to routed, check that they have an ip
2716
          target_mode = params_filled[constants.NIC_MODE]
2717
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2718
            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2719
                              (instance.name, nic_idx))
2720
      if nic_errors:
2721
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2722
                                   "\n".join(nic_errors))
2723

    
2724
    # hypervisor list/parameters
2725
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
2726
    if self.op.hvparams:
2727
      for hv_name, hv_dict in self.op.hvparams.items():
2728
        if hv_name not in self.new_hvparams:
2729
          self.new_hvparams[hv_name] = hv_dict
2730
        else:
2731
          self.new_hvparams[hv_name].update(hv_dict)
2732

    
2733
    # os hypervisor parameters
2734
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
2735
    if self.op.os_hvp:
2736
      for os_name, hvs in self.op.os_hvp.items():
2737
        if os_name not in self.new_os_hvp:
2738
          self.new_os_hvp[os_name] = hvs
2739
        else:
2740
          for hv_name, hv_dict in hvs.items():
2741
            if hv_name not in self.new_os_hvp[os_name]:
2742
              self.new_os_hvp[os_name][hv_name] = hv_dict
2743
            else:
2744
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
2745

    
2746
    # os parameters
2747
    self.new_osp = objects.FillDict(cluster.osparams, {})
2748
    if self.op.osparams:
2749
      for os_name, osp in self.op.osparams.items():
2750
        if os_name not in self.new_osp:
2751
          self.new_osp[os_name] = {}
2752

    
2753
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
2754
                                                  use_none=True)
2755

    
2756
        if not self.new_osp[os_name]:
2757
          # we removed all parameters
2758
          del self.new_osp[os_name]
2759
        else:
2760
          # check the parameter validity (remote check)
2761
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
2762
                         os_name, self.new_osp[os_name])
2763

    
2764
    # changes to the hypervisor list
2765
    if self.op.enabled_hypervisors is not None:
2766
      self.hv_list = self.op.enabled_hypervisors
2767
      for hv in self.hv_list:
2768
        # if the hypervisor doesn't already exist in the cluster
2769
        # hvparams, we initialize it to empty, and then (in both
2770
        # cases) we make sure to fill the defaults, as we might not
2771
        # have a complete defaults list if the hypervisor wasn't
2772
        # enabled before
2773
        if hv not in new_hvp:
2774
          new_hvp[hv] = {}
2775
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
2776
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
2777
    else:
2778
      self.hv_list = cluster.enabled_hypervisors
2779

    
2780
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
2781
      # either the enabled list has changed, or the parameters have, validate
2782
      for hv_name, hv_params in self.new_hvparams.items():
2783
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
2784
            (self.op.enabled_hypervisors and
2785
             hv_name in self.op.enabled_hypervisors)):
2786
          # either this is a new hypervisor, or its parameters have changed
2787
          hv_class = hypervisor.GetHypervisor(hv_name)
2788
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2789
          hv_class.CheckParameterSyntax(hv_params)
2790
          _CheckHVParams(self, node_list, hv_name, hv_params)
2791

    
2792
    if self.op.os_hvp:
2793
      # no need to check any newly-enabled hypervisors, since the
2794
      # defaults have already been checked in the above code-block
2795
      for os_name, os_hvp in self.new_os_hvp.items():
2796
        for hv_name, hv_params in os_hvp.items():
2797
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2798
          # we need to fill in the new os_hvp on top of the actual hv_p
2799
          cluster_defaults = self.new_hvparams.get(hv_name, {})
2800
          new_osp = objects.FillDict(cluster_defaults, hv_params)
2801
          hv_class = hypervisor.GetHypervisor(hv_name)
2802
          hv_class.CheckParameterSyntax(new_osp)
2803
          _CheckHVParams(self, node_list, hv_name, new_osp)
2804

    
2805
    if self.op.default_iallocator:
2806
      alloc_script = utils.FindFile(self.op.default_iallocator,
2807
                                    constants.IALLOCATOR_SEARCH_PATH,
2808
                                    os.path.isfile)
2809
      if alloc_script is None:
2810
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
2811
                                   " specified" % self.op.default_iallocator,
2812
                                   errors.ECODE_INVAL)
2813

    
2814
  def Exec(self, feedback_fn):
2815
    """Change the parameters of the cluster.
2816

2817
    """
2818
    if self.op.vg_name is not None:
2819
      new_volume = self.op.vg_name
2820
      if not new_volume:
2821
        new_volume = None
2822
      if new_volume != self.cfg.GetVGName():
2823
        self.cfg.SetVGName(new_volume)
2824
      else:
2825
        feedback_fn("Cluster LVM configuration already in desired"
2826
                    " state, not changing")
2827
    if self.op.drbd_helper is not None:
2828
      new_helper = self.op.drbd_helper
2829
      if not new_helper:
2830
        new_helper = None
2831
      if new_helper != self.cfg.GetDRBDHelper():
2832
        self.cfg.SetDRBDHelper(new_helper)
2833
      else:
2834
        feedback_fn("Cluster DRBD helper already in desired state,"
2835
                    " not changing")
2836
    if self.op.hvparams:
2837
      self.cluster.hvparams = self.new_hvparams
2838
    if self.op.os_hvp:
2839
      self.cluster.os_hvp = self.new_os_hvp
2840
    if self.op.enabled_hypervisors is not None:
2841
      self.cluster.hvparams = self.new_hvparams
2842
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2843
    if self.op.beparams:
2844
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2845
    if self.op.nicparams:
2846
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2847
    if self.op.osparams:
2848
      self.cluster.osparams = self.new_osp
2849

    
2850
    if self.op.candidate_pool_size is not None:
2851
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
2852
      # we need to update the pool size here, otherwise the save will fail
2853
      _AdjustCandidatePool(self, [])
2854

    
2855
    if self.op.maintain_node_health is not None:
2856
      self.cluster.maintain_node_health = self.op.maintain_node_health
2857

    
2858
    if self.op.add_uids is not None:
2859
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
2860

    
2861
    if self.op.remove_uids is not None:
2862
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
2863

    
2864
    if self.op.uid_pool is not None:
2865
      self.cluster.uid_pool = self.op.uid_pool
2866

    
2867
    if self.op.default_iallocator is not None:
2868
      self.cluster.default_iallocator = self.op.default_iallocator
2869

    
2870
    self.cfg.Update(self.cluster, feedback_fn)
2871

    
2872

    
2873
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2874
  """Distribute additional files which are part of the cluster configuration.
2875

2876
  ConfigWriter takes care of distributing the config and ssconf files, but
2877
  there are more files which should be distributed to all nodes. This function
2878
  makes sure those are copied.
2879

2880
  @param lu: calling logical unit
2881
  @param additional_nodes: list of nodes not in the config to distribute to
2882

2883
  """
2884
  # 1. Gather target nodes
2885
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2886
  dist_nodes = lu.cfg.GetOnlineNodeList()
2887
  if additional_nodes is not None:
2888
    dist_nodes.extend(additional_nodes)
2889
  if myself.name in dist_nodes:
2890
    dist_nodes.remove(myself.name)
2891

    
2892
  # 2. Gather files to distribute
2893
  dist_files = set([constants.ETC_HOSTS,
2894
                    constants.SSH_KNOWN_HOSTS_FILE,
2895
                    constants.RAPI_CERT_FILE,
2896
                    constants.RAPI_USERS_FILE,
2897
                    constants.CONFD_HMAC_KEY,
2898
                    constants.CLUSTER_DOMAIN_SECRET_FILE,
2899
                   ])
2900

    
2901
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2902
  for hv_name in enabled_hypervisors:
2903
    hv_class = hypervisor.GetHypervisor(hv_name)
2904
    dist_files.update(hv_class.GetAncillaryFiles())
2905

    
2906
  # 3. Perform the files upload
2907
  for fname in dist_files:
2908
    if os.path.exists(fname):
2909
      result = lu.rpc.call_upload_file(dist_nodes, fname)
2910
      for to_node, to_result in result.items():
2911
        msg = to_result.fail_msg
2912
        if msg:
2913
          msg = ("Copy of file %s to node %s failed: %s" %
2914
                 (fname, to_node, msg))
2915
          lu.proc.LogWarning(msg)
2916

    
2917

    
2918
class LURedistributeConfig(NoHooksLU):
2919
  """Force the redistribution of cluster configuration.
2920

2921
  This is a very simple LU.
2922

2923
  """
2924
  REQ_BGL = False
2925

    
2926
  def ExpandNames(self):
2927
    self.needed_locks = {
2928
      locking.LEVEL_NODE: locking.ALL_SET,
2929
    }
2930
    self.share_locks[locking.LEVEL_NODE] = 1
2931

    
2932
  def Exec(self, feedback_fn):
2933
    """Redistribute the configuration.
2934

2935
    """
2936
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2937
    _RedistributeAncillaryFiles(self)
2938

    
2939

    
2940
def _WaitForSync(lu, instance, disks=None, oneshot=False):
2941
  """Sleep and poll for an instance's disk to sync.
2942

2943
  """
2944
  if not instance.disks or disks is not None and not disks:
2945
    return True
2946

    
2947
  disks = _ExpandCheckDisks(instance, disks)
2948

    
2949
  if not oneshot:
2950
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2951

    
2952
  node = instance.primary_node
2953

    
2954
  for dev in disks:
2955
    lu.cfg.SetDiskID(dev, node)
2956

    
2957
  # TODO: Convert to utils.Retry
2958

    
2959
  retries = 0
2960
  degr_retries = 10 # in seconds, as we sleep 1 second each time
2961
  while True:
2962
    max_time = 0
2963
    done = True
2964
    cumul_degraded = False
2965
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
2966
    msg = rstats.fail_msg
2967
    if msg:
2968
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2969
      retries += 1
2970
      if retries >= 10:
2971
        raise errors.RemoteError("Can't contact node %s for mirror data,"
2972
                                 " aborting." % node)
2973
      time.sleep(6)
2974
      continue
2975
    rstats = rstats.payload
2976
    retries = 0
2977
    for i, mstat in enumerate(rstats):
2978
      if mstat is None:
2979
        lu.LogWarning("Can't compute data for node %s/%s",
2980
                           node, disks[i].iv_name)
2981
        continue
2982

    
2983
      cumul_degraded = (cumul_degraded or
2984
                        (mstat.is_degraded and mstat.sync_percent is None))
2985
      if mstat.sync_percent is not None:
2986
        done = False
2987
        if mstat.estimated_time is not None:
2988
          rem_time = ("%s remaining (estimated)" %
2989
                      utils.FormatSeconds(mstat.estimated_time))
2990
          max_time = mstat.estimated_time
2991
        else:
2992
          rem_time = "no time estimate"
2993
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2994
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
2995

    
2996
    # if we're done but degraded, let's do a few small retries, to
2997
    # make sure we see a stable and not transient situation; therefore
2998
    # we force restart of the loop
2999
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3000
      logging.info("Degraded disks found, %d retries left", degr_retries)
3001
      degr_retries -= 1
3002
      time.sleep(1)
3003
      continue
3004

    
3005
    if done or oneshot:
3006
      break
3007

    
3008
    time.sleep(min(60, max_time))
3009

    
3010
  if done:
3011
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
3012
  return not cumul_degraded
3013

    
3014

    
3015
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3016
  """Check that mirrors are not degraded.
3017

3018
  The ldisk parameter, if True, will change the test from the
3019
  is_degraded attribute (which represents overall non-ok status for
3020
  the device(s)) to the ldisk (representing the local storage status).
3021

3022
  """
3023
  lu.cfg.SetDiskID(dev, node)
3024

    
3025
  result = True
3026

    
3027
  if on_primary or dev.AssembleOnSecondary():
3028
    rstats = lu.rpc.call_blockdev_find(node, dev)
3029
    msg = rstats.fail_msg
3030
    if msg:
3031
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
3032
      result = False
3033
    elif not rstats.payload:
3034
      lu.LogWarning("Can't find disk on node %s", node)
3035
      result = False
3036
    else:
3037
      if ldisk:
3038
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
3039
      else:
3040
        result = result and not rstats.payload.is_degraded
3041

    
3042
  if dev.children:
3043
    for child in dev.children:
3044
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
3045

    
3046
  return result
3047

    
3048

    
3049
class LUDiagnoseOS(NoHooksLU):
3050
  """Logical unit for OS diagnose/query.
3051

3052
  """
3053
  _OP_PARAMS = [
3054
    _POutputFields,
3055
    ("names", _EmptyList, _TListOf(_TNonEmptyString)),
3056
    ]
3057
  REQ_BGL = False
3058
  _FIELDS_STATIC = utils.FieldSet()
3059
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants",
3060
                                   "parameters", "api_versions")
3061

    
3062
  def CheckArguments(self):
3063
    if self.op.names:
3064
      raise errors.OpPrereqError("Selective OS query not supported",
3065
                                 errors.ECODE_INVAL)
3066

    
3067
    _CheckOutputFields(static=self._FIELDS_STATIC,
3068
                       dynamic=self._FIELDS_DYNAMIC,
3069
                       selected=self.op.output_fields)
3070

    
3071
  def ExpandNames(self):
3072
    # Lock all nodes, in shared mode
3073
    # Temporary removal of locks, should be reverted later
3074
    # TODO: reintroduce locks when they are lighter-weight
3075
    self.needed_locks = {}
3076
    #self.share_locks[locking.LEVEL_NODE] = 1
3077
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3078

    
3079
  @staticmethod
3080
  def _DiagnoseByOS(rlist):
3081
    """Remaps a per-node return list into an a per-os per-node dictionary
3082

3083
    @param rlist: a map with node names as keys and OS objects as values
3084

3085
    @rtype: dict
3086
    @return: a dictionary with osnames as keys and as value another
3087
        map, with nodes as keys and tuples of (path, status, diagnose,
3088
        variants, parameters, api_versions) as values, eg::
3089

3090
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
3091
                                     (/srv/..., False, "invalid api")],
3092
                           "node2": [(/srv/..., True, "", [], [])]}
3093
          }
3094

3095
    """
3096
    all_os = {}
3097
    # we build here the list of nodes that didn't fail the RPC (at RPC
3098
    # level), so that nodes with a non-responding node daemon don't
3099
    # make all OSes invalid
3100
    good_nodes = [node_name for node_name in rlist
3101
                  if not rlist[node_name].fail_msg]
3102
    for node_name, nr in rlist.items():
3103
      if nr.fail_msg or not nr.payload:
3104
        continue
3105
      for (name, path, status, diagnose, variants,
3106
           params, api_versions) in nr.payload:
3107
        if name not in all_os:
3108
          # build a list of nodes for this os containing empty lists
3109
          # for each node in node_list
3110
          all_os[name] = {}
3111
          for nname in good_nodes:
3112
            all_os[name][nname] = []
3113
        # convert params from [name, help] to (name, help)
3114
        params = [tuple(v) for v in params]
3115
        all_os[name][node_name].append((path, status, diagnose,
3116
                                        variants, params, api_versions))
3117
    return all_os
3118

    
3119
  def Exec(self, feedback_fn):
3120
    """Compute the list of OSes.
3121

3122
    """
3123
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
3124
    node_data = self.rpc.call_os_diagnose(valid_nodes)
3125
    pol = self._DiagnoseByOS(node_data)
3126
    output = []
3127

    
3128
    for os_name, os_data in pol.items():
3129
      row = []
3130
      valid = True
3131
      (variants, params, api_versions) = null_state = (set(), set(), set())
3132
      for idx, osl in enumerate(os_data.values()):
3133
        valid = bool(valid and osl and osl[0][1])
3134
        if not valid:
3135
          (variants, params, api_versions) = null_state
3136
          break
3137
        node_variants, node_params, node_api = osl[0][3:6]
3138
        if idx == 0: # first entry
3139
          variants = set(node_variants)
3140
          params = set(node_params)
3141
          api_versions = set(node_api)
3142
        else: # keep consistency
3143
          variants.intersection_update(node_variants)
3144
          params.intersection_update(node_params)
3145
          api_versions.intersection_update(node_api)
3146

    
3147
      for field in self.op.output_fields:
3148
        if field == "name":
3149
          val = os_name
3150
        elif field == "valid":
3151
          val = valid
3152
        elif field == "node_status":
3153
          # this is just a copy of the dict
3154
          val = {}
3155
          for node_name, nos_list in os_data.items():
3156
            val[node_name] = nos_list
3157
        elif field == "variants":
3158
          val = list(variants)
3159
        elif field == "parameters":
3160
          val = list(params)
3161
        elif field == "api_versions":
3162
          val = list(api_versions)
3163
        else:
3164
          raise errors.ParameterError(field)
3165
        row.append(val)
3166
      output.append(row)
3167

    
3168
    return output
3169

    
3170

    
3171
class LURemoveNode(LogicalUnit):
3172
  """Logical unit for removing a node.
3173

3174
  """
3175
  HPATH = "node-remove"
3176
  HTYPE = constants.HTYPE_NODE
3177
  _OP_PARAMS = [
3178
    _PNodeName,
3179
    ]
3180

    
3181
  def BuildHooksEnv(self):
3182
    """Build hooks env.
3183

3184
    This doesn't run on the target node in the pre phase as a failed
3185
    node would then be impossible to remove.
3186

3187
    """
3188
    env = {
3189
      "OP_TARGET": self.op.node_name,
3190
      "NODE_NAME": self.op.node_name,
3191
      }
3192
    all_nodes = self.cfg.GetNodeList()
3193
    try:
3194
      all_nodes.remove(self.op.node_name)
3195
    except ValueError:
3196
      logging.warning("Node %s which is about to be removed not found"
3197
                      " in the all nodes list", self.op.node_name)
3198
    return env, all_nodes, all_nodes
3199

    
3200
  def CheckPrereq(self):
3201
    """Check prerequisites.
3202

3203
    This checks:
3204
     - the node exists in the configuration
3205
     - it does not have primary or secondary instances
3206
     - it's not the master
3207

3208
    Any errors are signaled by raising errors.OpPrereqError.
3209

3210
    """
3211
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3212
    node = self.cfg.GetNodeInfo(self.op.node_name)
3213
    assert node is not None
3214

    
3215
    instance_list = self.cfg.GetInstanceList()
3216

    
3217
    masternode = self.cfg.GetMasterNode()
3218
    if node.name == masternode:
3219
      raise errors.OpPrereqError("Node is the master node,"
3220
                                 " you need to failover first.",
3221
                                 errors.ECODE_INVAL)
3222

    
3223
    for instance_name in instance_list:
3224
      instance = self.cfg.GetInstanceInfo(instance_name)
3225
      if node.name in instance.all_nodes:
3226
        raise errors.OpPrereqError("Instance %s is still running on the node,"
3227
                                   " please remove first." % instance_name,
3228
                                   errors.ECODE_INVAL)
3229
    self.op.node_name = node.name
3230
    self.node = node
3231

    
3232
  def Exec(self, feedback_fn):
3233
    """Removes the node from the cluster.
3234

3235
    """
3236
    node = self.node
3237
    logging.info("Stopping the node daemon and removing configs from node %s",
3238
                 node.name)
3239

    
3240
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
3241

    
3242
    # Promote nodes to master candidate as needed
3243
    _AdjustCandidatePool(self, exceptions=[node.name])
3244
    self.context.RemoveNode(node.name)
3245

    
3246
    # Run post hooks on the node before it's removed
3247
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
3248
    try:
3249
      hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
3250
    except:
3251
      # pylint: disable-msg=W0702
3252
      self.LogWarning("Errors occurred running hooks on %s" % node.name)
3253

    
3254
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
3255
    msg = result.fail_msg
3256
    if msg:
3257
      self.LogWarning("Errors encountered on the remote node while leaving"
3258
                      " the cluster: %s", msg)
3259

    
3260
    # Remove node from our /etc/hosts
3261
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3262
      # FIXME: this should be done via an rpc call to node daemon
3263
      utils.RemoveHostFromEtcHosts(node.name)
3264
      _RedistributeAncillaryFiles(self)
3265

    
3266

    
3267
class LUQueryNodes(NoHooksLU):
3268
  """Logical unit for querying nodes.
3269

3270
  """
3271
  # pylint: disable-msg=W0142
3272
  _OP_PARAMS = [
3273
    _POutputFields,
3274
    ("names", _EmptyList, _TListOf(_TNonEmptyString)),
3275
    ("use_locking", False, _TBool),
3276
    ]
3277
  REQ_BGL = False
3278

    
3279
  _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
3280
                    "master_candidate", "offline", "drained"]
3281

    
3282
  _FIELDS_DYNAMIC = utils.FieldSet(
3283
    "dtotal", "dfree",
3284
    "mtotal", "mnode", "mfree",
3285
    "bootid",
3286
    "ctotal", "cnodes", "csockets",
3287
    )
3288

    
3289
  _FIELDS_STATIC = utils.FieldSet(*[
3290
    "pinst_cnt", "sinst_cnt",
3291
    "pinst_list", "sinst_list",
3292
    "pip", "sip", "tags",
3293
    "master",
3294
    "role"] + _SIMPLE_FIELDS
3295
    )
3296

    
3297
  def CheckArguments(self):
3298
    _CheckOutputFields(static=self._FIELDS_STATIC,
3299
                       dynamic=self._FIELDS_DYNAMIC,
3300
                       selected=self.op.output_fields)
3301

    
3302
  def ExpandNames(self):
3303
    self.needed_locks = {}
3304
    self.share_locks[locking.LEVEL_NODE] = 1
3305

    
3306
    if self.op.names:
3307
      self.wanted = _GetWantedNodes(self, self.op.names)
3308
    else:
3309
      self.wanted = locking.ALL_SET
3310

    
3311
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3312
    self.do_locking = self.do_node_query and self.op.use_locking
3313
    if self.do_locking:
3314
      # if we don't request only static fields, we need to lock the nodes
3315
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
3316

    
3317
  def Exec(self, feedback_fn):
3318
    """Computes the list of nodes and their attributes.
3319

3320
    """
3321
    all_info = self.cfg.GetAllNodesInfo()
3322
    if self.do_locking:
3323
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
3324
    elif self.wanted != locking.ALL_SET:
3325
      nodenames = self.wanted
3326
      missing = set(nodenames).difference(all_info.keys())
3327
      if missing:
3328
        raise errors.OpExecError(
3329
          "Some nodes were removed before retrieving their data: %s" % missing)
3330
    else:
3331
      nodenames = all_info.keys()
3332

    
3333
    nodenames = utils.NiceSort(nodenames)
3334
    nodelist = [all_info[name] for name in nodenames]
3335

    
3336
    # begin data gathering
3337

    
3338
    if self.do_node_query:
3339
      live_data = {}
3340
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3341
                                          self.cfg.GetHypervisorType())
3342
      for name in nodenames:
3343
        nodeinfo = node_data[name]
3344
        if not nodeinfo.fail_msg and nodeinfo.payload:
3345
          nodeinfo = nodeinfo.payload
3346
          fn = utils.TryConvert
3347
          live_data[name] = {
3348
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
3349
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
3350
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
3351
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
3352
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
3353
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
3354
            "bootid": nodeinfo.get('bootid', None),
3355
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
3356
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
3357
            }
3358
        else:
3359
          live_data[name] = {}
3360
    else:
3361
      live_data = dict.fromkeys(nodenames, {})
3362

    
3363
    node_to_primary = dict([(name, set()) for name in nodenames])
3364
    node_to_secondary = dict([(name, set()) for name in nodenames])
3365

    
3366
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
3367
                             "sinst_cnt", "sinst_list"))
3368
    if inst_fields & frozenset(self.op.output_fields):
3369
      inst_data = self.cfg.GetAllInstancesInfo()
3370

    
3371
      for inst in inst_data.values():
3372
        if inst.primary_node in node_to_primary:
3373
          node_to_primary[inst.primary_node].add(inst.name)
3374
        for secnode in inst.secondary_nodes:
3375
          if secnode in node_to_secondary:
3376
            node_to_secondary[secnode].add(inst.name)
3377

    
3378
    master_node = self.cfg.GetMasterNode()
3379

    
3380
    # end data gathering
3381

    
3382
    output = []
3383
    for node in nodelist:
3384
      node_output = []
3385
      for field in self.op.output_fields:
3386
        if field in self._SIMPLE_FIELDS:
3387
          val = getattr(node, field)
3388
        elif field == "pinst_list":
3389
          val = list(node_to_primary[node.name])
3390
        elif field == "sinst_list":
3391
          val = list(node_to_secondary[node.name])
3392
        elif field == "pinst_cnt":
3393
          val = len(node_to_primary[node.name])
3394
        elif field == "sinst_cnt":
3395
          val = len(node_to_secondary[node.name])
3396
        elif field == "pip":
3397
          val = node.primary_ip
3398
        elif field == "sip":
3399
          val = node.secondary_ip
3400
        elif field == "tags":
3401
          val = list(node.GetTags())
3402
        elif field == "master":
3403
          val = node.name == master_node
3404
        elif self._FIELDS_DYNAMIC.Matches(field):
3405
          val = live_data[node.name].get(field, None)
3406
        elif field == "role":
3407
          if node.name == master_node:
3408
            val = "M"
3409
          elif node.master_candidate:
3410
            val = "C"
3411
          elif node.drained:
3412
            val = "D"
3413
          elif node.offline:
3414
            val = "O"
3415
          else:
3416
            val = "R"
3417
        else:
3418
          raise errors.ParameterError(field)
3419
        node_output.append(val)
3420
      output.append(node_output)
3421

    
3422
    return output
3423

    
3424

    
3425
class LUQueryNodeVolumes(NoHooksLU):
3426
  """Logical unit for getting volumes on node(s).
3427

3428
  """
3429
  _OP_PARAMS = [
3430
    ("nodes", _EmptyList, _TListOf(_TNonEmptyString)),
3431
    ("output_fields", _NoDefault, _TListOf(_TNonEmptyString)),
3432
    ]
3433
  REQ_BGL = False
3434
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
3435
  _FIELDS_STATIC = utils.FieldSet("node")
3436

    
3437
  def CheckArguments(self):
3438
    _CheckOutputFields(static=self._FIELDS_STATIC,
3439
                       dynamic=self._FIELDS_DYNAMIC,
3440
                       selected=self.op.output_fields)
3441

    
3442
  def ExpandNames(self):
3443
    self.needed_locks = {}
3444
    self.share_locks[locking.LEVEL_NODE] = 1
3445
    if not self.op.nodes:
3446
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3447
    else:
3448
      self.needed_locks[locking.LEVEL_NODE] = \
3449
        _GetWantedNodes(self, self.op.nodes)
3450

    
3451
  def Exec(self, feedback_fn):
3452
    """Computes the list of nodes and their attributes.
3453

3454
    """
3455
    nodenames = self.acquired_locks[locking.LEVEL_NODE]
3456
    volumes = self.rpc.call_node_volumes(nodenames)
3457

    
3458
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3459
             in self.cfg.GetInstanceList()]
3460

    
3461
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
3462

    
3463
    output = []
3464
    for node in nodenames:
3465
      nresult = volumes[node]
3466
      if nresult.offline:
3467
        continue
3468
      msg = nresult.fail_msg
3469
      if msg:
3470
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
3471
        continue
3472

    
3473
      node_vols = nresult.payload[:]
3474
      node_vols.sort(key=lambda vol: vol['dev'])
3475

    
3476
      for vol in node_vols:
3477
        node_output = []
3478
        for field in self.op.output_fields:
3479
          if field == "node":
3480
            val = node
3481
          elif field == "phys":
3482
            val = vol['dev']
3483
          elif field == "vg":
3484
            val = vol['vg']
3485
          elif field == "name":
3486
            val = vol['name']
3487
          elif field == "size":
3488
            val = int(float(vol['size']))
3489
          elif field == "instance":
3490
            for inst in ilist:
3491
              if node not in lv_by_node[inst]:
3492
                continue
3493
              if vol['name'] in lv_by_node[inst][node]:
3494
                val = inst.name
3495
                break
3496
            else:
3497
              val = '-'
3498
          else:
3499
            raise errors.ParameterError(field)
3500
          node_output.append(str(val))
3501

    
3502
        output.append(node_output)
3503

    
3504
    return output
3505

    
3506

    
3507
class LUQueryNodeStorage(NoHooksLU):
3508
  """Logical unit for getting information on storage units on node(s).
3509

3510
  """
3511
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
3512
  _OP_PARAMS = [
3513
    ("nodes", _EmptyList, _TListOf(_TNonEmptyString)),
3514
    ("storage_type", _NoDefault, _CheckStorageType),
3515
    ("output_fields", _NoDefault, _TListOf(_TNonEmptyString)),
3516
    ("name", None, _TMaybeString),
3517
    ]
3518
  REQ_BGL = False
3519

    
3520
  def CheckArguments(self):
3521
    _CheckOutputFields(static=self._FIELDS_STATIC,
3522
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
3523
                       selected=self.op.output_fields)
3524

    
3525
  def ExpandNames(self):
3526
    self.needed_locks = {}
3527
    self.share_locks[locking.LEVEL_NODE] = 1
3528

    
3529
    if self.op.nodes:
3530
      self.needed_locks[locking.LEVEL_NODE] = \
3531
        _GetWantedNodes(self, self.op.nodes)
3532
    else:
3533
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3534

    
3535
  def Exec(self, feedback_fn):
3536
    """Computes the list of nodes and their attributes.
3537

3538
    """
3539
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
3540

    
3541
    # Always get name to sort by
3542
    if constants.SF_NAME in self.op.output_fields:
3543
      fields = self.op.output_fields[:]
3544
    else:
3545
      fields = [constants.SF_NAME] + self.op.output_fields
3546

    
3547
    # Never ask for node or type as it's only known to the LU
3548
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
3549
      while extra in fields:
3550
        fields.remove(extra)
3551

    
3552
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
3553
    name_idx = field_idx[constants.SF_NAME]
3554

    
3555
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3556
    data = self.rpc.call_storage_list(self.nodes,
3557
                                      self.op.storage_type, st_args,
3558
                                      self.op.name, fields)
3559

    
3560
    result = []
3561

    
3562
    for node in utils.NiceSort(self.nodes):
3563
      nresult = data[node]
3564
      if nresult.offline:
3565
        continue
3566

    
3567
      msg = nresult.fail_msg
3568
      if msg:
3569
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
3570
        continue
3571

    
3572
      rows = dict([(row[name_idx], row) for row in nresult.payload])
3573

    
3574
      for name in utils.NiceSort(rows.keys()):
3575
        row = rows[name]
3576

    
3577
        out = []
3578

    
3579
        for field in self.op.output_fields:
3580
          if field == constants.SF_NODE:
3581
            val = node
3582
          elif field == constants.SF_TYPE:
3583
            val = self.op.storage_type
3584
          elif field in field_idx:
3585
            val = row[field_idx[field]]
3586
          else:
3587
            raise errors.ParameterError(field)
3588

    
3589
          out.append(val)
3590

    
3591
        result.append(out)
3592

    
3593
    return result
3594

    
3595

    
3596
class LUModifyNodeStorage(NoHooksLU):
3597
  """Logical unit for modifying a storage volume on a node.
3598

3599
  """
3600
  _OP_PARAMS = [
3601
    _PNodeName,
3602
    ("storage_type", _NoDefault, _CheckStorageType),
3603
    ("name", _NoDefault, _TNonEmptyString),
3604
    ("changes", _NoDefault, _TDict),
3605
    ]
3606
  REQ_BGL = False
3607

    
3608
  def CheckArguments(self):
3609
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3610

    
3611
    storage_type = self.op.storage_type
3612

    
3613
    try:
3614
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
3615
    except KeyError:
3616
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
3617
                                 " modified" % storage_type,
3618
                                 errors.ECODE_INVAL)
3619

    
3620
    diff = set(self.op.changes.keys()) - modifiable
3621
    if diff:
3622
      raise errors.OpPrereqError("The following fields can not be modified for"
3623
                                 " storage units of type '%s': %r" %
3624
                                 (storage_type, list(diff)),
3625
                                 errors.ECODE_INVAL)
3626

    
3627
  def ExpandNames(self):
3628
    self.needed_locks = {
3629
      locking.LEVEL_NODE: self.op.node_name,
3630
      }
3631

    
3632
  def Exec(self, feedback_fn):
3633
    """Computes the list of nodes and their attributes.
3634

3635
    """
3636
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3637
    result = self.rpc.call_storage_modify(self.op.node_name,
3638
                                          self.op.storage_type, st_args,
3639
                                          self.op.name, self.op.changes)
3640
    result.Raise("Failed to modify storage unit '%s' on %s" %
3641
                 (self.op.name, self.op.node_name))
3642

    
3643

    
3644
class LUAddNode(LogicalUnit):
3645
  """Logical unit for adding node to the cluster.
3646

3647
  """
3648
  HPATH = "node-add"
3649
  HTYPE = constants.HTYPE_NODE
3650
  _OP_PARAMS = [
3651
    _PNodeName,
3652
    ("primary_ip", None, _NoType),
3653
    ("secondary_ip", None, _TMaybeString),
3654
    ("readd", False, _TBool),
3655
    ]
3656

    
3657
  def CheckArguments(self):
3658
    # validate/normalize the node name
3659
    self.op.node_name = netutils.HostInfo.NormalizeName(self.op.node_name)
3660

    
3661
  def BuildHooksEnv(self):
3662
    """Build hooks env.
3663

3664
    This will run on all nodes before, and on all nodes + the new node after.
3665

3666
    """
3667
    env = {
3668
      "OP_TARGET": self.op.node_name,
3669
      "NODE_NAME": self.op.node_name,
3670
      "NODE_PIP": self.op.primary_ip,
3671
      "NODE_SIP": self.op.secondary_ip,
3672
      }
3673
    nodes_0 = self.cfg.GetNodeList()
3674
    nodes_1 = nodes_0 + [self.op.node_name, ]
3675
    return env, nodes_0, nodes_1
3676

    
3677
  def CheckPrereq(self):
3678
    """Check prerequisites.
3679

3680
    This checks:
3681
     - the new node is not already in the config
3682
     - it is resolvable
3683
     - its parameters (single/dual homed) matches the cluster
3684

3685
    Any errors are signaled by raising errors.OpPrereqError.
3686

3687
    """
3688
    node_name = self.op.node_name
3689
    cfg = self.cfg
3690

    
3691
    dns_data = netutils.GetHostInfo(node_name)
3692

    
3693
    node = dns_data.name
3694
    primary_ip = self.op.primary_ip = dns_data.ip
3695
    if self.op.secondary_ip is None:
3696
      self.op.secondary_ip = primary_ip
3697
    if not netutils.IsValidIP4(self.op.secondary_ip):
3698
      raise errors.OpPrereqError("Invalid secondary IP given",
3699
                                 errors.ECODE_INVAL)
3700
    secondary_ip = self.op.secondary_ip
3701

    
3702
    node_list = cfg.GetNodeList()
3703
    if not self.op.readd and node in node_list:
3704
      raise errors.OpPrereqError("Node %s is already in the configuration" %
3705
                                 node, errors.ECODE_EXISTS)
3706
    elif self.op.readd and node not in node_list:
3707
      raise errors.OpPrereqError("Node %s is not in the configuration" % node,
3708
                                 errors.ECODE_NOENT)
3709

    
3710
    self.changed_primary_ip = False
3711

    
3712
    for existing_node_name in node_list:
3713
      existing_node = cfg.GetNodeInfo(existing_node_name)
3714

    
3715
      if self.op.readd and node == existing_node_name:
3716
        if existing_node.secondary_ip != secondary_ip:
3717
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
3718
                                     " address configuration as before",
3719
                                     errors.ECODE_INVAL)
3720
        if existing_node.primary_ip != primary_ip:
3721
          self.changed_primary_ip = True
3722

    
3723
        continue
3724

    
3725
      if (existing_node.primary_ip == primary_ip or
3726
          existing_node.secondary_ip == primary_ip or
3727
          existing_node.primary_ip == secondary_ip or
3728
          existing_node.secondary_ip == secondary_ip):
3729
        raise errors.OpPrereqError("New node ip address(es) conflict with"
3730
                                   " existing node %s" % existing_node.name,
3731
                                   errors.ECODE_NOTUNIQUE)
3732

    
3733
    # check that the type of the node (single versus dual homed) is the
3734
    # same as for the master
3735
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
3736
    master_singlehomed = myself.secondary_ip == myself.primary_ip
3737
    newbie_singlehomed = secondary_ip == primary_ip
3738
    if master_singlehomed != newbie_singlehomed:
3739
      if master_singlehomed:
3740
        raise errors.OpPrereqError("The master has no private ip but the"
3741
                                   " new node has one",
3742
                                   errors.ECODE_INVAL)
3743
      else:
3744
        raise errors.OpPrereqError("The master has a private ip but the"
3745
                                   " new node doesn't have one",
3746
                                   errors.ECODE_INVAL)
3747

    
3748
    # checks reachability
3749
    if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
3750
      raise errors.OpPrereqError("Node not reachable by ping",
3751
                                 errors.ECODE_ENVIRON)
3752

    
3753
    if not newbie_singlehomed:
3754
      # check reachability from my secondary ip to newbie's secondary ip
3755
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
3756
                           source=myself.secondary_ip):
3757
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
3758
                                   " based ping to noded port",
3759
                                   errors.ECODE_ENVIRON)
3760

    
3761
    if self.op.readd:
3762
      exceptions = [node]
3763
    else:
3764
      exceptions = []
3765

    
3766
    self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
3767

    
3768
    if self.op.readd:
3769
      self.new_node = self.cfg.GetNodeInfo(node)
3770
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
3771
    else:
3772
      self.new_node = objects.Node(name=node,
3773
                                   primary_ip=primary_ip,
3774
                                   secondary_ip=secondary_ip,
3775
                                   master_candidate=self.master_candidate,
3776
                                   offline=False, drained=False)
3777

    
3778
  def Exec(self, feedback_fn):
3779
    """Adds the new node to the cluster.
3780

3781
    """
3782
    new_node = self.new_node
3783
    node = new_node.name
3784

    
3785
    # for re-adds, reset the offline/drained/master-candidate flags;
3786
    # we need to reset here, otherwise offline would prevent RPC calls
3787
    # later in the procedure; this also means that if the re-add
3788
    # fails, we are left with a non-offlined, broken node
3789
    if self.op.readd:
3790
      new_node.drained = new_node.offline = False # pylint: disable-msg=W0201
3791
      self.LogInfo("Readding a node, the offline/drained flags were reset")
3792
      # if we demote the node, we do cleanup later in the procedure
3793
      new_node.master_candidate = self.master_candidate
3794
      if self.changed_primary_ip:
3795
        new_node.primary_ip = self.op.primary_ip
3796

    
3797
    # notify the user about any possible mc promotion
3798
    if new_node.master_candidate:
3799
      self.LogInfo("Node will be a master candidate")
3800

    
3801
    # check connectivity
3802
    result = self.rpc.call_version([node])[node]
3803
    result.Raise("Can't get version information from node %s" % node)
3804
    if constants.PROTOCOL_VERSION == result.payload:
3805
      logging.info("Communication to node %s fine, sw version %s match",
3806
                   node, result.payload)
3807
    else:
3808
      raise errors.OpExecError("Version mismatch master version %s,"
3809
                               " node version %s" %
3810
                               (constants.PROTOCOL_VERSION, result.payload))
3811

    
3812
    # setup ssh on node
3813
    if self.cfg.GetClusterInfo().modify_ssh_setup:
3814
      logging.info("Copy ssh key to node %s", node)
3815
      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
3816
      keyarray = []
3817
      keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
3818
                  constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
3819
                  priv_key, pub_key]
3820

    
3821
      for i in keyfiles:
3822
        keyarray.append(utils.ReadFile(i))
3823

    
3824
      result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
3825
                                      keyarray[2], keyarray[3], keyarray[4],
3826
                                      keyarray[5])
3827
      result.Raise("Cannot transfer ssh keys to the new node")
3828

    
3829
    # Add node to our /etc/hosts, and add key to known_hosts
3830
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3831
      # FIXME: this should be done via an rpc call to node daemon
3832
      utils.AddHostToEtcHosts(new_node.name)
3833

    
3834
    if new_node.secondary_ip != new_node.primary_ip:
3835
      result = self.rpc.call_node_has_ip_address(new_node.name,
3836
                                                 new_node.secondary_ip)
3837
      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
3838
                   prereq=True, ecode=errors.ECODE_ENVIRON)
3839
      if not result.payload:
3840
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
3841
                                 " you gave (%s). Please fix and re-run this"
3842
                                 " command." % new_node.secondary_ip)
3843

    
3844
    node_verify_list = [self.cfg.GetMasterNode()]
3845
    node_verify_param = {
3846
      constants.NV_NODELIST: [node],
3847
      # TODO: do a node-net-test as well?
3848
    }
3849

    
3850
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3851
                                       self.cfg.GetClusterName())
3852
    for verifier in node_verify_list:
3853
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
3854
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
3855
      if nl_payload:
3856
        for failed in nl_payload:
3857
          feedback_fn("ssh/hostname verification failed"
3858
                      " (checking from %s): %s" %
3859
                      (verifier, nl_payload[failed]))
3860
        raise errors.OpExecError("ssh/hostname verification failed.")
3861

    
3862
    if self.op.readd:
3863
      _RedistributeAncillaryFiles(self)
3864
      self.context.ReaddNode(new_node)
3865
      # make sure we redistribute the config
3866
      self.cfg.Update(new_node, feedback_fn)
3867
      # and make sure the new node will not have old files around
3868
      if not new_node.master_candidate:
3869
        result = self.rpc.call_node_demote_from_mc(new_node.name)
3870
        msg = result.fail_msg
3871
        if msg:
3872
          self.LogWarning("Node failed to demote itself from master"
3873
                          " candidate status: %s" % msg)
3874
    else:
3875
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
3876
      self.context.AddNode(new_node, self.proc.GetECId())
3877

    
3878

    
3879
class LUSetNodeParams(LogicalUnit):
3880
  """Modifies the parameters of a node.
3881

3882
  """
3883
  HPATH = "node-modify"
3884
  HTYPE = constants.HTYPE_NODE
3885
  _OP_PARAMS = [
3886
    _PNodeName,
3887
    ("master_candidate", None, _TMaybeBool),
3888
    ("offline", None, _TMaybeBool),
3889
    ("drained", None, _TMaybeBool),
3890
    ("auto_promote", False, _TBool),
3891
    _PForce,
3892
    ]
3893
  REQ_BGL = False
3894

    
3895
  def CheckArguments(self):
3896
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3897
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
3898
    if all_mods.count(None) == 3:
3899
      raise errors.OpPrereqError("Please pass at least one modification",
3900
                                 errors.ECODE_INVAL)
3901
    if all_mods.count(True) > 1:
3902
      raise errors.OpPrereqError("Can't set the node into more than one"
3903
                                 " state at the same time",
3904
                                 errors.ECODE_INVAL)
3905

    
3906
    # Boolean value that tells us whether we're offlining or draining the node
3907
    self.offline_or_drain = (self.op.offline == True or
3908
                             self.op.drained == True)
3909
    self.deoffline_or_drain = (self.op.offline == False or
3910
                               self.op.drained == False)
3911
    self.might_demote = (self.op.master_candidate == False or
3912
                         self.offline_or_drain)
3913

    
3914
    self.lock_all = self.op.auto_promote and self.might_demote
3915

    
3916

    
3917
  def ExpandNames(self):
3918
    if self.lock_all:
3919
      self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET}
3920
    else:
3921
      self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3922

    
3923
  def BuildHooksEnv(self):
3924
    """Build hooks env.
3925

3926
    This runs on the master node.
3927

3928
    """
3929
    env = {
3930
      "OP_TARGET": self.op.node_name,
3931
      "MASTER_CANDIDATE": str(self.op.master_candidate),
3932
      "OFFLINE": str(self.op.offline),
3933
      "DRAINED": str(self.op.drained),
3934
      }
3935
    nl = [self.cfg.GetMasterNode(),
3936
          self.op.node_name]
3937
    return env, nl, nl
3938

    
3939
  def CheckPrereq(self):
3940
    """Check prerequisites.
3941

3942
    This only checks the instance list against the existing names.
3943

3944
    """
3945
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3946

    
3947
    if (self.op.master_candidate is not None or
3948
        self.op.drained is not None or
3949
        self.op.offline is not None):
3950
      # we can't change the master's node flags
3951
      if self.op.node_name == self.cfg.GetMasterNode():
3952
        raise errors.OpPrereqError("The master role can be changed"
3953
                                   " only via masterfailover",
3954
                                   errors.ECODE_INVAL)
3955

    
3956

    
3957
    if node.master_candidate and self.might_demote and not self.lock_all:
3958
      assert not self.op.auto_promote, "auto-promote set but lock_all not"
3959
      # check if after removing the current node, we're missing master
3960
      # candidates
3961
      (mc_remaining, mc_should, _) = \
3962
          self.cfg.GetMasterCandidateStats(exceptions=[node.name])
3963
      if mc_remaining < mc_should:
3964
        raise errors.OpPrereqError("Not enough master candidates, please"
3965
                                   " pass auto_promote to allow promotion",
3966
                                   errors.ECODE_INVAL)
3967

    
3968
    if (self.op.master_candidate == True and
3969
        ((node.offline and not self.op.offline == False) or
3970
         (node.drained and not self.op.drained == False))):
3971
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3972
                                 " to master_candidate" % node.name,
3973
                                 errors.ECODE_INVAL)
3974

    
3975
    # If we're being deofflined/drained, we'll MC ourself if needed
3976
    if (self.deoffline_or_drain and not self.offline_or_drain and not
3977
        self.op.master_candidate == True and not node.master_candidate):
3978
      self.op.master_candidate = _DecideSelfPromotion(self)
3979
      if self.op.master_candidate:
3980
        self.LogInfo("Autopromoting node to master candidate")
3981

    
3982
    return
3983

    
3984
  def Exec(self, feedback_fn):
3985
    """Modifies a node.
3986

3987
    """
3988
    node = self.node
3989

    
3990
    result = []
3991
    changed_mc = False
3992

    
3993
    if self.op.offline is not None:
3994
      node.offline = self.op.offline
3995
      result.append(("offline", str(self.op.offline)))
3996
      if self.op.offline == True:
3997
        if node.master_candidate:
3998
          node.master_candidate = False
3999
          changed_mc = True
4000
          result.append(("master_candidate", "auto-demotion due to offline"))
4001
        if node.drained:
4002
          node.drained = False
4003
          result.append(("drained", "clear drained status due to offline"))
4004

    
4005
    if self.op.master_candidate is not None:
4006
      node.master_candidate = self.op.master_candidate
4007
      changed_mc = True
4008
      result.append(("master_candidate", str(self.op.master_candidate)))
4009
      if self.op.master_candidate == False:
4010
        rrc = self.rpc.call_node_demote_from_mc(node.name)
4011
        msg = rrc.fail_msg
4012
        if msg:
4013
          self.LogWarning("Node failed to demote itself: %s" % msg)
4014

    
4015
    if self.op.drained is not None:
4016
      node.drained = self.op.drained
4017
      result.append(("drained", str(self.op.drained)))
4018
      if self.op.drained == True:
4019
        if node.master_candidate:
4020
          node.master_candidate = False
4021
          changed_mc = True
4022
          result.append(("master_candidate", "auto-demotion due to drain"))
4023
          rrc = self.rpc.call_node_demote_from_mc(node.name)
4024
          msg = rrc.fail_msg
4025
          if msg:
4026
            self.LogWarning("Node failed to demote itself: %s" % msg)
4027
        if node.offline:
4028
          node.offline = False
4029
          result.append(("offline", "clear offline status due to drain"))
4030

    
4031
    # we locked all nodes, we adjust the CP before updating this node
4032
    if self.lock_all:
4033
      _AdjustCandidatePool(self, [node.name])
4034

    
4035
    # this will trigger configuration file update, if needed
4036
    self.cfg.Update(node, feedback_fn)
4037

    
4038
    # this will trigger job queue propagation or cleanup
4039
    if changed_mc:
4040
      self.context.ReaddNode(node)
4041

    
4042
    return result
4043

    
4044

    
4045
class LUPowercycleNode(NoHooksLU):
4046
  """Powercycles a node.
4047

4048
  """
4049
  _OP_PARAMS = [
4050
    _PNodeName,
4051
    _PForce,
4052
    ]
4053
  REQ_BGL = False
4054

    
4055
  def CheckArguments(self):
4056
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
4057
    if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
4058
      raise errors.OpPrereqError("The node is the master and the force"
4059
                                 " parameter was not set",
4060
                                 errors.ECODE_INVAL)
4061

    
4062
  def ExpandNames(self):
4063
    """Locking for PowercycleNode.
4064

4065
    This is a last-resort option and shouldn't block on other
4066
    jobs. Therefore, we grab no locks.
4067

4068
    """
4069
    self.needed_locks = {}
4070

    
4071
  def