Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ aa29e95f

History | View | Annotate | Download (399.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay to many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42
import itertools
43

    
44
from ganeti import ssh
45
from ganeti import utils
46
from ganeti import errors
47
from ganeti import hypervisor
48
from ganeti import locking
49
from ganeti import constants
50
from ganeti import objects
51
from ganeti import serializer
52
from ganeti import ssconf
53
from ganeti import uidpool
54
from ganeti import compat
55
from ganeti import masterd
56
from ganeti import netutils
57
from ganeti import ht
58
from ganeti import query
59
from ganeti import qlang
60

    
61
import ganeti.masterd.instance # pylint: disable-msg=W0611
62

    
63
# Common opcode attributes
64

    
65
#: output fields for a query operation
66
_POutputFields = ("output_fields", ht.NoDefault, ht.TListOf(ht.TNonEmptyString))
67

    
68

    
69
#: the shutdown timeout
70
_PShutdownTimeout = ("shutdown_timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT,
71
                     ht.TPositiveInt)
72

    
73
#: the force parameter
74
_PForce = ("force", False, ht.TBool)
75

    
76
#: a required instance name (for single-instance LUs)
77
_PInstanceName = ("instance_name", ht.NoDefault, ht.TNonEmptyString)
78

    
79
#: Whether to ignore offline nodes
80
_PIgnoreOfflineNodes = ("ignore_offline_nodes", False, ht.TBool)
81

    
82
#: a required node name (for single-node LUs)
83
_PNodeName = ("node_name", ht.NoDefault, ht.TNonEmptyString)
84

    
85
#: a required node group name (for single-group LUs)
86
_PGroupName = ("group_name", ht.NoDefault, ht.TNonEmptyString)
87

    
88
#: the migration type (live/non-live)
89
_PMigrationMode = ("mode", None,
90
                   ht.TOr(ht.TNone, ht.TElemOf(constants.HT_MIGRATION_MODES)))
91

    
92
#: the obsolete 'live' mode (boolean)
93
_PMigrationLive = ("live", None, ht.TMaybeBool)
94

    
95

    
96
def _SupportsOob(cfg, node):
97
  """Tells if node supports OOB.
98

99
  @type cfg: L{config.ConfigWriter}
100
  @param cfg: The cluster configuration
101
  @type node: L{objects.Node}
102
  @param node: The node
103
  @return: The OOB script if supported or an empty string otherwise
104

105
  """
106
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
107

    
108

    
109
# End types
110
class LogicalUnit(object):
111
  """Logical Unit base class.
112

113
  Subclasses must follow these rules:
114
    - implement ExpandNames
115
    - implement CheckPrereq (except when tasklets are used)
116
    - implement Exec (except when tasklets are used)
117
    - implement BuildHooksEnv
118
    - redefine HPATH and HTYPE
119
    - optionally redefine their run requirements:
120
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
121

122
  Note that all commands require root permissions.
123

124
  @ivar dry_run_result: the value (if any) that will be returned to the caller
125
      in dry-run mode (signalled by opcode dry_run parameter)
126
  @cvar _OP_PARAMS: a list of opcode attributes, the default values
127
      they should get if not already defined, and types they must match
128

129
  """
130
  HPATH = None
131
  HTYPE = None
132
  _OP_PARAMS = []
133
  REQ_BGL = True
134

    
135
  def __init__(self, processor, op, context, rpc):
136
    """Constructor for LogicalUnit.
137

138
    This needs to be overridden in derived classes in order to check op
139
    validity.
140

141
    """
142
    self.proc = processor
143
    self.op = op
144
    self.cfg = context.cfg
145
    self.context = context
146
    self.rpc = rpc
147
    # Dicts used to declare locking needs to mcpu
148
    self.needed_locks = None
149
    self.acquired_locks = {}
150
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
151
    self.add_locks = {}
152
    self.remove_locks = {}
153
    # Used to force good behavior when calling helper functions
154
    self.recalculate_locks = {}
155
    self.__ssh = None
156
    # logging
157
    self.Log = processor.Log # pylint: disable-msg=C0103
158
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
159
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
160
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
161
    # support for dry-run
162
    self.dry_run_result = None
163
    # support for generic debug attribute
164
    if (not hasattr(self.op, "debug_level") or
165
        not isinstance(self.op.debug_level, int)):
166
      self.op.debug_level = 0
167

    
168
    # Tasklets
169
    self.tasklets = None
170

    
171
    # The new kind-of-type-system
172
    op_id = self.op.OP_ID
173
    for attr_name, aval, test in self._OP_PARAMS:
174
      if not hasattr(op, attr_name):
175
        if aval == ht.NoDefault:
176
          raise errors.OpPrereqError("Required parameter '%s.%s' missing" %
177
                                     (op_id, attr_name), errors.ECODE_INVAL)
178
        else:
179
          if callable(aval):
180
            dval = aval()
181
          else:
182
            dval = aval
183
          setattr(self.op, attr_name, dval)
184
      attr_val = getattr(op, attr_name)
185
      if test == ht.NoType:
186
        # no tests here
187
        continue
188
      if not callable(test):
189
        raise errors.ProgrammerError("Validation for parameter '%s.%s' failed,"
190
                                     " given type is not a proper type (%s)" %
191
                                     (op_id, attr_name, test))
192
      if not test(attr_val):
193
        logging.error("OpCode %s, parameter %s, has invalid type %s/value %s",
194
                      self.op.OP_ID, attr_name, type(attr_val), attr_val)
195
        raise errors.OpPrereqError("Parameter '%s.%s' fails validation" %
196
                                   (op_id, attr_name), errors.ECODE_INVAL)
197

    
198
    self.CheckArguments()
199

    
200
  def __GetSSH(self):
201
    """Returns the SshRunner object
202

203
    """
204
    if not self.__ssh:
205
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
206
    return self.__ssh
207

    
208
  ssh = property(fget=__GetSSH)
209

    
210
  def CheckArguments(self):
211
    """Check syntactic validity for the opcode arguments.
212

213
    This method is for doing a simple syntactic check and ensure
214
    validity of opcode parameters, without any cluster-related
215
    checks. While the same can be accomplished in ExpandNames and/or
216
    CheckPrereq, doing these separate is better because:
217

218
      - ExpandNames is left as as purely a lock-related function
219
      - CheckPrereq is run after we have acquired locks (and possible
220
        waited for them)
221

222
    The function is allowed to change the self.op attribute so that
223
    later methods can no longer worry about missing parameters.
224

225
    """
226
    pass
227

    
228
  def ExpandNames(self):
229
    """Expand names for this LU.
230

231
    This method is called before starting to execute the opcode, and it should
232
    update all the parameters of the opcode to their canonical form (e.g. a
233
    short node name must be fully expanded after this method has successfully
234
    completed). This way locking, hooks, logging, etc. can work correctly.
235

236
    LUs which implement this method must also populate the self.needed_locks
237
    member, as a dict with lock levels as keys, and a list of needed lock names
238
    as values. Rules:
239

240
      - use an empty dict if you don't need any lock
241
      - if you don't need any lock at a particular level omit that level
242
      - don't put anything for the BGL level
243
      - if you want all locks at a level use locking.ALL_SET as a value
244

245
    If you need to share locks (rather than acquire them exclusively) at one
246
    level you can modify self.share_locks, setting a true value (usually 1) for
247
    that level. By default locks are not shared.
248

249
    This function can also define a list of tasklets, which then will be
250
    executed in order instead of the usual LU-level CheckPrereq and Exec
251
    functions, if those are not defined by the LU.
252

253
    Examples::
254

255
      # Acquire all nodes and one instance
256
      self.needed_locks = {
257
        locking.LEVEL_NODE: locking.ALL_SET,
258
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
259
      }
260
      # Acquire just two nodes
261
      self.needed_locks = {
262
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
263
      }
264
      # Acquire no locks
265
      self.needed_locks = {} # No, you can't leave it to the default value None
266

267
    """
268
    # The implementation of this method is mandatory only if the new LU is
269
    # concurrent, so that old LUs don't need to be changed all at the same
270
    # time.
271
    if self.REQ_BGL:
272
      self.needed_locks = {} # Exclusive LUs don't need locks.
273
    else:
274
      raise NotImplementedError
275

    
276
  def DeclareLocks(self, level):
277
    """Declare LU locking needs for a level
278

279
    While most LUs can just declare their locking needs at ExpandNames time,
280
    sometimes there's the need to calculate some locks after having acquired
281
    the ones before. This function is called just before acquiring locks at a
282
    particular level, but after acquiring the ones at lower levels, and permits
283
    such calculations. It can be used to modify self.needed_locks, and by
284
    default it does nothing.
285

286
    This function is only called if you have something already set in
287
    self.needed_locks for the level.
288

289
    @param level: Locking level which is going to be locked
290
    @type level: member of ganeti.locking.LEVELS
291

292
    """
293

    
294
  def CheckPrereq(self):
295
    """Check prerequisites for this LU.
296

297
    This method should check that the prerequisites for the execution
298
    of this LU are fulfilled. It can do internode communication, but
299
    it should be idempotent - no cluster or system changes are
300
    allowed.
301

302
    The method should raise errors.OpPrereqError in case something is
303
    not fulfilled. Its return value is ignored.
304

305
    This method should also update all the parameters of the opcode to
306
    their canonical form if it hasn't been done by ExpandNames before.
307

308
    """
309
    if self.tasklets is not None:
310
      for (idx, tl) in enumerate(self.tasklets):
311
        logging.debug("Checking prerequisites for tasklet %s/%s",
312
                      idx + 1, len(self.tasklets))
313
        tl.CheckPrereq()
314
    else:
315
      pass
316

    
317
  def Exec(self, feedback_fn):
318
    """Execute the LU.
319

320
    This method should implement the actual work. It should raise
321
    errors.OpExecError for failures that are somewhat dealt with in
322
    code, or expected.
323

324
    """
325
    if self.tasklets is not None:
326
      for (idx, tl) in enumerate(self.tasklets):
327
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
328
        tl.Exec(feedback_fn)
329
    else:
330
      raise NotImplementedError
331

    
332
  def BuildHooksEnv(self):
333
    """Build hooks environment for this LU.
334

335
    This method should return a three-node tuple consisting of: a dict
336
    containing the environment that will be used for running the
337
    specific hook for this LU, a list of node names on which the hook
338
    should run before the execution, and a list of node names on which
339
    the hook should run after the execution.
340

341
    The keys of the dict must not have 'GANETI_' prefixed as this will
342
    be handled in the hooks runner. Also note additional keys will be
343
    added by the hooks runner. If the LU doesn't define any
344
    environment, an empty dict (and not None) should be returned.
345

346
    No nodes should be returned as an empty list (and not None).
347

348
    Note that if the HPATH for a LU class is None, this function will
349
    not be called.
350

351
    """
352
    raise NotImplementedError
353

    
354
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
355
    """Notify the LU about the results of its hooks.
356

357
    This method is called every time a hooks phase is executed, and notifies
358
    the Logical Unit about the hooks' result. The LU can then use it to alter
359
    its result based on the hooks.  By default the method does nothing and the
360
    previous result is passed back unchanged but any LU can define it if it
361
    wants to use the local cluster hook-scripts somehow.
362

363
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
364
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
365
    @param hook_results: the results of the multi-node hooks rpc call
366
    @param feedback_fn: function used send feedback back to the caller
367
    @param lu_result: the previous Exec result this LU had, or None
368
        in the PRE phase
369
    @return: the new Exec result, based on the previous result
370
        and hook results
371

372
    """
373
    # API must be kept, thus we ignore the unused argument and could
374
    # be a function warnings
375
    # pylint: disable-msg=W0613,R0201
376
    return lu_result
377

    
378
  def _ExpandAndLockInstance(self):
379
    """Helper function to expand and lock an instance.
380

381
    Many LUs that work on an instance take its name in self.op.instance_name
382
    and need to expand it and then declare the expanded name for locking. This
383
    function does it, and then updates self.op.instance_name to the expanded
384
    name. It also initializes needed_locks as a dict, if this hasn't been done
385
    before.
386

387
    """
388
    if self.needed_locks is None:
389
      self.needed_locks = {}
390
    else:
391
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
392
        "_ExpandAndLockInstance called with instance-level locks set"
393
    self.op.instance_name = _ExpandInstanceName(self.cfg,
394
                                                self.op.instance_name)
395
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
396

    
397
  def _LockInstancesNodes(self, primary_only=False):
398
    """Helper function to declare instances' nodes for locking.
399

400
    This function should be called after locking one or more instances to lock
401
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
402
    with all primary or secondary nodes for instances already locked and
403
    present in self.needed_locks[locking.LEVEL_INSTANCE].
404

405
    It should be called from DeclareLocks, and for safety only works if
406
    self.recalculate_locks[locking.LEVEL_NODE] is set.
407

408
    In the future it may grow parameters to just lock some instance's nodes, or
409
    to just lock primaries or secondary nodes, if needed.
410

411
    If should be called in DeclareLocks in a way similar to::
412

413
      if level == locking.LEVEL_NODE:
414
        self._LockInstancesNodes()
415

416
    @type primary_only: boolean
417
    @param primary_only: only lock primary nodes of locked instances
418

419
    """
420
    assert locking.LEVEL_NODE in self.recalculate_locks, \
421
      "_LockInstancesNodes helper function called with no nodes to recalculate"
422

    
423
    # TODO: check if we're really been called with the instance locks held
424

    
425
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
426
    # future we might want to have different behaviors depending on the value
427
    # of self.recalculate_locks[locking.LEVEL_NODE]
428
    wanted_nodes = []
429
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
430
      instance = self.context.cfg.GetInstanceInfo(instance_name)
431
      wanted_nodes.append(instance.primary_node)
432
      if not primary_only:
433
        wanted_nodes.extend(instance.secondary_nodes)
434

    
435
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
436
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
437
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
438
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
439

    
440
    del self.recalculate_locks[locking.LEVEL_NODE]
441

    
442

    
443
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
444
  """Simple LU which runs no hooks.
445

446
  This LU is intended as a parent for other LogicalUnits which will
447
  run no hooks, in order to reduce duplicate code.
448

449
  """
450
  HPATH = None
451
  HTYPE = None
452

    
453
  def BuildHooksEnv(self):
454
    """Empty BuildHooksEnv for NoHooksLu.
455

456
    This just raises an error.
457

458
    """
459
    assert False, "BuildHooksEnv called for NoHooksLUs"
460

    
461

    
462
class Tasklet:
463
  """Tasklet base class.
464

465
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
466
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
467
  tasklets know nothing about locks.
468

469
  Subclasses must follow these rules:
470
    - Implement CheckPrereq
471
    - Implement Exec
472

473
  """
474
  def __init__(self, lu):
475
    self.lu = lu
476

    
477
    # Shortcuts
478
    self.cfg = lu.cfg
479
    self.rpc = lu.rpc
480

    
481
  def CheckPrereq(self):
482
    """Check prerequisites for this tasklets.
483

484
    This method should check whether the prerequisites for the execution of
485
    this tasklet are fulfilled. It can do internode communication, but it
486
    should be idempotent - no cluster or system changes are allowed.
487

488
    The method should raise errors.OpPrereqError in case something is not
489
    fulfilled. Its return value is ignored.
490

491
    This method should also update all parameters to their canonical form if it
492
    hasn't been done before.
493

494
    """
495
    pass
496

    
497
  def Exec(self, feedback_fn):
498
    """Execute the tasklet.
499

500
    This method should implement the actual work. It should raise
501
    errors.OpExecError for failures that are somewhat dealt with in code, or
502
    expected.
503

504
    """
505
    raise NotImplementedError
506

    
507

    
508
class _QueryBase:
509
  """Base for query utility classes.
510

511
  """
512
  #: Attribute holding field definitions
513
  FIELDS = None
514

    
515
  def __init__(self, names, fields, use_locking):
516
    """Initializes this class.
517

518
    """
519
    self.names = names
520
    self.use_locking = use_locking
521

    
522
    self.query = query.Query(self.FIELDS, fields)
523
    self.requested_data = self.query.RequestedData()
524

    
525
    self.do_locking = None
526
    self.wanted = None
527

    
528
  def _GetNames(self, lu, all_names, lock_level):
529
    """Helper function to determine names asked for in the query.
530

531
    """
532
    if self.do_locking:
533
      names = lu.acquired_locks[lock_level]
534
    else:
535
      names = all_names
536

    
537
    if self.wanted == locking.ALL_SET:
538
      assert not self.names
539
      # caller didn't specify names, so ordering is not important
540
      return utils.NiceSort(names)
541

    
542
    # caller specified names and we must keep the same order
543
    assert self.names
544
    assert not self.do_locking or lu.acquired_locks[lock_level]
545

    
546
    missing = set(self.wanted).difference(names)
547
    if missing:
548
      raise errors.OpExecError("Some items were removed before retrieving"
549
                               " their data: %s" % missing)
550

    
551
    # Return expanded names
552
    return self.wanted
553

    
554
  @classmethod
555
  def FieldsQuery(cls, fields):
556
    """Returns list of available fields.
557

558
    @return: List of L{objects.QueryFieldDefinition}
559

560
    """
561
    return query.QueryFields(cls.FIELDS, fields)
562

    
563
  def ExpandNames(self, lu):
564
    """Expand names for this query.
565

566
    See L{LogicalUnit.ExpandNames}.
567

568
    """
569
    raise NotImplementedError()
570

    
571
  def DeclareLocks(self, lu, level):
572
    """Declare locks for this query.
573

574
    See L{LogicalUnit.DeclareLocks}.
575

576
    """
577
    raise NotImplementedError()
578

    
579
  def _GetQueryData(self, lu):
580
    """Collects all data for this query.
581

582
    @return: Query data object
583

584
    """
585
    raise NotImplementedError()
586

    
587
  def NewStyleQuery(self, lu):
588
    """Collect data and execute query.
589

590
    """
591
    data = self._GetQueryData(lu)
592

    
593
    return objects.QueryResponse(data=self.query.Query(data),
594
                                 fields=self.query.GetFields()).ToDict()
595

    
596
  def OldStyleQuery(self, lu):
597
    """Collect data and execute query.
598

599
    """
600
    return self.query.OldStyleQuery(self._GetQueryData(lu))
601

    
602

    
603
def _GetWantedNodes(lu, nodes):
604
  """Returns list of checked and expanded node names.
605

606
  @type lu: L{LogicalUnit}
607
  @param lu: the logical unit on whose behalf we execute
608
  @type nodes: list
609
  @param nodes: list of node names or None for all nodes
610
  @rtype: list
611
  @return: the list of nodes, sorted
612
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
613

614
  """
615
  if nodes:
616
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
617

    
618
  return utils.NiceSort(lu.cfg.GetNodeList())
619

    
620

    
621
def _GetWantedInstances(lu, instances):
622
  """Returns list of checked and expanded instance names.
623

624
  @type lu: L{LogicalUnit}
625
  @param lu: the logical unit on whose behalf we execute
626
  @type instances: list
627
  @param instances: list of instance names or None for all instances
628
  @rtype: list
629
  @return: the list of instances, sorted
630
  @raise errors.OpPrereqError: if the instances parameter is wrong type
631
  @raise errors.OpPrereqError: if any of the passed instances is not found
632

633
  """
634
  if instances:
635
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
636
  else:
637
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
638
  return wanted
639

    
640

    
641
def _GetUpdatedParams(old_params, update_dict,
642
                      use_default=True, use_none=False):
643
  """Return the new version of a parameter dictionary.
644

645
  @type old_params: dict
646
  @param old_params: old parameters
647
  @type update_dict: dict
648
  @param update_dict: dict containing new parameter values, or
649
      constants.VALUE_DEFAULT to reset the parameter to its default
650
      value
651
  @param use_default: boolean
652
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
653
      values as 'to be deleted' values
654
  @param use_none: boolean
655
  @type use_none: whether to recognise C{None} values as 'to be
656
      deleted' values
657
  @rtype: dict
658
  @return: the new parameter dictionary
659

660
  """
661
  params_copy = copy.deepcopy(old_params)
662
  for key, val in update_dict.iteritems():
663
    if ((use_default and val == constants.VALUE_DEFAULT) or
664
        (use_none and val is None)):
665
      try:
666
        del params_copy[key]
667
      except KeyError:
668
        pass
669
    else:
670
      params_copy[key] = val
671
  return params_copy
672

    
673

    
674
def _CheckOutputFields(static, dynamic, selected):
675
  """Checks whether all selected fields are valid.
676

677
  @type static: L{utils.FieldSet}
678
  @param static: static fields set
679
  @type dynamic: L{utils.FieldSet}
680
  @param dynamic: dynamic fields set
681

682
  """
683
  f = utils.FieldSet()
684
  f.Extend(static)
685
  f.Extend(dynamic)
686

    
687
  delta = f.NonMatching(selected)
688
  if delta:
689
    raise errors.OpPrereqError("Unknown output fields selected: %s"
690
                               % ",".join(delta), errors.ECODE_INVAL)
691

    
692

    
693
def _CheckGlobalHvParams(params):
694
  """Validates that given hypervisor params are not global ones.
695

696
  This will ensure that instances don't get customised versions of
697
  global params.
698

699
  """
700
  used_globals = constants.HVC_GLOBALS.intersection(params)
701
  if used_globals:
702
    msg = ("The following hypervisor parameters are global and cannot"
703
           " be customized at instance level, please modify them at"
704
           " cluster level: %s" % utils.CommaJoin(used_globals))
705
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
706

    
707

    
708
def _CheckNodeOnline(lu, node, msg=None):
709
  """Ensure that a given node is online.
710

711
  @param lu: the LU on behalf of which we make the check
712
  @param node: the node to check
713
  @param msg: if passed, should be a message to replace the default one
714
  @raise errors.OpPrereqError: if the node is offline
715

716
  """
717
  if msg is None:
718
    msg = "Can't use offline node"
719
  if lu.cfg.GetNodeInfo(node).offline:
720
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
721

    
722

    
723
def _CheckNodeNotDrained(lu, node):
724
  """Ensure that a given node is not drained.
725

726
  @param lu: the LU on behalf of which we make the check
727
  @param node: the node to check
728
  @raise errors.OpPrereqError: if the node is drained
729

730
  """
731
  if lu.cfg.GetNodeInfo(node).drained:
732
    raise errors.OpPrereqError("Can't use drained node %s" % node,
733
                               errors.ECODE_STATE)
734

    
735

    
736
def _CheckNodeVmCapable(lu, node):
737
  """Ensure that a given node is vm capable.
738

739
  @param lu: the LU on behalf of which we make the check
740
  @param node: the node to check
741
  @raise errors.OpPrereqError: if the node is not vm capable
742

743
  """
744
  if not lu.cfg.GetNodeInfo(node).vm_capable:
745
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
746
                               errors.ECODE_STATE)
