Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 5ca09268

History | View | Annotate | Download (385.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay to many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42
import itertools
43

    
44
from ganeti import ssh
45
from ganeti import utils
46
from ganeti import errors
47
from ganeti import hypervisor
48
from ganeti import locking
49
from ganeti import constants
50
from ganeti import objects
51
from ganeti import serializer
52
from ganeti import ssconf
53
from ganeti import uidpool
54
from ganeti import compat
55
from ganeti import masterd
56
from ganeti import netutils
57
from ganeti import ht
58

    
59
import ganeti.masterd.instance # pylint: disable-msg=W0611
60

    
61
# Common opcode attributes
62

    
63
#: output fields for a query operation
64
_POutputFields = ("output_fields", ht.NoDefault, ht.TListOf(ht.TNonEmptyString))
65

    
66

    
67
#: the shutdown timeout
68
_PShutdownTimeout = ("shutdown_timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT,
69
                     ht.TPositiveInt)
70

    
71
#: the force parameter
72
_PForce = ("force", False, ht.TBool)
73

    
74
#: a required instance name (for single-instance LUs)
75
_PInstanceName = ("instance_name", ht.NoDefault, ht.TNonEmptyString)
76

    
77
#: Whether to ignore offline nodes
78
_PIgnoreOfflineNodes = ("ignore_offline_nodes", False, ht.TBool)
79

    
80
#: a required node name (for single-node LUs)
81
_PNodeName = ("node_name", ht.NoDefault, ht.TNonEmptyString)
82

    
83
#: the migration type (live/non-live)
84
_PMigrationMode = ("mode", None,
85
                   ht.TOr(ht.TNone, ht.TElemOf(constants.HT_MIGRATION_MODES)))
86

    
87
#: the obsolete 'live' mode (boolean)
88
_PMigrationLive = ("live", None, ht.TMaybeBool)
89

    
90

    
91
# End types
92
class LogicalUnit(object):
93
  """Logical Unit base class.
94

95
  Subclasses must follow these rules:
96
    - implement ExpandNames
97
    - implement CheckPrereq (except when tasklets are used)
98
    - implement Exec (except when tasklets are used)
99
    - implement BuildHooksEnv
100
    - redefine HPATH and HTYPE
101
    - optionally redefine their run requirements:
102
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
103

104
  Note that all commands require root permissions.
105

106
  @ivar dry_run_result: the value (if any) that will be returned to the caller
107
      in dry-run mode (signalled by opcode dry_run parameter)
108
  @cvar _OP_PARAMS: a list of opcode attributes, their defaults values
109
      they should get if not already defined, and types they must match
110

111
  """
112
  HPATH = None
113
  HTYPE = None
114
  _OP_PARAMS = []
115
  REQ_BGL = True
116

    
117
  def __init__(self, processor, op, context, rpc):
118
    """Constructor for LogicalUnit.
119

120
    This needs to be overridden in derived classes in order to check op
121
    validity.
122

123
    """
124
    self.proc = processor
125
    self.op = op
126
    self.cfg = context.cfg
127
    self.context = context
128
    self.rpc = rpc
129
    # Dicts used to declare locking needs to mcpu
130
    self.needed_locks = None
131
    self.acquired_locks = {}
132
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
133
    self.add_locks = {}
134
    self.remove_locks = {}
135
    # Used to force good behavior when calling helper functions
136
    self.recalculate_locks = {}
137
    self.__ssh = None
138
    # logging
139
    self.Log = processor.Log # pylint: disable-msg=C0103
140
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
141
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
142
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
143
    # support for dry-run
144
    self.dry_run_result = None
145
    # support for generic debug attribute
146
    if (not hasattr(self.op, "debug_level") or
147
        not isinstance(self.op.debug_level, int)):
148
      self.op.debug_level = 0
149

    
150
    # Tasklets
151
    self.tasklets = None
152

    
153
    # The new kind-of-type-system
154
    op_id = self.op.OP_ID
155
    for attr_name, aval, test in self._OP_PARAMS:
156
      if not hasattr(op, attr_name):
157
        if aval == ht.NoDefault:
158
          raise errors.OpPrereqError("Required parameter '%s.%s' missing" %
159
                                     (op_id, attr_name), errors.ECODE_INVAL)
160
        else:
161
          if callable(aval):
162
            dval = aval()
163
          else:
164
            dval = aval
165
          setattr(self.op, attr_name, dval)
166
      attr_val = getattr(op, attr_name)
167
      if test == ht.NoType:
168
        # no tests here
169
        continue
170
      if not callable(test):
171
        raise errors.ProgrammerError("Validation for parameter '%s.%s' failed,"
172
                                     " given type is not a proper type (%s)" %
173
                                     (op_id, attr_name, test))
174
      if not test(attr_val):
175
        logging.error("OpCode %s, parameter %s, has invalid type %s/value %s",
176
                      self.op.OP_ID, attr_name, type(attr_val), attr_val)
177
        raise errors.OpPrereqError("Parameter '%s.%s' fails validation" %
178
                                   (op_id, attr_name), errors.ECODE_INVAL)
179

    
180
    self.CheckArguments()
181

    
182
  def __GetSSH(self):
183
    """Returns the SshRunner object
184

185
    """
186
    if not self.__ssh:
187
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
188
    return self.__ssh
189

    
190
  ssh = property(fget=__GetSSH)
191

    
192
  def CheckArguments(self):
193
    """Check syntactic validity for the opcode arguments.
194

195
    This method is for doing a simple syntactic check and ensure
196
    validity of opcode parameters, without any cluster-related
197
    checks. While the same can be accomplished in ExpandNames and/or
198
    CheckPrereq, doing these separate is better because:
199

200
      - ExpandNames is left as as purely a lock-related function
201
      - CheckPrereq is run after we have acquired locks (and possible
202
        waited for them)
203

204
    The function is allowed to change the self.op attribute so that
205
    later methods can no longer worry about missing parameters.
206

207
    """
208
    pass
209

    
210
  def ExpandNames(self):
211
    """Expand names for this LU.
212

213
    This method is called before starting to execute the opcode, and it should
214
    update all the parameters of the opcode to their canonical form (e.g. a
215
    short node name must be fully expanded after this method has successfully
216
    completed). This way locking, hooks, logging, ecc. can work correctly.
217

218
    LUs which implement this method must also populate the self.needed_locks
219
    member, as a dict with lock levels as keys, and a list of needed lock names
220
    as values. Rules:
221

222
      - use an empty dict if you don't need any lock
223
      - if you don't need any lock at a particular level omit that level
224
      - don't put anything for the BGL level
225
      - if you want all locks at a level use locking.ALL_SET as a value
226

227
    If you need to share locks (rather than acquire them exclusively) at one
228
    level you can modify self.share_locks, setting a true value (usually 1) for
229
    that level. By default locks are not shared.
230

231
    This function can also define a list of tasklets, which then will be
232
    executed in order instead of the usual LU-level CheckPrereq and Exec
233
    functions, if those are not defined by the LU.
234

235
    Examples::
236

237
      # Acquire all nodes and one instance
238
      self.needed_locks = {
239
        locking.LEVEL_NODE: locking.ALL_SET,
240
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
241
      }
242
      # Acquire just two nodes
243
      self.needed_locks = {
244
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
245
      }
246
      # Acquire no locks
247
      self.needed_locks = {} # No, you can't leave it to the default value None
248

249
    """
250
    # The implementation of this method is mandatory only if the new LU is
251
    # concurrent, so that old LUs don't need to be changed all at the same
252
    # time.
253
    if self.REQ_BGL:
254
      self.needed_locks = {} # Exclusive LUs don't need locks.
255
    else:
256
      raise NotImplementedError
257

    
258
  def DeclareLocks(self, level):
259
    """Declare LU locking needs for a level
260

261
    While most LUs can just declare their locking needs at ExpandNames time,
262
    sometimes there's the need to calculate some locks after having acquired
263
    the ones before. This function is called just before acquiring locks at a
264
    particular level, but after acquiring the ones at lower levels, and permits
265
    such calculations. It can be used to modify self.needed_locks, and by
266
    default it does nothing.
267

268
    This function is only called if you have something already set in
269
    self.needed_locks for the level.
270

271
    @param level: Locking level which is going to be locked
272
    @type level: member of ganeti.locking.LEVELS
273

274
    """
275

    
276
  def CheckPrereq(self):
277
    """Check prerequisites for this LU.
278

279
    This method should check that the prerequisites for the execution
280
    of this LU are fulfilled. It can do internode communication, but
281
    it should be idempotent - no cluster or system changes are
282
    allowed.
283

284
    The method should raise errors.OpPrereqError in case something is
285
    not fulfilled. Its return value is ignored.
286

287
    This method should also update all the parameters of the opcode to
288
    their canonical form if it hasn't been done by ExpandNames before.
289

290
    """
291
    if self.tasklets is not None:
292
      for (idx, tl) in enumerate(self.tasklets):
293
        logging.debug("Checking prerequisites for tasklet %s/%s",
294
                      idx + 1, len(self.tasklets))
295
        tl.CheckPrereq()
296
    else:
297
      pass
298

    
299
  def Exec(self, feedback_fn):
300
    """Execute the LU.
301

302
    This method should implement the actual work. It should raise
303
    errors.OpExecError for failures that are somewhat dealt with in
304
    code, or expected.
305

306
    """
307
    if self.tasklets is not None:
308
      for (idx, tl) in enumerate(self.tasklets):
309
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
310
        tl.Exec(feedback_fn)
311
    else:
312
      raise NotImplementedError
313

    
314
  def BuildHooksEnv(self):
315
    """Build hooks environment for this LU.
316

317
    This method should return a three-node tuple consisting of: a dict
318
    containing the environment that will be used for running the
319
    specific hook for this LU, a list of node names on which the hook
320
    should run before the execution, and a list of node names on which
321
    the hook should run after the execution.
322

323
    The keys of the dict must not have 'GANETI_' prefixed as this will
324
    be handled in the hooks runner. Also note additional keys will be
325
    added by the hooks runner. If the LU doesn't define any
326
    environment, an empty dict (and not None) should be returned.
327

328
    No nodes should be returned as an empty list (and not None).
329

330
    Note that if the HPATH for a LU class is None, this function will
331
    not be called.
332

333
    """
334
    raise NotImplementedError
335

    
336
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
337
    """Notify the LU about the results of its hooks.
338

339
    This method is called every time a hooks phase is executed, and notifies
340
    the Logical Unit about the hooks' result. The LU can then use it to alter
341
    its result based on the hooks.  By default the method does nothing and the
342
    previous result is passed back unchanged but any LU can define it if it
343
    wants to use the local cluster hook-scripts somehow.
344

345
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
346
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
347
    @param hook_results: the results of the multi-node hooks rpc call
348
    @param feedback_fn: function used send feedback back to the caller
349
    @param lu_result: the previous Exec result this LU had, or None
350
        in the PRE phase
351
    @return: the new Exec result, based on the previous result
352
        and hook results
353

354
    """
355
    # API must be kept, thus we ignore the unused argument and could
356
    # be a function warnings
357
    # pylint: disable-msg=W0613,R0201
358
    return lu_result
359

    
360
  def _ExpandAndLockInstance(self):
361
    """Helper function to expand and lock an instance.
362

363
    Many LUs that work on an instance take its name in self.op.instance_name
364
    and need to expand it and then declare the expanded name for locking. This
365
    function does it, and then updates self.op.instance_name to the expanded
366
    name. It also initializes needed_locks as a dict, if this hasn't been done
367
    before.
368

369
    """
370
    if self.needed_locks is None:
371
      self.needed_locks = {}
372
    else:
373
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
374
        "_ExpandAndLockInstance called with instance-level locks set"
375
    self.op.instance_name = _ExpandInstanceName(self.cfg,
376
                                                self.op.instance_name)
377
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
378

    
379
  def _LockInstancesNodes(self, primary_only=False):
380
    """Helper function to declare instances' nodes for locking.
381

382
    This function should be called after locking one or more instances to lock
383
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
384
    with all primary or secondary nodes for instances already locked and
385
    present in self.needed_locks[locking.LEVEL_INSTANCE].
386

387
    It should be called from DeclareLocks, and for safety only works if
388
    self.recalculate_locks[locking.LEVEL_NODE] is set.
389

390
    In the future it may grow parameters to just lock some instance's nodes, or
391
    to just lock primaries or secondary nodes, if needed.
392

393
    If should be called in DeclareLocks in a way similar to::
394

395
      if level == locking.LEVEL_NODE:
396
        self._LockInstancesNodes()
397

398
    @type primary_only: boolean
399
    @param primary_only: only lock primary nodes of locked instances
400

401
    """
402
    assert locking.LEVEL_NODE in self.recalculate_locks, \
403
      "_LockInstancesNodes helper function called with no nodes to recalculate"
404

    
405
    # TODO: check if we're really been called with the instance locks held
406

    
407
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
408
    # future we might want to have different behaviors depending on the value
409
    # of self.recalculate_locks[locking.LEVEL_NODE]
410
    wanted_nodes = []
411
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
412
      instance = self.context.cfg.GetInstanceInfo(instance_name)
413
      wanted_nodes.append(instance.primary_node)
414
      if not primary_only:
415
        wanted_nodes.extend(instance.secondary_nodes)
416

    
417
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
418
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
419
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
420
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
421

    
422
    del self.recalculate_locks[locking.LEVEL_NODE]
423

    
424

    
425
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
426
  """Simple LU which runs no hooks.
427

428
  This LU is intended as a parent for other LogicalUnits which will
429
  run no hooks, in order to reduce duplicate code.
430

431
  """
432
  HPATH = None
433
  HTYPE = None
434

    
435
  def BuildHooksEnv(self):
436
    """Empty BuildHooksEnv for NoHooksLu.
437

438
    This just raises an error.
439

440
    """
441
    assert False, "BuildHooksEnv called for NoHooksLUs"
442

    
443

    
444
class Tasklet:
445
  """Tasklet base class.
446

447
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
448
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
449
  tasklets know nothing about locks.
450

451
  Subclasses must follow these rules:
452
    - Implement CheckPrereq
453
    - Implement Exec
454

455
  """
456
  def __init__(self, lu):
457
    self.lu = lu
458

    
459
    # Shortcuts
460
    self.cfg = lu.cfg
461
    self.rpc = lu.rpc
462

    
463
  def CheckPrereq(self):
464
    """Check prerequisites for this tasklets.
465

466
    This method should check whether the prerequisites for the execution of
467
    this tasklet are fulfilled. It can do internode communication, but it
468
    should be idempotent - no cluster or system changes are allowed.
469

470
    The method should raise errors.OpPrereqError in case something is not
471
    fulfilled. Its return value is ignored.
472

473
    This method should also update all parameters to their canonical form if it
474
    hasn't been done before.
475

476
    """
477
    pass
478

    
479
  def Exec(self, feedback_fn):
480
    """Execute the tasklet.
481

482
    This method should implement the actual work. It should raise
483
    errors.OpExecError for failures that are somewhat dealt with in code, or
484
    expected.
485

486
    """
487
    raise NotImplementedError
488

    
489

    
490
def _GetWantedNodes(lu, nodes):
491
  """Returns list of checked and expanded node names.
492

493
  @type lu: L{LogicalUnit}
494
  @param lu: the logical unit on whose behalf we execute
495
  @type nodes: list
496
  @param nodes: list of node names or None for all nodes
497
  @rtype: list
498
  @return: the list of nodes, sorted
499
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
500

501
  """
502
  if not nodes:
503
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
504
      " non-empty list of nodes whose name is to be expanded.")
505

    
506
  wanted = [_ExpandNodeName(lu.cfg, name) for name in nodes]
507
  return utils.NiceSort(wanted)
508

    
509

    
510
def _GetWantedInstances(lu, instances):
511
  """Returns list of checked and expanded instance names.
512

513
  @type lu: L{LogicalUnit}
514
  @param lu: the logical unit on whose behalf we execute
515
  @type instances: list
516
  @param instances: list of instance names or None for all instances
517
  @rtype: list
518
  @return: the list of instances, sorted
519
  @raise errors.OpPrereqError: if the instances parameter is wrong type
520
  @raise errors.OpPrereqError: if any of the passed instances is not found
521

522
  """
523
  if instances:
524
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
525
  else:
526
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
527
  return wanted
528

    
529

    
530
def _GetUpdatedParams(old_params, update_dict,
531
                      use_default=True, use_none=False):
532
  """Return the new version of a parameter dictionary.
533

534
  @type old_params: dict
535
  @param old_params: old parameters
536
  @type update_dict: dict
537
  @param update_dict: dict containing new parameter values, or
538
      constants.VALUE_DEFAULT to reset the parameter to its default
539
      value
540
  @param use_default: boolean
541
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
542
      values as 'to be deleted' values
543
  @param use_none: boolean
544
  @type use_none: whether to recognise C{None} values as 'to be
545
      deleted' values
546
  @rtype: dict
547
  @return: the new parameter dictionary
548

549
  """
550
  params_copy = copy.deepcopy(old_params)
551
  for key, val in update_dict.iteritems():
552
    if ((use_default and val == constants.VALUE_DEFAULT) or
553
        (use_none and val is None)):
554
      try:
555
        del params_copy[key]
556
      except KeyError:
557
        pass
558
    else:
559
      params_copy[key] = val
560
  return params_copy
561

    
562

    
563
def _CheckOutputFields(static, dynamic, selected):
564
  """Checks whether all selected fields are valid.
565

566
  @type static: L{utils.FieldSet}
567
  @param static: static fields set
568
  @type dynamic: L{utils.FieldSet}
569
  @param dynamic: dynamic fields set
570

571
  """
572
  f = utils.FieldSet()