747

    
748

    
749
def _CheckNodeHasOS(lu, node, os_name, force_variant):
750
  """Ensure that a node supports a given OS.
751

752
  @param lu: the LU on behalf of which we make the check
753
  @param node: the node to check
754
  @param os_name: the OS to query about
755
  @param force_variant: whether to ignore variant errors
756
  @raise errors.OpPrereqError: if the node is not supporting the OS
757

758
  """
759
  result = lu.rpc.call_os_get(node, os_name)
760
  result.Raise("OS '%s' not in supported OS list for node %s" %
761
               (os_name, node),
762
               prereq=True, ecode=errors.ECODE_INVAL)
763
  if not force_variant:
764
    _CheckOSVariant(result.payload, os_name)
765

    
766

    
767
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
768
  """Ensure that a node has the given secondary ip.
769

770
  @type lu: L{LogicalUnit}
771
  @param lu: the LU on behalf of which we make the check
772
  @type node: string
773
  @param node: the node to check
774
  @type secondary_ip: string
775
  @param secondary_ip: the ip to check
776
  @type prereq: boolean
777
  @param prereq: whether to throw a prerequisite or an execute error
778
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
779
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
780

781
  """
782
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
783
  result.Raise("Failure checking secondary ip on node %s" % node,
784
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
785
  if not result.payload:
786
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
787
           " please fix and re-run this command" % secondary_ip)
788
    if prereq:
789
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
790
    else:
791
      raise errors.OpExecError(msg)
792

    
793

    
794
def _RequireFileStorage():
795
  """Checks that file storage is enabled.
796

797
  @raise errors.OpPrereqError: when file storage is disabled
798

799
  """
800
  if not constants.ENABLE_FILE_STORAGE:
801
    raise errors.OpPrereqError("File storage disabled at configure time",
802
                               errors.ECODE_INVAL)
803

    
804

    
805
def _CheckDiskTemplate(template):
806
  """Ensure a given disk template is valid.
807

808
  """
809
  if template not in constants.DISK_TEMPLATES:
810
    msg = ("Invalid disk template name '%s', valid templates are: %s" %
811
           (template, utils.CommaJoin(constants.DISK_TEMPLATES)))
812
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
813
  if template == constants.DT_FILE:
814
    _RequireFileStorage()
815
  return True
816

    
817

    
818
def _CheckStorageType(storage_type):
819
  """Ensure a given storage type is valid.
820

821
  """
822
  if storage_type not in constants.VALID_STORAGE_TYPES:
823
    raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
824
                               errors.ECODE_INVAL)
825
  if storage_type == constants.ST_FILE:
826
    _RequireFileStorage()
827
  return True
828

    
829

    
830
def _GetClusterDomainSecret():
831
  """Reads the cluster domain secret.
832

833
  """
834
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
835
                               strict=True)
836

    
837

    
838
def _CheckInstanceDown(lu, instance, reason):
839
  """Ensure that an instance is not running."""
840
  if instance.admin_up:
841
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
842
                               (instance.name, reason), errors.ECODE_STATE)
843

    
844
  pnode = instance.primary_node
845
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
846
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
847
              prereq=True, ecode=errors.ECODE_ENVIRON)
848

    
849
  if instance.name in ins_l.payload:
850
    raise errors.OpPrereqError("Instance %s is running, %s" %
851
                               (instance.name, reason), errors.ECODE_STATE)
852

    
853

    
854
def _ExpandItemName(fn, name, kind):
855
  """Expand an item name.
856

857
  @param fn: the function to use for expansion
858
  @param name: requested item name
859
  @param kind: text description ('Node' or 'Instance')
860
  @return: the resolved (full) name
861
  @raise errors.OpPrereqError: if the item is not found
862

863
  """
864
  full_name = fn(name)
865
  if full_name is None:
866
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
867
                               errors.ECODE_NOENT)
868
  return full_name
869

    
870

    
871
def _ExpandNodeName(cfg, name):
872
  """Wrapper over L{_ExpandItemName} for nodes."""
873
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
874

    
875

    
876
def _ExpandInstanceName(cfg, name):
877
  """Wrapper over L{_ExpandItemName} for instance."""
878
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
879

    
880

    
881
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
882
                          memory, vcpus, nics, disk_template, disks,
883
                          bep, hvp, hypervisor_name):
884
  """Builds instance related env variables for hooks
885

886
  This builds the hook environment from individual variables.
887

888
  @type name: string
889
  @param name: the name of the instance
890
  @type primary_node: string
891
  @param primary_node: the name of the instance's primary node
892
  @type secondary_nodes: list
893
  @param secondary_nodes: list of secondary nodes as strings
894
  @type os_type: string
895
  @param os_type: the name of the instance's OS
896
  @type status: boolean
897
  @param status: the should_run status of the instance
898
  @type memory: string
899
  @param memory: the memory size of the instance
900
  @type vcpus: string
901
  @param vcpus: the count of VCPUs the instance has
902
  @type nics: list
903
  @param nics: list of tuples (ip, mac, mode, link) representing
904
      the NICs the instance has
905
  @type disk_template: string
906
  @param disk_template: the disk template of the instance
907
  @type disks: list
908
  @param disks: the list of (size, mode) pairs
909
  @type bep: dict
910
  @param bep: the backend parameters for the instance
911
  @type hvp: dict
912
  @param hvp: the hypervisor parameters for the instance
913
  @type hypervisor_name: string
914
  @param hypervisor_name: the hypervisor for the instance
915
  @rtype: dict
916
  @return: the hook environment for this instance
917

918
  """
919
  if status:
920
    str_status = "up"
921
  else:
922
    str_status = "down"
923
  env = {
924
    "OP_TARGET": name,
925
    "INSTANCE_NAME": name,
926
    "INSTANCE_PRIMARY": primary_node,
927
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
928
    "INSTANCE_OS_TYPE": os_type,
929
    "INSTANCE_STATUS": str_status,
930
    "INSTANCE_MEMORY": memory,
931
    "INSTANCE_VCPUS": vcpus,
932
    "INSTANCE_DISK_TEMPLATE": disk_template,
933
    "INSTANCE_HYPERVISOR": hypervisor_name,
934
  }
935

    
936
  if nics:
937
    nic_count = len(nics)
938
    for idx, (ip, mac, mode, link) in enumerate(nics):
939
      if ip is None:
940
        ip = ""
941
      env["INSTANCE_NIC%d_IP" % idx] = ip
942
      env["INSTANCE_NIC%d_MAC" % idx] = mac
943
      env["INSTANCE_NIC%d_MODE" % idx] = mode
944
      env["INSTANCE_NIC%d_LINK" % idx] = link
945
      if mode == constants.NIC_MODE_BRIDGED:
946
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
947
  else:
948
    nic_count = 0
949

    
950
  env["INSTANCE_NIC_COUNT"] = nic_count
951

    
952
  if disks:
953
    disk_count = len(disks)
954
    for idx, (size, mode) in enumerate(disks):
955
      env["INSTANCE_DISK%d_SIZE" % idx] = size
956
      env["INSTANCE_DISK%d_MODE" % idx] = mode
957
  else:
958
    disk_count = 0
959

    
960
  env["INSTANCE_DISK_COUNT"] = disk_count
961

    
962
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
963
    for key, value in source.items():
964
      env["INSTANCE_%s_%s" % (kind, key)] = value
965

    
966
  return env
967

    
968

    
969
def _NICListToTuple(lu, nics):
970
  """Build a list of nic information tuples.
971

972
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
973
  value in LUQueryInstanceData.
974

975
  @type lu:  L{LogicalUnit}
976
  @param lu: the logical unit on whose behalf we execute
977
  @type nics: list of L{objects.NIC}
978
  @param nics: list of nics to convert to hooks tuples
979

980
  """
981
  hooks_nics = []
982
  cluster = lu.cfg.GetClusterInfo()
983
  for nic in nics:
984
    ip = nic.ip
985
    mac = nic.mac
986
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
987
    mode = filled_params[constants.NIC_MODE]
988
    link = filled_params[constants.NIC_LINK]
989
    hooks_nics.append((ip, mac, mode, link))
990
  return hooks_nics
991

    
992

    
993
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
994
  """Builds instance related env variables for hooks from an object.
995

996
  @type lu: L{LogicalUnit}
997
  @param lu: the logical unit on whose behalf we execute
998
  @type instance: L{objects.Instance}
999
  @param instance: the instance for which we should build the
1000
      environment
1001
  @type override: dict
1002
  @param override: dictionary with key/values that will override
1003
      our values
1004
  @rtype: dict
1005
  @return: the hook environment dictionary
1006

1007
  """
1008
  cluster = lu.cfg.GetClusterInfo()
1009
  bep = cluster.FillBE(instance)
1010
  hvp = cluster.FillHV(instance)
1011
  args = {
1012
    'name': instance.name,
1013
    'primary_node': instance.primary_node,
1014
    'secondary_nodes': instance.secondary_nodes,
1015
    'os_type': instance.os,
1016
    'status': instance.admin_up,
1017
    'memory': bep[constants.BE_MEMORY],
1018
    'vcpus': bep[constants.BE_VCPUS],
1019
    'nics': _NICListToTuple(lu, instance.nics),
1020
    'disk_template': instance.disk_template,
1021
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
1022
    'bep': bep,
1023
    'hvp': hvp,
1024
    'hypervisor_name': instance.hypervisor,
1025
  }
1026
  if override:
1027
    args.update(override)
1028
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
1029

    
1030

    
1031
def _AdjustCandidatePool(lu, exceptions):
1032
  """Adjust the candidate pool after node operations.
1033

1034
  """
1035
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1036
  if mod_list:
1037
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1038
               utils.CommaJoin(node.name for node in mod_list))
1039
    for name in mod_list:
1040
      lu.context.ReaddNode(name)
1041
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1042
  if mc_now > mc_max:
1043
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1044
               (mc_now, mc_max))
1045

    
1046

    
1047
def _DecideSelfPromotion(lu, exceptions=None):
1048
  """Decide whether I should promote myself as a master candidate.
1049

1050
  """
1051
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1052
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1053
  # the new node will increase mc_max with one, so:
1054
  mc_should = min(mc_should + 1, cp_size)
1055
  return mc_now < mc_should
1056

    
1057

    
1058
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1059
  """Check that the brigdes needed by a list of nics exist.
1060

1061
  """
1062
  cluster = lu.cfg.GetClusterInfo()
1063
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1064
  brlist = [params[constants.NIC_LINK] for params in paramslist
1065
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1066
  if brlist:
1067
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1068
    result.Raise("Error checking bridges on destination node '%s'" %
1069
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1070

    
1071

    
1072
def _CheckInstanceBridgesExist(lu, instance, node=None):
1073
  """Check that the brigdes needed by an instance exist.
1074

1075
  """
1076
  if node is None:
1077
    node = instance.primary_node
1078
  _CheckNicsBridgesExist(lu, instance.nics, node)
1079

    
1080

    
1081
def _CheckOSVariant(os_obj, name):
1082
  """Check whether an OS name conforms to the os variants specification.
1083

1084
  @type os_obj: L{objects.OS}
1085
  @param os_obj: OS object to check
1086
  @type name: string
1087
  @param name: OS name passed by the user, to check for validity
1088

1089
  """
1090
  if not os_obj.supported_variants:
1091
    return
1092
  variant = objects.OS.GetVariant(name)
1093
  if not variant:
1094
    raise errors.OpPrereqError("OS name must include a variant",
1095
                               errors.ECODE_INVAL)
1096

    
1097
  if variant not in os_obj.supported_variants:
1098
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1099

    
1100

    
1101
def _GetNodeInstancesInner(cfg, fn):
1102
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1103

    
1104

    
1105
def _GetNodeInstances(cfg, node_name):
1106
  """Returns a list of all primary and secondary instances on a node.
1107

1108
  """
1109

    
1110
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1111

    
1112

    
1113
def _GetNodePrimaryInstances(cfg, node_name):
1114
  """Returns primary instances on a node.
1115

1116
  """
1117
  return _GetNodeInstancesInner(cfg,
1118
                                lambda inst: node_name == inst.primary_node)
1119

    
1120

    
1121
def _GetNodeSecondaryInstances(cfg, node_name):
1122
  """Returns secondary instances on a node.
1123

1124
  """
1125
  return _GetNodeInstancesInner(cfg,
1126
                                lambda inst: node_name in inst.secondary_nodes)
1127

    
1128

    
1129
def _GetStorageTypeArgs(cfg, storage_type):
1130
  """Returns the arguments for a storage type.
1131

1132
  """
1133
  # Special case for file storage
1134
  if storage_type == constants.ST_FILE:
1135
    # storage.FileStorage wants a list of storage directories
1136
    return [[cfg.GetFileStorageDir()]]
1137

    
1138
  return []
1139

    
1140

    
1141
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
1142
  faulty = []
1143

    
1144
  for dev in instance.disks:
1145
    cfg.SetDiskID(dev, node_name)
1146

    
1147
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
1148
  result.Raise("Failed to get disk status from node %s" % node_name,
1149
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1150

    
1151
  for idx, bdev_status in enumerate(result.payload):
1152
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1153
      faulty.append(idx)
1154

    
1155
  return faulty
1156

    
1157

    
1158
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1159
  """Check the sanity of iallocator and node arguments and use the
1160
  cluster-wide iallocator if appropriate.
1161

1162
  Check that at most one of (iallocator, node) is specified. If none is
1163
  specified, then the LU's opcode's iallocator slot is filled with the
1164
  cluster-wide default iallocator.
1165

1166
  @type iallocator_slot: string
1167
  @param iallocator_slot: the name of the opcode iallocator slot
1168
  @type node_slot: string
1169
  @param node_slot: the name of the opcode target node slot
1170

1171
  """
1172
  node = getattr(lu.op, node_slot, None)
1173
  iallocator = getattr(lu.op, iallocator_slot, None)
1174

    
1175
  if node is not None and iallocator is not None:
1176
    raise errors.OpPrereqError("Do not specify both, iallocator and node.",
1177
                               errors.ECODE_INVAL)
1178
  elif node is None and iallocator is None:
1179
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1180
    if default_iallocator:
1181
      setattr(lu.op, iallocator_slot, default_iallocator)
1182
    else:
1183
      raise errors.OpPrereqError("No iallocator or node given and no"
1184
                                 " cluster-wide default iallocator found."
1185
                                 " Please specify either an iallocator or a"
1186
                                 " node, or set a cluster-wide default"
1187
                                 " iallocator.")
1188

    
1189

    
1190
class LUPostInitCluster(LogicalUnit):
1191
  """Logical unit for running hooks after cluster initialization.
1192

1193
  """
1194
  HPATH = "cluster-init"
1195
  HTYPE = constants.HTYPE_CLUSTER
1196

    
1197
  def BuildHooksEnv(self):
1198
    """Build hooks env.
1199

1200
    """
1201
    env = {"OP_TARGET": self.cfg.GetClusterName()}
1202
    mn = self.cfg.GetMasterNode()
1203
    return env, [], [mn]
1204

    
1205
  def Exec(self, feedback_fn):
1206
    """Nothing to do.
1207

1208
    """
1209
    return True
1210

    
1211

    
1212
class LUDestroyCluster(LogicalUnit):
1213
  """Logical unit for destroying the cluster.
1214

1215
  """
1216
  HPATH = "cluster-destroy"
1217
  HTYPE = constants.HTYPE_CLUSTER
1218

    
1219
  def BuildHooksEnv(self):
1220
    """Build hooks env.
1221

1222
    """
1223
    env = {"OP_TARGET": self.cfg.GetClusterName()}
1224
    return env, [], []
1225

    
1226
  def CheckPrereq(self):
1227
    """Check prerequisites.
1228

1229
    This checks whether the cluster is empty.
1230

1231
    Any errors are signaled by raising errors.OpPrereqError.
1232

1233
    """
1234
    master = self.cfg.GetMasterNode()
1235

    
1236
    nodelist = self.cfg.GetNodeList()
1237
    if len(nodelist) != 1 or nodelist[0] != master:
1238
      raise errors.OpPrereqError("There are still %d node(s) in"
1239
                                 " this cluster." % (len(nodelist) - 1),
1240
                                 errors.ECODE_INVAL)
1241
    instancelist = self.cfg.GetInstanceList()
1242
    if instancelist:
1243
      raise errors.OpPrereqError("There are still %d instance(s) in"
1244
                                 " this cluster." % len(instancelist),
1245
                                 errors.ECODE_INVAL)
1246

    
1247
  def Exec(self, feedback_fn):
1248
    """Destroys the cluster.
1249

1250
    """
1251
    master = self.cfg.GetMasterNode()
1252

    
1253
    # Run post hooks on master node before it's removed
1254
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
1255
    try:
1256
      hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
1257
    except:
1258
      # pylint: disable-msg=W0702
1259
      self.LogWarning("Errors occurred running hooks on %s" % master)
1260

    
1261
    result = self.rpc.call_node_stop_master(master, False)
1262
    result.Raise("Could not disable the master role")
1263

    
1264
    return master
1265

    
1266

    
1267
def _VerifyCertificate(filename):
1268
  """Verifies a certificate for LUVerifyCluster.
1269

1270
  @type filename: string
1271
  @param filename: Path to PEM file
1272

1273
  """
1274
  try:
1275
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1276
                                           utils.ReadFile(filename))
1277
  except Exception, err: # pylint: disable-msg=W0703
1278
    return (LUVerifyCluster.ETYPE_ERROR,
1279
            "Failed to load X509 certificate %s: %s" % (filename, err))
1280

    
1281
  (errcode, msg) = \
1282
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1283
                                constants.SSL_CERT_EXPIRATION_ERROR)
1284

    
1285
  if msg:
1286
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1287
  else:
1288
    fnamemsg = None
1289

    
1290
  if errcode is None:
1291
    return (None, fnamemsg)
1292
  elif errcode == utils.CERT_WARNING:
1293
    return (LUVerifyCluster.ETYPE_WARNING, fnamemsg)
1294
  elif errcode == utils.CERT_ERROR:
1295
    return (LUVerifyCluster.ETYPE_ERROR, fnamemsg)
1296

    
1297
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1298

    
1299

    
1300
class LUVerifyCluster(LogicalUnit):
1301
  """Verifies the cluster status.
1302

1303
  """
1304
  HPATH = "cluster-verify"
1305
  HTYPE = constants.HTYPE_CLUSTER
1306
  _OP_PARAMS = [
1307
    ("skip_checks", ht.EmptyList,
1308
     ht.TListOf(ht.TElemOf(constants.VERIFY_OPTIONAL_CHECKS))),
1309
    ("verbose", False, ht.TBool),
1310
    ("error_codes", False, ht.TBool),
1311
    ("debug_simulate_errors", False, ht.TBool),
1312
    ]
1313
  REQ_BGL = False
1314

    
1315
  TCLUSTER = "cluster"
1316
  TNODE = "node"
1317
  TINSTANCE = "instance"
1318

    
1319
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1320
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1321
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1322
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1323
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1324
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1325
  EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
1326
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1327
  ENODEDRBD = (TNODE, "ENODEDRBD")
1328
  ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
1329
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1330
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1331
  ENODEHV = (TNODE, "ENODEHV")
1332
  ENODELVM = (TNODE, "ENODELVM")
1333
  ENODEN1 = (TNODE, "ENODEN1")
1334
  ENODENET = (TNODE, "ENODENET")
1335
  ENODEOS = (TNODE, "ENODEOS")
1336
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1337
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1338
  ENODERPC = (TNODE, "ENODERPC")
1339
  ENODESSH = (TNODE, "ENODESSH")
1340
  ENODEVERSION = (TNODE, "ENODEVERSION")
1341
  ENODESETUP = (TNODE, "ENODESETUP")
1342
  ENODETIME = (TNODE, "ENODETIME")
1343

    
1344
  ETYPE_FIELD = "code"
1345
  ETYPE_ERROR = "ERROR"
1346
  ETYPE_WARNING = "WARNING"
1347

    
1348
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1349

    
1350
  class NodeImage(object):
1351
    """A class representing the logical and physical status of a node.
1352

1353
    @type name: string
1354
    @ivar name: the node name to which this object refers
1355
    @ivar volumes: a structure as returned from
1356
        L{ganeti.backend.GetVolumeList} (runtime)
1357
    @ivar instances: a list of running instances (runtime)
1358
    @ivar pinst: list of configured primary instances (config)
1359
    @ivar sinst: list of configured secondary instances (config)
1360
    @ivar sbp: diction of {secondary-node: list of instances} of all peers
1361
        of this node (config)
1362
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1363
    @ivar dfree: free disk, as reported by the node (runtime)
1364
    @ivar offline: the offline status (config)
1365
    @type rpc_fail: boolean
1366
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1367
        not whether the individual keys were correct) (runtime)
1368
    @type lvm_fail: boolean
1369
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1370
    @type hyp_fail: boolean
1371
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1372
    @type ghost: boolean
1373
    @ivar ghost: whether this is a known node or not (config)
1374
    @type os_fail: boolean
1375
    @ivar os_fail: whether the RPC call didn't return valid OS data
1376
    @type oslist: list
1377
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1378
    @type vm_capable: boolean
1379
    @ivar vm_capable: whether the node can host instances
1380

1381
    """
1382
    def __init__(self, offline=False, name=None, vm_capable=True):
1383
      self.name = name
1384
      self.volumes = {}
1385
      self.instances = []
1386
      self.pinst = []
1387
      self.sinst = []
1388
      self.sbp = {}
1389
      self.mfree = 0
1390
      self.dfree = 0
1391
      self.offline = offline
1392
      self.vm_capable = vm_capable
1393
      self.rpc_fail = False
1394
      self.lvm_fail = False
1395
      self.hyp_fail = False
1396
      self.ghost = False
1397
      self.os_fail = False
1398
      self.oslist = {}
1399

    
1400
  def ExpandNames(self):
1401
    self.needed_locks = {
1402
      locking.LEVEL_NODE: locking.ALL_SET,
1403
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1404
    }
1405
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1406

    
1407
  def _Error(self, ecode, item, msg, *args, **kwargs):
1408
    """Format an error message.
1409

1410
    Based on the opcode's error_codes parameter, either format a
1411
    parseable error code, or a simpler error string.
1412

1413
    This must be called only from Exec and functions called from Exec.
1414

1415
    """
1416
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1417
    itype, etxt = ecode
1418
    # first complete the msg
1419
    if args:
1420
      msg = msg % args
1421
    # then format the whole message
1422
    if self.op.error_codes:
1423
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1424
    else:
1425
      if item:
1426
        item = " " + item
1427
      else:
1428
        item = ""
1429
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1430
    # and finally report it via the feedback_fn
1431
    self._feedback_fn("  - %s" % msg)
1432

    
1433
  def _ErrorIf(self, cond, *args, **kwargs):
1434
    """Log an error message if the passed condition is True.
1435

1436
    """
1437
    cond = bool(cond) or self.op.debug_simulate_errors
1438
    if cond:
1439
      self._Error(*args, **kwargs)
1440
    # do not mark the operation as failed for WARN cases only
1441
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1442
      self.bad = self.bad or cond
1443

    
1444
  def _VerifyNode(self, ninfo, nresult):
1445
    """Perform some basic validation on data returned from a node.
1446

1447
      - check the result data structure is well formed and has all the
1448
        mandatory fields
1449
      - check ganeti version
1450

1451
    @type ninfo: L{objects.Node}
1452
    @param ninfo: the node to check
1453
    @param nresult: the results from the node
1454
    @rtype: boolean
1455
    @return: whether overall this call was successful (and we can expect
1456
         reasonable values in the respose)
1457

1458
    """
1459
    node = ninfo.name
1460
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1461

    
1462
    # main result, nresult should be a non-empty dict
1463
    test = not nresult or not isinstance(nresult, dict)
1464
    _ErrorIf(test, self.ENODERPC, node,
1465
                  "unable to verify node: no data returned")
1466
    if test:
1467
      return False
1468

    
1469
    # compares ganeti version
1470
    local_version = constants.PROTOCOL_VERSION
1471
    remote_version = nresult.get("version", None)
1472
    test = not (remote_version and
1473
                isinstance(remote_version, (list, tuple)) and
1474
                len(remote_version) == 2)
1475
    _ErrorIf(test, self.ENODERPC, node,
1476
             "connection to node returned invalid data")
1477
    if test:
1478
      return False
1479

    
1480
    test = local_version != remote_version[0]
1481
    _ErrorIf(test, self.ENODEVERSION, node,
1482
             "incompatible protocol versions: master %s,"
1483
             " node %s", local_version, remote_version[0])
1484
    if test:
1485
      return False
1486

    
1487
    # node seems compatible, we can actually try to look into its results
1488

    
1489
    # full package version
1490
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1491
                  self.ENODEVERSION, node,
1492
                  "software version mismatch: master %s, node %s",
1493
                  constants.RELEASE_VERSION, remote_version[1],
1494
                  code=self.ETYPE_WARNING)
1495

    
1496
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1497
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1498
      for hv_name, hv_result in hyp_result.iteritems():
1499
        test = hv_result is not None
1500
        _ErrorIf(test, self.ENODEHV, node,
1501
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1502

    
1503
    test = nresult.get(constants.NV_NODESETUP,
1504
                           ["Missing NODESETUP results"])
1505
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1506
             "; ".join(test))
1507

    
1508
    return True
1509

    
1510
  def _VerifyNodeTime(self, ninfo, nresult,
1511
                      nvinfo_starttime, nvinfo_endtime):
1512
    """Check the node time.
1513

1514
    @type ninfo: L{objects.Node}
1515
    @param ninfo: the node to check
1516
    @param nresult: the remote results for the node
1517
    @param nvinfo_starttime: the start time of the RPC call
1518
    @param nvinfo_endtime: the end time of the RPC call
1519

1520
    """
1521
    node = ninfo.name
1522
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1523

    
1524
    ntime = nresult.get(constants.NV_TIME, None)
1525
    try:
1526
      ntime_merged = utils.MergeTime(ntime)
1527
    except (ValueError, TypeError):
1528
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1529
      return
1530

    
1531
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1532
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1533
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1534
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1535
    else:
1536
      ntime_diff = None
1537

    
1538
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1539
             "Node time diverges by at least %s from master node time",
1540
             ntime_diff)
1541

    
1542
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1543
    """Check the node time.
1544

1545
    @type ninfo: L{objects.Node}
1546
    @param ninfo: the node to check
1547
    @param nresult: the remote results for the node
1548
    @param vg_name: the configured VG name
1549

1550
    """
1551
    if vg_name is None:
1552
      return
1553

    
1554
    node = ninfo.name
1555
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1556

    
1557
    # checks vg existence and size > 20G
1558
    vglist = nresult.get(constants.NV_VGLIST, None)
1559
    test = not vglist
1560
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1561
    if not test:
1562
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1563
                                            constants.MIN_VG_SIZE)
1564
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1565

    
1566
    # check pv names
1567
    pvlist = nresult.get(constants.NV_PVLIST, None)
1568
    test = pvlist is None
1569
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1570
    if not test:
1571
      # check that ':' is not present in PV names, since it's a
1572
      # special character for lvcreate (denotes the range of PEs to
1573
      # use on the PV)
1574
      for _, pvname, owner_vg in pvlist:
1575
        test = ":" in pvname
1576
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1577
                 " '%s' of VG '%s'", pvname, owner_vg)
1578

    
1579
  def _VerifyNodeNetwork(self, ninfo, nresult):
1580
    """Check the node time.
1581

1582
    @type ninfo: L{objects.Node}
1583
    @param ninfo: the node to check
1584
    @param nresult: the remote results for the node
1585

1586
    """
1587
    node = ninfo.name
1588
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1589

    
1590
    test = constants.NV_NODELIST not in nresult
1591
    _ErrorIf(test, self.ENODESSH, node,
1592
             "node hasn't returned node ssh connectivity data")
1593
    if not test:
1594
      if nresult[constants.NV_NODELIST]:
1595
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1596
          _ErrorIf(True, self.ENODESSH, node,
1597
                   "ssh communication with node '%s': %s", a_node, a_msg)
1598

    
1599
    test = constants.NV_NODENETTEST not in nresult
1600
    _ErrorIf(test, self.ENODENET, node,
1601
             "node hasn't returned node tcp connectivity data")
1602
    if not test:
1603
      if nresult[constants.NV_NODENETTEST]:
1604
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1605
        for anode in nlist:
1606
          _ErrorIf(True, self.ENODENET, node,
1607
                   "tcp communication with node '%s': %s",
1608
                   anode, nresult[constants.NV_NODENETTEST][anode])
1609

    
1610
    test = constants.NV_MASTERIP not in nresult
1611
    _ErrorIf(test, self.ENODENET, node,
1612
             "node hasn't returned node master IP reachability data")
1613
    if not test:
1614
      if not nresult[constants.NV_MASTERIP]:
1615
        if node == self.master_node:
1616
          msg = "the master node cannot reach the master IP (not configured?)"
1617
        else:
1618
          msg = "cannot reach the master IP"
1619
        _ErrorIf(True, self.ENODENET, node, msg)
1620

    
1621
  def _VerifyInstance(self, instance, instanceconfig, node_image,
1622
                      diskstatus):
1623
    """Verify an instance.
1624

1625
    This function checks to see if the required block devices are
1626
    available on the instance's node.
1627

1628
    """
1629
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1630
    node_current = instanceconfig.primary_node
1631

    
1632
    node_vol_should = {}
1633
    instanceconfig.MapLVsByNode(node_vol_should)
1634

    
1635
    for node in node_vol_should:
1636
      n_img = node_image[node]
1637
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1638
        # ignore missing volumes on offline or broken nodes
1639
        continue
1640
      for volume in node_vol_should[node]:
1641
        test = volume not in n_img.volumes
1642
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1643
                 "volume %s missing on node %s", volume, node)
1644

    
1645
    if instanceconfig.admin_up:
1646
      pri_img = node_image[node_current]
1647
      test = instance not in pri_img.instances and not pri_img.offline