573
  f.Extend(static)
574
  f.Extend(dynamic)
575

    
576
  delta = f.NonMatching(selected)
577
  if delta:
578
    raise errors.OpPrereqError("Unknown output fields selected: %s"
579
                               % ",".join(delta), errors.ECODE_INVAL)
580

    
581

    
582
def _CheckGlobalHvParams(params):
583
  """Validates that given hypervisor params are not global ones.
584

585
  This will ensure that instances don't get customised versions of
586
  global params.
587

588
  """
589
  used_globals = constants.HVC_GLOBALS.intersection(params)
590
  if used_globals:
591
    msg = ("The following hypervisor parameters are global and cannot"
592
           " be customized at instance level, please modify them at"
593
           " cluster level: %s" % utils.CommaJoin(used_globals))
594
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
595

    
596

    
597
def _CheckNodeOnline(lu, node, msg=None):
598
  """Ensure that a given node is online.
599

600
  @param lu: the LU on behalf of which we make the check
601
  @param node: the node to check
602
  @param msg: if passed, should be a message to replace the default one
603
  @raise errors.OpPrereqError: if the node is offline
604

605
  """
606
  if msg is None:
607
    msg = "Can't use offline node"
608
  if lu.cfg.GetNodeInfo(node).offline:
609
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
610

    
611

    
612
def _CheckNodeNotDrained(lu, node):
613
  """Ensure that a given node is not drained.
614

615
  @param lu: the LU on behalf of which we make the check
616
  @param node: the node to check
617
  @raise errors.OpPrereqError: if the node is drained
618

619
  """
620
  if lu.cfg.GetNodeInfo(node).drained:
621
    raise errors.OpPrereqError("Can't use drained node %s" % node,
622
                               errors.ECODE_STATE)
623

    
624

    
625
def _CheckNodeVmCapable(lu, node):
626
  """Ensure that a given node is vm capable.
627

628
  @param lu: the LU on behalf of which we make the check
629
  @param node: the node to check
630
  @raise errors.OpPrereqError: if the node is not vm capable
631

632
  """
633
  if not lu.cfg.GetNodeInfo(node).vm_capable:
634
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
635
                               errors.ECODE_STATE)
636

    
637

    
638
def _CheckNodeHasOS(lu, node, os_name, force_variant):
639
  """Ensure that a node supports a given OS.
640

641
  @param lu: the LU on behalf of which we make the check
642
  @param node: the node to check
643
  @param os_name: the OS to query about
644
  @param force_variant: whether to ignore variant errors
645
  @raise errors.OpPrereqError: if the node is not supporting the OS
646

647
  """
648
  result = lu.rpc.call_os_get(node, os_name)
649
  result.Raise("OS '%s' not in supported OS list for node %s" %
650
               (os_name, node),
651
               prereq=True, ecode=errors.ECODE_INVAL)
652
  if not force_variant:
653
    _CheckOSVariant(result.payload, os_name)
654

    
655

    
656
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
657
  """Ensure that a node has the given secondary ip.
658

659
  @type lu: L{LogicalUnit}
660
  @param lu: the LU on behalf of which we make the check
661
  @type node: string
662
  @param node: the node to check
663
  @type secondary_ip: string
664
  @param secondary_ip: the ip to check
665
  @type prereq: boolean
666
  @param prereq: whether to throw a prerequisite or an execute error
667
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
668
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
669

670
  """
671
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
672
  result.Raise("Failure checking secondary ip on node %s" % node,
673
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
674
  if not result.payload:
675
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
676
           " please fix and re-run this command" % secondary_ip)
677
    if prereq:
678
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
679
    else:
680
      raise errors.OpExecError(msg)
681

    
682

    
683
def _RequireFileStorage():
684
  """Checks that file storage is enabled.
685

686
  @raise errors.OpPrereqError: when file storage is disabled
687

688
  """
689
  if not constants.ENABLE_FILE_STORAGE:
690
    raise errors.OpPrereqError("File storage disabled at configure time",
691
                               errors.ECODE_INVAL)
692

    
693

    
694
def _CheckDiskTemplate(template):
695
  """Ensure a given disk template is valid.
696

697
  """
698
  if template not in constants.DISK_TEMPLATES:
699
    msg = ("Invalid disk template name '%s', valid templates are: %s" %
700
           (template, utils.CommaJoin(constants.DISK_TEMPLATES)))
701
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
702
  if template == constants.DT_FILE:
703
    _RequireFileStorage()
704
  return True
705

    
706

    
707
def _CheckStorageType(storage_type):
708
  """Ensure a given storage type is valid.
709

710
  """
711
  if storage_type not in constants.VALID_STORAGE_TYPES:
712
    raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
713
                               errors.ECODE_INVAL)
714
  if storage_type == constants.ST_FILE:
715
    _RequireFileStorage()
716
  return True
717

    
718

    
719
def _GetClusterDomainSecret():
720
  """Reads the cluster domain secret.
721

722
  """
723
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
724
                               strict=True)
725

    
726

    
727
def _CheckInstanceDown(lu, instance, reason):
728
  """Ensure that an instance is not running."""
729
  if instance.admin_up:
730
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
731
                               (instance.name, reason), errors.ECODE_STATE)
732

    
733
  pnode = instance.primary_node
734
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
735
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
736
              prereq=True, ecode=errors.ECODE_ENVIRON)
737

    
738
  if instance.name in ins_l.payload:
739
    raise errors.OpPrereqError("Instance %s is running, %s" %
740
                               (instance.name, reason), errors.ECODE_STATE)
741

    
742

    
743
def _ExpandItemName(fn, name, kind):
744
  """Expand an item name.
745

746
  @param fn: the function to use for expansion
747
  @param name: requested item name
748
  @param kind: text description ('Node' or 'Instance')
749
  @return: the resolved (full) name
750
  @raise errors.OpPrereqError: if the item is not found
751

752
  """
753
  full_name = fn(name)
754
  if full_name is None:
755
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
756
                               errors.ECODE_NOENT)
757
  return full_name
758

    
759

    
760
def _ExpandNodeName(cfg, name):
761
  """Wrapper over L{_ExpandItemName} for nodes."""
762
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
763

    
764

    
765
def _ExpandInstanceName(cfg, name):
766
  """Wrapper over L{_ExpandItemName} for instance."""
767
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
768

    
769

    
770
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
771
                          memory, vcpus, nics, disk_template, disks,
772
                          bep, hvp, hypervisor_name):
773
  """Builds instance related env variables for hooks
774

775
  This builds the hook environment from individual variables.
776

777
  @type name: string
778
  @param name: the name of the instance
779
  @type primary_node: string
780
  @param primary_node: the name of the instance's primary node
781
  @type secondary_nodes: list
782
  @param secondary_nodes: list of secondary nodes as strings
783
  @type os_type: string
784
  @param os_type: the name of the instance's OS
785
  @type status: boolean
786
  @param status: the should_run status of the instance
787
  @type memory: string
788
  @param memory: the memory size of the instance
789
  @type vcpus: string
790
  @param vcpus: the count of VCPUs the instance has
791
  @type nics: list
792
  @param nics: list of tuples (ip, mac, mode, link) representing
793
      the NICs the instance has
794
  @type disk_template: string
795
  @param disk_template: the disk template of the instance
796
  @type disks: list
797
  @param disks: the list of (size, mode) pairs
798
  @type bep: dict
799
  @param bep: the backend parameters for the instance
800
  @type hvp: dict
801
  @param hvp: the hypervisor parameters for the instance
802
  @type hypervisor_name: string
803
  @param hypervisor_name: the hypervisor for the instance
804
  @rtype: dict
805
  @return: the hook environment for this instance
806

807
  """
808
  if status:
809
    str_status = "up"
810
  else:
811
    str_status = "down"
812
  env = {
813
    "OP_TARGET": name,
814
    "INSTANCE_NAME": name,
815
    "INSTANCE_PRIMARY": primary_node,
816
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
817
    "INSTANCE_OS_TYPE": os_type,
818
    "INSTANCE_STATUS": str_status,
819
    "INSTANCE_MEMORY": memory,
820
    "INSTANCE_VCPUS": vcpus,
821
    "INSTANCE_DISK_TEMPLATE": disk_template,
822
    "INSTANCE_HYPERVISOR": hypervisor_name,
823
  }
824

    
825
  if nics:
826
    nic_count = len(nics)
827
    for idx, (ip, mac, mode, link) in enumerate(nics):
828
      if ip is None:
829
        ip = ""
830
      env["INSTANCE_NIC%d_IP" % idx] = ip
831
      env["INSTANCE_NIC%d_MAC" % idx] = mac
832
      env["INSTANCE_NIC%d_MODE" % idx] = mode
833
      env["INSTANCE_NIC%d_LINK" % idx] = link
834
      if mode == constants.NIC_MODE_BRIDGED:
835
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
836
  else:
837
    nic_count = 0
838

    
839
  env["INSTANCE_NIC_COUNT"] = nic_count
840

    
841
  if disks:
842
    disk_count = len(disks)
843
    for idx, (size, mode) in enumerate(disks):
844
      env["INSTANCE_DISK%d_SIZE" % idx] = size
845
      env["INSTANCE_DISK%d_MODE" % idx] = mode
846
  else:
847
    disk_count = 0
848

    
849
  env["INSTANCE_DISK_COUNT"] = disk_count
850

    
851
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
852
    for key, value in source.items():
853
      env["INSTANCE_%s_%s" % (kind, key)] = value
854

    
855
  return env
856

    
857

    
858
def _NICListToTuple(lu, nics):
859
  """Build a list of nic information tuples.
860

861
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
862
  value in LUQueryInstanceData.
863

864
  @type lu:  L{LogicalUnit}
865
  @param lu: the logical unit on whose behalf we execute
866
  @type nics: list of L{objects.NIC}
867
  @param nics: list of nics to convert to hooks tuples
868

869
  """
870
  hooks_nics = []
871
  cluster = lu.cfg.GetClusterInfo()
872
  for nic in nics:
873
    ip = nic.ip
874
    mac = nic.mac
875
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
876
    mode = filled_params[constants.NIC_MODE]
877
    link = filled_params[constants.NIC_LINK]
878
    hooks_nics.append((ip, mac, mode, link))
879
  return hooks_nics
880

    
881

    
882
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
883
  """Builds instance related env variables for hooks from an object.
884

885
  @type lu: L{LogicalUnit}
886
  @param lu: the logical unit on whose behalf we execute
887
  @type instance: L{objects.Instance}
888
  @param instance: the instance for which we should build the
889
      environment
890
  @type override: dict
891
  @param override: dictionary with key/values that will override
892
      our values
893
  @rtype: dict
894
  @return: the hook environment dictionary
895

896
  """
897
  cluster = lu.cfg.GetClusterInfo()
898
  bep = cluster.FillBE(instance)
899
  hvp = cluster.FillHV(instance)
900
  args = {
901
    'name': instance.name,
902
    'primary_node': instance.primary_node,
903
    'secondary_nodes': instance.secondary_nodes,
904
    'os_type': instance.os,
905
    'status': instance.admin_up,
906
    'memory': bep[constants.BE_MEMORY],
907
    'vcpus': bep[constants.BE_VCPUS],
908
    'nics': _NICListToTuple(lu, instance.nics),
909
    'disk_template': instance.disk_template,
910
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
911
    'bep': bep,
912
    'hvp': hvp,
913
    'hypervisor_name': instance.hypervisor,
914
  }
915
  if override:
916
    args.update(override)
917
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
918

    
919

    
920
def _AdjustCandidatePool(lu, exceptions):
921
  """Adjust the candidate pool after node operations.
922

923
  """
924
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
925
  if mod_list:
926
    lu.LogInfo("Promoted nodes to master candidate role: %s",
927
               utils.CommaJoin(node.name for node in mod_list))
928
    for name in mod_list:
929
      lu.context.ReaddNode(name)
930
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
931
  if mc_now > mc_max:
932
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
933
               (mc_now, mc_max))
934

    
935

    
936
def _DecideSelfPromotion(lu, exceptions=None):
937
  """Decide whether I should promote myself as a master candidate.
938

939
  """
940
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
941
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
942
  # the new node will increase mc_max with one, so:
943
  mc_should = min(mc_should + 1, cp_size)
944
  return mc_now < mc_should
945

    
946

    
947
def _CheckNicsBridgesExist(lu, target_nics, target_node):
948
  """Check that the brigdes needed by a list of nics exist.
949

950
  """
951
  cluster = lu.cfg.GetClusterInfo()