1648
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1649
               "instance not running on its primary node %s",
1650
               node_current)
1651

    
1652
    for node, n_img in node_image.items():
1653
      if (not node == node_current):
1654
        test = instance in n_img.instances
1655
        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1656
                 "instance should not run on node %s", node)
1657

    
1658
    diskdata = [(nname, success, status, idx)
1659
                for (nname, disks) in diskstatus.items()
1660
                for idx, (success, status) in enumerate(disks)]
1661

    
1662
    for nname, success, bdev_status, idx in diskdata:
1663
      _ErrorIf(instanceconfig.admin_up and not success,
1664
               self.EINSTANCEFAULTYDISK, instance,
1665
               "couldn't retrieve status for disk/%s on %s: %s",
1666
               idx, nname, bdev_status)
1667
      _ErrorIf((instanceconfig.admin_up and success and
1668
                bdev_status.ldisk_status == constants.LDS_FAULTY),
1669
               self.EINSTANCEFAULTYDISK, instance,
1670
               "disk/%s on %s is faulty", idx, nname)
1671

    
1672
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1673
    """Verify if there are any unknown volumes in the cluster.
1674

1675
    The .os, .swap and backup volumes are ignored. All other volumes are
1676
    reported as unknown.
1677

1678
    @type reserved: L{ganeti.utils.FieldSet}
1679
    @param reserved: a FieldSet of reserved volume names
1680

1681
    """
1682
    for node, n_img in node_image.items():
1683
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1684
        # skip non-healthy nodes
1685
        continue
1686
      for volume in n_img.volumes:
1687
        test = ((node not in node_vol_should or
1688
                volume not in node_vol_should[node]) and
1689
                not reserved.Matches(volume))
1690
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1691
                      "volume %s is unknown", volume)
1692

    
1693
  def _VerifyOrphanInstances(self, instancelist, node_image):
1694
    """Verify the list of running instances.
1695

1696
    This checks what instances are running but unknown to the cluster.
1697

1698
    """
1699
    for node, n_img in node_image.items():
1700
      for o_inst in n_img.instances:
1701
        test = o_inst not in instancelist
1702
        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1703
                      "instance %s on node %s should not exist", o_inst, node)
1704

    
1705
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1706
    """Verify N+1 Memory Resilience.
1707

1708
    Check that if one single node dies we can still start all the
1709
    instances it was primary for.
1710

1711
    """
1712
    for node, n_img in node_image.items():
1713
      # This code checks that every node which is now listed as
1714
      # secondary has enough memory to host all instances it is
1715
      # supposed to should a single other node in the cluster fail.
1716
      # FIXME: not ready for failover to an arbitrary node
1717
      # FIXME: does not support file-backed instances
1718
      # WARNING: we currently take into account down instances as well
1719
      # as up ones, considering that even if they're down someone
1720
      # might want to start them even in the event of a node failure.
1721
      for prinode, instances in n_img.sbp.items():
1722
        needed_mem = 0
1723
        for instance in instances:
1724
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1725
          if bep[constants.BE_AUTO_BALANCE]:
1726
            needed_mem += bep[constants.BE_MEMORY]
1727
        test = n_img.mfree < needed_mem
1728
        self._ErrorIf(test, self.ENODEN1, node,
1729
                      "not enough memory to accomodate instance failovers"
1730
                      " should node %s fail", prinode)
1731

    
1732
  def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum,
1733
                       master_files):
1734
    """Verifies and computes the node required file checksums.
1735

1736
    @type ninfo: L{objects.Node}
1737
    @param ninfo: the node to check
1738
    @param nresult: the remote results for the node
1739
    @param file_list: required list of files
1740
    @param local_cksum: dictionary of local files and their checksums
1741
    @param master_files: list of files that only masters should have
1742

1743
    """
1744
    node = ninfo.name
1745
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1746

    
1747
    remote_cksum = nresult.get(constants.NV_FILELIST, None)
1748
    test = not isinstance(remote_cksum, dict)
1749
    _ErrorIf(test, self.ENODEFILECHECK, node,
1750
             "node hasn't returned file checksum data")
1751
    if test:
1752
      return
1753

    
1754
    for file_name in file_list:
1755
      node_is_mc = ninfo.master_candidate
1756
      must_have = (file_name not in master_files) or node_is_mc
1757
      # missing
1758
      test1 = file_name not in remote_cksum
1759
      # invalid checksum
1760
      test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1761
      # existing and good
1762
      test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1763
      _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1764
               "file '%s' missing", file_name)
1765
      _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1766
               "file '%s' has wrong checksum", file_name)
1767
      # not candidate and this is not a must-have file
1768
      _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1769
               "file '%s' should not exist on non master"
1770
               " candidates (and the file is outdated)", file_name)
1771
      # all good, except non-master/non-must have combination
1772
      _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1773
               "file '%s' should not exist"
1774
               " on non master candidates", file_name)
1775

    
1776
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
1777
                      drbd_map):
1778
    """Verifies and the node DRBD status.
1779

1780
    @type ninfo: L{objects.Node}
1781
    @param ninfo: the node to check
1782
    @param nresult: the remote results for the node
1783
    @param instanceinfo: the dict of instances
1784
    @param drbd_helper: the configured DRBD usermode helper
1785
    @param drbd_map: the DRBD map as returned by
1786
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
1787

1788
    """
1789
    node = ninfo.name
1790
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1791

    
1792
    if drbd_helper:
1793
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
1794
      test = (helper_result == None)
1795
      _ErrorIf(test, self.ENODEDRBDHELPER, node,
1796
               "no drbd usermode helper returned")
1797
      if helper_result:
1798
        status, payload = helper_result
1799
        test = not status
1800
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
1801
                 "drbd usermode helper check unsuccessful: %s", payload)
1802
        test = status and (payload != drbd_helper)
1803
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
1804
                 "wrong drbd usermode helper: %s", payload)
1805

    
1806
    # compute the DRBD minors
1807
    node_drbd = {}
1808
    for minor, instance in drbd_map[node].items():
1809
      test = instance not in instanceinfo
1810
      _ErrorIf(test, self.ECLUSTERCFG, None,
1811
               "ghost instance '%s' in temporary DRBD map", instance)
1812
        # ghost instance should not be running, but otherwise we
1813
        # don't give double warnings (both ghost instance and
1814
        # unallocated minor in use)
1815
      if test:
1816
        node_drbd[minor] = (instance, False)
1817
      else:
1818
        instance = instanceinfo[instance]
1819
        node_drbd[minor] = (instance.name, instance.admin_up)
1820

    
1821
    # and now check them
1822
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
1823
    test = not isinstance(used_minors, (tuple, list))
1824
    _ErrorIf(test, self.ENODEDRBD, node,
1825
             "cannot parse drbd status file: %s", str(used_minors))
1826
    if test:
1827
      # we cannot check drbd status
1828
      return
1829

    
1830
    for minor, (iname, must_exist) in node_drbd.items():
1831
      test = minor not in used_minors and must_exist
1832
      _ErrorIf(test, self.ENODEDRBD, node,
1833
               "drbd minor %d of instance %s is not active", minor, iname)
1834
    for minor in used_minors:
1835
      test = minor not in node_drbd
1836
      _ErrorIf(test, self.ENODEDRBD, node,
1837
               "unallocated drbd minor %d is in use", minor)
1838

    
1839
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
1840
    """Builds the node OS structures.
1841

1842
    @type ninfo: L{objects.Node}
1843
    @param ninfo: the node to check
1844
    @param nresult: the remote results for the node
1845
    @param nimg: the node image object
1846

1847
    """
1848
    node = ninfo.name
1849
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1850

    
1851
    remote_os = nresult.get(constants.NV_OSLIST, None)
1852
    test = (not isinstance(remote_os, list) or
1853
            not compat.all(isinstance(v, list) and len(v) == 7
1854
                           for v in remote_os))
1855

    
1856
    _ErrorIf(test, self.ENODEOS, node,
1857
             "node hasn't returned valid OS data")
1858

    
1859
    nimg.os_fail = test
1860

    
1861
    if test:
1862
      return
1863

    
1864
    os_dict = {}
1865

    
1866
    for (name, os_path, status, diagnose,
1867
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
1868

    
1869
      if name not in os_dict:
1870
        os_dict[name] = []
1871

    
1872
      # parameters is a list of lists instead of list of tuples due to
1873
      # JSON lacking a real tuple type, fix it:
1874
      parameters = [tuple(v) for v in parameters]
1875
      os_dict[name].append((os_path, status, diagnose,
1876
                            set(variants), set(parameters), set(api_ver)))
1877

    
1878
    nimg.oslist = os_dict
1879

    
1880
  def _VerifyNodeOS(self, ninfo, nimg, base):
1881
    """Verifies the node OS list.
1882

1883
    @type ninfo: L{objects.Node}
1884
    @param ninfo: the node to check
1885
    @param nimg: the node image object
1886
    @param base: the 'template' node we match against (e.g. from the master)
1887

1888
    """
1889
    node = ninfo.name
1890
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1891

    
1892
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
1893

    
1894
    for os_name, os_data in nimg.oslist.items():
1895
      assert os_data, "Empty OS status for OS %s?!" % os_name
1896
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
1897
      _ErrorIf(not f_status, self.ENODEOS, node,
1898
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
1899
      _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
1900
               "OS '%s' has multiple entries (first one shadows the rest): %s",
1901
               os_name, utils.CommaJoin([v[0] for v in os_data]))
1902
      # this will catched in backend too
1903
      _ErrorIf(compat.any(v >= constants.OS_API_V15 for v in f_api)
1904
               and not f_var, self.ENODEOS, node,
1905
               "OS %s with API at least %d does not declare any variant",
1906
               os_name, constants.OS_API_V15)
1907
      # comparisons with the 'base' image
1908
      test = os_name not in base.oslist
1909
      _ErrorIf(test, self.ENODEOS, node,
1910
               "Extra OS %s not present on reference node (%s)",
1911
               os_name, base.name)
1912
      if test:
1913
        continue
1914
      assert base.oslist[os_name], "Base node has empty OS status?"
1915
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
1916
      if not b_status:
1917
        # base OS is invalid, skipping
1918
        continue
1919
      for kind, a, b in [("API version", f_api, b_api),
1920
                         ("variants list", f_var, b_var),
1921
                         ("parameters", f_param, b_param)]:
1922
        _ErrorIf(a != b, self.ENODEOS, node,
1923
                 "OS %s %s differs from reference node %s: %s vs. %s",
1924
                 kind, os_name, base.name,
1925
                 utils.CommaJoin(a), utils.CommaJoin(b))
1926

    
1927
    # check any missing OSes
1928
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
1929
    _ErrorIf(missing, self.ENODEOS, node,
1930
             "OSes present on reference node %s but missing on this node: %s",
1931
             base.name, utils.CommaJoin(missing))
1932

    
1933
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
1934
    """Verifies and updates the node volume data.
1935

1936
    This function will update a L{NodeImage}'s internal structures
1937
    with data from the remote call.
1938

1939
    @type ninfo: L{objects.Node}
1940
    @param ninfo: the node to check
1941
    @param nresult: the remote results for the node
1942
    @param nimg: the node image object
1943
    @param vg_name: the configured VG name
1944

1945
    """
1946
    node = ninfo.name
1947
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1948

    
1949
    nimg.lvm_fail = True
1950
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1951
    if vg_name is None:
1952
      pass
1953
    elif isinstance(lvdata, basestring):
1954
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1955
               utils.SafeEncode(lvdata))
1956
    elif not isinstance(lvdata, dict):
1957
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1958
    else:
1959
      nimg.volumes = lvdata
1960
      nimg.lvm_fail = False
1961

    
1962
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
1963
    """Verifies and updates the node instance list.
1964

1965
    If the listing was successful, then updates this node's instance
1966
    list. Otherwise, it marks the RPC call as failed for the instance
1967
    list key.
1968

1969
    @type ninfo: L{objects.Node}
1970
    @param ninfo: the node to check
1971
    @param nresult: the remote results for the node
1972
    @param nimg: the node image object
1973

1974
    """
1975
    idata = nresult.get(constants.NV_INSTANCELIST, None)
1976
    test = not isinstance(idata, list)
1977
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
1978
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
1979
    if test:
1980
      nimg.hyp_fail = True
1981
    else:
1982
      nimg.instances = idata
1983

    
1984
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
1985
    """Verifies and computes a node information map
1986

1987
    @type ninfo: L{objects.Node}
1988
    @param ninfo: the node to check
1989
    @param nresult: the remote results for the node
1990
    @param nimg: the node image object
1991
    @param vg_name: the configured VG name
1992

1993
    """
1994
    node = ninfo.name
1995
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1996

    
1997
    # try to read free memory (from the hypervisor)
1998
    hv_info = nresult.get(constants.NV_HVINFO, None)
1999
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2000
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
2001
    if not test:
2002
      try:
2003
        nimg.mfree = int(hv_info["memory_free"])
2004
      except (ValueError, TypeError):
2005
        _ErrorIf(True, self.ENODERPC, node,
2006
                 "node returned invalid nodeinfo, check hypervisor")
2007

    
2008
    # FIXME: devise a free space model for file based instances as well
2009
    if vg_name is not None:
2010
      test = (constants.NV_VGLIST not in nresult or
2011
              vg_name not in nresult[constants.NV_VGLIST])
2012
      _ErrorIf(test, self.ENODELVM, node,
2013
               "node didn't return data for the volume group '%s'"
2014
               " - it is either missing or broken", vg_name)
2015
      if not test:
2016
        try:
2017
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2018
        except (ValueError, TypeError):
2019
          _ErrorIf(True, self.ENODERPC, node,
2020
                   "node returned invalid LVM info, check LVM status")
2021

    
2022
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2023
    """Gets per-disk status information for all instances.
2024

2025
    @type nodelist: list of strings
2026
    @param nodelist: Node names
2027
    @type node_image: dict of (name, L{objects.Node})
2028
    @param node_image: Node objects
2029
    @type instanceinfo: dict of (name, L{objects.Instance})
2030
    @param instanceinfo: Instance objects
2031
    @rtype: {instance: {node: [(succes, payload)]}}
2032
    @return: a dictionary of per-instance dictionaries with nodes as
2033
        keys and disk information as values; the disk information is a
2034
        list of tuples (success, payload)
2035

2036
    """
2037
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2038

    
2039
    node_disks = {}
2040
    node_disks_devonly = {}
2041
    diskless_instances = set()
2042
    diskless = constants.DT_DISKLESS
2043

    
2044
    for nname in nodelist:
2045
      node_instances = list(itertools.chain(node_image[nname].pinst,
2046
                                            node_image[nname].sinst))
2047
      diskless_instances.update(inst for inst in node_instances
2048
                                if instanceinfo[inst].disk_template == diskless)
2049
      disks = [(inst, disk)
2050
               for inst in node_instances
2051
               for disk in instanceinfo[inst].disks]
2052

    
2053
      if not disks:
2054
        # No need to collect data
2055
        continue
2056

    
2057
      node_disks[nname] = disks
2058

    
2059
      # Creating copies as SetDiskID below will modify the objects and that can
2060
      # lead to incorrect data returned from nodes
2061
      devonly = [dev.Copy() for (_, dev) in disks]
2062

    
2063
      for dev in devonly:
2064
        self.cfg.SetDiskID(dev, nname)
2065

    
2066
      node_disks_devonly[nname] = devonly
2067

    
2068
    assert len(node_disks) == len(node_disks_devonly)
2069

    
2070
    # Collect data from all nodes with disks
2071
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2072
                                                          node_disks_devonly)
2073

    
2074
    assert len(result) == len(node_disks)
2075

    
2076
    instdisk = {}
2077

    
2078
    for (nname, nres) in result.items():
2079
      disks = node_disks[nname]
2080

    
2081
      if nres.offline:
2082
        # No data from this node
2083
        data = len(disks) * [(False, "node offline")]
2084
      else:
2085
        msg = nres.fail_msg
2086
        _ErrorIf(msg, self.ENODERPC, nname,
2087
                 "while getting disk information: %s", msg)
2088
        if msg:
2089
          # No data from this node
2090
          data = len(disks) * [(False, msg)]
2091
        else:
2092
          data = []
2093
          for idx, i in enumerate(nres.payload):
2094
            if isinstance(i, (tuple, list)) and len(i) == 2:
2095
              data.append(i)
2096
            else:
2097
              logging.warning("Invalid result from node %s, entry %d: %s",
2098
                              nname, idx, i)
2099
              data.append((False, "Invalid result from the remote node"))
2100

    
2101
      for ((inst, _), status) in zip(disks, data):
2102
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2103

    
2104
    # Add empty entries for diskless instances.
2105
    for inst in diskless_instances:
2106
      assert inst not in instdisk
2107
      instdisk[inst] = {}
2108

    
2109
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2110
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2111
                      compat.all(isinstance(s, (tuple, list)) and
2112
                                 len(s) == 2 for s in statuses)
2113
                      for inst, nnames in instdisk.items()
2114
                      for nname, statuses in nnames.items())
2115
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2116

    
2117
    return instdisk
2118

    
2119
  def BuildHooksEnv(self):
2120
    """Build hooks env.
2121

2122
    Cluster-Verify hooks just ran in the post phase and their failure makes
2123
    the output be logged in the verify output and the verification to fail.
2124

2125
    """
2126
    all_nodes = self.cfg.GetNodeList()
2127
    env = {
2128
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2129
      }
2130
    for node in self.cfg.GetAllNodesInfo().values():
2131
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
2132

    
2133
    return env, [], all_nodes
2134

    
2135
  def Exec(self, feedback_fn):