952
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
953
  brlist = [params[constants.NIC_LINK] for params in paramslist
954
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
955
  if brlist:
956
    result = lu.rpc.call_bridges_exist(target_node, brlist)
957
    result.Raise("Error checking bridges on destination node '%s'" %
958
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
959

    
960

    
961
def _CheckInstanceBridgesExist(lu, instance, node=None):
962
  """Check that the brigdes needed by an instance exist.
963

964
  """
965
  if node is None:
966
    node = instance.primary_node
967
  _CheckNicsBridgesExist(lu, instance.nics, node)
968

    
969

    
970
def _CheckOSVariant(os_obj, name):
971
  """Check whether an OS name conforms to the os variants specification.
972

973
  @type os_obj: L{objects.OS}
974
  @param os_obj: OS object to check
975
  @type name: string
976
  @param name: OS name passed by the user, to check for validity
977

978
  """
979
  if not os_obj.supported_variants:
980
    return
981
  variant = objects.OS.GetVariant(name)
982
  if not variant:
983
    raise errors.OpPrereqError("OS name must include a variant",
984
                               errors.ECODE_INVAL)
985

    
986
  if variant not in os_obj.supported_variants:
987
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
988

    
989

    
990
def _GetNodeInstancesInner(cfg, fn):
991
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
992

    
993

    
994
def _GetNodeInstances(cfg, node_name):
995
  """Returns a list of all primary and secondary instances on a node.
996

997
  """
998

    
999
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1000

    
1001

    
1002
def _GetNodePrimaryInstances(cfg, node_name):
1003
  """Returns primary instances on a node.
1004

1005
  """
1006
  return _GetNodeInstancesInner(cfg,
1007
                                lambda inst: node_name == inst.primary_node)
1008

    
1009

    
1010
def _GetNodeSecondaryInstances(cfg, node_name):
1011
  """Returns secondary instances on a node.
1012

1013
  """
1014
  return _GetNodeInstancesInner(cfg,
1015
                                lambda inst: node_name in inst.secondary_nodes)
1016

    
1017

    
1018
def _GetStorageTypeArgs(cfg, storage_type):
1019
  """Returns the arguments for a storage type.
1020

1021
  """
1022
  # Special case for file storage
1023
  if storage_type == constants.ST_FILE:
1024
    # storage.FileStorage wants a list of storage directories
1025
    return [[cfg.GetFileStorageDir()]]
1026

    
1027
  return []
1028

    
1029

    
1030
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
1031
  faulty = []
1032

    
1033
  for dev in instance.disks:
1034
    cfg.SetDiskID(dev, node_name)
1035

    
1036
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
1037
  result.Raise("Failed to get disk status from node %s" % node_name,
1038
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1039

    
1040
  for idx, bdev_status in enumerate(result.payload):
1041
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1042
      faulty.append(idx)
1043

    
1044
  return faulty
1045

    
1046

    
1047
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1048
  """Check the sanity of iallocator and node arguments and use the
1049
  cluster-wide iallocator if appropriate.
1050

1051
  Check that at most one of (iallocator, node) is specified. If none is
1052
  specified, then the LU's opcode's iallocator slot is filled with the
1053
  cluster-wide default iallocator.
1054

1055
  @type iallocator_slot: string
1056
  @param iallocator_slot: the name of the opcode iallocator slot
1057
  @type node_slot: string
1058
  @param node_slot: the name of the opcode target node slot
1059

1060
  """
1061
  node = getattr(lu.op, node_slot, None)
1062
  iallocator = getattr(lu.op, iallocator_slot, None)
1063

    
1064
  if node is not None and iallocator is not None:
1065
    raise errors.OpPrereqError("Do not specify both, iallocator and node.",
1066
                               errors.ECODE_INVAL)
1067
  elif node is None and iallocator is None:
1068
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1069
    if default_iallocator:
1070
      setattr(lu.op, iallocator_slot, default_iallocator)
1071
    else:
1072
      raise errors.OpPrereqError("No iallocator or node given and no"
1073
                                 " cluster-wide default iallocator found."
1074
                                 " Please specify either an iallocator or a"
1075
                                 " node, or set a cluster-wide default"
1076
                                 " iallocator.")
1077

    
1078

    
1079
class LUPostInitCluster(LogicalUnit):
1080
  """Logical unit for running hooks after cluster initialization.
1081

1082
  """
1083
  HPATH = "cluster-init"
1084
  HTYPE = constants.HTYPE_CLUSTER
1085

    
1086
  def BuildHooksEnv(self):
1087
    """Build hooks env.
1088

1089
    """
1090
    env = {"OP_TARGET": self.cfg.GetClusterName()}
1091
    mn = self.cfg.GetMasterNode()
1092
    return env, [], [mn]
1093

    
1094
  def Exec(self, feedback_fn):
1095
    """Nothing to do.
1096

1097
    """
1098
    return True
1099

    
1100

    
1101
class LUDestroyCluster(LogicalUnit):
1102
  """Logical unit for destroying the cluster.
1103

1104
  """
1105
  HPATH = "cluster-destroy"
1106
  HTYPE = constants.HTYPE_CLUSTER
1107

    
1108
  def BuildHooksEnv(self):
1109
    """Build hooks env.
1110

1111
    """
1112
    env = {"OP_TARGET": self.cfg.GetClusterName()}
1113
    return env, [], []
1114

    
1115
  def CheckPrereq(self):
1116
    """Check prerequisites.
1117

1118
    This checks whether the cluster is empty.
1119

1120
    Any errors are signaled by raising errors.OpPrereqError.
1121

1122
    """
1123
    master = self.cfg.GetMasterNode()
1124

    
1125
    nodelist = self.cfg.GetNodeList()
1126
    if len(nodelist) != 1 or nodelist[0] != master:
1127
      raise errors.OpPrereqError("There are still %d node(s) in"
1128
                                 " this cluster." % (len(nodelist) - 1),
1129
                                 errors.ECODE_INVAL)
1130
    instancelist = self.cfg.GetInstanceList()
1131
    if instancelist:
1132
      raise errors.OpPrereqError("There are still %d instance(s) in"
1133
                                 " this cluster." % len(instancelist),
1134
                                 errors.ECODE_INVAL)
1135

    
1136
  def Exec(self, feedback_fn):
1137
    """Destroys the cluster.
1138

1139
    """
1140
    master = self.cfg.GetMasterNode()
1141

    
1142
    # Run post hooks on master node before it's removed
1143
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
1144
    try:
1145
      hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
1146
    except:
1147
      # pylint: disable-msg=W0702
1148
      self.LogWarning("Errors occurred running hooks on %s" % master)
1149

    
1150
    result = self.rpc.call_node_stop_master(master, False)
1151
    result.Raise("Could not disable the master role")
1152

    
1153
    return master
1154

    
1155

    
1156
def _VerifyCertificate(filename):
1157
  """Verifies a certificate for LUVerifyCluster.
1158

1159
  @type filename: string
1160
  @param filename: Path to PEM file
1161

1162
  """
1163
  try:
1164
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1165
                                           utils.ReadFile(filename))
1166
  except Exception, err: # pylint: disable-msg=W0703
1167
    return (LUVerifyCluster.ETYPE_ERROR,
1168
            "Failed to load X509 certificate %s: %s" % (filename, err))
1169

    
1170
  (errcode, msg) = \
1171
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1172
                                constants.SSL_CERT_EXPIRATION_ERROR)
1173

    
1174
  if msg:
1175
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1176
  else:
1177
    fnamemsg = None
1178

    
1179
  if errcode is None:
1180
    return (None, fnamemsg)
1181
  elif errcode == utils.CERT_WARNING:
1182
    return (LUVerifyCluster.ETYPE_WARNING, fnamemsg)
1183
  elif errcode == utils.CERT_ERROR:
1184
    return (LUVerifyCluster.ETYPE_ERROR, fnamemsg)
1185

    
1186
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1187

    
1188

    
1189
class LUVerifyCluster(LogicalUnit):
1190
  """Verifies the cluster status.
1191

1192
  """
1193
  HPATH = "cluster-verify"
1194
  HTYPE = constants.HTYPE_CLUSTER
1195
  _OP_PARAMS = [
1196
    ("skip_checks", ht.EmptyList,
1197
     ht.TListOf(ht.TElemOf(constants.VERIFY_OPTIONAL_CHECKS))),
1198
    ("verbose", False, ht.TBool),
1199
    ("error_codes", False, ht.TBool),
1200
    ("debug_simulate_errors", False, ht.TBool),
1201
    ]
1202
  REQ_BGL = False
1203

    
1204
  TCLUSTER = "cluster"
1205
  TNODE = "node"
1206
  TINSTANCE = "instance"
1207

    
1208
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1209
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1210
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1211
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1212
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1213
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1214
  EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
1215
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1216
  ENODEDRBD = (TNODE, "ENODEDRBD")
1217
  ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
1218
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1219
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1220
  ENODEHV = (TNODE, "ENODEHV")
1221
  ENODELVM = (TNODE, "ENODELVM")
1222
  ENODEN1 = (TNODE, "ENODEN1")
1223
  ENODENET = (TNODE, "ENODENET")
1224
  ENODEOS = (TNODE, "ENODEOS")
1225
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1226
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1227
  ENODERPC = (TNODE, "ENODERPC")
1228
  ENODESSH = (TNODE, "ENODESSH")
1229
  ENODEVERSION = (TNODE, "ENODEVERSION")
1230
  ENODESETUP = (TNODE, "ENODESETUP")
1231
  ENODETIME = (TNODE, "ENODETIME")
1232

    
1233
  ETYPE_FIELD = "code"
1234
  ETYPE_ERROR = "ERROR"
1235
  ETYPE_WARNING = "WARNING"
1236

    
1237
  class NodeImage(object):
1238
    """A class representing the logical and physical status of a node.
1239

1240
    @type name: string
1241
    @ivar name: the node name to which this object refers
1242
    @ivar volumes: a structure as returned from
1243
        L{ganeti.backend.GetVolumeList} (runtime)
1244
    @ivar instances: a list of running instances (runtime)
1245
    @ivar pinst: list of configured primary instances (config)
1246
    @ivar sinst: list of configured secondary instances (config)
1247
    @ivar sbp: diction of {secondary-node: list of instances} of all peers
1248
        of this node (config)
1249
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1250
    @ivar dfree: free disk, as reported by the node (runtime)
1251
    @ivar offline: the offline status (config)
1252
    @type rpc_fail: boolean
1253
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1254
        not whether the individual keys were correct) (runtime)
1255
    @type lvm_fail: boolean
1256
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1257
    @type hyp_fail: boolean
1258
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1259
    @type ghost: boolean
1260
    @ivar ghost: whether this is a known node or not (config)
1261
    @type os_fail: boolean
1262
    @ivar os_fail: whether the RPC call didn't return valid OS data
1263
    @type oslist: list
1264
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1265
    @type vm_capable: boolean
1266
    @ivar vm_capable: whether the node can host instances
1267

1268
    """
1269
    def __init__(self, offline=False, name=None, vm_capable=True):
1270
      self.name = name
1271
      self.volumes = {}
1272
      self.instances = []
1273
      self.pinst = []
1274
      self.sinst = []
1275
      self.sbp = {}
1276
      self.mfree = 0
1277
      self.dfree = 0
1278
      self.offline = offline
1279
      self.vm_capable = vm_capable
1280
      self.rpc_fail = False
1281
      self.lvm_fail = False
1282
      self.hyp_fail = False
1283
      self.ghost = False
1284
      self.os_fail = False
1285
      self.oslist = {}
1286

    
1287
  def ExpandNames(self):
1288
    self.needed_locks = {
1289
      locking.LEVEL_NODE: locking.ALL_SET,
1290
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1291
    }
1292
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1293

    
1294
  def _Error(self, ecode, item, msg, *args, **kwargs):
1295
    """Format an error message.
1296

1297
    Based on the opcode's error_codes parameter, either format a
1298
    parseable error code, or a simpler error string.
1299

1300
    This must be called only from Exec and functions called from Exec.
1301

1302
    """
1303
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1304
    itype, etxt = ecode
1305
    # first complete the msg
1306
    if args:
1307
      msg = msg % args
1308
    # then format the whole message
1309
    if self.op.error_codes:
1310
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1311
    else:
1312
      if item:
1313
        item = " " + item
1314
      else:
1315
        item = ""
1316
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1317
    # and finally report it via the feedback_fn
1318
    self._feedback_fn("  - %s" % msg)
1319

    
1320
  def _ErrorIf(self, cond, *args, **kwargs):
1321
    """Log an error message if the passed condition is True.
1322

1323
    """
1324
    cond = bool(cond) or self.op.debug_simulate_errors
1325
    if cond:
1326
      self._Error(*args, **kwargs)
1327
    # do not mark the operation as failed for WARN cases only
1328
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1329
      self.bad = self.bad or cond
1330

    
1331
  def _VerifyNode(self, ninfo, nresult):
1332
    """Perform some basic validation on data returned from a node.
1333

1334
      - check the result data structure is well formed and has all the
1335
        mandatory fields
1336
      - check ganeti version
1337

1338
    @type ninfo: L{objects.Node}
1339
    @param ninfo: the node to check
1340
    @param nresult: the results from the node
1341
    @rtype: boolean
1342
    @return: whether overall this call was successful (and we can expect
1343
         reasonable values in the respose)
1344

1345
    """
1346
    node = ninfo.name
1347
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1348

    
1349
    # main result, nresult should be a non-empty dict
1350
    test = not nresult or not isinstance(nresult, dict)
1351
    _ErrorIf(test, self.ENODERPC, node,
1352
                  "unable to verify node: no data returned")
1353
    if test:
1354
      return False
1355

    
1356
    # compares ganeti version
1357
    local_version = constants.PROTOCOL_VERSION
1358
    remote_version = nresult.get("version", None)
1359
    test = not (remote_version and
1360
                isinstance(remote_version, (list, tuple)) and
1361
                len(remote_version) == 2)
1362
    _ErrorIf(test, self.ENODERPC, node,
1363
             "connection to node returned invalid data")
1364
    if test:
1365
      return False
1366

    
1367
    test = local_version != remote_version[0]
1368
    _ErrorIf(test, self.ENODEVERSION, node,
1369
             "incompatible protocol versions: master %s,"
1370
             " node %s", local_version, remote_version[0])
1371
    if test:
1372
      return False
1373

    
1374
    # node seems compatible, we can actually try to look into its results
1375

    
1376
    # full package version
1377
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1378
                  self.ENODEVERSION, node,
1379
                  "software version mismatch: master %s, node %s",
1380
                  constants.RELEASE_VERSION, remote_version[1],
1381
                  code=self.ETYPE_WARNING)
1382

    
1383
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1384
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1385
      for hv_name, hv_result in hyp_result.iteritems():
1386
        test = hv_result is not None
1387
        _ErrorIf(test, self.ENODEHV, node,
1388
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1389

    
1390
    test = nresult.get(constants.NV_NODESETUP,
1391
                           ["Missing NODESETUP results"])
1392
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1393
             "; ".join(test))
1394

    
1395
    return True
1396

    
1397
  def _VerifyNodeTime(self, ninfo, nresult,
1398
                      nvinfo_starttime, nvinfo_endtime):
1399
    """Check the node time.
1400

1401
    @type ninfo: L{objects.Node}
1402
    @param ninfo: the node to check
1403
    @param nresult: the remote results for the node
1404
    @param nvinfo_starttime: the start time of the RPC call
1405
    @param nvinfo_endtime: the end time of the RPC call
1406

1407
    """
1408
    node = ninfo.name
1409
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1410

    
1411
    ntime = nresult.get(constants.NV_TIME, None)
1412
    try:
1413
      ntime_merged = utils.MergeTime(ntime)
1414
    except (ValueError, TypeError):
1415
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1416
      return
1417

    
1418
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1419
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1420
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1421
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1422
    else:
1423
      ntime_diff = None
1424

    
1425
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1426
             "Node time diverges by at least %s from master node time",
1427
             ntime_diff)
1428

    
1429
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1430
    """Check the node time.
1431

1432
    @type ninfo: L{objects.Node}
1433
    @param ninfo: the node to check
1434
    @param nresult: the remote results for the node
1435
    @param vg_name: the configured VG name
1436

1437
    """
1438
    if vg_name is None:
1439
      return
1440

    
1441
    node = ninfo.name
1442
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1443

    
1444
    # checks vg existence and size > 20G
1445
    vglist = nresult.get(constants.NV_VGLIST, None)
1446
    test = not vglist
1447
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1448
    if not test:
1449
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1450
                                            constants.MIN_VG_SIZE)
1451
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1452

    
1453
    # check pv names
1454
    pvlist = nresult.get(constants.NV_PVLIST, None)
1455
    test = pvlist is None
1456
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1457
    if not test:
1458
      # check that ':' is not present in PV names, since it's a
1459
      # special character for lvcreate (denotes the range of PEs to
1460
      # use on the PV)
1461
      for _, pvname, owner_vg in pvlist:
1462
        test = ":" in pvname
1463
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1464
                 " '%s' of VG '%s'", pvname, owner_vg)
1465

    
1466
  def _VerifyNodeNetwork(self, ninfo, nresult):
1467
    """Check the node time.
1468

1469
    @type ninfo: L{objects.Node}
1470
    @param ninfo: the node to check
1471
    @param nresult: the remote results for the node
1472

1473
    """
1474
    node = ninfo.name
1475
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1476

    
1477
    test = constants.NV_NODELIST not in nresult
1478
    _ErrorIf(test, self.ENODESSH, node,
1479
             "node hasn't returned node ssh connectivity data")
1480
    if not test:
1481
      if nresult[constants.NV_NODELIST]:
1482
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1483
          _ErrorIf(True, self.ENODESSH, node,
1484
                   "ssh communication with node '%s': %s", a_node, a_msg)
1485

    
1486
    test = constants.NV_NODENETTEST not in nresult
1487
    _ErrorIf(test, self.ENODENET, node,
1488
             "node hasn't returned node tcp connectivity data")
1489
    if not test:
1490
      if nresult[constants.NV_NODENETTEST]:
1491
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1492
        for anode in nlist:
1493
          _ErrorIf(True, self.ENODENET, node,
1494
                   "tcp communication with node '%s': %s",
1495
                   anode, nresult[constants.NV_NODENETTEST][anode])
1496

    
1497
    test = constants.NV_MASTERIP not in nresult
1498
    _ErrorIf(test, self.ENODENET, node,
1499
             "node hasn't returned node master IP reachability data")
1500
    if not test:
1501
      if not nresult[constants.NV_MASTERIP]:
1502
        if node == self.master_node:
1503
          msg = "the master node cannot reach the master IP (not configured?)"
1504
        else:
1505
          msg = "cannot reach the master IP"
1506
        _ErrorIf(True, self.ENODENET, node, msg)
1507

    
1508
  def _VerifyInstance(self, instance, instanceconfig, node_image,
1509
                      diskstatus):
1510
    """Verify an instance.
1511

1512
    This function checks to see if the required block devices are
1513
    available on the instance's node.
1514

1515
    """
1516
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1517
    node_current = instanceconfig.primary_node
1518

    
1519
    node_vol_should = {}
1520
    instanceconfig.MapLVsByNode(node_vol_should)
1521

    
1522
    for node in node_vol_should:
1523
      n_img = node_image[node]
1524
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1525
        # ignore missing volumes on offline or broken nodes
1526
        continue
1527
      for volume in node_vol_should[node]:
1528
        test = volume not in n_img.volumes
1529
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1530
                 "volume %s missing on node %s", volume, node)
1531

    
1532
    if instanceconfig.admin_up:
1533
      pri_img = node_image[node_current]
1534
      test = instance not in pri_img.instances and not pri_img.offline
1535
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1536
               "instance not running on its primary node %s",
1537
               node_current)
1538

    
1539
    for node, n_img in node_image.items():
1540
      if (not node == node_current):
1541
        test = instance in n_img.instances
1542
        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1543
                 "instance should not run on node %s", node)
1544

    
1545
    diskdata = [(nname, success, status, idx)
1546
                for (nname, disks) in diskstatus.items()
1547
                for idx, (success, status) in enumerate(disks)]
1548

    
1549
    for nname, success, bdev_status, idx in diskdata:
1550
      _ErrorIf(instanceconfig.admin_up and not success,
1551
               self.EINSTANCEFAULTYDISK, instance,
1552
               "couldn't retrieve status for disk/%s on %s: %s",
1553
               idx, nname, bdev_status)
1554
      _ErrorIf((instanceconfig.admin_up and success and
1555
                bdev_status.ldisk_status == constants.LDS_FAULTY),
1556
               self.EINSTANCEFAULTYDISK, instance,
1557
               "disk/%s on %s is faulty", idx, nname)
1558

    
1559
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1560
    """Verify if there are any unknown volumes in the cluster.
1561

1562
    The .os, .swap and backup volumes are ignored. All other volumes are
1563
    reported as unknown.
1564

1565
    @type reserved: L{ganeti.utils.FieldSet}
1566
    @param reserved: a FieldSet of reserved volume names
1567

1568
    """
1569
    for node, n_img in node_image.items():
1570
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1571
        # skip non-healthy nodes
1572
        continue
1573
      for volume in n_img.volumes:
1574
        test = ((node not in node_vol_should or
1575
                volume not in node_vol_should[node]) and
1576
                not reserved.Matches(volume))
1577
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1578
                      "volume %s is unknown", volume)
1579

    
1580
  def _VerifyOrphanInstances(self, instancelist, node_image):
1581
    """Verify the list of running instances.
1582

1583
    This checks what instances are running but unknown to the cluster.
1584

1585
    """
1586
    for node, n_img in node_image.items():
1587
      for o_inst in n_img.instances:
1588
        test = o_inst not in instancelist
1589
        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1590
                      "instance %s on node %s should not exist", o_inst, node)
1591

    
1592
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1593
    """Verify N+1 Memory Resilience.
1594

1595
    Check that if one single node dies we can still start all the
1596
    instances it was primary for.
1597

1598
    """
1599
    for node, n_img in node_image.items():
1600
      # This code checks that every node which is now listed as
1601
      # secondary has enough memory to host all instances it is
1602
      # supposed to should a single other node in the cluster fail.
1603
      # FIXME: not ready for failover to an arbitrary node
1604
      # FIXME: does not support file-backed instances
1605
      # WARNING: we currently take into account down instances as well
1606
      # as up ones, considering that even if they're down someone
1607
      # might want to start them even in the event of a node failure.
1608
      for prinode, instances in n_img.sbp.items():
1609
        needed_mem = 0
1610
        for instance in instances:
1611
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1612
          if bep[constants.BE_AUTO_BALANCE]:
1613
            needed_mem += bep[constants.BE_MEMORY]
1614
        test = n_img.mfree < needed_mem
1615
        self._ErrorIf(test, self.ENODEN1, node,
1616
                      "not enough memory on to accommodate"
1617
                      " failovers should peer node %s fail", prinode)
1618

    
1619
  def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum,
1620
                       master_files):
1621
    """Verifies and computes the node required file checksums.
1622

1623
    @type ninfo: L{objects.Node}
1624
    @param ninfo: the node to check
1625
    @param nresult: the remote results for the node
1626
    @param file_list: required list of files
1627
    @param local_cksum: dictionary of local files and their checksums
1628
    @param master_files: list of files that only masters should have
1629

1630
    """
1631
    node = ninfo.name
1632
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1633

    
1634
    remote_cksum = nresult.get(constants.NV_FILELIST, None)
1635
    test = not isinstance(remote_cksum, dict)
1636
    _ErrorIf(test, self.ENODEFILECHECK, node,
1637
             "node hasn't returned file checksum data")
1638
    if test:
1639
      return
1640

    
1641
    for file_name in file_list:
1642
      node_is_mc = ninfo.master_candidate
1643
      must_have = (file_name not in master_files) or node_is_mc
1644
      # missing
1645
      test1 = file_name not in remote_cksum
1646
      # invalid checksum
1647
      test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1648
      # existing and good
1649
      test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1650
      _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1651
               "file '%s' missing", file_name)
1652
      _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1653
               "file '%s' has wrong checksum", file_name)
1654
      # not candidate and this is not a must-have file
1655
      _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1656
               "file '%s' should not exist on non master"
1657
               " candidates (and the file is outdated)", file_name)
1658
      # all good, except non-master/non-must have combination
1659
      _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1660
               "file '%s' should not exist"
1661
               " on non master candidates", file_name)
1662

    
1663
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
1664
                      drbd_map):
1665
    """Verifies and the node DRBD status.
1666

1667
    @type ninfo: L{objects.Node}
1668
    @param ninfo: the node to check
1669
    @param nresult: the remote results for the node
1670
    @param instanceinfo: the dict of instances
1671
    @param drbd_helper: the configured DRBD usermode helper
1672
    @param drbd_map: the DRBD map as returned by
1673
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
1674

1675
    """
1676
    node = ninfo.name
1677
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1678

    
1679
    if drbd_helper:
1680
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
1681
      test = (helper_result == None)
1682
      _ErrorIf(test, self.ENODEDRBDHELPER, node,
1683
               "no drbd usermode helper returned")
1684
      if helper_result:
1685
        status, payload = helper_result
1686
        test = not status
1687
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
1688
                 "drbd usermode helper check unsuccessful: %s", payload)
1689
        test = status and (payload != drbd_helper)
1690
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
1691
                 "wrong drbd usermode helper: %s", payload)
1692

    
1693
    # compute the DRBD minors
1694
    node_drbd = {}
1695
    for minor, instance in drbd_map[node].items():
1696
      test = instance not in instanceinfo
1697
      _ErrorIf(test, self.ECLUSTERCFG, None,
1698
               "ghost instance '%s' in temporary DRBD map", instance)
1699
        # ghost instance should not be running, but otherwise we
1700
        # don't give double warnings (both ghost instance and
1701
        # unallocated minor in use)
1702
      if test:
1703
        node_drbd[minor] = (instance, False)
1704
      else:
1705
        instance = instanceinfo[instance]
1706
        node_drbd[minor] = (instance.name, instance.admin_up)
1707

    
1708
    # and now check them
1709
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
1710
    test = not isinstance(used_minors, (tuple, list))
1711
    _ErrorIf(test, self.ENODEDRBD, node,
1712
             "cannot parse drbd status file: %s", str(used_minors))
1713
    if test:
1714
      # we cannot check drbd status
1715
      return
1716

    
1717
    for minor, (iname, must_exist) in node_drbd.items():
1718
      test = minor not in used_minors and must_exist
1719
      _ErrorIf(test, self.ENODEDRBD, node,
1720
               "drbd minor %d of instance %s is not active", minor, iname)
1721
    for minor in used_minors:
1722
      test = minor not in node_drbd
1723
      _ErrorIf(test, self.ENODEDRBD, node,
1724
               "unallocated drbd minor %d is in use", minor)
1725

    
1726
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
1727
    """Builds the node OS structures.
1728

1729
    @type ninfo: L{objects.Node}
1730
    @param ninfo: the node to check
1731
    @param nresult: the remote results for the node
1732
    @param nimg: the node image object
1733

1734
    """
1735
    node = ninfo.name
1736
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1737

    
1738
    remote_os = nresult.get(constants.NV_OSLIST, None)
1739
    test = (not isinstance(remote_os, list) or
1740
            not compat.all(isinstance(v, list) and len(v) == 7
1741
                           for v in remote_os))
1742

    
1743
    _ErrorIf(test, self.ENODEOS, node,
1744
             "node hasn't returned valid OS data")
1745

    
1746
    nimg.os_fail = test
1747

    
1748
    if test:
1749
      return
1750

    
1751
    os_dict = {}
1752

    
1753
    for (name, os_path, status, diagnose,
1754
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
1755

    
1756
      if name not in os_dict:
1757
        os_dict[name] = []
1758

    
1759
      # parameters is a list of lists instead of list of tuples due to
1760
      # JSON lacking a real tuple type, fix it:
1761
      parameters = [tuple(v) for v in parameters]
1762
      os_dict[name].append((os_path, status, diagnose,
1763
                            set(variants), set(parameters), set(api_ver)))
1764

    
1765
    nimg.oslist = os_dict
1766

    
1767
  def _VerifyNodeOS(self, ninfo, nimg, base):
1768
    """Verifies the node OS list.
1769

1770
    @type ninfo: L{objects.Node}
1771
    @param ninfo: the node to check
1772
    @param nimg: the node image object
1773
    @param base: the 'template' node we match against (e.g. from the master)
1774

1775
    """
1776
    node = ninfo.name
1777
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1778

    
1779
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
1780

    
1781
    for os_name, os_data in nimg.oslist.items():
1782
      assert os_data, "Empty OS status for OS %s?!" % os_name
1783
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
1784
      _ErrorIf(not f_status, self.ENODEOS, node,
1785
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
1786
      _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
1787
               "OS '%s' has multiple entries (first one shadows the rest): %s",
1788
               os_name, utils.CommaJoin([v[0] for v in os_data]))
1789
      # this will catched in backend too
1790
      _ErrorIf(compat.any(v >= constants.OS_API_V15 for v in f_api)
1791
               and not f_var, self.ENODEOS, node,
1792
               "OS %s with API at least %d does not declare any variant",
1793
               os_name, constants.OS_API_V15)
1794
      # comparisons with the 'base' image
1795
      test = os_name not in base.oslist
1796
      _ErrorIf(test, self.ENODEOS, node,
1797
               "Extra OS %s not present on reference node (%s)",
1798
               os_name, base.name)
1799
      if test:
1800
        continue
1801
      assert base.oslist[os_name], "Base node has empty OS status?"
1802
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
1803
      if not b_status:
1804
        # base OS is invalid, skipping
1805
        continue
1806
      for kind, a, b in [("API version", f_api, b_api),
1807
                         ("variants list", f_var, b_var),
1808
                         ("parameters", f_param, b_param)]:
1809
        _ErrorIf(a != b, self.ENODEOS, node,
1810
                 "OS %s %s differs from reference node %s: %s vs. %s",
1811
                 kind, os_name, base.name,
1812
                 utils.CommaJoin(a), utils.CommaJoin(b))
1813

    
1814
    # check any missing OSes
1815
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
1816
    _ErrorIf(missing, self.ENODEOS, node,
1817
             "OSes present on reference node %s but missing on this node: %s",
1818
             base.name, utils.CommaJoin(missing))
1819

    
1820
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
1821
    """Verifies and updates the node volume data.
1822

1823
    This function will update a L{NodeImage}'s internal structures
1824
    with data from the remote call.
1825

1826
    @type ninfo: L{objects.Node}
1827
    @param ninfo: the node to check
1828
    @param nresult: the remote results for the node
1829
    @param nimg: the node image object
1830
    @param vg_name: the configured VG name
1831

1832
    """
1833
    node = ninfo.name
1834
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1835

    
1836
    nimg.lvm_fail = True
1837
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1838
    if vg_name is None:
1839
      pass
1840
    elif isinstance(lvdata, basestring):
1841
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1842
               utils.SafeEncode(lvdata))
1843
    elif not isinstance(lvdata, dict):
1844
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1845
    else:
1846
      nimg.volumes = lvdata
1847
      nimg.lvm_fail = False
1848

    
1849
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
1850
    """Verifies and updates the node instance list.
1851

1852
    If the listing was successful, then updates this node's instance
1853
    list. Otherwise, it marks the RPC call as failed for the instance
1854
    list key.
1855

1856
    @type ninfo: L{objects.Node}
1857
    @param ninfo: the node to check
1858
    @param nresult: the remote results for the node
1859
    @param nimg: the node image object
1860

1861
    """
1862
    idata = nresult.get(constants.NV_INSTANCELIST, None)
1863
    test = not isinstance(idata, list)
1864
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
1865
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
1866
    if test:
1867
      nimg.hyp_fail = True
1868
    else:
1869
      nimg.instances = idata
1870

    
1871
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
1872
    """Verifies and computes a node information map
1873

1874
    @type ninfo: L{objects.Node}
1875
    @param ninfo: the node to check
1876
    @param nresult: the remote results for the node
1877
    @param nimg: the node image object
1878
    @param vg_name: the configured VG name
1879

1880
    """
1881
    node = ninfo.name
1882
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1883

    
1884
    # try to read free memory (from the hypervisor)
1885
    hv_info = nresult.get(constants.NV_HVINFO, None)
1886
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
1887
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1888
    if not test:
1889
      try:
1890
        nimg.mfree = int(hv_info["memory_free"])
1891
      except (ValueError, TypeError):
1892
        _ErrorIf(True, self.ENODERPC, node,
1893
                 "node returned invalid nodeinfo, check hypervisor")
1894

    
1895
    # FIXME: devise a free space model for file based instances as well
1896
    if vg_name is not None:
1897
      test = (constants.NV_VGLIST not in nresult or
1898
              vg_name not in nresult[constants.NV_VGLIST])
1899
      _ErrorIf(test, self.ENODELVM, node,
1900
               "node didn't return data for the volume group '%s'"
1901
               " - it is either missing or broken", vg_name)
1902
      if not test:
1903
        try:
1904
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
1905
        except (ValueError, TypeError):
1906
          _ErrorIf(True, self.ENODERPC, node,
1907
                   "node returned invalid LVM info, check LVM status")
1908

    
1909
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
1910
    """Gets per-disk status information for all instances.
1911

1912
    @type nodelist: list of strings
1913
    @param nodelist: Node names
1914
    @type node_image: dict of (name, L{objects.Node})
1915
    @param node_image: Node objects
1916
    @type instanceinfo: dict of (name, L{objects.Instance})
1917
    @param instanceinfo: Instance objects
1918
    @rtype: {instance: {node: [(succes, payload)]}}
1919
    @return: a dictionary of per-instance dictionaries with nodes as
1920
        keys and disk information as values; the disk information is a
1921
        list of tuples (success, payload)
1922

1923
    """
1924
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1925

    
1926
    node_disks = {}
1927
    node_disks_devonly = {}
1928
    diskless_instances = set()
1929
    diskless = constants.DT_DISKLESS
1930

    
1931
    for nname in nodelist:
1932
      node_instances = list(itertools.chain(node_image[nname].pinst,
1933
                                            node_image[nname].sinst))
1934
      diskless_instances.update(inst for inst in node_instances
1935
                                if instanceinfo[inst].disk_template == diskless)
1936
      disks = [(inst, disk)
1937
               for inst in node_instances
1938
               for disk in instanceinfo[inst].disks]
1939

    
1940
      if not disks:
1941
        # No need to collect data
1942
        continue
1943

    
1944
      node_disks[nname] = disks
1945

    
1946
      # Creating copies as SetDiskID below will modify the objects and that can
1947
      # lead to incorrect data returned from nodes
1948
      devonly = [dev.Copy() for (_, dev) in disks]
1949

    
1950
      for dev in devonly:
1951
        self.cfg.SetDiskID(dev, nname)
1952

    
1953
      node_disks_devonly[nname] = devonly
1954

    
1955
    assert len(node_disks) == len(node_disks_devonly)
1956

    
1957
    # Collect data from all nodes with disks
1958
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
1959
                                                          node_disks_devonly)
1960

    
1961
    assert len(result) == len(node_disks)
1962

    
1963
    instdisk = {}
1964

    
1965
    for (nname, nres) in result.items():
1966
      disks = node_disks[nname]
1967

    
1968
      if nres.offline:
1969
        # No data from this node
1970
        data = len(disks) * [(False, "node offline")]
1971
      else:
1972
        msg = nres.fail_msg
1973
        _ErrorIf(msg, self.ENODERPC, nname,
1974
                 "while getting disk information: %s", msg)
1975
        if msg:
1976
          # No data from this node
1977
          data = len(disks) * [(False, msg)]
1978
        else:
1979
          data = []
1980
          for idx, i in enumerate(nres.payload):
1981
            if isinstance(i, (tuple, list)) and len(i) == 2:
1982
              data.append(i)
1983
            else:
1984
              logging.warning("Invalid result from node %s, entry %d: %s",
1985
                              nname, idx, i)
1986
              data.append((False, "Invalid result from the remote node"))
1987

    
1988
      for ((inst, _), status) in zip(disks, data):
1989
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
1990

    
1991
    # Add empty entries for diskless instances.
1992
    for inst in diskless_instances:
1993
      assert inst not in instdisk
1994
      instdisk[inst] = {}
1995

    
1996
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
1997
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
1998
                      compat.all(isinstance(s, (tuple, list)) and
1999
                                 len(s) == 2 for s in statuses)
2000
                      for inst, nnames in instdisk.items()
2001
                      for nname, statuses in nnames.items())
2002
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2003

    
2004
    return instdisk
2005

    
2006
  def BuildHooksEnv(self):
2007
    """Build hooks env.
2008

2009
    Cluster-Verify hooks just ran in the post phase and their failure makes
2010
    the output be logged in the verify output and the verification to fail.
2011

2012
    """
2013
    all_nodes = self.cfg.GetNodeList()
2014
    env = {
2015
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2016
      }
2017
    for node in self.cfg.GetAllNodesInfo().values():
2018
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
2019

    
2020
    return env, [], all_nodes
2021

    
2022
  def Exec(self, feedback_fn):
2023
    """Verify integrity of cluster, performing various test on nodes.
2024

2025
    """
2026
    self.bad = False
2027
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2028
    verbose = self.op.verbose
2029
    self._feedback_fn = feedback_fn
2030
    feedback_fn("* Verifying global settings")
2031
    for msg in self.cfg.VerifyConfig():
2032
      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
2033

    
2034
    # Check the cluster certificates
2035
    for cert_filename in constants.ALL_CERT_FILES:
2036
      (errcode, msg) = _VerifyCertificate(cert_filename)
2037
      _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
2038

    
2039
    vg_name = self.cfg.GetVGName()
2040
    drbd_helper = self.cfg.GetDRBDHelper()
2041
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2042
    cluster = self.cfg.GetClusterInfo()
2043
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
2044
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
2045
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
2046
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
2047
                        for iname in instancelist)
2048
    i_non_redundant = [] # Non redundant instances
2049
    i_non_a_balanced = [] # Non auto-balanced instances
2050
    n_offline = 0 # Count of offline nodes
2051
    n_drained = 0 # Count of nodes being drained
2052
    node_vol_should = {}
2053

    
2054
    # FIXME: verify OS list
2055
    # do local checksums
2056
    master_files = [constants.CLUSTER_CONF_FILE]
2057
    master_node = self.master_node = self.cfg.GetMasterNode()
2058
    master_ip = self.cfg.GetMasterIP()
2059

    
2060
    file_names = ssconf.SimpleStore().GetFileList()
2061
    file_names.extend(constants.ALL_CERT_FILES)
2062
    file_names.extend(master_files)
2063
    if cluster.modify_etc_hosts:
2064
      file_names.append(constants.ETC_HOSTS)
2065

    
2066
    local_checksums = utils.FingerprintFiles(file_names)
2067

    
2068
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
2069
    node_verify_param = {
2070
      constants.NV_FILELIST: file_names,
2071
      constants.NV_NODELIST: [node.name for node in nodeinfo
2072
                              if not node.offline],
2073
      constants.NV_HYPERVISOR: hypervisors,
2074
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
2075
                                  node.secondary_ip) for node in nodeinfo
2076
                                 if not node.offline],
2077
      constants.NV_INSTANCELIST: hypervisors,
2078
      constants.NV_VERSION: None,
2079
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2080
      constants.NV_NODESETUP: None,
2081
      constants.NV_TIME: None,
2082
      constants.NV_MASTERIP: (master_node, master_ip),
2083
      constants.NV_OSLIST: None,
2084
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2085
      }
2086

    
2087
    if vg_name is not None:
2088
      node_verify_param[constants.NV_VGLIST] = None
2089
      node_verify_param[constants.NV_LVLIST] = vg_name
2090
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2091
      node_verify_param[constants.NV_DRBDLIST] = None
2092

    
2093
    if drbd_helper:
2094
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2095

    
2096
    # Build our expected cluster state
2097
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2098
                                                 name=node.name,
2099
                                                 vm_capable=node.vm_capable))
2100
                      for node in nodeinfo)
2101

    
2102
    for instance in instancelist:
2103
      inst_config = instanceinfo[instance]
2104

    
2105
      for nname in inst_config.all_nodes:
2106
        if nname not in node_image:
2107
          # ghost node
2108
          gnode = self.NodeImage(name=nname)