2136
    """Verify integrity of cluster, performing various test on nodes.
2137

2138
    """
2139
    self.bad = False
2140
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2141
    verbose = self.op.verbose
2142
    self._feedback_fn = feedback_fn
2143
    feedback_fn("* Verifying global settings")
2144
    for msg in self.cfg.VerifyConfig():
2145
      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
2146

    
2147
    # Check the cluster certificates
2148
    for cert_filename in constants.ALL_CERT_FILES:
2149
      (errcode, msg) = _VerifyCertificate(cert_filename)
2150
      _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
2151

    
2152
    vg_name = self.cfg.GetVGName()
2153
    drbd_helper = self.cfg.GetDRBDHelper()
2154
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2155
    cluster = self.cfg.GetClusterInfo()
2156
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
2157
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
2158
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
2159
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
2160
                        for iname in instancelist)
2161
    i_non_redundant = [] # Non redundant instances
2162
    i_non_a_balanced = [] # Non auto-balanced instances
2163
    n_offline = 0 # Count of offline nodes
2164
    n_drained = 0 # Count of nodes being drained
2165
    node_vol_should = {}
2166

    
2167
    # FIXME: verify OS list
2168
    # do local checksums
2169
    master_files = [constants.CLUSTER_CONF_FILE]
2170
    master_node = self.master_node = self.cfg.GetMasterNode()
2171
    master_ip = self.cfg.GetMasterIP()
2172

    
2173
    file_names = ssconf.SimpleStore().GetFileList()
2174
    file_names.extend(constants.ALL_CERT_FILES)
2175
    file_names.extend(master_files)
2176
    if cluster.modify_etc_hosts:
2177
      file_names.append(constants.ETC_HOSTS)
2178

    
2179
    local_checksums = utils.FingerprintFiles(file_names)
2180

    
2181
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
2182
    node_verify_param = {
2183
      constants.NV_FILELIST: file_names,
2184
      constants.NV_NODELIST: [node.name for node in nodeinfo
2185
                              if not node.offline],
2186
      constants.NV_HYPERVISOR: hypervisors,
2187
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
2188
                                  node.secondary_ip) for node in nodeinfo
2189
                                 if not node.offline],
2190
      constants.NV_INSTANCELIST: hypervisors,
2191
      constants.NV_VERSION: None,
2192
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2193
      constants.NV_NODESETUP: None,
2194
      constants.NV_TIME: None,
2195
      constants.NV_MASTERIP: (master_node, master_ip),
2196
      constants.NV_OSLIST: None,
2197
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2198
      }
2199

    
2200
    if vg_name is not None:
2201
      node_verify_param[constants.NV_VGLIST] = None
2202
      node_verify_param[constants.NV_LVLIST] = vg_name
2203
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2204
      node_verify_param[constants.NV_DRBDLIST] = None
2205

    
2206
    if drbd_helper:
2207
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2208

    
2209
    # Build our expected cluster state
2210
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2211
                                                 name=node.name,
2212
                                                 vm_capable=node.vm_capable))
2213
                      for node in nodeinfo)
2214

    
2215
    for instance in instancelist:
2216
      inst_config = instanceinfo[instance]
2217

    
2218
      for nname in inst_config.all_nodes:
2219
        if nname not in node_image:
2220
          # ghost node
2221
          gnode = self.NodeImage(name=nname)
2222
          gnode.ghost = True
2223
          node_image[nname] = gnode
2224

    
2225
      inst_config.MapLVsByNode(node_vol_should)
2226

    
2227
      pnode = inst_config.primary_node
2228
      node_image[pnode].pinst.append(instance)
2229

    
2230
      for snode in inst_config.secondary_nodes:
2231
        nimg = node_image[snode]
2232
        nimg.sinst.append(instance)
2233
        if pnode not in nimg.sbp:
2234
          nimg.sbp[pnode] = []
2235
        nimg.sbp[pnode].append(instance)
2236

    
2237
    # At this point, we have the in-memory data structures complete,
2238
    # except for the runtime information, which we'll gather next
2239

    
2240
    # Due to the way our RPC system works, exact response times cannot be
2241
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2242
    # time before and after executing the request, we can at least have a time
2243
    # window.
2244
    nvinfo_starttime = time.time()
2245
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
2246
                                           self.cfg.GetClusterName())
2247
    nvinfo_endtime = time.time()
2248

    
2249
    all_drbd_map = self.cfg.ComputeDRBDMap()
2250

    
2251
    feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist))
2252
    instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo)
2253

    
2254
    feedback_fn("* Verifying node status")
2255

    
2256
    refos_img = None
2257

    
2258
    for node_i in nodeinfo:
2259
      node = node_i.name
2260
      nimg = node_image[node]
2261

    
2262
      if node_i.offline:
2263
        if verbose:
2264
          feedback_fn("* Skipping offline node %s" % (node,))
2265
        n_offline += 1
2266
        continue
2267

    
2268
      if node == master_node:
2269
        ntype = "master"
2270
      elif node_i.master_candidate:
2271
        ntype = "master candidate"
2272
      elif node_i.drained:
2273
        ntype = "drained"
2274
        n_drained += 1
2275
      else:
2276
        ntype = "regular"
2277
      if verbose:
2278
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2279

    
2280
      msg = all_nvinfo[node].fail_msg
2281
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
2282
      if msg:
2283
        nimg.rpc_fail = True
2284
        continue
2285

    
2286
      nresult = all_nvinfo[node].payload
2287

    
2288
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2289
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2290
      self._VerifyNodeNetwork(node_i, nresult)
2291
      self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums,
2292
                            master_files)
2293

    
2294
      if nimg.vm_capable:
2295
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2296
        self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper,
2297
                             all_drbd_map)
2298

    
2299
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2300
        self._UpdateNodeInstances(node_i, nresult, nimg)
2301
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2302
        self._UpdateNodeOS(node_i, nresult, nimg)
2303
        if not nimg.os_fail:
2304
          if refos_img is None:
2305
            refos_img = nimg
2306
          self._VerifyNodeOS(node_i, nimg, refos_img)
2307

    
2308
    feedback_fn("* Verifying instance status")
2309
    for instance in instancelist:
2310
      if verbose:
2311
        feedback_fn("* Verifying instance %s" % instance)
2312
      inst_config = instanceinfo[instance]
2313
      self._VerifyInstance(instance, inst_config, node_image,
2314
                           instdisk[instance])
2315
      inst_nodes_offline = []
2316

    
2317
      pnode = inst_config.primary_node
2318
      pnode_img = node_image[pnode]
2319
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2320
               self.ENODERPC, pnode, "instance %s, connection to"
2321
               " primary node failed", instance)
2322

    
2323
      if pnode_img.offline:
2324
        inst_nodes_offline.append(pnode)
2325

    
2326
      # If the instance is non-redundant we cannot survive losing its primary
2327
      # node, so we are not N+1 compliant. On the other hand we have no disk
2328
      # templates with more than one secondary so that situation is not well
2329
      # supported either.
2330
      # FIXME: does not support file-backed instances
2331
      if not inst_config.secondary_nodes:
2332
        i_non_redundant.append(instance)
2333
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
2334
               instance, "instance has multiple secondary nodes: %s",
2335
               utils.CommaJoin(inst_config.secondary_nodes),
2336
               code=self.ETYPE_WARNING)
2337

    
2338
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2339
        i_non_a_balanced.append(instance)
2340

    
2341
      for snode in inst_config.secondary_nodes:
2342
        s_img = node_image[snode]
2343
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
2344
                 "instance %s, connection to secondary node failed", instance)
2345

    
2346
        if s_img.offline:
2347
          inst_nodes_offline.append(snode)
2348

    
2349
      # warn that the instance lives on offline nodes
2350
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
2351
               "instance lives on offline node(s) %s",
2352
               utils.CommaJoin(inst_nodes_offline))
2353
      # ... or ghost/non-vm_capable nodes
2354
      for node in inst_config.all_nodes:
2355
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
2356
                 "instance lives on ghost node %s", node)
2357
        _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE,
2358
                 instance, "instance lives on non-vm_capable node %s", node)
2359

    
2360
    feedback_fn("* Verifying orphan volumes")
2361
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2362
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2363

    
2364
    feedback_fn("* Verifying orphan instances")
2365
    self._VerifyOrphanInstances(instancelist, node_image)
2366

    
2367
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2368
      feedback_fn("* Verifying N+1 Memory redundancy")
2369
      self._VerifyNPlusOneMemory(node_image, instanceinfo)
2370

    
2371
    feedback_fn("* Other Notes")
2372
    if i_non_redundant:
2373
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2374
                  % len(i_non_redundant))
2375

    
2376
    if i_non_a_balanced:
2377
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2378
                  % len(i_non_a_balanced))
2379

    
2380
    if n_offline:
2381
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2382

    
2383
    if n_drained:
2384
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2385

    
2386
    return not self.bad
2387

    
2388
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2389
    """Analyze the post-hooks' result
2390

2391
    This method analyses the hook result, handles it, and sends some
2392
    nicely-formatted feedback back to the user.
2393

2394
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2395
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2396
    @param hooks_results: the results of the multi-node hooks rpc call
2397
    @param feedback_fn: function used send feedback back to the caller
2398
    @param lu_result: previous Exec result
2399
    @return: the new Exec result, based on the previous result
2400
        and hook results
2401

2402
    """
2403
    # We only really run POST phase hooks, and are only interested in
2404
    # their results
2405
    if phase == constants.HOOKS_PHASE_POST:
2406
      # Used to change hooks' output to proper indentation
2407
      feedback_fn("* Hooks Results")
2408
      assert hooks_results, "invalid result from hooks"
2409

    
2410
      for node_name in hooks_results:
2411
        res = hooks_results[node_name]
2412
        msg = res.fail_msg
2413
        test = msg and not res.offline
2414
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
2415
                      "Communication failure in hooks execution: %s", msg)
2416
        if res.offline or msg:
2417
          # No need to investigate payload if node is offline or gave an error.
2418
          # override manually lu_result here as _ErrorIf only
2419
          # overrides self.bad
2420
          lu_result = 1
2421
          continue
2422
        for script, hkr, output in res.payload:
2423
          test = hkr == constants.HKR_FAIL
2424
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
2425
                        "Script %s failed, output:", script)
2426
          if test:
2427
            output = self._HOOKS_INDENT_RE.sub('      ', output)
2428
            feedback_fn("%s" % output)
2429
            lu_result = 0
2430

    
2431
      return lu_result
2432

    
2433

    
2434
class LUVerifyDisks(NoHooksLU):
2435
  """Verifies the cluster disks status.
2436

2437
  """
2438
  REQ_BGL = False
2439

    
2440
  def ExpandNames(self):
2441
    self.needed_locks = {
2442
      locking.LEVEL_NODE: locking.ALL_SET,
2443
      locking.LEVEL_INSTANCE: locking.ALL_SET,
2444
    }
2445
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
2446

    
2447
  def Exec(self, feedback_fn):
2448
    """Verify integrity of cluster disks.
2449

2450
    @rtype: tuple of three items
2451
    @return: a tuple of (dict of node-to-node_error, list of instances
2452
        which need activate-disks, dict of instance: (node, volume) for
2453
        missing volumes
2454

2455
    """
2456
    result = res_nodes, res_instances, res_missing = {}, [], {}
2457

    
2458
    nodes = utils.NiceSort(self.cfg.GetNodeList())
2459
    instances = [self.cfg.GetInstanceInfo(name)
2460
                 for name in self.cfg.GetInstanceList()]
2461

    
2462
    nv_dict = {}
2463
    for inst in instances:
2464
      inst_lvs = {}
2465
      if (not inst.admin_up or
2466
          inst.disk_template not in constants.DTS_NET_MIRROR):
2467
        continue
2468
      inst.MapLVsByNode(inst_lvs)
2469
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
2470
      for node, vol_list in inst_lvs.iteritems():
2471
        for vol in vol_list:
2472
          nv_dict[(node, vol)] = inst
2473

    
2474
    if not nv_dict:
2475
      return result
2476

    
2477
    vg_names = self.rpc.call_vg_list(nodes)
2478
    vg_names.Raise("Cannot get list of VGs")
2479

    
2480
    for node in nodes:
2481
      # node_volume
2482
      node_res = self.rpc.call_lv_list([node],
2483
                                       vg_names[node].payload.keys())[node]
2484
      if node_res.offline:
2485
        continue
2486
      msg = node_res.fail_msg
2487
      if msg:
2488
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
2489
        res_nodes[node] = msg
2490
        continue
2491

    
2492
      lvs = node_res.payload
2493
      for lv_name, (_, _, lv_online) in lvs.items():
2494
        inst = nv_dict.pop((node, lv_name), None)
2495
        if (not lv_online and inst is not None
2496
            and inst.name not in res_instances):
2497
          res_instances.append(inst.name)
2498

    
2499
    # any leftover items in nv_dict are missing LVs, let's arrange the
2500
    # data better
2501
    for key, inst in nv_dict.iteritems():
2502
      if inst.name not in res_missing:
2503
        res_missing[inst.name] = []
2504
      res_missing[inst.name].append(key)
2505

    
2506
    return result
2507

    
2508

    
2509
class LURepairDiskSizes(NoHooksLU):
2510
  """Verifies the cluster disks sizes.
2511

2512
  """
2513
  _OP_PARAMS = [("instances", ht.EmptyList, ht.TListOf(ht.TNonEmptyString))]
2514
  REQ_BGL = False
2515

    
2516
  def ExpandNames(self):
2517
    if self.op.instances:
2518
      self.wanted_names = []
2519
      for name in self.op.instances:
2520
        full_name = _ExpandInstanceName(self.cfg, name)
2521
        self.wanted_names.append(full_name)
2522
      self.needed_locks = {
2523
        locking.LEVEL_NODE: [],
2524
        locking.LEVEL_INSTANCE: self.wanted_names,
2525
        }
2526
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2527
    else:
2528
      self.wanted_names = None
2529
      self.needed_locks = {
2530
        locking.LEVEL_NODE: locking.ALL_SET,
2531
        locking.LEVEL_INSTANCE: locking.ALL_SET,
2532
        }
2533
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
2534

    
2535
  def DeclareLocks(self, level):
2536
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
2537
      self._LockInstancesNodes(primary_only=True)
2538

    
2539
  def CheckPrereq(self):
2540
    """Check prerequisites.
2541

2542
    This only checks the optional instance list against the existing names.
2543

2544
    """
2545
    if self.wanted_names is None:
2546
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2547

    
2548
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
2549
                             in self.wanted_names]
2550

    
2551
  def _EnsureChildSizes(self, disk):
2552
    """Ensure children of the disk have the needed disk size.
2553

2554
    This is valid mainly for DRBD8 and fixes an issue where the
2555
    children have smaller disk size.
2556

2557
    @param disk: an L{ganeti.objects.Disk} object
2558

2559
    """
2560
    if disk.dev_type == constants.LD_DRBD8:
2561
      assert disk.children, "Empty children for DRBD8?"
2562
      fchild = disk.children[0]
2563
      mismatch = fchild.size < disk.size
2564
      if mismatch:
2565
        self.LogInfo("Child disk has size %d, parent %d, fixing",
2566
                     fchild.size, disk.size)
2567
        fchild.size = disk.size
2568

    
2569
      # and we recurse on this child only, not on the metadev
2570
      return self._EnsureChildSizes(fchild) or mismatch
2571
    else:
2572
      return False
2573

    
2574
  def Exec(self, feedback_fn):
2575
    """Verify the size of cluster disks.
2576

2577
    """
2578
    # TODO: check child disks too
2579
    # TODO: check differences in size between primary/secondary nodes
2580
    per_node_disks = {}
2581
    for instance in self.wanted_instances:
2582
      pnode = instance.primary_node
2583
      if pnode not in per_node_disks:
2584
        per_node_disks[pnode] = []
2585
      for idx, disk in enumerate(instance.disks):
2586
        per_node_disks[pnode].append((instance, idx, disk))
2587

    
2588
    changed = []
2589
    for node, dskl in per_node_disks.items():
2590
      newl = [v[2].Copy() for v in dskl]
2591
      for dsk in newl:
2592
        self.cfg.SetDiskID(dsk, node)
2593
      result = self.rpc.call_blockdev_getsizes(node, newl)
2594
      if result.fail_msg:
2595
        self.LogWarning("Failure in blockdev_getsizes call to node"
2596
                        " %s, ignoring", node)
2597
        continue
2598
      if len(result.data) != len(dskl):
2599
        self.LogWarning("Invalid result from node %s, ignoring node results",
2600
                        node)
2601
        continue
2602
      for ((instance, idx, disk), size) in zip(dskl, result.data):
2603
        if size is None:
2604
          self.LogWarning("Disk %d of instance %s did not return size"
2605
                          " information, ignoring", idx, instance.name)
2606
          continue
2607
        if not isinstance(size, (int, long)):
2608
          self.LogWarning("Disk %d of instance %s did not return valid"
2609
                          " size information, ignoring", idx, instance.name)
2610
          continue
2611
        size = size >> 20
2612
        if size != disk.size:
2613
          self.LogInfo("Disk %d of instance %s has mismatched size,"
2614
                       " correcting: recorded %d, actual %d", idx,
2615
                       instance.name, disk.size, size)
2616
          disk.size = size
2617
          self.cfg.Update(instance, feedback_fn)
2618
          changed.append((instance.name, idx, size))
2619
        if self._EnsureChildSizes(disk):
2620
          self.cfg.Update(instance, feedback_fn)
2621
          changed.append((instance.name, idx, disk.size))
2622
    return changed
2623

    
2624

    
2625
class LURenameCluster(LogicalUnit):
2626
  """Rename the cluster.
2627

2628
  """