2109
          gnode.ghost = True
2110
          node_image[nname] = gnode
2111

    
2112
      inst_config.MapLVsByNode(node_vol_should)
2113

    
2114
      pnode = inst_config.primary_node
2115
      node_image[pnode].pinst.append(instance)
2116

    
2117
      for snode in inst_config.secondary_nodes:
2118
        nimg = node_image[snode]
2119
        nimg.sinst.append(instance)
2120
        if pnode not in nimg.sbp:
2121
          nimg.sbp[pnode] = []
2122
        nimg.sbp[pnode].append(instance)
2123

    
2124
    # At this point, we have the in-memory data structures complete,
2125
    # except for the runtime information, which we'll gather next
2126

    
2127
    # Due to the way our RPC system works, exact response times cannot be
2128
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2129
    # time before and after executing the request, we can at least have a time
2130
    # window.
2131
    nvinfo_starttime = time.time()
2132
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
2133
                                           self.cfg.GetClusterName())
2134
    nvinfo_endtime = time.time()
2135

    
2136
    all_drbd_map = self.cfg.ComputeDRBDMap()
2137

    
2138
    feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist))
2139
    instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo)
2140

    
2141
    feedback_fn("* Verifying node status")
2142

    
2143
    refos_img = None
2144

    
2145
    for node_i in nodeinfo:
2146
      node = node_i.name
2147
      nimg = node_image[node]
2148

    
2149
      if node_i.offline:
2150
        if verbose:
2151
          feedback_fn("* Skipping offline node %s" % (node,))
2152
        n_offline += 1
2153
        continue
2154

    
2155
      if node == master_node:
2156
        ntype = "master"
2157
      elif node_i.master_candidate:
2158
        ntype = "master candidate"
2159
      elif node_i.drained:
2160
        ntype = "drained"
2161
        n_drained += 1
2162
      else:
2163
        ntype = "regular"
2164
      if verbose:
2165
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2166

    
2167
      msg = all_nvinfo[node].fail_msg
2168
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
2169
      if msg:
2170
        nimg.rpc_fail = True
2171
        continue
2172

    
2173
      nresult = all_nvinfo[node].payload
2174

    
2175
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2176
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2177
      self._VerifyNodeNetwork(node_i, nresult)
2178
      self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums,
2179
                            master_files)
2180

    
2181
      if nimg.vm_capable:
2182
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2183
        self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper,
2184
                             all_drbd_map)
2185

    
2186
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2187
        self._UpdateNodeInstances(node_i, nresult, nimg)
2188
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2189
        self._UpdateNodeOS(node_i, nresult, nimg)
2190
        if not nimg.os_fail:
2191
          if refos_img is None:
2192
            refos_img = nimg
2193
          self._VerifyNodeOS(node_i, nimg, refos_img)
2194

    
2195
    feedback_fn("* Verifying instance status")
2196
    for instance in instancelist:
2197
      if verbose:
2198
        feedback_fn("* Verifying instance %s" % instance)
2199
      inst_config = instanceinfo[instance]
2200
      self._VerifyInstance(instance, inst_config, node_image,
2201
                           instdisk[instance])
2202
      inst_nodes_offline = []
2203

    
2204
      pnode = inst_config.primary_node
2205
      pnode_img = node_image[pnode]
2206
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2207
               self.ENODERPC, pnode, "instance %s, connection to"
2208
               " primary node failed", instance)
2209

    
2210
      if pnode_img.offline:
2211
        inst_nodes_offline.append(pnode)
2212

    
2213
      # If the instance is non-redundant we cannot survive losing its primary
2214
      # node, so we are not N+1 compliant. On the other hand we have no disk
2215
      # templates with more than one secondary so that situation is not well
2216
      # supported either.
2217
      # FIXME: does not support file-backed instances
2218
      if not inst_config.secondary_nodes:
2219
        i_non_redundant.append(instance)
2220
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
2221
               instance, "instance has multiple secondary nodes: %s",
2222
               utils.CommaJoin(inst_config.secondary_nodes),
2223
               code=self.ETYPE_WARNING)
2224

    
2225
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2226
        i_non_a_balanced.append(instance)
2227

    
2228
      for snode in inst_config.secondary_nodes:
2229
        s_img = node_image[snode]
2230
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
2231
                 "instance %s, connection to secondary node failed", instance)
2232

    
2233
        if s_img.offline:
2234
          inst_nodes_offline.append(snode)
2235

    
2236
      # warn that the instance lives on offline nodes
2237
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
2238
               "instance lives on offline node(s) %s",
2239
               utils.CommaJoin(inst_nodes_offline))
2240
      # ... or ghost/non-vm_capable nodes
2241
      for node in inst_config.all_nodes:
2242
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
2243
                 "instance lives on ghost node %s", node)
2244
        _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE,
2245
                 instance, "instance lives on non-vm_capable node %s", node)
2246

    
2247
    feedback_fn("* Verifying orphan volumes")
2248
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2249
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2250

    
2251
    feedback_fn("* Verifying orphan instances")
2252
    self._VerifyOrphanInstances(instancelist, node_image)
2253

    
2254
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2255
      feedback_fn("* Verifying N+1 Memory redundancy")
2256
      self._VerifyNPlusOneMemory(node_image, instanceinfo)
2257

    
2258
    feedback_fn("* Other Notes")
2259
    if i_non_redundant:
2260
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2261
                  % len(i_non_redundant))
2262

    
2263
    if i_non_a_balanced:
2264
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2265
                  % len(i_non_a_balanced))
2266

    
2267
    if n_offline:
2268
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2269

    
2270
    if n_drained:
2271
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2272

    
2273
    return not self.bad
2274

    
2275
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2276
    """Analyze the post-hooks' result
2277

2278
    This method analyses the hook result, handles it, and sends some
2279
    nicely-formatted feedback back to the user.
2280

2281
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2282
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2283
    @param hooks_results: the results of the multi-node hooks rpc call
2284
    @param feedback_fn: function used send feedback back to the caller
2285
    @param lu_result: previous Exec result
2286
    @return: the new Exec result, based on the previous result
2287
        and hook results
2288

2289
    """
2290
    # We only really run POST phase hooks, and are only interested in
2291
    # their results
2292
    if phase == constants.HOOKS_PHASE_POST:
2293
      # Used to change hooks' output to proper indentation
2294
      indent_re = re.compile('^', re.M)
2295
      feedback_fn("* Hooks Results")
2296
      assert hooks_results, "invalid result from hooks"
2297

    
2298
      for node_name in hooks_results:
2299
        res = hooks_results[node_name]
2300
        msg = res.fail_msg
2301
        test = msg and not res.offline
2302
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
2303
                      "Communication failure in hooks execution: %s", msg)
2304
        if res.offline or msg:
2305
          # No need to investigate payload if node is offline or gave an error.
2306
          # override manually lu_result here as _ErrorIf only
2307
          # overrides self.bad
2308
          lu_result = 1
2309
          continue
2310
        for script, hkr, output in res.payload:
2311
          test = hkr == constants.HKR_FAIL
2312
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
2313
                        "Script %s failed, output:", script)
2314
          if test:
2315
            output = indent_re.sub('      ', output)
2316
            feedback_fn("%s" % output)
2317
            lu_result = 0
2318

    
2319
      return lu_result
2320

    
2321

    
2322
class LUVerifyDisks(NoHooksLU):
2323
  """Verifies the cluster disks status.
2324

2325
  """
2326
  REQ_BGL = False
2327

    
2328
  def ExpandNames(self):
2329
    self.needed_locks = {
2330
      locking.LEVEL_NODE: locking.ALL_SET,
2331
      locking.LEVEL_INSTANCE: locking.ALL_SET,
2332
    }
2333
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
2334

    
2335
  def Exec(self, feedback_fn):
2336
    """Verify integrity of cluster disks.
2337

2338
    @rtype: tuple of three items
2339
    @return: a tuple of (dict of node-to-node_error, list of instances
2340
        which need activate-disks, dict of instance: (node, volume) for
2341
        missing volumes
2342

2343
    """
2344
    result = res_nodes, res_instances, res_missing = {}, [], {}
2345

    
2346
    vg_name = self.cfg.GetVGName()
2347
    nodes = utils.NiceSort(self.cfg.GetNodeList())
2348
    instances = [self.cfg.GetInstanceInfo(name)
2349
                 for name in self.cfg.GetInstanceList()]
2350

    
2351
    nv_dict = {}
2352
    for inst in instances:
2353
      inst_lvs = {}
2354
      if (not inst.admin_up or
2355
          inst.disk_template not in constants.DTS_NET_MIRROR):
2356
        continue
2357
      inst.MapLVsByNode(inst_lvs)
2358
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
2359
      for node, vol_list in inst_lvs.iteritems():
2360
        for vol in vol_list:
2361
          nv_dict[(node, vol)] = inst
2362

    
2363
    if not nv_dict:
2364
      return result
2365

    
2366
    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
2367

    
2368
    for node in nodes:
2369
      # node_volume
2370
      node_res = node_lvs[node]
2371
      if node_res.offline:
2372
        continue
2373
      msg = node_res.fail_msg
2374
      if msg:
2375
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
2376
        res_nodes[node] = msg
2377
        continue
2378

    
2379
      lvs = node_res.payload
2380
      for lv_name, (_, _, lv_online) in lvs.items():
2381
        inst = nv_dict.pop((node, lv_name), None)
2382
        if (not lv_online and inst is not None
2383
            and inst.name not in res_instances):
2384
          res_instances.append(inst.name)
2385

    
2386
    # any leftover items in nv_dict are missing LVs, let's arrange the
2387
    # data better
2388
    for key, inst in nv_dict.iteritems():
2389
      if inst.name not in res_missing:
2390
        res_missing[inst.name] = []
2391
      res_missing[inst.name].append(key)
2392

    
2393
    return result
2394

    
2395

    
2396
class LURepairDiskSizes(NoHooksLU):
2397
  """Verifies the cluster disks sizes.
2398

2399
  """
2400
  _OP_PARAMS = [("instances", ht.EmptyList, ht.TListOf(ht.TNonEmptyString))]
2401
  REQ_BGL = False
2402

    
2403
  def ExpandNames(self):
2404
    if self.op.instances:
2405
      self.wanted_names = []
2406
      for name in self.op.instances:
2407
        full_name = _ExpandInstanceName(self.cfg, name)
2408
        self.wanted_names.append(full_name)
2409
      self.needed_locks = {
2410
        locking.LEVEL_NODE: [],
2411
        locking.LEVEL_INSTANCE: self.wanted_names,
2412
        }
2413
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2414
    else:
2415
      self.wanted_names = None
2416
      self.needed_locks = {
2417
        locking.LEVEL_NODE: locking.ALL_SET,
2418
        locking.LEVEL_INSTANCE: locking.ALL_SET,
2419
        }
2420
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
2421

    
2422
  def DeclareLocks(self, level):
2423
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
2424
      self._LockInstancesNodes(primary_only=True)
2425

    
2426
  def CheckPrereq(self):
2427
    """Check prerequisites.
2428

2429
    This only checks the optional instance list against the existing names.
2430

2431
    """
2432
    if self.wanted_names is None:
2433
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2434

    
2435
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
2436
                             in self.wanted_names]
2437

    
2438
  def _EnsureChildSizes(self, disk):
2439
    """Ensure children of the disk have the needed disk size.
2440

2441
    This is valid mainly for DRBD8 and fixes an issue where the
2442
    children have smaller disk size.
2443

2444
    @param disk: an L{ganeti.objects.Disk} object
2445

2446
    """
2447
    if disk.dev_type == constants.LD_DRBD8:
2448
      assert disk.children, "Empty children for DRBD8?"
2449
      fchild = disk.children[0]
2450
      mismatch = fchild.size < disk.size
2451
      if mismatch:
2452
        self.LogInfo("Child disk has size %d, parent %d, fixing",
2453
                     fchild.size, disk.size)
2454
        fchild.size = disk.size
2455

    
2456
      # and we recurse on this child only, not on the metadev
2457
      return self._EnsureChildSizes(fchild) or mismatch
2458
    else:
2459
      return False
2460

    
2461
  def Exec(self, feedback_fn):
2462
    """Verify the size of cluster disks.
2463

2464
    """
2465
    # TODO: check child disks too
2466
    # TODO: check differences in size between primary/secondary nodes
2467
    per_node_disks = {}
2468
    for instance in self.wanted_instances:
2469
      pnode = instance.primary_node
2470
      if pnode not in per_node_disks:
2471
        per_node_disks[pnode] = []
2472
      for idx, disk in enumerate(instance.disks):
2473
        per_node_disks[pnode].append((instance, idx, disk))
2474

    
2475
    changed = []
2476
    for node, dskl in per_node_disks.items():
2477
      newl = [v[2].Copy() for v in dskl]
2478
      for dsk in newl:
2479
        self.cfg.SetDiskID(dsk, node)
2480
      result = self.rpc.call_blockdev_getsizes(node, newl)
2481
      if result.fail_msg:
2482
        self.LogWarning("Failure in blockdev_getsizes call to node"
2483
                        " %s, ignoring", node)
2484
        continue
2485
      if len(result.data) != len(dskl):
2486
        self.LogWarning("Invalid result from node %s, ignoring node results",
2487
                        node)
2488
        continue
2489
      for ((instance, idx, disk), size) in zip(dskl, result.data):
2490
        if size is None:
2491
          self.LogWarning("Disk %d of instance %s did not return size"
2492
                          " information, ignoring", idx, instance.name)
2493
          continue
2494
        if not isinstance(size, (int, long)):
2495
          self.LogWarning("Disk %d of instance %s did not return valid"
2496
                          " size information, ignoring", idx, instance.name)
2497
          continue
2498
        size = size >> 20
2499
        if size != disk.size:
2500
          self.LogInfo("Disk %d of instance %s has mismatched size,"
2501
                       " correcting: recorded %d, actual %d", idx,
2502
                       instance.name, disk.size, size)
2503
          disk.size = size
2504
          self.cfg.Update(instance, feedback_fn)
2505
          changed.append((instance.name, idx, size))
2506
        if self._EnsureChildSizes(disk):
2507
          self.cfg.Update(instance, feedback_fn)
2508
          changed.append((instance.name, idx, disk.size))
2509
    return changed
2510

    
2511

    
2512
class LURenameCluster(LogicalUnit):
2513
  """Rename the cluster.
2514

2515
  """
2516
  HPATH = "cluster-rename"
2517
  HTYPE = constants.HTYPE_CLUSTER
2518
  _OP_PARAMS = [("name", ht.NoDefault, ht.TNonEmptyString)]
2519

    
2520
  def BuildHooksEnv(self):
2521
    """Build hooks env.
2522

2523
    """
2524
    env = {
2525
      "OP_TARGET": self.cfg.GetClusterName(),
2526
      "NEW_NAME": self.op.name,
2527
      }
2528
    mn = self.cfg.GetMasterNode()
2529
    all_nodes = self.cfg.GetNodeList()
2530
    return env, [mn], all_nodes
2531

    
2532
  def CheckPrereq(self):
2533
    """Verify that the passed name is a valid one.
2534

2535
    """
2536
    hostname = netutils.GetHostname(name=self.op.name,
2537
                                    family=self.cfg.GetPrimaryIPFamily())
2538

    
2539
    new_name = hostname.name
2540
    self.ip = new_ip = hostname.ip
2541
    old_name = self.cfg.GetClusterName()
2542
    old_ip = self.cfg.GetMasterIP()
2543
    if new_name == old_name and new_ip == old_ip:
2544
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
2545
                                 " cluster has changed",
2546
                                 errors.ECODE_INVAL)
2547
    if new_ip != old_ip:
2548
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
2549
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
2550
                                   " reachable on the network" %
2551
                                   new_ip, errors.ECODE_NOTUNIQUE)
2552

    
2553
    self.op.name = new_name
2554

    
2555
  def Exec(self, feedback_fn):
2556
    """Rename the cluster.
2557

2558
    """
2559
    clustername = self.op.name
2560
    ip = self.ip
2561

    
2562
    # shutdown the master IP
2563
    master = self.cfg.GetMasterNode()
2564
    result = self.rpc.call_node_stop_master(master, False)
2565
    result.Raise("Could not disable the master role")
2566

    
2567
    try:
2568
      cluster = self.cfg.GetClusterInfo()
2569
      cluster.cluster_name = clustername
2570
      cluster.master_ip = ip
2571
      self.cfg.Update(cluster, feedback_fn)
2572

    
2573
      # update the known hosts file
2574
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
2575
      node_list = self.cfg.GetNodeList()
2576
      try:
2577
        node_list.remove(master)
2578
      except ValueError:
2579
        pass
2580
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
2581
    finally:
2582
      result = self.rpc.call_node_start_master(master, False, False)
2583
      msg = result.fail_msg
2584
      if msg:
2585
        self.LogWarning("Could not re-enable the master role on"
2586
                        " the master, please restart manually: %s", msg)
2587

    
2588
    return clustername
2589

    
2590

    
2591
class LUSetClusterParams(LogicalUnit):
2592
  """Change the parameters of the cluster.
2593

2594
  """
2595
  HPATH = "cluster-modify"
2596
  HTYPE = constants.HTYPE_CLUSTER
2597
  _OP_PARAMS = [
2598
    ("vg_name", None, ht.TMaybeString),
2599
    ("enabled_hypervisors", None,
2600
     ht.TOr(ht.TAnd(ht.TListOf(ht.TElemOf(constants.HYPER_TYPES)), ht.TTrue),
2601
            ht.TNone)),
2602
    ("hvparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
2603
                              ht.TNone)),
2604
    ("beparams", None, ht.TOr(ht.TDict, ht.TNone)),
2605
    ("os_hvp", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
2606
                            ht.TNone)),
2607
    ("osparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
2608
                              ht.TNone)),
2609
    ("candidate_pool_size", None, ht.TOr(ht.TStrictPositiveInt, ht.TNone)),
2610
    ("uid_pool", None, ht.NoType),
2611
    ("add_uids", None, ht.NoType),
2612
    ("remove_uids", None, ht.NoType),
2613
    ("maintain_node_health", None, ht.TMaybeBool),
2614
    ("prealloc_wipe_disks", None, ht.TMaybeBool),
2615
    ("nicparams", None, ht.TOr(ht.TDict, ht.TNone)),
2616
    ("drbd_helper", None, ht.TOr(ht.TString, ht.TNone)),
2617
    ("default_iallocator", None, ht.TOr(ht.TString, ht.TNone)),
2618
    ("reserved_lvs", None, ht.TOr(ht.TListOf(ht.TNonEmptyString), ht.TNone)),
2619
    ("hidden_os", None, ht.TOr(ht.TListOf(\
2620
          ht.TAnd(ht.TList,
2621
                ht.TIsLength(2),
2622
                ht.TMap(lambda v: v[0], ht.TElemOf(constants.DDMS_VALUES)))),
2623
          ht.TNone)),
2624
    ("blacklisted_os", None, ht.TOr(ht.TListOf(\
2625
          ht.TAnd(ht.TList,
2626
                ht.TIsLength(2),
2627
                ht.TMap(lambda v: v[0], ht.TElemOf(constants.DDMS_VALUES)))),
2628
          ht.TNone)),
2629
    ]
2630
  REQ_BGL = False
2631

    
2632
  def CheckArguments(self):
2633
    """Check parameters
2634

2635
    """
2636
    if self.op.uid_pool:
2637
      uidpool.CheckUidPool(self.op.uid_pool)
2638

    
2639
    if self.op.add_uids:
2640
      uidpool.CheckUidPool(self.op.add_uids)
2641

    
2642
    if self.op.remove_uids:
2643
      uidpool.CheckUidPool(self.op.remove_uids)
2644

    
2645
  def ExpandNames(self):
2646
    # FIXME: in the future maybe other cluster params won't require checking on
2647
    # all nodes to be modified.
2648
    self.needed_locks = {
2649
      locking.LEVEL_NODE: locking.ALL_SET,
2650
    }
2651
    self.share_locks[locking.LEVEL_NODE] = 1
2652

    
2653
  def BuildHooksEnv(self):
2654
    """Build hooks env.
2655

2656
    """
2657
    env = {
2658
      "OP_TARGET": self.cfg.GetClusterName(),
2659
      "NEW_VG_NAME": self.op.vg_name,
2660
      }
2661
    mn = self.cfg.GetMasterNode()
2662
    return env, [mn], [mn]
2663

    
2664
  def CheckPrereq(self):
2665
    """Check prerequisites.
2666

2667
    This checks whether the given params don't conflict and
2668
    if the given volume group is valid.
2669

2670
    """
2671
    if self.op.vg_name is not None and not self.op.vg_name:
2672
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
2673
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
2674
                                   " instances exist", errors.ECODE_INVAL)
2675

    
2676
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
2677
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
2678
        raise errors.OpPrereqError("Cannot disable drbd helper while"
2679
                                   " drbd-based instances exist",
2680
                                   errors.ECODE_INVAL)
2681

    
2682
    node_list = self.acquired_locks[locking.LEVEL_NODE]
2683

    
2684
    # if vg_name not None, checks given volume group on all nodes
2685
    if self.op.vg_name:
2686
      vglist = self.rpc.call_vg_list(node_list)
2687
      for node in node_list:
2688
        msg = vglist[node].fail_msg
2689
        if msg:
2690
          # ignoring down node
2691
          self.LogWarning("Error while gathering data on node %s"
2692
                          " (ignoring node): %s", node, msg)
2693
          continue
2694
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2695
                                              self.op.vg_name,
2696
                                              constants.MIN_VG_SIZE)
2697
        if vgstatus:
2698
          raise errors.OpPrereqError("Error on node '%s': %s" %
2699
                                     (node, vgstatus), errors.ECODE_ENVIRON)
2700

    
2701
    if self.op.drbd_helper:
2702
      # checks given drbd helper on all nodes
2703
      helpers = self.rpc.call_drbd_helper(node_list)
2704
      for node in node_list:
2705
        ninfo = self.cfg.GetNodeInfo(node)
2706
        if ninfo.offline:
2707
          self.LogInfo("Not checking drbd helper on offline node %s", node)
2708
          continue
2709
        msg = helpers[node].fail_msg
2710
        if msg:
2711
          raise errors.OpPrereqError("Error checking drbd helper on node"
2712
                                     " '%s': %s" % (node, msg),
2713
                                     errors.ECODE_ENVIRON)
2714
        node_helper = helpers[node].payload
2715
        if node_helper != self.op.drbd_helper:
2716
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
2717
                                     (node, node_helper), errors.ECODE_ENVIRON)
2718

    
2719
    self.cluster = cluster = self.cfg.GetClusterInfo()
2720
    # validate params changes
2721
    if self.op.beparams:
2722
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2723
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
2724

    
2725
    if self.op.nicparams:
2726
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2727
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
2728
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
2729
      nic_errors = []
2730

    
2731
      # check all instances for consistency
2732
      for instance in self.cfg.GetAllInstancesInfo().values():
2733
        for nic_idx, nic in enumerate(instance.nics):
2734
          params_copy = copy.deepcopy(nic.nicparams)
2735
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
2736

    
2737
          # check parameter syntax
2738
          try:
2739
            objects.NIC.CheckParameterSyntax(params_filled)
2740
          except errors.ConfigurationError, err:
2741
            nic_errors.append("Instance %s, nic/%d: %s" %
2742
                              (instance.name, nic_idx, err))
2743

    
2744
          # if we're moving instances to routed, check that they have an ip
2745
          target_mode = params_filled[constants.NIC_MODE]
2746
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2747
            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2748
                              (instance.name, nic_idx))
2749
      if nic_errors:
2750
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2751
                                   "\n".join(nic_errors))
2752

    
2753
    # hypervisor list/parameters
2754
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
2755
    if self.op.hvparams:
2756
      for hv_name, hv_dict in self.op.hvparams.items():
2757
        if hv_name not in self.new_hvparams:
2758
          self.new_hvparams[hv_name] = hv_dict
2759
        else:
2760
          self.new_hvparams[hv_name].update(hv_dict)
2761

    
2762
    # os hypervisor parameters
2763
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
2764
    if self.op.os_hvp:
2765
      for os_name, hvs in self.op.os_hvp.items():
2766
        if os_name not in self.new_os_hvp:
2767
          self.new_os_hvp[os_name] = hvs
2768
        else:
2769
          for hv_name, hv_dict in hvs.items():
2770
            if hv_name not in self.new_os_hvp[os_name]:
2771
              self.new_os_hvp[os_name][hv_name] = hv_dict
2772
            else:
2773
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
2774

    
2775
    # os parameters
2776
    self.new_osp = objects.FillDict(cluster.osparams, {})
2777
    if self.op.osparams:
2778
      for os_name, osp in self.op.osparams.items():
2779
        if os_name not in self.new_osp:
2780
          self.new_osp[os_name] = {}
2781

    
2782
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
2783
                                                  use_none=True)
2784

    
2785
        if not self.new_osp[os_name]:
2786
          # we removed all parameters
2787
          del self.new_osp[os_name]
2788
        else:
2789
          # check the parameter validity (remote check)
2790
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
2791
                         os_name, self.new_osp[os_name])
2792

    
2793
    # changes to the hypervisor list
2794
    if self.op.enabled_hypervisors is not None:
2795
      self.hv_list = self.op.enabled_hypervisors
2796
      for hv in self.hv_list:
2797
        # if the hypervisor doesn't already exist in the cluster
2798
        # hvparams, we initialize it to empty, and then (in both
2799
        # cases) we make sure to fill the defaults, as we might not
2800
        # have a complete defaults list if the hypervisor wasn't
2801
        # enabled before
2802
        if hv not in new_hvp:
2803
          new_hvp[hv] = {}
2804
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
2805
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
2806
    else:
2807
      self.hv_list = cluster.enabled_hypervisors
2808

    
2809
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
2810
      # either the enabled list has changed, or the parameters have, validate
2811
      for hv_name, hv_params in self.new_hvparams.items():
2812
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
2813
            (self.op.enabled_hypervisors and
2814
             hv_name in self.op.enabled_hypervisors)):
2815
          # either this is a new hypervisor, or its parameters have changed
2816
          hv_class = hypervisor.GetHypervisor(hv_name)
2817
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2818
          hv_class.CheckParameterSyntax(hv_params)
2819
          _CheckHVParams(self, node_list, hv_name, hv_params)
2820

    
2821
    if self.op.os_hvp:
2822
      # no need to check any newly-enabled hypervisors, since the
2823
      # defaults have already been checked in the above code-block
2824
      for os_name, os_hvp in self.new_os_hvp.items():
2825
        for hv_name, hv_params in os_hvp.items():
2826
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2827
          # we need to fill in the new os_hvp on top of the actual hv_p
2828
          cluster_defaults = self.new_hvparams.get(hv_name, {})
2829
          new_osp = objects.FillDict(cluster_defaults, hv_params)
2830
          hv_class = hypervisor.GetHypervisor(hv_name)
2831
          hv_class.CheckParameterSyntax(new_osp)
2832
          _CheckHVParams(self, node_list, hv_name, new_osp)
2833

    
2834
    if self.op.default_iallocator:
2835
      alloc_script = utils.FindFile(self.op.default_iallocator,
2836
                                    constants.IALLOCATOR_SEARCH_PATH,
2837
                                    os.path.isfile)
2838
      if alloc_script is None:
2839
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
2840
                                   " specified" % self.op.default_iallocator,
2841
                                   errors.ECODE_INVAL)
2842

    
2843
  def Exec(self, feedback_fn):
2844
    """Change the parameters of the cluster.
2845

2846
    """
2847
    if self.op.vg_name is not None:
2848
      new_volume = self.op.vg_name
2849
      if not new_volume:
2850
        new_volume = None
2851
      if new_volume != self.cfg.GetVGName():
2852
        self.cfg.SetVGName(new_volume)
2853
      else:
2854
        feedback_fn("Cluster LVM configuration already in desired"
2855
                    " state, not changing")
2856
    if self.op.drbd_helper is not None:
2857
      new_helper = self.op.drbd_helper
2858
      if not new_helper:
2859
        new_helper = None
2860
      if new_helper != self.cfg.GetDRBDHelper():
2861
        self.cfg.SetDRBDHelper(new_helper)
2862
      else:
2863
        feedback_fn("Cluster DRBD helper already in desired state,"
2864
                    " not changing")
2865
    if self.op.hvparams:
2866
      self.cluster.hvparams = self.new_hvparams
2867
    if self.op.os_hvp:
2868
      self.cluster.os_hvp = self.new_os_hvp
2869
    if self.op.enabled_hypervisors is not None:
2870
      self.cluster.hvparams = self.new_hvparams
2871
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2872
    if self.op.beparams:
2873
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2874
    if self.op.nicparams:
2875
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2876
    if self.op.osparams:
2877
      self.cluster.osparams = self.new_osp
2878

    
2879
    if self.op.candidate_pool_size is not None:
2880
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
2881
      # we need to update the pool size here, otherwise the save will fail
2882
      _AdjustCandidatePool(self, [])
2883

    
2884
    if self.op.maintain_node_health is not None:
2885
      self.cluster.maintain_node_health = self.op.maintain_node_health
2886

    
2887
    if self.op.prealloc_wipe_disks is not None:
2888
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
2889

    
2890
    if self.op.add_uids is not None:
2891
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
2892

    
2893
    if self.op.remove_uids is not None:
2894
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
2895

    
2896
    if self.op.uid_pool is not None:
2897
      self.cluster.uid_pool = self.op.uid_pool
2898

    
2899
    if self.op.default_iallocator is not None:
2900
      self.cluster.default_iallocator = self.op.default_iallocator
2901

    
2902
    if self.op.reserved_lvs is not None:
2903
      self.cluster.reserved_lvs = self.op.reserved_lvs
2904

    
2905
    def helper_os(aname, mods, desc):
2906
      desc += " OS list"
2907
      lst = getattr(self.cluster, aname)
2908
      for key, val in mods:
2909
        if key == constants.DDM_ADD:
2910
          if val in lst:
2911
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
2912
          else:
2913
            lst.append(val)
2914
        elif key == constants.DDM_REMOVE:
2915
          if val in lst:
2916
            lst.remove(val)
2917
          else:
2918
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
2919
        else:
2920
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
2921

    
2922
    if self.op.hidden_os:
2923
      helper_os("hidden_os", self.op.hidden_os, "hidden")
2924

    
2925
    if self.op.blacklisted_os:
2926
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
2927

    
2928
    self.cfg.Update(self.cluster, feedback_fn)
2929

    
2930

    
2931
def _UploadHelper(lu, nodes, fname):
2932
  """Helper for uploading a file and showing warnings.
2933

2934
  """
2935
  if os.path.exists(fname):
2936
    result = lu.rpc.call_upload_file(nodes, fname)
2937
    for to_node, to_result in result.items():
2938
      msg = to_result.fail_msg
2939
      if msg:
2940
        msg = ("Copy of file %s to node %s failed: %s" %
2941
               (fname, to_node, msg))
2942
        lu.proc.LogWarning(msg)
2943

    
2944

    
2945
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
2946
  """Distribute additional files which are part of the cluster configuration.
2947

2948
  ConfigWriter takes care of distributing the config and ssconf files, but
2949
  there are more files which should be distributed to all nodes. This function
2950
  makes sure those are copied.
2951

2952
  @param lu: calling logical unit
2953
  @param additional_nodes: list of nodes not in the config to distribute to
2954
  @type additional_vm: boolean
2955
  @param additional_vm: whether the additional nodes are vm-capable or not
2956

2957
  """
2958
  # 1. Gather target nodes
2959
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2960
  dist_nodes = lu.cfg.GetOnlineNodeList()
2961
  nvm_nodes = lu.cfg.GetNonVmCapableNodeList()
2962
  vm_nodes = [name for name in dist_nodes if name not in nvm_nodes]
2963
  if additional_nodes is not None:
2964
    dist_nodes.extend(additional_nodes)
2965
    if additional_vm:
2966
      vm_nodes.extend(additional_nodes)
2967
  if myself.name in dist_nodes:
2968
    dist_nodes.remove(myself.name)
2969
  if myself.name in vm_nodes:
2970
    vm_nodes.remove(myself.name)
2971

    
2972
  # 2. Gather files to distribute
2973
  dist_files = set([constants.ETC_HOSTS,
2974
                    constants.SSH_KNOWN_HOSTS_FILE,
2975
                    constants.RAPI_CERT_FILE,
2976
                    constants.RAPI_USERS_FILE,
2977
                    constants.CONFD_HMAC_KEY,
2978
                    constants.CLUSTER_DOMAIN_SECRET_FILE,
2979
                   ])
2980

    
2981
  vm_files = set()
2982
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2983
  for hv_name in enabled_hypervisors:
2984
    hv_class = hypervisor.GetHypervisor(hv_name)
2985
    vm_files.update(hv_class.GetAncillaryFiles())
2986

    
2987
  # 3. Perform the files upload
2988
  for fname in dist_files:
2989
    _UploadHelper(lu, dist_nodes, fname)
2990
  for fname in vm_files:
2991
    _UploadHelper(lu, vm_nodes, fname)
2992

    
2993

    
2994
class LURedistributeConfig(NoHooksLU):
2995
  """Force the redistribution of cluster configuration.
2996

2997
  This is a very simple LU.
2998

2999
  """
3000
  REQ_BGL = False
3001

    
3002
  def ExpandNames(self):
3003
    self.needed_locks = {
3004
      locking.LEVEL_NODE: locking.ALL_SET,
3005
    }
3006
    self.share_locks[locking.LEVEL_NODE] = 1
3007

    
3008
  def Exec(self, feedback_fn):
3009
    """Redistribute the configuration.
3010

3011
    """
3012
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3013
    _RedistributeAncillaryFiles(self)
3014

    
3015

    
3016
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3017
  """Sleep and poll for an instance's disk to sync.
3018

3019
  """
3020
  if not instance.disks or disks is not None and not disks:
3021
    return True
3022

    
3023
  disks = _ExpandCheckDisks(instance, disks)
3024

    
3025
  if not oneshot:
3026
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3027

    
3028
  node = instance.primary_node
3029

    
3030
  for dev in disks:
3031
    lu.cfg.SetDiskID(dev, node)
3032

    
3033
  # TODO: Convert to utils.Retry
3034

    
3035
  retries = 0
3036
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3037
  while True:
3038
    max_time = 0
3039
    done = True
3040
    cumul_degraded = False
3041
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3042
    msg = rstats.fail_msg
3043
    if msg:
3044
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3045
      retries += 1
3046
      if retries >= 10:
3047
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3048
                                 " aborting." % node)
3049
      time.sleep(6)
3050
      continue
3051
    rstats = rstats.payload
3052
    retries = 0
3053
    for i, mstat in enumerate(rstats):
3054
      if mstat is None:
3055
        lu.LogWarning("Can't compute data for node %s/%s",
3056
                           node, disks[i].iv_name)
3057
        continue
3058

    
3059
      cumul_degraded = (cumul_degraded or
3060
                        (mstat.is_degraded and mstat.sync_percent is None))
3061
      if mstat.sync_percent is not None:
3062
        done = False
3063
        if mstat.estimated_time is not None:
3064
          rem_time = ("%s remaining (estimated)" %
3065
                      utils.FormatSeconds(mstat.estimated_time))
3066
          max_time = mstat.estimated_time
3067
        else:
3068
          rem_time = "no time estimate"
3069
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
3070
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
3071

    
3072
    # if we're done but degraded, let's do a few small retries, to
3073
    # make sure we see a stable and not transient situation; therefore
3074
    # we force restart of the loop
3075
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3076
      logging.info("Degraded disks found, %d retries left", degr_retries)
3077
      degr_retries -= 1
3078
      time.sleep(1)
3079
      continue
3080

    
3081
    if done or oneshot:
3082
      break
3083

    
3084
    time.sleep(min(60, max_time))
3085

    
3086
  if done:
3087
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
3088
  return not cumul_degraded
3089

    
3090

    
3091
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3092
  """Check that mirrors are not degraded.
3093

3094
  The ldisk parameter, if True, will change the test from the
3095
  is_degraded attribute (which represents overall non-ok status for
3096
  the device(s)) to the ldisk (representing the local storage status).
3097

3098
  """
3099
  lu.cfg.SetDiskID(dev, node)
3100

    
3101
  result = True
3102

    
3103
  if on_primary or dev.AssembleOnSecondary():
3104
    rstats = lu.rpc.call_blockdev_find(node, dev)
3105
    msg = rstats.fail_msg
3106
    if msg:
3107
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
3108
      result = False
3109
    elif not rstats.payload:
3110
      lu.LogWarning("Can't find disk on node %s", node)
3111
      result = False
3112
    else:
3113
      if ldisk:
3114
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
3115
      else:
3116
        result = result and not rstats.payload.is_degraded
3117

    
3118
  if dev.children:
3119
    for child in dev.children:
3120
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
3121

    
3122
  return result
3123

    
3124

    
3125
class LUDiagnoseOS(NoHooksLU):
3126
  """Logical unit for OS diagnose/query.
3127

3128
  """
3129
  _OP_PARAMS = [
3130
    _POutputFields,
3131
    ("names", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
3132
    ]
3133
  REQ_BGL = False
3134
  _HID = "hidden"
3135
  _BLK = "blacklisted"
3136
  _VLD = "valid"
3137
  _FIELDS_STATIC = utils.FieldSet()
3138
  _FIELDS_DYNAMIC = utils.FieldSet("name", _VLD, "node_status", "variants",
3139
                                   "parameters", "api_versions", _HID, _BLK)
3140

    
3141
  def CheckArguments(self):
3142
    if self.op.names:
3143
      raise errors.OpPrereqError("Selective OS query not supported",
3144
                                 errors.ECODE_INVAL)
3145

    
3146
    _CheckOutputFields(static=self._FIELDS_STATIC,
3147
                       dynamic=self._FIELDS_DYNAMIC,
3148
                       selected=self.op.output_fields)
3149

    
3150
  def ExpandNames(self):
3151
    # Lock all nodes, in shared mode
3152
    # Temporary removal of locks, should be reverted later
3153
    # TODO: reintroduce locks when they are lighter-weight
3154
    self.needed_locks = {}
3155
    #self.share_locks[locking.LEVEL_NODE] = 1
3156
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3157

    
3158
  @staticmethod
3159
  def _DiagnoseByOS(rlist):
3160
    """Remaps a per-node return list into an a per-os per-node dictionary
3161

3162
    @param rlist: a map with node names as keys and OS objects as values
3163

3164
    @rtype: dict
3165
    @return: a dictionary with osnames as keys and as value another
3166
        map, with nodes as keys and tuples of (path, status, diagnose,
3167
        variants, parameters, api_versions) as values, eg::
3168

3169
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
3170
                                     (/srv/..., False, "invalid api")],
3171
                           "node2": [(/srv/..., True, "", [], [])]}
3172
          }
3173

3174
    """
3175
    all_os = {}
3176
    # we build here the list of nodes that didn't fail the RPC (at RPC
3177
    # level), so that nodes with a non-responding node daemon don't
3178
    # make all OSes invalid
3179
    good_nodes = [node_name for node_name in rlist
3180
                  if not rlist[node_name].fail_msg]
3181
    for node_name, nr in rlist.items():
3182
      if nr.fail_msg or not nr.payload:
3183
        continue
3184
      for (name, path, status, diagnose, variants,
3185
           params, api_versions) in nr.payload:
3186
        if name not in all_os:
3187
          # build a list of nodes for this os containing empty lists
3188
          # for each node in node_list
3189
          all_os[name] = {}
3190
          for nname in good_nodes:
3191
            all_os[name][nname] = []
3192
        # convert params from [name, help] to (name, help)
3193
        params = [tuple(v) for v in params]
3194
        all_os[name][node_name].append((path, status, diagnose,
3195
                                        variants, params, api_versions))
3196
    return all_os
3197

    
3198
  def Exec(self, feedback_fn):
3199
    """Compute the list of OSes.
3200

3201
    """
3202
    valid_nodes = [node.name
3203
                   for node in self.cfg.GetAllNodesInfo().values()
3204
                   if not node.offline and node.vm_capable]
3205
    node_data = self.rpc.call_os_diagnose(valid_nodes)
3206
    pol = self._DiagnoseByOS(node_data)
3207
    output = []
3208
    cluster = self.cfg.GetClusterInfo()
3209

    
3210
    for os_name in utils.NiceSort(pol.keys()):
3211
      os_data = pol[os_name]
3212
      row = []
3213
      valid = True
3214
      (variants, params, api_versions) = null_state = (set(), set(), set())
3215
      for idx, osl in enumerate(os_data.values()):
3216
        valid = bool(valid and osl and osl[0][1])
3217
        if not valid:
3218
          (variants, params, api_versions) = null_state
3219
          break
3220
        node_variants, node_params, node_api = osl[0][3:6]
3221
        if idx == 0: # first entry
3222
          variants = set(node_variants)
3223
          params = set(node_params)
3224
          api_versions = set(node_api)
3225
        else: # keep consistency
3226
          variants.intersection_update(node_variants)
3227
          params.intersection_update(node_params)
3228
          api_versions.intersection_update(node_api)
3229

    
3230
      is_hid = os_name in cluster.hidden_os
3231
      is_blk = os_name in cluster.blacklisted_os
3232
      if ((self._HID not in self.op.output_fields and is_hid) or
3233
          (self._BLK not in self.op.output_fields and is_blk) or
3234
          (self._VLD not in self.op.output_fields and not valid)):
3235
        continue
3236

    
3237
      for field in self.op.output_fields:
3238
        if field == "name":
3239
          val = os_name
3240
        elif field == self._VLD:
3241
          val = valid
3242
        elif field == "node_status":
3243
          # this is just a copy of the dict
3244
          val = {}
3245
          for node_name, nos_list in os_data.items():
3246
            val[node_name] = nos_list
3247
        elif field == "variants":
3248
          val = utils.NiceSort(list(variants))
3249
        elif field == "parameters":
3250
          val = list(params)
3251
        elif field == "api_versions":
3252
          val = list(api_versions)
3253
        elif field == self._HID:
3254
          val = is_hid
3255
        elif field == self._BLK:
3256
          val = is_blk
3257
        else:
3258
          raise errors.ParameterError(field)
3259
        row.append(val)
3260
      output.append(row)
3261

    
3262
    return output
3263

    
3264

    
3265
class LURemoveNode(LogicalUnit):
3266
  """Logical unit for removing a node.
3267

3268
  """
3269
  HPATH = "node-remove"
3270
  HTYPE = constants.HTYPE_NODE
3271
  _OP_PARAMS = [
3272
    _PNodeName,
3273
    ]
3274

    
3275
  def BuildHooksEnv(self):
3276
    """Build hooks env.
3277

3278
    This doesn't run on the target node in the pre phase as a failed
3279
    node would then be impossible to remove.
3280

3281
    """
3282
    env = {
3283
      "OP_TARGET": self.op.node_name,
3284
      "NODE_NAME": self.op.node_name,
3285
      }
3286
    all_nodes = self.cfg.GetNodeList()
3287
    try:
3288
      all_nodes.remove(self.op.node_name)
3289
    except ValueError:
3290
      logging.warning("Node %s which is about to be removed not found"
3291
                      " in the all nodes list", self.op.node_name)
3292
    return env, all_nodes, all_nodes
3293

    
3294
  def CheckPrereq(self):
3295
    """Check prerequisites.
3296

3297
    This checks:
3298
     - the node exists in the configuration
3299
     - it does not have primary or secondary instances
3300
     - it's not the master
3301

3302
    Any errors are signaled by raising errors.OpPrereqError.
3303

3304
    """
3305
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3306
    node = self.cfg.GetNodeInfo(self.op.node_name)
3307
    assert node is not None
3308

    
3309
    instance_list = self.cfg.GetInstanceList()
3310

    
3311
    masternode = self.cfg.GetMasterNode()
3312
    if node.name == masternode:
3313
      raise errors.OpPrereqError("Node is the master node,"
3314
                                 " you need to failover first.",
3315
                                 errors.ECODE_INVAL)
3316

    
3317
    for instance_name in instance_list:
3318
      instance = self.cfg.GetInstanceInfo(instance_name)
3319
      if node.name in instance.all_nodes:
3320
        raise errors.OpPrereqError("Instance %s is still running on the node,"
3321
                                   " please remove first." % instance_name,
3322
                                   errors.ECODE_INVAL)
3323
    self.op.node_name = node.name
3324
    self.node = node
3325

    
3326
  def Exec(self, feedback_fn):
3327
    """Removes the node from the cluster.
3328

3329
    """
3330
    node = self.node
3331
    logging.info("Stopping the node daemon and removing configs from node %s",
3332
                 node.name)
3333

    
3334
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
3335

    
3336
    # Promote nodes to master candidate as needed
3337
    _AdjustCandidatePool(self, exceptions=[node.name])
3338
    self.context.RemoveNode(node.name)
3339

    
3340
    # Run post hooks on the node before it's removed
3341
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
3342
    try:
3343
      hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
3344
    except:
3345
      # pylint: disable-msg=W0702
3346
      self.LogWarning("Errors occurred running hooks on %s" % node.name)
3347

    
3348
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
3349
    msg = result.fail_msg
3350
    if msg:
3351
      self.LogWarning("Errors encountered on the remote node while leaving"
3352
                      " the cluster: %s", msg)
3353

    
3354
    # Remove node from our /etc/hosts
3355
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3356
      master_node = self.cfg.GetMasterNode()
3357
      result = self.rpc.call_etc_hosts_modify(master_node,
3358
                                              constants.ETC_HOSTS_REMOVE,
3359
                                              node.name, None)
3360
      result.Raise("Can't update hosts file with new host data")
3361
      _RedistributeAncillaryFiles(self)
3362

    
3363

    
3364
class LUQueryNodes(NoHooksLU):
3365
  """Logical unit for querying nodes.
3366

3367
  """
3368
  # pylint: disable-msg=W0142
3369
  _OP_PARAMS = [
3370
    _POutputFields,
3371
    ("names", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
3372
    ("use_locking", False, ht.TBool),
3373
    ]
3374
  REQ_BGL = False
3375

    
3376
  _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
3377
                    "master_candidate", "offline", "drained",
3378
                    "master_capable", "vm_capable"]
3379

    
3380
  _FIELDS_DYNAMIC = utils.FieldSet(
3381
    "dtotal", "dfree",
3382
    "mtotal", "mnode", "mfree",
3383
    "bootid",
3384
    "ctotal", "cnodes", "csockets",
3385
    )
3386

    
3387
  _FIELDS_STATIC = utils.FieldSet(*[
3388
    "pinst_cnt", "sinst_cnt",
3389
    "pinst_list", "sinst_list",
3390
    "pip", "sip", "tags",
3391
    "master",
3392
    "role"] + _SIMPLE_FIELDS
3393
    )
3394

    
3395
  def CheckArguments(self):
3396
    _CheckOutputFields(static=self._FIELDS_STATIC,
3397
                       dynamic=self._FIELDS_DYNAMIC,
3398
                       selected=self.op.output_fields)
3399

    
3400
  def ExpandNames(self):
3401
    self.needed_locks = {}
3402
    self.share_locks[locking.LEVEL_NODE] = 1
3403

    
3404
    if self.op.names:
3405
      self.wanted = _GetWantedNodes(self, self.op.names)
3406
    else:
3407
      self.wanted = locking.ALL_SET
3408

    
3409
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3410
    self.do_locking = self.do_node_query and self.op.use_locking
3411
    if self.do_locking:
3412
      # if we don't request only static fields, we need to lock the nodes
3413
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
3414

    
3415
  def Exec(self, feedback_fn):
3416
    """Computes the list of nodes and their attributes.
3417

3418
    """
3419
    all_info = self.cfg.GetAllNodesInfo()
3420
    if self.do_locking:
3421
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
3422
    elif self.wanted != locking.ALL_SET:
3423
      nodenames = self.wanted
3424
      missing = set(nodenames).difference(all_info.keys())
3425
      if missing:
3426
        raise errors.OpExecError(
3427
          "Some nodes were removed before retrieving their data: %s" % missing)
3428
    else:
3429
      nodenames = all_info.keys()
3430

    
3431
    nodenames = utils.NiceSort(nodenames)
3432
    nodelist = [all_info[name] for name in nodenames]
3433

    
3434
    # begin data gathering
3435

    
3436
    if self.do_node_query:
3437
      live_data = {}
3438
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3439
                                          self.cfg.GetHypervisorType())
3440
      for name in nodenames:
3441
        nodeinfo = node_data[name]
3442
        if not nodeinfo.fail_msg and nodeinfo.payload:
3443
          nodeinfo = nodeinfo.payload
3444
          fn = utils.TryConvert
3445
          live_data[name] = {
3446
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
3447
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
3448
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
3449
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
3450
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
3451
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
3452
            "bootid": nodeinfo.get('bootid', None),
3453
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
3454
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
3455
            }
3456
        else:
3457
          live_data[name] = {}
3458
    else:
3459
      live_data = dict.fromkeys(nodenames, {})
3460

    
3461
    node_to_primary = dict([(name, set()) for name in nodenames])
3462
    node_to_secondary = dict([(name, set()) for name in nodenames])
3463

    
3464
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
3465
                             "sinst_cnt", "sinst_list"))
3466
    if inst_fields & frozenset(self.op.output_fields):
3467
      inst_data = self.cfg.GetAllInstancesInfo()
3468

    
3469
      for inst in inst_data.values():
3470
        if inst.primary_node in node_to_primary:
3471
          node_to_primary[inst.primary_node].add(inst.name)
3472
        for secnode in inst.secondary_nodes:
3473
          if secnode in node_to_secondary:
3474
            node_to_secondary[secnode].add(inst.name)
3475

    
3476
    master_node = self.cfg.GetMasterNode()
3477

    
3478
    # end data gathering
3479

    
3480
    output = []
3481
    for node in nodelist:
3482
      node_output = []
3483
      for field in self.op.output_fields:
3484
        if field in self._SIMPLE_FIELDS:
3485
          val = getattr(node, field)
3486
        elif field == "pinst_list":
3487
          val = list(node_to_primary[node.name])
3488
        elif field == "sinst_list":
3489
          val = list(node_to_secondary[node.name])
3490
        elif field == "pinst_cnt":
3491
          val = len(node_to_primary[node.name])
3492
        elif field == "sinst_cnt":
3493
          val = len(node_to_secondary[node.name])
3494
        elif field == "pip":
3495
          val = node.primary_ip
3496
        elif field == "sip":
3497
          val = node.secondary_ip
3498
        elif field == "tags":
3499
          val = list(node.GetTags())
3500
        elif field == "master":
3501
          val = node.name == master_node
3502
        elif self._FIELDS_DYNAMIC.Matches(field):
3503
          val = live_data[node.name].get(field, None)
3504
        elif field == "role":
3505
          if node.name == master_node:
3506
            val = "M"
3507
          elif node.master_candidate:
3508
            val = "C"
3509
          elif node.drained:
3510
            val = "D"
3511
          elif node.offline:
3512
            val = "O"
3513
          else:
3514
            val = "R"
3515
        else:
3516
          raise errors.ParameterError(field)
3517
        node_output.append(val)
3518
      output.append(node_output)
3519

    
3520
    return output
3521

    
3522

    
3523
class LUQueryNodeVolumes(NoHooksLU):
3524
  """Logical unit for getting volumes on node(s).
3525

3526
  """
3527
  _OP_PARAMS = [
3528
    ("nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
3529
    ("output_fields", ht.NoDefault, ht.TListOf(ht.TNonEmptyString)),
3530
    ]
3531
  REQ_BGL = False
3532
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
3533
  _FIELDS_STATIC = utils.FieldSet("node")
3534

    
3535
  def CheckArguments(self):
3536
    _CheckOutputFields(static=self._FIELDS_STATIC,
3537
                       dynamic=self._FIELDS_DYNAMIC,
3538
                       selected=self.op.output_fields)
3539

    
3540
  def ExpandNames(self):
3541
    self.needed_locks = {}
3542
    self.share_locks[locking.LEVEL_NODE] = 1
3543
    if not self.op.nodes:
3544
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3545
    else:
3546
      self.needed_locks[locking.LEVEL_NODE] = \
3547
        _GetWantedNodes(self, self.op.nodes)
3548

    
3549
  def Exec(self, feedback_fn):
3550
    """Computes the list of nodes and their attributes.
3551

3552
    """
3553
    nodenames = self.acquired_locks[locking.LEVEL_NODE]
3554
    volumes = self.rpc.call_node_volumes(nodenames)
3555

    
3556
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3557
             in self.cfg.GetInstanceList()]