2629
  HPATH = "cluster-rename"
2630
  HTYPE = constants.HTYPE_CLUSTER
2631
  _OP_PARAMS = [("name", ht.NoDefault, ht.TNonEmptyString)]
2632

    
2633
  def BuildHooksEnv(self):
2634
    """Build hooks env.
2635

2636
    """
2637
    env = {
2638
      "OP_TARGET": self.cfg.GetClusterName(),
2639
      "NEW_NAME": self.op.name,
2640
      }
2641
    mn = self.cfg.GetMasterNode()
2642
    all_nodes = self.cfg.GetNodeList()
2643
    return env, [mn], all_nodes
2644

    
2645
  def CheckPrereq(self):
2646
    """Verify that the passed name is a valid one.
2647

2648
    """
2649
    hostname = netutils.GetHostname(name=self.op.name,
2650
                                    family=self.cfg.GetPrimaryIPFamily())
2651

    
2652
    new_name = hostname.name
2653
    self.ip = new_ip = hostname.ip
2654
    old_name = self.cfg.GetClusterName()
2655
    old_ip = self.cfg.GetMasterIP()
2656
    if new_name == old_name and new_ip == old_ip:
2657
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
2658
                                 " cluster has changed",
2659
                                 errors.ECODE_INVAL)
2660
    if new_ip != old_ip:
2661
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
2662
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
2663
                                   " reachable on the network" %
2664
                                   new_ip, errors.ECODE_NOTUNIQUE)
2665

    
2666
    self.op.name = new_name
2667

    
2668
  def Exec(self, feedback_fn):
2669
    """Rename the cluster.
2670

2671
    """
2672
    clustername = self.op.name
2673
    ip = self.ip
2674

    
2675
    # shutdown the master IP
2676
    master = self.cfg.GetMasterNode()
2677
    result = self.rpc.call_node_stop_master(master, False)
2678
    result.Raise("Could not disable the master role")
2679

    
2680
    try:
2681
      cluster = self.cfg.GetClusterInfo()
2682
      cluster.cluster_name = clustername
2683
      cluster.master_ip = ip
2684
      self.cfg.Update(cluster, feedback_fn)
2685

    
2686
      # update the known hosts file
2687
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
2688
      node_list = self.cfg.GetOnlineNodeList()
2689
      try:
2690
        node_list.remove(master)
2691
      except ValueError:
2692
        pass
2693
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
2694
    finally:
2695
      result = self.rpc.call_node_start_master(master, False, False)
2696
      msg = result.fail_msg
2697
      if msg:
2698
        self.LogWarning("Could not re-enable the master role on"
2699
                        " the master, please restart manually: %s", msg)
2700

    
2701
    return clustername
2702

    
2703

    
2704
class LUSetClusterParams(LogicalUnit):
2705
  """Change the parameters of the cluster.
2706

2707
  """
2708
  HPATH = "cluster-modify"
2709
  HTYPE = constants.HTYPE_CLUSTER
2710
  _OP_PARAMS = [
2711
    ("vg_name", None, ht.TMaybeString),
2712
    ("enabled_hypervisors", None,
2713
     ht.TOr(ht.TAnd(ht.TListOf(ht.TElemOf(constants.HYPER_TYPES)), ht.TTrue),
2714
            ht.TNone)),
2715
    ("hvparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
2716
                              ht.TNone)),
2717
    ("beparams", None, ht.TOr(ht.TDict, ht.TNone)),
2718
    ("os_hvp", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
2719
                            ht.TNone)),
2720
    ("osparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
2721
                              ht.TNone)),
2722
    ("candidate_pool_size", None, ht.TOr(ht.TStrictPositiveInt, ht.TNone)),
2723
    ("uid_pool", None, ht.NoType),
2724
    ("add_uids", None, ht.NoType),
2725
    ("remove_uids", None, ht.NoType),
2726
    ("maintain_node_health", None, ht.TMaybeBool),
2727
    ("prealloc_wipe_disks", None, ht.TMaybeBool),
2728
    ("nicparams", None, ht.TOr(ht.TDict, ht.TNone)),
2729
    ("ndparams", None, ht.TOr(ht.TDict, ht.TNone)),
2730
    ("drbd_helper", None, ht.TOr(ht.TString, ht.TNone)),
2731
    ("default_iallocator", None, ht.TOr(ht.TString, ht.TNone)),
2732
    ("master_netdev", None, ht.TOr(ht.TString, ht.TNone)),
2733
    ("reserved_lvs", None, ht.TOr(ht.TListOf(ht.TNonEmptyString), ht.TNone)),
2734
    ("hidden_os", None, ht.TOr(ht.TListOf(\
2735
          ht.TAnd(ht.TList,
2736
                ht.TIsLength(2),
2737
                ht.TMap(lambda v: v[0], ht.TElemOf(constants.DDMS_VALUES)))),
2738
          ht.TNone)),
2739
    ("blacklisted_os", None, ht.TOr(ht.TListOf(\
2740
          ht.TAnd(ht.TList,
2741
                ht.TIsLength(2),
2742
                ht.TMap(lambda v: v[0], ht.TElemOf(constants.DDMS_VALUES)))),
2743
          ht.TNone)),
2744
    ]
2745
  REQ_BGL = False
2746

    
2747
  def CheckArguments(self):
2748
    """Check parameters
2749

2750
    """
2751
    if self.op.uid_pool:
2752
      uidpool.CheckUidPool(self.op.uid_pool)
2753

    
2754
    if self.op.add_uids:
2755
      uidpool.CheckUidPool(self.op.add_uids)
2756

    
2757
    if self.op.remove_uids:
2758
      uidpool.CheckUidPool(self.op.remove_uids)
2759

    
2760
  def ExpandNames(self):
2761
    # FIXME: in the future maybe other cluster params won't require checking on
2762
    # all nodes to be modified.
2763
    self.needed_locks = {
2764
      locking.LEVEL_NODE: locking.ALL_SET,
2765
    }
2766
    self.share_locks[locking.LEVEL_NODE] = 1
2767

    
2768
  def BuildHooksEnv(self):
2769
    """Build hooks env.
2770

2771
    """
2772
    env = {
2773
      "OP_TARGET": self.cfg.GetClusterName(),
2774
      "NEW_VG_NAME": self.op.vg_name,
2775
      }
2776
    mn = self.cfg.GetMasterNode()
2777
    return env, [mn], [mn]
2778

    
2779
  def CheckPrereq(self):
2780
    """Check prerequisites.
2781

2782
    This checks whether the given params don't conflict and
2783
    if the given volume group is valid.
2784

2785
    """
2786
    if self.op.vg_name is not None and not self.op.vg_name:
2787
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
2788
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
2789
                                   " instances exist", errors.ECODE_INVAL)
2790

    
2791
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
2792
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
2793
        raise errors.OpPrereqError("Cannot disable drbd helper while"
2794
                                   " drbd-based instances exist",
2795
                                   errors.ECODE_INVAL)
2796

    
2797
    node_list = self.acquired_locks[locking.LEVEL_NODE]
2798

    
2799
    # if vg_name not None, checks given volume group on all nodes
2800
    if self.op.vg_name:
2801
      vglist = self.rpc.call_vg_list(node_list)
2802
      for node in node_list:
2803
        msg = vglist[node].fail_msg
2804
        if msg:
2805
          # ignoring down node
2806
          self.LogWarning("Error while gathering data on node %s"
2807
                          " (ignoring node): %s", node, msg)
2808
          continue
2809
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2810
                                              self.op.vg_name,
2811
                                              constants.MIN_VG_SIZE)
2812
        if vgstatus:
2813
          raise errors.OpPrereqError("Error on node '%s': %s" %
2814
                                     (node, vgstatus), errors.ECODE_ENVIRON)
2815

    
2816
    if self.op.drbd_helper:
2817
      # checks given drbd helper on all nodes
2818
      helpers = self.rpc.call_drbd_helper(node_list)
2819
      for node in node_list:
2820
        ninfo = self.cfg.GetNodeInfo(node)
2821
        if ninfo.offline:
2822
          self.LogInfo("Not checking drbd helper on offline node %s", node)
2823
          continue
2824
        msg = helpers[node].fail_msg
2825
        if msg:
2826
          raise errors.OpPrereqError("Error checking drbd helper on node"
2827
                                     " '%s': %s" % (node, msg),
2828
                                     errors.ECODE_ENVIRON)
2829
        node_helper = helpers[node].payload
2830
        if node_helper != self.op.drbd_helper:
2831
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
2832
                                     (node, node_helper), errors.ECODE_ENVIRON)
2833

    
2834
    self.cluster = cluster = self.cfg.GetClusterInfo()
2835
    # validate params changes
2836
    if self.op.beparams:
2837
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2838
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
2839

    
2840
    if self.op.ndparams:
2841
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
2842
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
2843

    
2844
    if self.op.nicparams:
2845
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2846
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
2847
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
2848
      nic_errors = []
2849

    
2850
      # check all instances for consistency
2851
      for instance in self.cfg.GetAllInstancesInfo().values():
2852
        for nic_idx, nic in enumerate(instance.nics):
2853
          params_copy = copy.deepcopy(nic.nicparams)
2854
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
2855

    
2856
          # check parameter syntax
2857
          try:
2858
            objects.NIC.CheckParameterSyntax(params_filled)
2859
          except errors.ConfigurationError, err:
2860
            nic_errors.append("Instance %s, nic/%d: %s" %
2861
                              (instance.name, nic_idx, err))
2862

    
2863
          # if we're moving instances to routed, check that they have an ip
2864
          target_mode = params_filled[constants.NIC_MODE]
2865
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2866
            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2867
                              (instance.name, nic_idx))
2868
      if nic_errors:
2869
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2870
                                   "\n".join(nic_errors))
2871

    
2872
    # hypervisor list/parameters
2873
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
2874
    if self.op.hvparams:
2875
      for hv_name, hv_dict in self.op.hvparams.items():
2876
        if hv_name not in self.new_hvparams:
2877
          self.new_hvparams[hv_name] = hv_dict
2878
        else:
2879
          self.new_hvparams[hv_name].update(hv_dict)
2880

    
2881
    # os hypervisor parameters
2882
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
2883
    if self.op.os_hvp:
2884
      for os_name, hvs in self.op.os_hvp.items():
2885
        if os_name not in self.new_os_hvp:
2886
          self.new_os_hvp[os_name] = hvs
2887
        else:
2888
          for hv_name, hv_dict in hvs.items():
2889
            if hv_name not in self.new_os_hvp[os_name]:
2890
              self.new_os_hvp[os_name][hv_name] = hv_dict
2891
            else:
2892
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
2893

    
2894
    # os parameters
2895
    self.new_osp = objects.FillDict(cluster.osparams, {})
2896
    if self.op.osparams:
2897
      for os_name, osp in self.op.osparams.items():
2898
        if os_name not in self.new_osp:
2899
          self.new_osp[os_name] = {}
2900

    
2901
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
2902
                                                  use_none=True)
2903

    
2904
        if not self.new_osp[os_name]:
2905
          # we removed all parameters
2906
          del self.new_osp[os_name]
2907
        else:
2908
          # check the parameter validity (remote check)
2909
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
2910
                         os_name, self.new_osp[os_name])
2911

    
2912
    # changes to the hypervisor list
2913
    if self.op.enabled_hypervisors is not None:
2914
      self.hv_list = self.op.enabled_hypervisors
2915
      for hv in self.hv_list:
2916
        # if the hypervisor doesn't already exist in the cluster
2917
        # hvparams, we initialize it to empty, and then (in both
2918
        # cases) we make sure to fill the defaults, as we might not
2919
        # have a complete defaults list if the hypervisor wasn't
2920
        # enabled before
2921
        if hv not in new_hvp:
2922
          new_hvp[hv] = {}
2923
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
2924
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
2925
    else:
2926
      self.hv_list = cluster.enabled_hypervisors
2927

    
2928
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
2929
      # either the enabled list has changed, or the parameters have, validate
2930
      for hv_name, hv_params in self.new_hvparams.items():
2931
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
2932
            (self.op.enabled_hypervisors and
2933
             hv_name in self.op.enabled_hypervisors)):
2934
          # either this is a new hypervisor, or its parameters have changed
2935
          hv_class = hypervisor.GetHypervisor(hv_name)
2936
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2937
          hv_class.CheckParameterSyntax(hv_params)
2938
          _CheckHVParams(self, node_list, hv_name, hv_params)
2939

    
2940
    if self.op.os_hvp:
2941
      # no need to check any newly-enabled hypervisors, since the
2942
      # defaults have already been checked in the above code-block
2943
      for os_name, os_hvp in self.new_os_hvp.items():
2944
        for hv_name, hv_params in os_hvp.items():
2945
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2946
          # we need to fill in the new os_hvp on top of the actual hv_p
2947
          cluster_defaults = self.new_hvparams.get(hv_name, {})
2948
          new_osp = objects.FillDict(cluster_defaults, hv_params)
2949
          hv_class = hypervisor.GetHypervisor(hv_name)
2950
          hv_class.CheckParameterSyntax(new_osp)
2951
          _CheckHVParams(self, node_list, hv_name, new_osp)
2952

    
2953
    if self.op.default_iallocator:
2954
      alloc_script = utils.FindFile(self.op.default_iallocator,
2955
                                    constants.IALLOCATOR_SEARCH_PATH,
2956
                                    os.path.isfile)
2957
      if alloc_script is None:
2958
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
2959
                                   " specified" % self.op.default_iallocator,
2960
                                   errors.ECODE_INVAL)
2961

    
2962
  def Exec(self, feedback_fn):
2963
    """Change the parameters of the cluster.
2964

2965
    """
2966
    if self.op.vg_name is not None:
2967
      new_volume = self.op.vg_name
2968
      if not new_volume:
2969
        new_volume = None
2970
      if new_volume != self.cfg.GetVGName():
2971
        self.cfg.SetVGName(new_volume)
2972
      else:
2973
        feedback_fn("Cluster LVM configuration already in desired"
2974
                    " state, not changing")
2975
    if self.op.drbd_helper is not None:
2976
      new_helper = self.op.drbd_helper
2977
      if not new_helper:
2978
        new_helper = None
2979
      if new_helper != self.cfg.GetDRBDHelper():
2980
        self.cfg.SetDRBDHelper(new_helper)
2981
      else:
2982
        feedback_fn("Cluster DRBD helper already in desired state,"
2983
                    " not changing")
2984
    if self.op.hvparams:
2985
      self.cluster.hvparams = self.new_hvparams
2986
    if self.op.os_hvp:
2987
      self.cluster.os_hvp = self.new_os_hvp
2988
    if self.op.enabled_hypervisors is not None:
2989
      self.cluster.hvparams = self.new_hvparams
2990
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2991
    if self.op.beparams:
2992
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2993
    if self.op.nicparams:
2994
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2995
    if self.op.osparams:
2996
      self.cluster.osparams = self.new_osp
2997
    if self.op.ndparams:
2998
      self.cluster.ndparams = self.new_ndparams
2999

    
3000
    if self.op.candidate_pool_size is not None:
3001
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3002
      # we need to update the pool size here, otherwise the save will fail
3003
      _AdjustCandidatePool(self, [])
3004

    
3005
    if self.op.maintain_node_health is not None:
3006
      self.cluster.maintain_node_health = self.op.maintain_node_health
3007

    
3008
    if self.op.prealloc_wipe_disks is not None:
3009
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3010

    
3011
    if self.op.add_uids is not None:
3012
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3013

    
3014
    if self.op.remove_uids is not None:
3015
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3016

    
3017
    if self.op.uid_pool is not None:
3018
      self.cluster.uid_pool = self.op.uid_pool
3019

    
3020
    if self.op.default_iallocator is not None:
3021
      self.cluster.default_iallocator = self.op.default_iallocator
3022

    
3023
    if self.op.reserved_lvs is not None:
3024
      self.cluster.reserved_lvs = self.op.reserved_lvs
3025

    
3026
    def helper_os(aname, mods, desc):
3027
      desc += " OS list"
3028
      lst = getattr(self.cluster, aname)
3029
      for key, val in mods:
3030
        if key == constants.DDM_ADD:
3031
          if val in lst:
3032
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3033
          else:
3034
            lst.append(val)
3035
        elif key == constants.DDM_REMOVE:
3036
          if val in lst:
3037
            lst.remove(val)
3038
          else:
3039
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3040
        else:
3041
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3042

    
3043
    if self.op.hidden_os:
3044
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3045

    
3046
    if self.op.blacklisted_os:
3047
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3048

    
3049
    if self.op.master_netdev:
3050
      master = self.cfg.GetMasterNode()
3051
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3052
                  self.cluster.master_netdev)
3053
      result = self.rpc.call_node_stop_master(master, False)
3054
      result.Raise("Could not disable the master ip")
3055
      feedback_fn("Changing master_netdev from %s to %s" %
3056
                  (self.cluster.master_netdev, self.op.master_netdev))
3057
      self.cluster.master_netdev = self.op.master_netdev
3058

    
3059
    self.cfg.Update(self.cluster, feedback_fn)
3060

    
3061
    if self.op.master_netdev:
3062
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3063
                  self.op.master_netdev)
3064
      result = self.rpc.call_node_start_master(master, False, False)
3065
      if result.fail_msg:
3066
        self.LogWarning("Could not re-enable the master ip on"
3067
                        " the master, please restart manually: %s",
3068
                        result.fail_msg)
3069

    
3070

    
3071
def _UploadHelper(lu, nodes, fname):
3072
  """Helper for uploading a file and showing warnings.
3073

3074
  """
3075
  if os.path.exists(fname):
3076
    result = lu.rpc.call_upload_file(nodes, fname)