3558

    
3559
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
3560

    
3561
    output = []
3562
    for node in nodenames:
3563
      nresult = volumes[node]
3564
      if nresult.offline:
3565
        continue
3566
      msg = nresult.fail_msg
3567
      if msg:
3568
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
3569
        continue
3570

    
3571
      node_vols = nresult.payload[:]
3572
      node_vols.sort(key=lambda vol: vol['dev'])
3573

    
3574
      for vol in node_vols:
3575
        node_output = []
3576
        for field in self.op.output_fields:
3577
          if field == "node":
3578
            val = node
3579
          elif field == "phys":
3580
            val = vol['dev']
3581
          elif field == "vg":
3582
            val = vol['vg']
3583
          elif field == "name":
3584
            val = vol['name']
3585
          elif field == "size":
3586
            val = int(float(vol['size']))
3587
          elif field == "instance":
3588
            for inst in ilist:
3589
              if node not in lv_by_node[inst]:
3590
                continue
3591
              if vol['name'] in lv_by_node[inst][node]:
3592
                val = inst.name
3593
                break
3594
            else:
3595
              val = '-'
3596
          else:
3597
            raise errors.ParameterError(field)
3598
          node_output.append(str(val))
3599

    
3600
        output.append(node_output)
3601

    
3602
    return output
3603

    
3604

    
3605
class LUQueryNodeStorage(NoHooksLU):
3606
  """Logical unit for getting information on storage units on node(s).
3607

3608
  """
3609
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
3610
  _OP_PARAMS = [
3611
    ("nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
3612
    ("storage_type", ht.NoDefault, _CheckStorageType),
3613
    ("output_fields", ht.NoDefault, ht.TListOf(ht.TNonEmptyString)),
3614
    ("name", None, ht.TMaybeString),
3615
    ]
3616
  REQ_BGL = False
3617

    
3618
  def CheckArguments(self):
3619
    _CheckOutputFields(static=self._FIELDS_STATIC,
3620
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
3621
                       selected=self.op.output_fields)
3622

    
3623
  def ExpandNames(self):
3624
    self.needed_locks = {}
3625
    self.share_locks[locking.LEVEL_NODE] = 1
3626

    
3627
    if self.op.nodes:
3628
      self.needed_locks[locking.LEVEL_NODE] = \
3629
        _GetWantedNodes(self, self.op.nodes)
3630
    else:
3631
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3632

    
3633
  def Exec(self, feedback_fn):
3634
    """Computes the list of nodes and their attributes.
3635

3636
    """
3637
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
3638

    
3639
    # Always get name to sort by
3640
    if constants.SF_NAME in self.op.output_fields:
3641
      fields = self.op.output_fields[:]
3642
    else:
3643
      fields = [constants.SF_NAME] + self.op.output_fields
3644

    
3645
    # Never ask for node or type as it's only known to the LU
3646
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
3647
      while extra in fields:
3648
        fields.remove(extra)
3649

    
3650
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
3651
    name_idx = field_idx[constants.SF_NAME]
3652

    
3653
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3654
    data = self.rpc.call_storage_list(self.nodes,
3655
                                      self.op.storage_type, st_args,
3656
                                      self.op.name, fields)
3657

    
3658
    result = []
3659

    
3660
    for node in utils.NiceSort(self.nodes):
3661
      nresult = data[node]
3662
      if nresult.offline:
3663
        continue
3664

    
3665
      msg = nresult.fail_msg
3666
      if msg:
3667
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
3668
        continue
3669

    
3670
      rows = dict([(row[name_idx], row) for row in nresult.payload])
3671

    
3672
      for name in utils.NiceSort(rows.keys()):
3673
        row = rows[name]
3674

    
3675
        out = []
3676

    
3677
        for field in self.op.output_fields:
3678
          if field == constants.SF_NODE:
3679
            val = node
3680
          elif field == constants.SF_TYPE:
3681
            val = self.op.storage_type
3682
          elif field in field_idx:
3683
            val = row[field_idx[field]]
3684
          else:
3685
            raise errors.ParameterError(field)
3686

    
3687
          out.append(val)
3688

    
3689
        result.append(out)
3690

    
3691
    return result
3692

    
3693

    
3694
class LUModifyNodeStorage(NoHooksLU):
3695
  """Logical unit for modifying a storage volume on a node.
3696

3697
  """
3698
  _OP_PARAMS = [
3699
    _PNodeName,
3700
    ("storage_type", ht.NoDefault, _CheckStorageType),
3701
    ("name", ht.NoDefault, ht.TNonEmptyString),
3702
    ("changes", ht.NoDefault, ht.TDict),
3703
    ]
3704
  REQ_BGL = False
3705

    
3706
  def CheckArguments(self):
3707
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3708

    
3709
    storage_type = self.op.storage_type
3710

    
3711
    try:
3712
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
3713
    except KeyError:
3714
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
3715
                                 " modified" % storage_type,
3716
                                 errors.ECODE_INVAL)
3717

    
3718
    diff = set(self.op.changes.keys()) - modifiable
3719
    if diff:
3720
      raise errors.OpPrereqError("The following fields can not be modified for"
3721
                                 " storage units of type '%s': %r" %
3722
                                 (storage_type, list(diff)),
3723
                                 errors.ECODE_INVAL)
3724

    
3725
  def ExpandNames(self):
3726
    self.needed_locks = {
3727
      locking.LEVEL_NODE: self.op.node_name,
3728
      }
3729

    
3730
  def Exec(self, feedback_fn):
3731
    """Computes the list of nodes and their attributes.
3732

3733
    """
3734
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3735
    result = self.rpc.call_storage_modify(self.op.node_name,
3736
                                          self.op.storage_type, st_args,
3737
                                          self.op.name, self.op.changes)
3738
    result.Raise("Failed to modify storage unit '%s' on %s" %
3739
                 (self.op.name, self.op.node_name))
3740

    
3741

    
3742
class LUAddNode(LogicalUnit):
3743
  """Logical unit for adding node to the cluster.
3744

3745
  """
3746
  HPATH = "node-add"
3747
  HTYPE = constants.HTYPE_NODE
3748
  _OP_PARAMS = [
3749
    _PNodeName,
3750
    ("primary_ip", None, ht.NoType),
3751
    ("secondary_ip", None, ht.TMaybeString),
3752
    ("readd", False, ht.TBool),
3753
    ("group", None, ht.TMaybeString),
3754
    ("master_capable", None, ht.TMaybeBool),
3755
    ("vm_capable", None, ht.TMaybeBool),
3756
    ]
3757
  _NFLAGS = ["master_capable", "vm_capable"]
3758

    
3759
  def CheckArguments(self):
3760
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
3761
    # validate/normalize the node name
3762
    self.hostname = netutils.GetHostname(name=self.op.node_name,
3763
                                         family=self.primary_ip_family)
3764
    self.op.node_name = self.hostname.name
3765
    if self.op.readd and self.op.group:
3766
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
3767
                                 " being readded", errors.ECODE_INVAL)
3768

    
3769
  def BuildHooksEnv(self):
3770
    """Build hooks env.
3771

3772
    This will run on all nodes before, and on all nodes + the new node after.
3773

3774
    """
3775
    env = {
3776
      "OP_TARGET": self.op.node_name,
3777
      "NODE_NAME": self.op.node_name,
3778
      "NODE_PIP": self.op.primary_ip,
3779
      "NODE_SIP": self.op.secondary_ip,
3780
      "MASTER_CAPABLE": str(self.op.master_capable),
3781
      "VM_CAPABLE": str(self.op.vm_capable),
3782
      }
3783
    nodes_0 = self.cfg.GetNodeList()
3784
    nodes_1 = nodes_0 + [self.op.node_name, ]
3785
    return env, nodes_0, nodes_1
3786

    
3787
  def CheckPrereq(self):
3788
    """Check prerequisites.
3789

3790
    This checks:
3791
     - the new node is not already in the config
3792
     - it is resolvable
3793
     - its parameters (single/dual homed) matches the cluster
3794

3795
    Any errors are signaled by raising errors.OpPrereqError.
3796

3797
    """
3798
    cfg = self.cfg
3799
    hostname = self.hostname
3800
    node = hostname.name
3801
    primary_ip = self.op.primary_ip = hostname.ip
3802
    if self.op.secondary_ip is None:
3803
      if self.primary_ip_family == netutils.IP6Address.family:
3804
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
3805
                                   " IPv4 address must be given as secondary",
3806
                                   errors.ECODE_INVAL)
3807
      self.op.secondary_ip = primary_ip
3808

    
3809
    secondary_ip = self.op.secondary_ip
3810
    if not netutils.IP4Address.IsValid(secondary_ip):
3811
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
3812
                                 " address" % secondary_ip, errors.ECODE_INVAL)
3813

    
3814
    node_list = cfg.GetNodeList()
3815
    if not self.op.readd and node in node_list:
3816
      raise errors.OpPrereqError("Node %s is already in the configuration" %
3817
                                 node, errors.ECODE_EXISTS)
3818
    elif self.op.readd and node not in node_list:
3819
      raise errors.OpPrereqError("Node %s is not in the configuration" % node,
3820
                                 errors.ECODE_NOENT)
3821

    
3822
    self.changed_primary_ip = False
3823

    
3824
    for existing_node_name in node_list:
3825
      existing_node = cfg.GetNodeInfo(existing_node_name)
3826

    
3827
      if self.op.readd and node == existing_node_name:
3828
        if existing_node.secondary_ip != secondary_ip:
3829
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
3830
                                     " address configuration as before",
3831
                                     errors.ECODE_INVAL)
3832
        if existing_node.primary_ip != primary_ip:
3833
          self.changed_primary_ip = True
3834

    
3835
        continue
3836

    
3837
      if (existing_node.primary_ip == primary_ip or
3838
          existing_node.secondary_ip == primary_ip or
3839
          existing_node.primary_ip == secondary_ip or
3840
          existing_node.secondary_ip == secondary_ip):
3841
        raise errors.OpPrereqError("New node ip address(es) conflict with"
3842
                                   " existing node %s" % existing_node.name,
3843
                                   errors.ECODE_NOTUNIQUE)
3844

    
3845
    # After this 'if' block, None is no longer a valid value for the
3846
    # _capable op attributes
3847
    if self.op.readd:
3848
      old_node = self.cfg.GetNodeInfo(node)
3849
      assert old_node is not None, "Can't retrieve locked node %s" % node
3850
      for attr in self._NFLAGS:
3851
        if getattr(self.op, attr) is None:
3852
          setattr(self.op, attr, getattr(old_node, attr))
3853
    else:
3854
      for attr in self._NFLAGS:
3855
        if getattr(self.op, attr) is None:
3856
          setattr(self.op, attr, True)
3857

    
3858
    if self.op.readd and not self.op.vm_capable:
3859
      pri, sec = cfg.GetNodeInstances(node)
3860
      if pri or sec:
3861
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
3862
                                   " flag set to false, but it already holds"
3863
                                   " instances" % node,
3864
                                   errors.ECODE_STATE)
3865

    
3866
    # check that the type of the node (single versus dual homed) is the
3867
    # same as for the master
3868
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
3869
    master_singlehomed = myself.secondary_ip == myself.primary_ip
3870
    newbie_singlehomed = secondary_ip == primary_ip
3871
    if master_singlehomed != newbie_singlehomed:
3872
      if master_singlehomed:
3873
        raise errors.OpPrereqError("The master has no secondary ip but the"
3874
                                   " new node has one",
3875
                                   errors.ECODE_INVAL)
3876
      else:
3877
        raise errors.OpPrereqError("The master has a secondary ip but the"
3878
                                   " new node doesn't have one",
3879
                                   errors.ECODE_INVAL)
3880

    
3881
    # checks reachability
3882
    if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
3883
      raise errors.OpPrereqError("Node not reachable by ping",
3884
                                 errors.ECODE_ENVIRON)
3885

    
3886
    if not newbie_singlehomed:
3887
      # check reachability from my secondary ip to newbie's secondary ip
3888
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
3889
                           source=myself.secondary_ip):
3890
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
3891
                                   " based ping to node daemon port",
3892
                                   errors.ECODE_ENVIRON)
3893

    
3894
    if self.op.readd:
3895
      exceptions = [node]
3896
    else:
3897
      exceptions = []
3898

    
3899
    if self.op.master_capable:
3900
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
3901
    else:
3902
      self.master_candidate = False
3903

    
3904
    if self.op.readd:
3905
      self.new_node = old_node
3906
    else:
3907
      node_group = cfg.LookupNodeGroup(self.op.group)
3908
      self.new_node = objects.Node(name=node,
3909
                                   primary_ip=primary_ip,
3910
                                   secondary_ip=secondary_ip,
3911
                                   master_candidate=self.master_candidate,
3912
                                   offline=False, drained=False,
3913
                                   group=node_group)
3914

    
3915
  def Exec(self, feedback_fn):
3916
    """Adds the new node to the cluster.
3917

3918
    """
3919
    new_node = self.new_node
3920
    node = new_node.name
3921

    
3922
    # for re-adds, reset the offline/drained/master-candidate flags;
3923
    # we need to reset here, otherwise offline would prevent RPC calls
3924
    # later in the procedure; this also means that if the re-add
3925
    # fails, we are left with a non-offlined, broken node
3926
    if self.op.readd:
3927
      new_node.drained = new_node.offline = False # pylint: disable-msg=W0201
3928
      self.LogInfo("Readding a node, the offline/drained flags were reset")
3929
      # if we demote the node, we do cleanup later in the procedure
3930
      new_node.master_candidate = self.master_candidate
3931
      if self.changed_primary_ip:
3932
        new_node.primary_ip = self.op.primary_ip
3933

    
3934
    # copy the master/vm_capable flags
3935
    for attr in self._NFLAGS:
3936
      setattr(new_node, attr, getattr(self.op, attr))
3937

    
3938
    # notify the user about any possible mc promotion
3939
    if new_node.master_candidate:
3940
      self.LogInfo("Node will be a master candidate")
3941

    
3942
    # check connectivity
3943
    result = self.rpc.call_version([node])[node]
3944
    result.Raise("Can't get version information from node %s" % node)
3945
    if constants.PROTOCOL_VERSION == result.payload:
3946
      logging.info("Communication to node %s fine, sw version %s match",
3947
                   node, result.payload)
3948
    else:
3949
      raise errors.OpExecError("Version mismatch master version %s,"
3950
                               " node version %s" %
3951
                               (constants.PROTOCOL_VERSION, result.payload))
3952

    
3953
    # Add node to our /etc/hosts, and add key to known_hosts
3954
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3955
      master_node = self.cfg.GetMasterNode()
3956
      result = self.rpc.call_etc_hosts_modify(master_node,
3957
                                              constants.ETC_HOSTS_ADD,
3958
                                              self.hostname.name,
3959
                                              self.hostname.ip)
3960
      result.Raise("Can't update hosts file with new host data")
3961

    
3962
    if new_node.secondary_ip != new_node.primary_ip:
3963
      _CheckNodeHasSecondaryIP(self, new_node.name, new_node.secondary_ip,
3964
                               False)
3965

    
3966
    node_verify_list = [self.cfg.GetMasterNode()]
3967
    node_verify_param = {
3968
      constants.NV_NODELIST: [node],
3969
      # TODO: do a node-net-test as well?
3970
    }
3971

    
3972
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3973
                                       self.cfg.GetClusterName())
3974
    for verifier in node_verify_list:
3975
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
3976
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
3977
      if nl_payload:
3978
        for failed in nl_payload:
3979
          feedback_fn("ssh/hostname verification failed"
3980
                      " (checking from %s): %s" %
3981
                      (verifier, nl_payload[failed]))
3982
        raise errors.OpExecError("ssh/hostname verification failed.")
3983

    
3984
    if self.op.readd:
3985
      _RedistributeAncillaryFiles(self)
3986
      self.context.ReaddNode(new_node)
3987
      # make sure we redistribute the config
3988
      self.cfg.Update(new_node, feedback_fn)
3989
      # and make sure the new node will not have old files around
3990
      if not new_node.master_candidate:
3991
        result = self.rpc.call_node_demote_from_mc(new_node.name)
3992
        msg = result.fail_msg
3993
        if msg:
3994
          self.LogWarning("Node failed to demote itself from master"
3995
                          " candidate status: %s" % msg)
3996
    else:
3997
      _RedistributeAncillaryFiles(self, additional_nodes=[node],
3998
                                  additional_vm=self.op.vm_capable)
3999
      self.context.AddNode(new_node, self.proc.GetECId())
4000

    
4001

    
4002
class LUSetNodeParams(LogicalUnit):
4003
  """Modifies the parameters of a node.
4004

4005
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
4006
      to the node role (as _ROLE_*)
4007
  @cvar _R2F: a dictionary from node role to tuples of flags
4008
  @cvar _FLAGS: a list of attribute names corresponding to the flags
4009

4010
  """
4011
  HPATH = "node-modify"
4012
  HTYPE = constants.HTYPE_NODE
4013
  _OP_PARAMS = [
4014
    _PNodeName,
4015
    ("master_candidate", None, ht.TMaybeBool),
4016
    ("offline", None, ht.TMaybeBool),
4017
    ("drained", None, ht.TMaybeBool),
4018
    ("auto_promote", False, ht.TBool),
4019
    ("master_capable", None, ht.TMaybeBool),
4020
    ("vm_capable", None, ht.TMaybeBool),
4021
    ("secondary_ip", None, ht.TMaybeString),
4022
    _PForce,
4023
    ]
4024
  REQ_BGL = False
4025
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
4026
  _F2R = {
4027
    (True, False, False): _ROLE_CANDIDATE,
4028
    (False, True, False): _ROLE_DRAINED,
4029
    (Fals