3077
    for to_node, to_result in result.items():
3078
      msg = to_result.fail_msg
3079
      if msg:
3080
        msg = ("Copy of file %s to node %s failed: %s" %
3081
               (fname, to_node, msg))
3082
        lu.proc.LogWarning(msg)
3083

    
3084

    
3085
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3086
  """Distribute additional files which are part of the cluster configuration.
3087

3088
  ConfigWriter takes care of distributing the config and ssconf files, but
3089
  there are more files which should be distributed to all nodes. This function
3090
  makes sure those are copied.
3091

3092
  @param lu: calling logical unit
3093
  @param additional_nodes: list of nodes not in the config to distribute to
3094
  @type additional_vm: boolean
3095
  @param additional_vm: whether the additional nodes are vm-capable or not
3096

3097
  """
3098
  # 1. Gather target nodes
3099
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3100
  dist_nodes = lu.cfg.GetOnlineNodeList()
3101
  nvm_nodes = lu.cfg.GetNonVmCapableNodeList()
3102
  vm_nodes = [name for name in dist_nodes if name not in nvm_nodes]
3103
  if additional_nodes is not None:
3104
    dist_nodes.extend(additional_nodes)
3105
    if additional_vm:
3106
      vm_nodes.extend(additional_nodes)
3107
  if myself.name in dist_nodes:
3108
    dist_nodes.remove(myself.name)
3109
  if myself.name in vm_nodes:
3110
    vm_nodes.remove(myself.name)
3111

    
3112
  # 2. Gather files to distribute
3113
  dist_files = set([constants.ETC_HOSTS,
3114
                    constants.SSH_KNOWN_HOSTS_FILE,
3115
                    constants.RAPI_CERT_FILE,
3116
                    constants.RAPI_USERS_FILE,
3117
                    constants.CONFD_HMAC_KEY,
3118
                    constants.CLUSTER_DOMAIN_SECRET_FILE,
3119
                   ])
3120

    
3121
  vm_files = set()
3122
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
3123
  for hv_name in enabled_hypervisors:
3124
    hv_class = hypervisor.GetHypervisor(hv_name)
3125
    vm_files.update(hv_class.GetAncillaryFiles())
3126

    
3127
  # 3. Perform the files upload
3128
  for fname in dist_files:
3129
    _UploadHelper(lu, dist_nodes, fname)
3130
  for fname in vm_files:
3131
    _UploadHelper(lu, vm_nodes, fname)
3132

    
3133

    
3134
class LURedistributeConfig(NoHooksLU):
3135
  """Force the redistribution of cluster configuration.
3136

3137
  This is a very simple LU.
3138

3139
  """
3140
  REQ_BGL = False
3141

    
3142
  def ExpandNames(self):
3143
    self.needed_locks = {
3144
      locking.LEVEL_NODE: locking.ALL_SET,
3145
    }
3146
    self.share_locks[locking.LEVEL_NODE] = 1
3147

    
3148
  def Exec(self, feedback_fn):
3149
    """Redistribute the configuration.
3150

3151
    """
3152
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3153
    _RedistributeAncillaryFiles(self)
3154

    
3155

    
3156
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3157
  """Sleep and poll for an instance's disk to sync.
3158

3159
  """
3160
  if not instance.disks or disks is not None and not disks:
3161
    return True
3162

    
3163
  disks = _ExpandCheckDisks(instance, disks)
3164

    
3165
  if not oneshot:
3166
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3167

    
3168
  node = instance.primary_node
3169

    
3170
  for dev in disks:
3171
    lu.cfg.SetDiskID(dev, node)
3172

    
3173
  # TODO: Convert to utils.Retry
3174

    
3175
  retries = 0
3176
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3177
  while True:
3178
    max_time = 0
3179
    done = True
3180
    cumul_degraded = False
3181
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3182
    msg = rstats.fail_msg
3183
    if msg:
3184
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3185
      retries += 1
3186
      if retries >= 10:
3187
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3188
                                 " aborting." % node)
3189
      time.sleep(6)
3190
      continue
3191
    rstats = rstats.payload
3192
    retries = 0
3193
    for i, mstat in enumerate(rstats):
3194
      if mstat is None:
3195
        lu.LogWarning("Can't compute data for node %s/%s",
3196
                           node, disks[i].iv_name)
3197
        continue
3198

    
3199
      cumul_degraded = (cumul_degraded or
3200
                        (mstat.is_degraded and mstat.sync_percent is None))
3201
      if mstat.sync_percent is not None:
3202
        done = False
3203
        if mstat.estimated_time is not None:
3204
          rem_time = ("%s remaining (estimated)" %
3205
                      utils.FormatSeconds(mstat.estimated_time))
3206
          max_time = mstat.estimated_time
3207
        else:
3208
          rem_time = "no time estimate"
3209
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
3210
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
3211

    
3212
    # if we're done but degraded, let's do a few small retries, to
3213
    # make sure we see a stable and not transient situation; therefore
3214
    # we force restart of the loop
3215
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3216
      logging.info("Degraded disks found, %d retries left", degr_retries)
3217
      degr_retries -= 1
3218
      time.sleep(1)
3219
      continue
3220

    
3221
    if done or oneshot:
3222
      break
3223

    
3224
    time.sleep(min(60, max_time))
3225

    
3226
  if done:
3227
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
3228
  return not cumul_degraded
3229

    
3230

    
3231
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3232
  """Check that mirrors are not degraded.
3233

3234
  The ldisk parameter, if True, will change the test from the
3235
  is_degraded attribute (which represents overall non-ok status for
3236
  the device(s)) to the ldisk (representing the local storage status).
3237

3238
  """
3239
  lu.cfg.SetDiskID(dev, node)
3240

    
3241
  result = True
3242

    
3243
  if on_primary or dev.AssembleOnSecondary():
3244
    rstats = lu.rpc.call_blockdev_find(node, dev)
3245
    msg = rstats.fail_msg
3246
    if msg:
3247
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
3248
      result = False
3249
    elif not rstats.payload:
3250
      lu.LogWarning("Can't find disk on node %s", node)
3251
      result = False
3252
    else:
3253
      if ldisk:
3254
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
3255
      else:
3256
        result = result and not rstats.payload.is_degraded
3257

    
3258
  if dev.children:
3259
    for child in dev.children:
3260
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
3261

    
3262
  return result
3263

    
3264

    
3265
class LUOobCommand(NoHooksLU):
3266
  """Logical unit for OOB handling.
3267

3268
  """
3269
  _OP_PARAMS = [
3270
    _PNodeName,
3271
    ("command", None, ht.TElemOf(constants.OOB_COMMANDS)),
3272
    ("timeout", constants.OOB_TIMEOUT, ht.TInt),
3273
    ]
3274
  REG_BGL = False
3275

    
3276
  def CheckPrereq(self):
3277
    """Check prerequisites.
3278

3279
    This checks:
3280
     - the node exists in the configuration
3281
     - OOB is supported
3282

3283
    Any errors are signaled by raising errors.OpPrereqError.
3284

3285
    """
3286
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3287
    node = self.cfg.GetNodeInfo(self.op.node_name)
3288

    
3289
    if node is None:
3290
      raise errors.OpPrereqError("Node %s not found" % self.op.node_name)
3291

    
3292
    self.oob_program = _SupportsOob(self.cfg, node)
3293

    
3294
    if not self.oob_program:
3295
      raise errors.OpPrereqError("OOB is not supported for node %s" %
3296
                                 self.op.node_name)
3297

    
3298
    if self.op.command == constants.OOB_POWER_OFF and not node.offline:
3299
      raise errors.OpPrereqError(("Cannot power off node %s because it is"
3300
                                  " not marked offline") % self.op.node_name)
3301

    
3302
    self.node = node
3303

    
3304
  def ExpandNames(self):
3305
    """Gather locks we need.
3306

3307
    """
3308
    node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3309
    self.needed_locks = {
3310
      locking.LEVEL_NODE: [node_name],
3311
      }
3312

    
3313
  def Exec(self, feedback_fn):
3314
    """Execute OOB and return result if we expect any.
3315

3316
    """
3317
    master_node = self.cfg.GetMasterNode()
3318
    node = self.node
3319

    
3320
    logging.info("Executing out-of-band command '%s' using '%s' on %s",
3321
                 self.op.command, self.oob_program, self.op.node_name)
3322
    result = self.rpc.call_run_oob(master_node, self.oob_program,
3323
                                   self.op.command, self.op.node_name,
3324
                                   self.op.timeout)
3325

    
3326
    result.Raise("An error occurred on execution of OOB helper")
3327

    
3328
    self._CheckPayload(result)
3329

    
3330
    if self.op.command == constants.OOB_HEALTH:
3331
      # For health we should log important events
3332
      for item, status in result.payload:
3333
        if status in [constants.OOB_STATUS_WARNING,
3334
                      constants.OOB_STATUS_CRITICAL]:
3335
          logging.warning("On node '%s' item '%s' has status '%s'",
3336
                          self.op.node_name, item, status)
3337

    
3338
    if self.op.command == constants.OOB_POWER_ON:
3339
      node.powered = True
3340
    elif self.op.command == constants.OOB_POWER_OFF:
3341
      node.powered = False
3342
    elif self.op.command == constants.OOB_POWER_STATUS:
3343
      powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
3344
      if powered != self.node.powered:
3345
        logging.warning(("Recorded power state (%s) of node '%s' does not match"
3346
                         " actual power state (%s)"), node.powered,
3347
                        self.op.node_name, powered)
3348

    
3349
    self.cfg.Update(node, feedback_fn)
3350

    
3351
    return result.payload
3352

    
3353
  def _CheckPayload(self, result):
3354
    """Checks if the payload is valid.
3355

3356
    @param result: RPC result
3357
    @raises errors.OpExecError: If payload is not valid
3358

3359
    """
3360
    errs = []
3361
    if self.op.command == constants.OOB_HEALTH:
3362
      if not isinstance(result.payload, list):
3363
        errs.append("command 'health' is expected to return a list but got %s" %
3364
                    type(result.payload))
3365
      for item, status in result.payload:
3366
        if status not in constants.OOB_STATUSES:
3367
          errs.append("health item '%s' has invalid status '%s'" %
3368
                      (item, status))
3369

    
3370
    if self.op.command == constants.OOB_POWER_STATUS:
3371
      if not isinstance(result.payload, dict):
3372
        errs.append("power-status is expected to return a dict but got %s" %
3373
                    type(result.payload))
3374

    
3375
    if self.op.command in [
3376
        constants.OOB_POWER_ON,
3377
        constants.OOB_POWER_OFF,
3378
        constants.OOB_POWER_CYCLE,
3379
        ]:
3380
      if result.payload is not None:
3381
        errs.append("%s is expected to not return payload but got '%s'" %
3382
                    (self.op.command, result.payload))
3383

    
3384
    if errs:
3385
      raise errors.OpExecError("Check of out-of-band payload failed due to %s" %
3386
                               utils.CommaJoin(errs))
3387

    
3388

    
3389

    
3390
class LUDiagnoseOS(NoHooksLU):
3391
  """Logical unit for OS diagnose/query.
3392

3393
  """
3394
  _OP_PARAMS = [
3395
    _POutputFields,
3396
    ("names", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
3397
    ]
3398
  REQ_BGL = False
3399
  _HID = "hidden"
3400
  _BLK = "blacklisted"
3401
  _VLD = "valid"
3402
  _FIELDS_STATIC = utils.FieldSet()
3403
  _FIELDS_DYNAMIC = utils.FieldSet("name", _VLD, "node_status", "variants",
3404
                                   "parameters", "api_versions", _HID, _BLK)
3405

    
3406
  def CheckArguments(self):
3407
    if self.op.names:
3408
      raise errors.OpPrereqError("Selective OS query not supported",
3409
                                 errors.ECODE_INVAL)
3410

    
3411
    _CheckOutputFields(static=self._FIELDS_STATIC,
3412
                       dynamic=self._FIELDS_DYNAMIC,
3413
                       selected=self.op.output_fields)
3414

    
3415
  def ExpandNames(self):
3416
    # Lock all nodes, in shared mode
3417
    # Temporary removal of locks, should be reverted later
3418
    # TODO: reintroduce locks when they are lighter-weight
3419
    self.needed_locks = {}
3420
    #self.share_locks[locking.LEVEL_NODE] = 1
3421
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3422

    
3423
  @staticmethod
3424
  def _DiagnoseByOS(rlist):
3425
    """Remaps a per-node return list into an a per-os per-node dictionary
3426

3427
    @param rlist: a map with node names as keys and OS objects as values
3428

3429
    @rtype: dict
3430
    @return: a dictionary with osnames as keys and as value another
3431
        map, with nodes as keys and tuples of (path, status, diagnose,
3432
        variants, parameters, api_versions) as values, eg::
3433

3434
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
3435
                                     (/srv/..., False, "invalid api")],
3436
                           "node2": [(/srv/..., True, "", [], [])]}
3437
          }
3438

3439
    """
3440
    all_os = {}
3441
    # we build here the list of nodes that didn't fail the RPC (at RPC
3442
    # level), so that nodes with a non-responding node daemon don't
3443
    # make all OSes invalid
3444
    good_nodes = [node_name for node_name in rlist
3445
                  if not rlist[node_name].fail_msg]
3446
    for node_name, nr in rlist.items():
3447
      if nr.fail_msg or not nr.payload:
3448
        continue
3449
      for (name, path, status, diagnose, variants,
3450
           params, api_versions) in nr.payload:
3451
        if name not in all_os:
3452
          # build a list of nodes for this os containing empty lists
3453
          # for each node in node_list
3454
          all_os[name] = {}
3455
          for nname in good_nodes:
3456
            all_os[name][nname] = []
3457
        # convert params from [name, help] to (name, help)
3458
        params = [tuple(v) for v in params]
3459
        all_os[name][node_name].append((path, status, diagnose,
3460
                                        variants, params, api_versions))
3461
    return all_os
3462

    
3463
  def Exec(self, feedback_fn):
3464
    """Compute the list of OSes.
3465

3466
    """
3467
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
3468
    node_data = self.rpc.call_os_diagnose(valid_nodes)
3469
    pol = self._DiagnoseByOS(node_data)
3470
    output = []
3471
    cluster = self.cfg.GetClusterInfo()
3472

    
3473
    for os_name in utils.NiceSort(pol.keys()):
3474
      os_data = pol[os_name]
3475
      row = []
3476
      valid = True
3477
      (variants, params, api_versions) = null_state = (set(), set(), set())
3478
      for idx, osl in enumerate(os_data.values()):
3479
        valid = bool(valid and osl and osl[0][1])
3480
        if not valid:
3481
          (variants, params, api_versions) = null_state
3482
          break
3483
        node_variants, node_params, node_api = osl[0][3:6]
3484
        if idx == 0: # first entry
3485
          variants = set(node_variants)
3486
          params = set(node_params)
3487
          api_versions = set(node_api)
3488
        else: # keep consistency
3489
          variants.intersection_update(node_variants)
3490
          params.intersection_update(node_params)
3491
          api_versions.intersection_update(node_api)
3492

    
3493
      is_hid = os_name in cluster.hidden_os
3494
      is_blk = os_name in cluster.blacklisted_os
3495
      if ((self._HID not in self.op.output_fields and is_hid) or
3496
          (self._BLK not in self.op.output_fields and is_blk) or
3497
          (self._VLD not in self.op.output_fields and not valid)):
3498
        continue
3499

    
3500
      for field in self.op.output_fields:
3501
        if field == "name":
3502
          val = os_name
3503
        elif field == self._VLD:
3504
          val = valid
3505
        elif field == "node_status":
3506
          # this is just a copy of the dict
3507
          val = {}
3508
          for node_name, nos_list in os_data.items():
3509
            val[node_name] = nos_list
3510
        elif field == "variants":
3511
          val = utils.NiceSort(list(variants))
3512
        elif field == "parameters":
3513
          val = list(params)
3514
        elif field == "api_versions":
3515
          val = list(api_versions)
3516
        elif field == self._HID:
3517
          val = is_hid
3518
        elif field == self._BLK:
3519
          val = is_blk
3520
        else:
3521
          raise errors.ParameterError(field)
3522
        row.append(val)
3523
      output.append(row)
3524

    
3525
    return output
3526

    
3527

    
3528
class LURemoveNode(LogicalUnit):
3529
  """Logical unit for removing a node.
3530

3531
  """
3532
  HPATH = "node-remove"
3533
  HTYPE = constants.HTYPE_NODE
3534
  _OP_PARAMS = [
3535
    _PNodeName,
3536
    ]
3537

    
3538
  def BuildHooksEnv(self):
3539
    """Build hooks env.
3540

3541
    This doesn't run on the target node in the pre phase as a failed
3542
    node would then be impossible to remove.
3543

3544
    """
3545
    env = {
3546
      "OP_TARGET": self.op.node_name,
3547
      "NODE_NAME": self.op.node_name,
3548
      }
3549
    all_nodes = self.cfg.GetNodeList()
3550
    try:
3551
      all_nodes.remove(self.op.node_name)
3552
    except ValueError:
3553
      logging.warning("Node %s which is about to be removed not found"
3554
                      " in the all nodes list", self.op.node_name)
3555
    return env, all_nodes, all_nodes
3556

    
3557
  def CheckPrereq(self):
3558
    """Check prerequisites.
3559

3560
    This checks:
3561
     - the node exists in the configuration
3562
     - it does not have primary or secondary instances
3563
     - it's not the master
3564

3565
    Any errors are signaled by raising errors.OpPrereqError.
3566

3567
    """
3568
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3569
    node = self.cfg.GetNodeInfo(self.op.node_name)
3570
    assert node is not None
3571

    
3572
    instance_list = self.cfg.GetInstanceList()
3573

    
3574
    masternode = self.cfg.GetMasterNode()
3575
    if node.name == masternode:
3576
      raise errors.OpPrereqError("Node is the master node,"
3577
                                 " you need to failover first.",
3578
                                 errors.ECODE_INVAL)
3579

    
3580
    for instance_name in instance_list:
3581
      instance = self.cfg.GetInstanceInfo(instance_name)
3582
      if node.name in instance.all_nodes:
3583
        raise errors.OpPrereqError("Instance %s is still running on the node,"
3584
                                   " please remove first." % instance_name,
3585
                                   errors.ECODE_INVAL)
3586
    self.op.node_name = node.name
3587
    self.node = node
3588

    
3589
  def Exec(self, feedback_fn):
3590
    """Removes the node from the cluster.
3591

3592
    """
3593
    node = self.node
3594
    logging.info("Stopping the node daemon and removing configs from node %s",
3595
                 node.name)
3596

    
3597
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
3598

    
3599
    # Promote nodes to master candidate as needed
3600
    _AdjustCandidatePool(self, exceptions=[node.name])
3601
    self.context.RemoveNode(node.name)
3602

    
3603
    # Run post hooks on the node before it's removed
3604
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
3605
    try:
3606
      hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
3607
    except:
3608
      # pylint: disable-msg=W0702
3609
      self.LogWarning("Errors occurred running hooks on %s" % node.name)
3610

    
3611
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
3612
    msg = result.fail_msg
3613
    if msg:
3614
      self.LogWarning("Errors encountered on the remote node while leaving"
3615
                      " the cluster: %s", msg)
3616

    
3617
    # Remove node from our /etc/hosts
3618
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3619
      master_node = self.cfg.GetMasterNode()
3620
      result = self.rpc.call_etc_hosts_modify(master_node,
3621
                                              constants.ETC_HOSTS_REMOVE,
3622
                                              node.name, None)
3623
      result.Raise("Can't update hosts file with new host data")
3624
      _RedistributeAncillaryFiles(self)
3625

    
3626

    
3627
class _NodeQuery(_QueryBase):
3628
  FIELDS = query.NODE_FIELDS
3629

    
3630
  def ExpandNames(self, lu):
3631
    lu.needed_locks = {}
3632
    lu.share_locks[locking.LEVEL_NODE] = 1
3633

    
3634
    if self.names:
3635
      self.wanted = _GetWantedNodes(lu, self.names)
3636
    else:
3637
      self.wanted = locking.ALL_SET
3638

    
3639
    self.do_locking = (self.use_locking and
3640
                       query.NQ_LIVE in self.requested_data)
3641

    
3642
    if self.do_locking:
3643
      # if we don't request only static fields, we need to lock the nodes
3644
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
3645

    
3646
  def DeclareLocks(self, lu, level):
3647
    pass
3648

    
3649
  def _GetQueryData(self, lu):
3650
    """Computes the list of nodes and their attributes.
3651

3652
    """
3653
    all_info = lu.cfg.GetAllNodesInfo()
3654

    
3655
    nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
3656

    
3657
    # Gather data as requested
3658
    if query.NQ_LIVE in self.requested_data:
3659
      node_data = lu.rpc.call_node_info(nodenames, lu.cfg.GetVGName(),
3660
                                        lu.cfg.GetHypervisorType())
3661
      live_data = dict((name, nresult.payload)
3662
                       for (name, nresult) in node_data.items()
3663
                       if not nresult.fail_msg and nresult.payload)
3664
    else:
3665
      live_data = None
3666

    
3667
    if query.NQ_INST in self.requested_data:
3668
      node_to_primary = dict([(name, set()) for name in nodenames])
3669
      node_to_secondary = dict([(name, set()) for name in nodenames])
3670

    
3671
      inst_data = lu.cfg.GetAllInstancesInfo()
3672

    
3673
      for inst in inst_data.values():
3674
        if inst.primary_node in node_to_primary:
3675
          node_to_primary[inst.primary_node].add(inst.name)
3676
        for secnode in inst.secondary_nodes:
3677
          if secnode in node_to_secondary:
3678
            node_to_secondary[secnode].add(inst.name)
3679
    else:
3680
      node_to_primary = None
3681
      node_to_secondary = None
3682

    
3683
    if query.NQ_GROUP in self.requested_data:
3684
      groups = lu.cfg.GetAllNodeGroupsInfo()
3685
    else:
3686
      groups = {}
3687

    
3688
    return query.NodeQueryData([all_info[name] for name in nodenames],
3689
                               live_data, lu.cfg.GetMasterNode(),
3690
                               node_to_primary, node_to_secondary, groups)
3691

    
3692

    
3693
class LUQueryNodes(NoHooksLU):
3694
  """Logical unit for querying nodes.
3695

3696
  """
3697
  # pylint: disable-msg=W0142
3698
  _OP_PARAMS = [
3699
    _POutputFields,
3700
    ("names", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
3701
    ("use_locking", False, ht.TBool),
3702
    ]
3703
  REQ_BGL = False
3704

    
3705
  def CheckArguments(self):
3706
    self.nq = _NodeQuery(self.op.names, self.op.output_fields,
3707
                         self.op.use_locking)
3708

    
3709
  def ExpandNames(self):
3710
    self.nq.ExpandNames(self)
3711

    
3712
  def Exec(self, feedback_fn):
3713
    return self.nq.OldStyleQuery(self)
3714

    
3715

    
3716
class LUQueryNodeVolumes(NoHooksLU):
3717
  """Logical unit for getting volumes on node(s).
3718

3719
  """
3720
  _OP_PARAMS = [
3721
    _POutputFields,
3722
    ("nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
3723
    ]
3724
  REQ_BGL = False
3725
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
3726
  _FIELDS_STATIC = utils.FieldSet("node")
3727

    
3728
  def CheckArguments(self):
3729
    _CheckOutputFields(static=self._FIELDS_STATIC,
3730
                       dynamic=self._FIELDS_DYNAMIC,
3731
                       selected=self.op.output_fields)
3732

    
3733
  def ExpandNames(self):
3734
    self.needed_locks = {}
3735
    self.share_locks[locking.LEVEL_NODE] = 1
3736
    if not self.op.nodes:
3737
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3738
    else:
3739
      self.needed_locks[locking.LEVEL_NODE] = \
3740
        _GetWantedNodes(self, self.op.nodes)
3741

    
3742
  def Exec(self, feedback_fn):
3743
    """Computes the list of nodes and their attributes.
3744

3745
    """
3746
    nodenames = self.acquired_locks[locking.LEVEL_NODE]
3747
    volumes = self.rpc.call_node_volumes(nodenames)
3748

    
3749
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3750
             in self.cfg.GetInstanceList()]
3751

    
3752
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
3753

    
3754
    output = []
3755
    for node in nodenames:
3756
      nresult = volumes[node]
3757
      if nresult.offline:
3758
        continue
3759
      msg = nresult.fail_msg
3760
      if msg:
3761
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
3762
        continue
3763

    
3764
      node_vols = nresult.payload[:]
3765
      node_vols.sort(key=lambda vol: vol['dev'])
3766

    
3767
      for vol in node_vols:
3768
        node_output = []
3769
        for field in self.op.output_fields:
3770
          if field == "node":
3771
            val = node
3772
          elif field == "phys":
3773
            val = vol['dev']
3774
          elif field == "vg":
3775
            val = vol['vg']
3776
          elif field == "name":
3777
            val = vol['name']
3778
          elif field == "size":
3779
            val = int(float(vol['size']))
3780
          elif field == "instance":
3781
            for inst in ilist:
3782
              if node not in lv_by_node[inst]:
3783
                continue
3784
              if vol['name'] in lv_by_node[inst][node]:
3785
                val = inst.name
3786
                break
3787
            else:
3788
              val = '-'
3789
          else:
3790
            raise errors.ParameterError(field)
3791
          node_output.append(str(val))
3792

    
3793
        output.append(node_output)
3794

    
3795
    return output
3796

    
3797

    
3798
class LUQueryNodeStorage(NoHooksLU):
3799
  """Logical unit for getting information on storage units on node(s).
3800

3801
  """
3802
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
3803
  _OP_PARAMS = [
3804
    _POutputFields,
3805
    ("nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
3806
    ("storage_type", ht.NoDefault, _CheckStorageType),
3807
    ("name", None, ht.TMaybeString),
3808
    ]
3809
  REQ_BGL = False
3810

    
3811
  def CheckArguments(self):
3812
    _CheckOutputFields(static=self._FIELDS_STATIC,
3813
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
3814
                       selected=self.op.output_fields)
3815

    
3816
  def ExpandNames(self):
3817
    self.needed_locks = {}
3818
    self.share_locks[locking.LEVEL_NODE] = 1
3819

    
3820
    if self.op.nodes:
3821
      self.needed_locks[locking.LEVEL_NODE] = \
3822
        _GetWantedNodes(self, self.op.nodes)
3823
    else:
3824
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3825

    
3826
  def Exec(self, feedback_fn):
3827
    """Computes the list of nodes and their attributes.
3828

3829
    """
3830
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
3831

    
3832
    # Always get name to sort by
3833
    if constants.SF_NAME in self.op.output_fields:
3834
      fields = self.op.output_fields[:]
3835
    else:
3836
      fields = [constants.SF_NAME] + self.op.output_fields
3837

    
3838
    # Never ask for node or type as it's only known to the LU
3839
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
3840
      while extra in fields:
3841
        fields.remove(extra)
3842

    
3843
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
3844
    name_idx = field_idx[constants.SF_NAME]
3845

    
3846
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3847
    data = self.rpc.call_storage_list(self.nodes,
3848
                                      self.op.storage_type, st_args,
3849
                                      self.op.name, fields)
3850

    
3851
    result = []
3852

    
3853
    for node in utils.NiceSort(self.nodes):
3854
      nresult = data[node]
3855
      if nresult.offline:
3856
        continue
3857

    
3858
      msg = nresult.fail_msg
3859
      if msg:
3860
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
3861
        continue
3862

    
3863
      rows = dict([(row[name_idx], row) for row in nresult.payload])
3864

    
3865
      for name in utils.NiceSort(rows.keys()):
3866
        row = rows[name]
3867

    
3868
        out = []
3869

    
3870
        for field in self.op.output_fields:
3871
          if field == constants.SF_NODE:
3872
            val = node
3873
          elif field == constants.SF_TYPE:
3874
            val = self.op.storage_type
3875
          elif field in field_idx:
3876
            val = row[field_idx[field]]
3877
          else:
3878
            raise errors.ParameterError(field)
3879

    
3880
          out.append(val)
3881

    
3882
        result.append(out)
3883

    
3884
    return result
3885

    
3886

    
3887
class _InstanceQuery(_QueryBase):
3888
  FIELDS = query.INSTANCE_FIELDS
3889

    
3890
  def ExpandNames(self, lu):
3891
    lu.needed_locks = {}
3892
    lu.share_locks[locking.LEVEL_INSTANCE] = 1
3893
    lu.share_locks[locking.LEVEL_NODE] = 1
3894

    
3895
    if self.names:
3896
      self.wanted = _GetWantedInstances(lu, self.names)
3897
    else:
3898
      self.wanted = locking.ALL_SET
3899

    
3900
    self.do_locking = (self.use_locking and
3901
                       query.IQ_LIVE in self.requested_data)
3902
    if self.do_locking:
3903
      lu.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3904
      lu.needed_locks[locking.LEVEL_NODE] = []
3905
      lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3906

    
3907
  def DeclareLocks(self, lu, level):
3908
    if level == locking.LEVEL_NODE and self.do_locking:
3909
      lu._LockInstancesNodes() # pylint: disable-msg=W0212
3910

    
3911
  def _GetQueryData(self, lu):
3912
    """Computes the list of instances and their attributes.
3913

3914
    """
3915
    all_info = lu.cfg.GetAllInstancesInfo()
3916

    
3917
    instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE)
3918

    
3919
    instance_list = [all_info[name] for name in instance_names]
3920
    nodes = frozenset([inst.primary_node for inst in instance_list])
3921
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3922
    bad_nodes = []
3923
    offline_nodes = []
3924

    
3925
    # Gather data as requested
3926
    if query.IQ_LIVE in self.requested_data:
3927
      live_data = {}
3928
      node_data = lu.rpc.call_all_instances_info(nodes, hv_list)
3929
      for name in nodes:
3930
        result = node_data[name]
3931
        if result.offline:
3932
          # offline nodes will be in both lists
3933
          assert result.fail_msg
3934
          offline_nodes.append(name)
3935
        if result.fail_msg:
3936
          bad_nodes.append(name)
3937
        elif result.payload:
3938
          live_data.update(result.payload)
3939
        # else no instance is alive
3940
    else:
3941
      live_data = {}
3942

    
3943
    if query.IQ_DISKUSAGE in self.requested_data:
3944
      disk_usage = dict((inst.name,
3945
                         _ComputeDiskSize(inst.disk_template,
3946
                                          [{"size": disk.size}
3947
                                           for disk in inst.disks]))
3948
                        for inst in instance_list)
3949
    else:
3950
      disk_usage = None
3951

    
3952
    return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(),
3953
                                   disk_usage, offline_nodes, bad_nodes,
3954
                                   live_data)
3955

    
3956

    
3957
#: Query type implementations
3958
_QUERY_IMPL = {
3959
  constants.QR_INSTANCE: _InstanceQuery,
3960
  constants.QR_NODE: _NodeQuery,
3961
  }
3962

    
3963

    
3964
def _GetQueryImplementation(name):
3965
  """Returns the implemtnation for a query type.
3966

3967
  @param name: Query type, must be one of L{constants.QR_OP_QUERY}
3968

3969
  """
3970
  try:
3971
    return _QUERY_IMPL[name]
3972
  except KeyError:
3973
    raise errors.OpPrereqError("Unknown query resource '%s'" % name,
3974
                               errors.ECODE_INVAL)
3975

    
3976

    
3977
class LUQuery(NoHooksLU):
3978
  """Query for resources/items of a certain kind.
3979

3980
  """
3981
  # pylint: disable-msg=W0142
3982
  _OP_PARAMS = [
3983
    ("what", ht.NoDefault, ht.TElemOf(constants.QR_OP_QUERY)),
3984
    ("fields", ht.NoDefault, ht.TListOf(ht.TNonEmptyString)),
3985
    ("filter", None, ht.TOr(ht.TNone,
3986
                            ht.TListOf(ht.TOr(ht.TNonEmptyString, ht.TList)))),
3987
    ]
3988
  REQ_BGL = False
3989

    
3990
  def CheckArguments(self):
3991
    qcls = _GetQueryImplementation(self.op.what)
3992
    names = qlang.ReadSimpleFilter("name", self.op.filter)
3993

    
3994
    self.impl = qcls(names, self.op.fields, False)
3995

    
3996
  def ExpandNames(self):
3997
    self.impl.ExpandNames(self)
3998

    
3999
  def DeclareLocks(self, level):
4000
    self.impl.DeclareLocks(self, level)
4001

    
4002
  def Exec(self, feedback_fn):
4003
    return self.impl.NewStyleQuery(self)
4004

    
4005

    
4006
class LUQueryFields(NoHooksLU):
4007
  """Query for resources/items of a certain kind.
4008

4009
  """
4010
  # pylint: disable-msg=W0142
4011
  _OP_PARAMS = [
4012
    ("what", ht.NoDefault, ht.TElemOf(constants.QR_OP_QUERY)),
4013
    ("fields", None, ht.TOr(ht.TNone, ht.TListOf(ht.TNonEmptyString))),
4014
    ]
4015
  REQ_BGL = False
4016

    
4017
  def CheckArguments(self):
4018
    self.qcls = _GetQueryImplementation(self.op.what)
4019

    
4020
  def ExpandNames(self):
4021
    self.needed_locks = {}
4022

    
4023
  def Exec(self, feedback_fn):
4024
    return self.qcls.FieldsQuery(self.op.fields)
4025

    
4026

    
4027
class LUModifyNodeStorage(NoHooksLU):
4028
  """Logical unit for modifying a storage volume on a node.
4029

4030
  """
4031
  _OP_PARAMS = [
4032
    _PNodeName,
4033
    ("storage_type", ht.NoDefault, _CheckStorageType),
4034
    ("name", ht.NoDefault, ht.TNonEmptyString),
4035
    ("changes", ht.NoDefault, ht.TDict),
4036
    ]
4037
  REQ_BGL = False
4038

    
4039
  def CheckArguments(self):
4040
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
4041

    
4042
    storage_type = self.op.storage_type
4043

    
4044
    try:
4045
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
4046
    except KeyError:
4047
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
4048
                                 " modified" % storage_type,
4049
                                 errors.ECODE_INVAL)
4050

    
4051
    diff = set(self.op.changes.keys()) - modifiable
4052
    if diff:
4053
      raise errors.OpPrereqError("The following fields can not be modified for"
4054
                                 " storage units of type '%s': %r" %
4055
                                 (storage_type, list(diff)),
4056
                                 errors.ECODE_INVAL)
4057

    
4058
  def ExpandNames(self):
4059
    self.needed_locks = {
4060
      locking.LEVEL_NODE: self.op.node_name,
4061
      }
4062

    
4063
  def Exec(self, feedback_fn):
4064
    """Computes the list of nodes and their attributes.
4065

4066
    """
4067
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
4068
    result = self.rpc.call_storage_modify(self.op.node_name,
4069
                                          self.op.storage_type, st_args,
4070
                                          self.op.name, self.op.changes)
4071
    result.Raise("Failed to modify storage unit '%s' on %s" %