Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ c6a9dffa

History | View | Annotate | Download (383.7 kB)

<
1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay to many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42

    
43
from ganeti import ssh
44
from ganeti import utils
45
from ganeti import errors
46
from ganeti import hypervisor
47
from ganeti import locking
48
from ganeti import constants
49
from ganeti import objects
50
from ganeti import serializer
51
from ganeti import ssconf
52
from ganeti import uidpool
53
from ganeti import compat
54
from ganeti import masterd
55
from ganeti import netutils
56
from ganeti import ht
57

    
58
import ganeti.masterd.instance # pylint: disable-msg=W0611
59

    
60
# Common opcode attributes
61

    
62
#: output fields for a query operation
63
_POutputFields = ("output_fields", ht.NoDefault, ht.TListOf(ht.TNonEmptyString))
64

    
65

    
66
#: the shutdown timeout
67
_PShutdownTimeout = ("shutdown_timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT,
68
                     ht.TPositiveInt)
69

    
70
#: the force parameter
71
_PForce = ("force", False, ht.TBool)
72

    
73
#: a required instance name (for single-instance LUs)
74
_PInstanceName = ("instance_name", ht.NoDefault, ht.TNonEmptyString)
75

    
76
#: Whether to ignore offline nodes
77
_PIgnoreOfflineNodes = ("ignore_offline_nodes", False, ht.TBool)
78

    
79
#: a required node name (for single-node LUs)
80
_PNodeName = ("node_name", ht.NoDefault, ht.TNonEmptyString)
81

    
82
#: the migration type (live/non-live)
83
_PMigrationMode = ("mode", None,
84
                   ht.TOr(ht.TNone, ht.TElemOf(constants.HT_MIGRATION_MODES)))
85

    
86
#: the obsolete 'live' mode (boolean)
87
_PMigrationLive = ("live", None, ht.TMaybeBool)
88

    
89

    
90
# End types
91
class LogicalUnit(object):
92
  """Logical Unit base class.
93

94
  Subclasses must follow these rules:
95
    - implement ExpandNames
96
    - implement CheckPrereq (except when tasklets are used)
97
    - implement Exec (except when tasklets are used)
98
    - implement BuildHooksEnv
99
    - redefine HPATH and HTYPE
100
    - optionally redefine their run requirements:
101
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
102

103
  Note that all commands require root permissions.
104

105
  @ivar dry_run_result: the value (if any) that will be returned to the caller
106
      in dry-run mode (signalled by opcode dry_run parameter)
107
  @cvar _OP_PARAMS: a list of opcode attributes, their defaults values
108
      they should get if not already defined, and types they must match
109

110
  """
111
  HPATH = None
112
  HTYPE = None
113
  _OP_PARAMS = []
114
  REQ_BGL = True
115

    
116
  def __init__(self, processor, op, context, rpc):
117
    """Constructor for LogicalUnit.
118

119
    This needs to be overridden in derived classes in order to check op
120
    validity.
121

122
    """
123
    self.proc = processor
124
    self.op = op
125
    self.cfg = context.cfg
126
    self.context = context
127
    self.rpc = rpc
128
    # Dicts used to declare locking needs to mcpu
129
    self.needed_locks = None
130
    self.acquired_locks = {}
131
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
132
    self.add_locks = {}
133
    self.remove_locks = {}
134
    # Used to force good behavior when calling helper functions
135
    self.recalculate_locks = {}
136
    self.__ssh = None
137
    # logging
138
    self.Log = processor.Log # pylint: disable-msg=C0103
139
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
140
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
141
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
142
    # support for dry-run
143
    self.dry_run_result = None
144
    # support for generic debug attribute
145
    if (not hasattr(self.op, "debug_level") or
146
        not isinstance(self.op.debug_level, int)):
147
      self.op.debug_level = 0
148

    
149
    # Tasklets
150
    self.tasklets = None
151

    
152
    # The new kind-of-type-system
153
    op_id = self.op.OP_ID
154
    for attr_name, aval, test in self._OP_PARAMS:
155
      if not hasattr(op, attr_name):
156
        if aval == ht.NoDefault:
157
          raise errors.OpPrereqError("Required parameter '%s.%s' missing" %
158
                                     (op_id, attr_name), errors.ECODE_INVAL)
159
        else:
160
          if callable(aval):
161
            dval = aval()
162
          else:
163
            dval = aval
164
          setattr(self.op, attr_name, dval)
165
      attr_val = getattr(op, attr_name)
166
      if test == ht.NoType:
167
        # no tests here
168
        continue
169
      if not callable(test):
170
        raise errors.ProgrammerError("Validation for parameter '%s.%s' failed,"
171
                                     " given type is not a proper type (%s)" %
172
                                     (op_id, attr_name, test))
173
      if not test(attr_val):
174
        logging.error("OpCode %s, parameter %s, has invalid type %s/value %s",
175
                      self.op.OP_ID, attr_name, type(attr_val), attr_val)
176
        raise errors.OpPrereqError("Parameter '%s.%s' fails validation" %
177
                                   (op_id, attr_name), errors.ECODE_INVAL)
178

    
179
    self.CheckArguments()
180

    
181
  def __GetSSH(self):
182
    """Returns the SshRunner object
183

184
    """
185
    if not self.__ssh:
186
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
187
    return self.__ssh
188

    
189
  ssh = property(fget=__GetSSH)
190

    
191
  def CheckArguments(self):
192
    """Check syntactic validity for the opcode arguments.
193

194
    This method is for doing a simple syntactic check and ensure
195
    validity of opcode parameters, without any cluster-related
196
    checks. While the same can be accomplished in ExpandNames and/or
197
    CheckPrereq, doing these separate is better because:
198

199
      - ExpandNames is left as as purely a lock-related function
200
      - CheckPrereq is run after we have acquired locks (and possible
201
        waited for them)
202

203
    The function is allowed to change the self.op attribute so that
204
    later methods can no longer worry about missing parameters.
205

206
    """
207
    pass
208

    
209
  def ExpandNames(self):
210
    """Expand names for this LU.
211

212
    This method is called before starting to execute the opcode, and it should
213
    update all the parameters of the opcode to their canonical form (e.g. a
214
    short node name must be fully expanded after this method has successfully
215
    completed). This way locking, hooks, logging, ecc. can work correctly.
216

217
    LUs which implement this method must also populate the self.needed_locks
218
    member, as a dict with lock levels as keys, and a list of needed lock names
219
    as values. Rules:
220

221
      - use an empty dict if you don't need any lock
222
      - if you don't need any lock at a particular level omit that level
223
      - don't put anything for the BGL level
224
      - if you want all locks at a level use locking.ALL_SET as a value
225

226
    If you need to share locks (rather than acquire them exclusively) at one
227
    level you can modify self.share_locks, setting a true value (usually 1) for
228
    that level. By default locks are not shared.
229

230
    This function can also define a list of tasklets, which then will be
231
    executed in order instead of the usual LU-level CheckPrereq and Exec
232
    functions, if those are not defined by the LU.
233

234
    Examples::
235

236
      # Acquire all nodes and one instance
237
      self.needed_locks = {
238
        locking.LEVEL_NODE: locking.ALL_SET,
239
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
240
      }
241
      # Acquire just two nodes
242
      self.needed_locks = {
243
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
244
      }
245
      # Acquire no locks
246
      self.needed_locks = {} # No, you can't leave it to the default value None
247

248
    """
249
    # The implementation of this method is mandatory only if the new LU is
250
    # concurrent, so that old LUs don't need to be changed all at the same
251
    # time.
252
    if self.REQ_BGL:
253
      self.needed_locks = {} # Exclusive LUs don't need locks.
254
    else:
255
      raise NotImplementedError
256

    
257
  def DeclareLocks(self, level):
258
    """Declare LU locking needs for a level
259

260
    While most LUs can just declare their locking needs at ExpandNames time,
261
    sometimes there's the need to calculate some locks after having acquired
262
    the ones before. This function is called just before acquiring locks at a
263
    particular level, but after acquiring the ones at lower levels, and permits
264
    such calculations. It can be used to modify self.needed_locks, and by
265
    default it does nothing.
266

267
    This function is only called if you have something already set in
268
    self.needed_locks for the level.
269

270
    @param level: Locking level which is going to be locked
271
    @type level: member of ganeti.locking.LEVELS
272

273
    """
274

    
275
  def CheckPrereq(self):
276
    """Check prerequisites for this LU.
277

278
    This method should check that the prerequisites for the execution
279
    of this LU are fulfilled. It can do internode communication, but
280
    it should be idempotent - no cluster or system changes are
281
    allowed.
282

283
    The method should raise errors.OpPrereqError in case something is
284
    not fulfilled. Its return value is ignored.
285

286
    This method should also update all the parameters of the opcode to
287
    their canonical form if it hasn't been done by ExpandNames before.
288

289
    """
290
    if self.tasklets is not None:
291
      for (idx, tl) in enumerate(self.tasklets):
292
        logging.debug("Checking prerequisites for tasklet %s/%s",
293
                      idx + 1, len(self.tasklets))
294
        tl.CheckPrereq()
295
    else:
296
      pass
297

    
298
  def Exec(self, feedback_fn):
299
    """Execute the LU.
300

301
    This method should implement the actual work. It should raise
302
    errors.OpExecError for failures that are somewhat dealt with in
303
    code, or expected.
304

305
    """
306
    if self.tasklets is not None:
307
      for (idx, tl) in enumerate(self.tasklets):
308
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
309
        tl.Exec(feedback_fn)
310
    else:
311
      raise NotImplementedError
312

    
313
  def BuildHooksEnv(self):
314
    """Build hooks environment for this LU.
315

316
    This method should return a three-node tuple consisting of: a dict
317
    containing the environment that will be used for running the
318
    specific hook for this LU, a list of node names on which the hook
319
    should run before the execution, and a list of node names on which
320
    the hook should run after the execution.
321

322
    The keys of the dict must not have 'GANETI_' prefixed as this will
323
    be handled in the hooks runner. Also note additional keys will be
324
    added by the hooks runner. If the LU doesn't define any
325
    environment, an empty dict (and not None) should be returned.
326

327
    No nodes should be returned as an empty list (and not None).
328

329
    Note that if the HPATH for a LU class is None, this function will
330
    not be called.
331

332
    """
333
    raise NotImplementedError
334

    
335
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
336
    """Notify the LU about the results of its hooks.
337

338
    This method is called every time a hooks phase is executed, and notifies
339
    the Logical Unit about the hooks' result. The LU can then use it to alter
340
    its result based on the hooks.  By default the method does nothing and the
341
    previous result is passed back unchanged but any LU can define it if it
342
    wants to use the local cluster hook-scripts somehow.
343

344
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
345
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
346
    @param hook_results: the results of the multi-node hooks rpc call
347
    @param feedback_fn: function used send feedback back to the caller
348
    @param lu_result: the previous Exec result this LU had, or None
349
        in the PRE phase
350
    @return: the new Exec result, based on the previous result
351
        and hook results
352

353
    """
354
    # API must be kept, thus we ignore the unused argument and could
355
    # be a function warnings
356
    # pylint: disable-msg=W0613,R0201
357
    return lu_result
358

    
359
  def _ExpandAndLockInstance(self):
360
    """Helper function to expand and lock an instance.
361

362
    Many LUs that work on an instance take its name in self.op.instance_name
363
    and need to expand it and then declare the expanded name for locking. This
364
    function does it, and then updates self.op.instance_name to the expanded
365
    name. It also initializes needed_locks as a dict, if this hasn't been done
366
    before.
367

368
    """
369
    if self.needed_locks is None:
370
      self.needed_locks = {}
371
    else:
372
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
373
        "_ExpandAndLockInstance called with instance-level locks set"
374
    self.op.instance_name = _ExpandInstanceName(self.cfg,
375
                                                self.op.instance_name)
376
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
377

    
378
  def _LockInstancesNodes(self, primary_only=False):
379
    """Helper function to declare instances' nodes for locking.
380

381
    This function should be called after locking one or more instances to lock
382
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
383
    with all primary or secondary nodes for instances already locked and
384
    present in self.needed_locks[locking.LEVEL_INSTANCE].
385

386
    It should be called from DeclareLocks, and for safety only works if
387
    self.recalculate_locks[locking.LEVEL_NODE] is set.
388

389
    In the future it may grow parameters to just lock some instance's nodes, or
390
    to just lock primaries or secondary nodes, if needed.
391

392
    If should be called in DeclareLocks in a way similar to::
393

394
      if level == locking.LEVEL_NODE:
395
        self._LockInstancesNodes()
396

397
    @type primary_only: boolean
398
    @param primary_only: only lock primary nodes of locked instances
399

400
    """
401
    assert locking.LEVEL_NODE in self.recalculate_locks, \
402
      "_LockInstancesNodes helper function called with no nodes to recalculate"
403

    
404
    # TODO: check if we're really been called with the instance locks held
405

    
406
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
407
    # future we might want to have different behaviors depending on the value
408
    # of self.recalculate_locks[locking.LEVEL_NODE]
409
    wanted_nodes = []
410
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
411
      instance = self.context.cfg.GetInstanceInfo(instance_name)
412
      wanted_nodes.append(instance.primary_node)
413
      if not primary_only:
414
        wanted_nodes.extend(instance.secondary_nodes)
415

    
416
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
417
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
418
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
419
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
420

    
421
    del self.recalculate_locks[locking.LEVEL_NODE]
422

    
423

    
424
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
425
  """Simple LU which runs no hooks.
426

427
  This LU is intended as a parent for other LogicalUnits which will
428
  run no hooks, in order to reduce duplicate code.
429

430
  """
431
  HPATH = None
432
  HTYPE = None
433

    
434
  def BuildHooksEnv(self):
435
    """Empty BuildHooksEnv for NoHooksLu.
436

437
    This just raises an error.
438

439
    """
440
    assert False, "BuildHooksEnv called for NoHooksLUs"
441

    
442

    
443
class Tasklet:
444
  """Tasklet base class.
445

446
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
447
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
448
  tasklets know nothing about locks.
449

450
  Subclasses must follow these rules:
451
    - Implement CheckPrereq
452
    - Implement Exec
453

454
  """
455
  def __init__(self, lu):
456
    self.lu = lu
457

    
458
    # Shortcuts
459
    self.cfg = lu.cfg
460
    self.rpc = lu.rpc
461

    
462
  def CheckPrereq(self):
463
    """Check prerequisites for this tasklets.
464

465
    This method should check whether the prerequisites for the execution of
466
    this tasklet are fulfilled. It can do internode communication, but it
467
    should be idempotent - no cluster or system changes are allowed.
468

469
    The method should raise errors.OpPrereqError in case something is not
470
    fulfilled. Its return value is ignored.
471

472
    This method should also update all parameters to their canonical form if it
473
    hasn't been done before.
474

475
    """
476
    pass
477

    
478
  def Exec(self, feedback_fn):
479
    """Execute the tasklet.
480

481
    This method should implement the actual work. It should raise
482
    errors.OpExecError for failures that are somewhat dealt with in code, or
483
    expected.
484

485
    """
486
    raise NotImplementedError
487

    
488

    
489
def _GetWantedNodes(lu, nodes):
490
  """Returns list of checked and expanded node names.
491

492
  @type lu: L{LogicalUnit}
493
  @param lu: the logical unit on whose behalf we execute
494
  @type nodes: list
495
  @param nodes: list of node names or None for all nodes
496
  @rtype: list
497
  @return: the list of nodes, sorted
498
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
499

500
  """
501
  if not nodes:
502
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
503
      " non-empty list of nodes whose name is to be expanded.")
504

    
505
  wanted = [_ExpandNodeName(lu.cfg, name) for name in nodes]
506
  return utils.NiceSort(wanted)
507

    
508

    
509
def _GetWantedInstances(lu, instances):
510
  """Returns list of checked and expanded instance names.
511

512
  @type lu: L{LogicalUnit}
513
  @param lu: the logical unit on whose behalf we execute
514
  @type instances: list
515
  @param instances: list of instance names or None for all instances
516
  @rtype: list
517
  @return: the list of instances, sorted
518
  @raise errors.OpPrereqError: if the instances parameter is wrong type
519
  @raise errors.OpPrereqError: if any of the passed instances is not found
520

521
  """
522
  if instances:
523
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
524
  else:
525
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
526
  return wanted
527

    
528

    
529
def _GetUpdatedParams(old_params, update_dict,
530
                      use_default=True, use_none=False):
531
  """Return the new version of a parameter dictionary.
532

533
  @type old_params: dict
534
  @param old_params: old parameters
535
  @type update_dict: dict
536
  @param update_dict: dict containing new parameter values, or
537
      constants.VALUE_DEFAULT to reset the parameter to its default
538
      value
539
  @param use_default: boolean
540
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
541
      values as 'to be deleted' values
542
  @param use_none: boolean
543
  @type use_none: whether to recognise C{None} values as 'to be
544
      deleted' values
545
  @rtype: dict
546
  @return: the new parameter dictionary
547

548
  """
549
  params_copy = copy.deepcopy(old_params)
550
  for key, val in update_dict.iteritems():
551
    if ((use_default and val == constants.VALUE_DEFAULT) or
552
        (use_none and val is None)):
553
      try:
554
        del params_copy[key]
555
      except KeyError:
556
        pass
557
    else:
558
      params_copy[key] = val
559
  return params_copy
560

    
561

    
562
def _CheckOutputFields(static, dynamic, selected):
563
  """Checks whether all selected fields are valid.
564

565
  @type static: L{utils.FieldSet}
566
  @param static: static fields set
567
  @type dynamic: L{utils.FieldSet}
568
  @param dynamic: dynamic fields set
569

570
  """
571
  f = utils.FieldSet()
572
  f.Extend(static)
573
  f.Extend(dynamic)
574

    
575
  delta = f.NonMatching(selected)
576
  if delta:
577
    raise errors.OpPrereqError("Unknown output fields selected: %s"
578
                               % ",".join(delta), errors.ECODE_INVAL)
579

    
580

    
581
def _CheckGlobalHvParams(params):
582
  """Validates that given hypervisor params are not global ones.
583

584
  This will ensure that instances don't get customised versions of
585
  global params.
586

587
  """
588
  used_globals = constants.HVC_GLOBALS.intersection(params)
589
  if used_globals:
590
    msg = ("The following hypervisor parameters are global and cannot"
591
           " be customized at instance level, please modify them at"
592
           " cluster level: %s" % utils.CommaJoin(used_globals))
593
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
594

    
595

    
596
def _CheckNodeOnline(lu, node):
597
  """Ensure that a given node is online.
598

599
  @param lu: the LU on behalf of which we make the check
600
  @param node: the node to check
601
  @raise errors.OpPrereqError: if the node is offline
602

603
  """
604
  if lu.cfg.GetNodeInfo(node).offline:
605
    raise errors.OpPrereqError("Can't use offline node %s" % node,
606
                               errors.ECODE_STATE)
607

    
608

    
609
def _CheckNodeNotDrained(lu, node):
610
  """Ensure that a given node is not drained.
611

612
  @param lu: the LU on behalf of which we make the check
613
  @param node: the node to check
614
  @raise errors.OpPrereqError: if the node is drained
615

616
  """
617
  if lu.cfg.GetNodeInfo(node).drained:
618
    raise errors.OpPrereqError("Can't use drained node %s" % node,
619
                               errors.ECODE_STATE)
620

    
621

    
622
def _CheckNodeVmCapable(lu, node):
623
  """Ensure that a given node is vm capable.
624

625
  @param lu: the LU on behalf of which we make the check
626
  @param node: the node to check
627
  @raise errors.OpPrereqError: if the node is not vm capable
628

629
  """
630
  if not lu.cfg.GetNodeInfo(node).vm_capable:
631
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
632
                               errors.ECODE_STATE)
633

    
634

    
635
def _CheckNodeHasOS(lu, node, os_name, force_variant):
636
  """Ensure that a node supports a given OS.
637

638
  @param lu: the LU on behalf of which we make the check
639
  @param node: the node to check
640
  @param os_name: the OS to query about
641
  @param force_variant: whether to ignore variant errors
642
  @raise errors.OpPrereqError: if the node is not supporting the OS
643

644
  """
645
  result = lu.rpc.call_os_get(node, os_name)
646
  result.Raise("OS '%s' not in supported OS list for node %s" %
647
               (os_name, node),
648
               prereq=True, ecode=errors.ECODE_INVAL)
649
  if not force_variant:
650
    _CheckOSVariant(result.payload, os_name)
651

    
652

    
653
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
654
  """Ensure that a node has the given secondary ip.
655

656
  @type lu: L{LogicalUnit}
657
  @param lu: the LU on behalf of which we make the check
658
  @type node: string
659
  @param node: the node to check
660
  @type secondary_ip: string
661
  @param secondary_ip: the ip to check
662
  @type prereq: boolean
663
  @param prereq: whether to throw a prerequisite or an execute error
664
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
665
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
666

667
  """
668
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
669
  result.Raise("Failure checking secondary ip on node %s" % node,
670
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
671
  if not result.payload:
672
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
673
           " please fix and re-run this command" % secondary_ip)
674
    if prereq:
675
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
676
    else:
677
      raise errors.OpExecError(msg)
678

    
679

    
680
def _RequireFileStorage():
681
  """Checks that file storage is enabled.
682

683
  @raise errors.OpPrereqError: when file storage is disabled
684

685
  """
686
  if not constants.ENABLE_FILE_STORAGE:
687
    raise errors.OpPrereqError("File storage disabled at configure time",
688
                               errors.ECODE_INVAL)
689

    
690

    
691
def _CheckDiskTemplate(template):
692
  """Ensure a given disk template is valid.
693

694
  """
695
  if template not in constants.DISK_TEMPLATES:
696
    msg = ("Invalid disk template name '%s', valid templates are: %s" %
697
           (template, utils.CommaJoin(constants.DISK_TEMPLATES)))
698
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
699
  if template == constants.DT_FILE:
700
    _RequireFileStorage()
701
  return True
702

    
703

    
704
def _CheckStorageType(storage_type):
705
  """Ensure a given storage type is valid.
706

707
  """
708
  if storage_type not in constants.VALID_STORAGE_TYPES:
709
    raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
710
                               errors.ECODE_INVAL)
711
  if storage_type == constants.ST_FILE:
712
    _RequireFileStorage()
713
  return True
714

    
715

    
716
def _GetClusterDomainSecret():
717
  """Reads the cluster domain secret.
718

719
  """
720
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
721
                               strict=True)
722

    
723

    
724
def _CheckInstanceDown(lu, instance, reason):
725
  """Ensure that an instance is not running."""
726
  if instance.admin_up:
727
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
728
                               (instance.name, reason), errors.ECODE_STATE)
729

    
730
  pnode = instance.primary_node
731
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
732
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
733
              prereq=True, ecode=errors.ECODE_ENVIRON)
734

    
735
  if instance.name in ins_l.payload:
736
    raise errors.OpPrereqError("Instance %s is running, %s" %
737
                               (instance.name, reason), errors.ECODE_STATE)
738

    
739

    
740
def _ExpandItemName(fn, name, kind):
741
  """Expand an item name.
742

743
  @param fn: the function to use for expansion
744
  @param name: requested item name
745
  @param kind: text description ('Node' or 'Instance')
746
  @return: the resolved (full) name
747
  @raise errors.OpPrereqError: if the item is not found
748

749
  """
750
  full_name = fn(name)
751
  if full_name is None:
752
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
753
                               errors.ECODE_NOENT)
754
  return full_name
755

    
756

    
757
def _ExpandNodeName(cfg, name):
758
  """Wrapper over L{_ExpandItemName} for nodes."""
759
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
760

    
761

    
762
def _ExpandInstanceName(cfg, name):
763
  """Wrapper over L{_ExpandItemName} for instance."""
764
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
765

    
766

    
767
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
768
                          memory, vcpus, nics, disk_template, disks,
769
                          bep, hvp, hypervisor_name):
770
  """Builds instance related env variables for hooks
771

772
  This builds the hook environment from individual variables.
773

774
  @type name: string
775
  @param name: the name of the instance
776
  @type primary_node: string
777
  @param primary_node: the name of the instance's primary node
778
  @type secondary_nodes: list
779
  @param secondary_nodes: list of secondary nodes as strings
780
  @type os_type: string
781
  @param os_type: the name of the instance's OS
782
  @type status: boolean
783
  @param status: the should_run status of the instance
784
  @type memory: string
785
  @param memory: the memory size of the instance
786
  @type vcpus: string
787
  @param vcpus: the count of VCPUs the instance has
788
  @type nics: list
789
  @param nics: list of tuples (ip, mac, mode, link) representing
790
      the NICs the instance has
791
  @type disk_template: string
792
  @param disk_template: the disk template of the instance
793
  @type disks: list
794
  @param disks: the list of (size, mode) pairs
795
  @type bep: dict
796
  @param bep: the backend parameters for the instance
797
  @type hvp: dict
798
  @param hvp: the hypervisor parameters for the instance
799
  @type hypervisor_name: string
800
  @param hypervisor_name: the hypervisor for the instance
801
  @rtype: dict
802
  @return: the hook environment for this instance
803

804
  """
805
  if status:
806
    str_status = "up"
807
  else:
808
    str_status = "down"
809
  env = {
810
    "OP_TARGET": name,
811
    "INSTANCE_NAME": name,
812
    "INSTANCE_PRIMARY": primary_node,
813
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
814
    "INSTANCE_OS_TYPE": os_type,
815
    "INSTANCE_STATUS": str_status,
816
    "INSTANCE_MEMORY": memory,
817
    "INSTANCE_VCPUS": vcpus,
818
    "INSTANCE_DISK_TEMPLATE": disk_template,
819
    "INSTANCE_HYPERVISOR": hypervisor_name,
820
  }
821

    
822
  if nics:
823
    nic_count = len(nics)
824
    for idx, (ip, mac, mode, link) in enumerate(nics):
825
      if ip is None:
826
        ip = ""
827
      env["INSTANCE_NIC%d_IP" % idx] = ip
828
      env["INSTANCE_NIC%d_MAC" % idx] = mac
829
      env["INSTANCE_NIC%d_MODE" % idx] = mode
830
      env["INSTANCE_NIC%d_LINK" % idx] = link
831
      if mode == constants.NIC_MODE_BRIDGED:
832
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
833
  else:
834
    nic_count = 0
835

    
836
  env["INSTANCE_NIC_COUNT"] = nic_count
837

    
838
  if disks:
839
    disk_count = len(disks)
840
    for idx, (size, mode) in enumerate(disks):
841
      env["INSTANCE_DISK%d_SIZE" % idx] = size
842
      env["INSTANCE_DISK%d_MODE" % idx] = mode
843
  else:
844
    disk_count = 0
845

    
846
  env["INSTANCE_DISK_COUNT"] = disk_count
847

    
848
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
849
    for key, value in source.items():
850
      env["INSTANCE_%s_%s" % (kind, key)] = value
851

    
852
  return env
853

    
854

    
855
def _NICListToTuple(lu, nics):
856
  """Build a list of nic information tuples.
857

858
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
859
  value in LUQueryInstanceData.
860

861
  @type lu:  L{LogicalUnit}
862
  @param lu: the logical unit on whose behalf we execute
863
  @type nics: list of L{objects.NIC}
864
  @param nics: list of nics to convert to hooks tuples
865

866
  """
867
  hooks_nics = []
868
  cluster = lu.cfg.GetClusterInfo()
869
  for nic in nics:
870
    ip = nic.ip
871
    mac = nic.mac
872
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
873
    mode = filled_params[constants.NIC_MODE]
874
    link = filled_params[constants.NIC_LINK]
875
    hooks_nics.append((ip, mac, mode, link))
876
  return hooks_nics
877

    
878

    
879
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
880
  """Builds instance related env variables for hooks from an object.
881

882
  @type lu: L{LogicalUnit}
883
  @param lu: the logical unit on whose behalf we execute
884
  @type instance: L{objects.Instance}
885
  @param instance: the instance for which we should build the
886
      environment
887
  @type override: dict
888
  @param override: dictionary with key/values that will override
889
      our values
890
  @rtype: dict
891
  @return: the hook environment dictionary
892

893
  """
894
  cluster = lu.cfg.GetClusterInfo()
895
  bep = cluster.FillBE(instance)
896
  hvp = cluster.FillHV(instance)
897
  args = {
898
    'name': instance.name,
899
    'primary_node': instance.primary_node,
900
    'secondary_nodes': instance.secondary_nodes,
901
    'os_type': instance.os,
902
    'status': instance.admin_up,
903
    'memory': bep[constants.BE_MEMORY],
904
    'vcpus': bep[constants.BE_VCPUS],
905
    'nics': _NICListToTuple(lu, instance.nics),
906
    'disk_template': instance.disk_template,
907
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
908
    'bep': bep,
909
    'hvp': hvp,
910
    'hypervisor_name': instance.hypervisor,
911
  }
912
  if override:
913
    args.update(override)
914
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
915

    
916

    
917
def _AdjustCandidatePool(lu, exceptions):
918
  """Adjust the candidate pool after node operations.
919

920
  """
921
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
922
  if mod_list:
923
    lu.LogInfo("Promoted nodes to master candidate role: %s",
924
               utils.CommaJoin(node.name for node in mod_list))
925
    for name in mod_list:
926
      lu.context.ReaddNode(name)
927
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
928
  if mc_now > mc_max:
929
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
930
               (mc_now, mc_max))
931

    
932

    
933
def _DecideSelfPromotion(lu, exceptions=None):
934
  """Decide whether I should promote myself as a master candidate.
935

936
  """
937
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
938
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
939
  # the new node will increase mc_max with one, so:
940
  mc_should = min(mc_should + 1, cp_size)
941
  return mc_now < mc_should
942

    
943

    
944
def _CheckNicsBridgesExist(lu, target_nics, target_node):
945
  """Check that the brigdes needed by a list of nics exist.
946

947
  """
948
  cluster = lu.cfg.GetClusterInfo()
949
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
950
  brlist = [params[constants.NIC_LINK] for params in paramslist
951
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
952
  if brlist:
953
    result = lu.rpc.call_bridges_exist(target_node, brlist)
954
    result.Raise("Error checking bridges on destination node '%s'" %
955
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
956

    
957

    
958
def _CheckInstanceBridgesExist(lu, instance, node=None):
959
  """Check that the brigdes needed by an instance exist.
960

961
  """
962
  if node is None:
963
    node = instance.primary_node
964
  _CheckNicsBridgesExist(lu, instance.nics, node)
965

    
966

    
967
def _CheckOSVariant(os_obj, name):
968
  """Check whether an OS name conforms to the os variants specification.
969

970
  @type os_obj: L{objects.OS}
971
  @param os_obj: OS object to check
972
  @type name: string
973
  @param name: OS name passed by the user, to check for validity
974

975
  """
976
  if not os_obj.supported_variants:
977
    return
978
  variant = objects.OS.GetVariant(name)
979
  if not variant:
980
    raise errors.OpPrereqError("OS name must include a variant",
981
                               errors.ECODE_INVAL)
982

    
983
  if variant not in os_obj.supported_variants:
984
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
985

    
986

    
987
def _GetNodeInstancesInner(cfg, fn):
988
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
989

    
990

    
991
def _GetNodeInstances(cfg, node_name):
992
  """Returns a list of all primary and secondary instances on a node.
993

994
  """
995

    
996
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
997

    
998

    
999
def _GetNodePrimaryInstances(cfg, node_name):
1000
  """Returns primary instances on a node.
1001

1002
  """
1003
  return _GetNodeInstancesInner(cfg,
1004
                                lambda inst: node_name == inst.primary_node)
1005

    
1006

    
1007
def _GetNodeSecondaryInstances(cfg, node_name):
1008
  """Returns secondary instances on a node.
1009

1010
  """
1011
  return _GetNodeInstancesInner(cfg,
1012
                                lambda inst: node_name in inst.secondary_nodes)
1013

    
1014

    
1015
def _GetStorageTypeArgs(cfg, storage_type):
1016
  """Returns the arguments for a storage type.
1017

1018
  """
1019
  # Special case for file storage
1020
  if storage_type == constants.ST_FILE:
1021
    # storage.FileStorage wants a list of storage directories
1022
    return [[cfg.GetFileStorageDir()]]
1023

    
1024
  return []
1025

    
1026

    
1027
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
1028
  faulty = []
1029

    
1030
  for dev in instance.disks:
1031
    cfg.SetDiskID(dev, node_name)
1032

    
1033
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
1034
  result.Raise("Failed to get disk status from node %s" % node_name,
1035
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1036

    
1037
  for idx, bdev_status in enumerate(result.payload):
1038
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1039
      faulty.append(idx)
1040

    
1041
  return faulty
1042

    
1043

    
1044
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1045
  """Check the sanity of iallocator and node arguments and use the
1046
  cluster-wide iallocator if appropriate.
1047

1048
  Check that at most one of (iallocator, node) is specified. If none is
1049
  specified, then the LU's opcode's iallocator slot is filled with the
1050
  cluster-wide default iallocator.
1051

1052
  @type iallocator_slot: string
1053
  @param iallocator_slot: the name of the opcode iallocator slot
1054
  @type node_slot: string
1055
  @param node_slot: the name of the opcode target node slot
1056

1057
  """
1058
  node = getattr(lu.op, node_slot, None)
1059
  iallocator = getattr(lu.op, iallocator_slot, None)
1060

    
1061
  if node is not None and iallocator is not None:
1062
    raise errors.OpPrereqError("Do not specify both, iallocator and node.",
1063
                               errors.ECODE_INVAL)
1064
  elif node is None and iallocator is None:
1065
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1066
    if default_iallocator:
1067
      setattr(lu.op, iallocator_slot, default_iallocator)
1068
    else:
1069
      raise errors.OpPrereqError("No iallocator or node given and no"
1070
                                 " cluster-wide default iallocator found."
1071
                                 " Please specify either an iallocator or a"
1072
                                 " node, or set a cluster-wide default"
1073
                                 " iallocator.")
1074

    
1075

    
1076
class LUPostInitCluster(LogicalUnit):
1077
  """Logical unit for running hooks after cluster initialization.
1078

1079
  """
1080
  HPATH = "cluster-init"
1081
  HTYPE = constants.HTYPE_CLUSTER
1082

    
1083
  def BuildHooksEnv(self):
1084
    """Build hooks env.
1085

1086
    """
1087
    env = {"OP_TARGET": self.cfg.GetClusterName()}
1088
    mn = self.cfg.GetMasterNode()
1089
    return env, [], [mn]
1090

    
1091
  def Exec(self, feedback_fn):
1092
    """Nothing to do.
1093

1094
    """
1095
    return True
1096

    
1097

    
1098
class LUDestroyCluster(LogicalUnit):
1099
  """Logical unit for destroying the cluster.
1100

1101
  """
1102
  HPATH = "cluster-destroy"
1103
  HTYPE = constants.HTYPE_CLUSTER
1104

    
1105
  def BuildHooksEnv(self):
1106
    """Build hooks env.
1107

1108
    """
1109
    env = {"OP_TARGET": self.cfg.GetClusterName()}
1110
    return env, [], []
1111

    
1112
  def CheckPrereq(self):
1113
    """Check prerequisites.
1114

1115
    This checks whether the cluster is empty.
1116

1117
    Any errors are signaled by raising errors.OpPrereqError.
1118

1119
    """
1120
    master = self.cfg.GetMasterNode()
1121

    
1122
    nodelist = self.cfg.GetNodeList()
1123
    if len(nodelist) != 1 or nodelist[0] != master:
1124
      raise errors.OpPrereqError("There are still %d node(s) in"
1125
                                 " this cluster." % (len(nodelist) - 1),
1126
                                 errors.ECODE_INVAL)
1127
    instancelist = self.cfg.GetInstanceList()
1128
    if instancelist:
1129
      raise errors.OpPrereqError("There are still %d instance(s) in"
1130
                                 " this cluster." % len(instancelist),
1131
                                 errors.ECODE_INVAL)
1132

    
1133
  def Exec(self, feedback_fn):
1134
    """Destroys the cluster.
1135

1136
    """
1137
    master = self.cfg.GetMasterNode()
1138

    
1139
    # Run post hooks on master node before it's removed
1140
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
1141
    try:
1142
      hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
1143
    except:
1144
      # pylint: disable-msg=W0702
1145
      self.LogWarning("Errors occurred running hooks on %s" % master)
1146

    
1147
    result = self.rpc.call_node_stop_master(master, False)
1148
    result.Raise("Could not disable the master role")
1149

    
1150
    return master
1151

    
1152

    
1153
def _VerifyCertificate(filename):
1154
  """Verifies a certificate for LUVerifyCluster.
1155

1156
  @type filename: string
1157
  @param filename: Path to PEM file
1158

1159
  """
1160
  try:
1161
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1162
                                           utils.ReadFile(filename))
1163
  except Exception, err: # pylint: disable-msg=W0703
1164
    return (LUVerifyCluster.ETYPE_ERROR,
1165
            "Failed to load X509 certificate %s: %s" % (filename, err))
1166

    
1167
  (errcode, msg) = \
1168
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1169
                                constants.SSL_CERT_EXPIRATION_ERROR)
1170

    
1171
  if msg:
1172
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1173
  else:
1174
    fnamemsg = None
1175

    
1176
  if errcode is None:
1177
    return (None, fnamemsg)
1178
  elif errcode == utils.CERT_WARNING:
1179
    return (LUVerifyCluster.ETYPE_WARNING, fnamemsg)
1180
  elif errcode == utils.CERT_ERROR:
1181
    return (LUVerifyCluster.ETYPE_ERROR, fnamemsg)
1182

    
1183
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1184

    
1185

    
1186
class LUVerifyCluster(LogicalUnit):
1187
  """Verifies the cluster status.
1188

1189
  """
1190
  HPATH = "cluster-verify"
1191
  HTYPE = constants.HTYPE_CLUSTER
1192
  _OP_PARAMS = [
1193
    ("skip_checks", ht.EmptyList,
1194
     ht.TListOf(ht.TElemOf(constants.VERIFY_OPTIONAL_CHECKS))),
1195
    ("verbose", False, ht.TBool),
1196
    ("error_codes", False, ht.TBool),
1197
    ("debug_simulate_errors", False, ht.TBool),
1198
    ]
1199
  REQ_BGL = False
1200

    
1201
  TCLUSTER = "cluster"
1202
  TNODE = "node"
1203
  TINSTANCE = "instance"
1204

    
1205
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1206
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1207
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1208
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1209
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1210
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1211
  EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
1212
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1213
  ENODEDRBD = (TNODE, "ENODEDRBD")
1214
  ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
1215
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1216
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1217
  ENODEHV = (TNODE, "ENODEHV")
1218
  ENODELVM = (TNODE, "ENODELVM")
1219
  ENODEN1 = (TNODE, "ENODEN1")
1220
  ENODENET = (TNODE, "ENODENET")
1221
  ENODEOS = (TNODE, "ENODEOS")
1222
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1223
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1224
  ENODERPC = (TNODE, "ENODERPC")
1225
  ENODESSH = (TNODE, "ENODESSH")
1226
  ENODEVERSION = (TNODE, "ENODEVERSION")
1227
  ENODESETUP = (TNODE, "ENODESETUP")
1228
  ENODETIME = (TNODE, "ENODETIME")
1229

    
1230
  ETYPE_FIELD = "code"
1231
  ETYPE_ERROR = "ERROR"
1232
  ETYPE_WARNING = "WARNING"
1233

    
1234
  class NodeImage(object):
1235
    """A class representing the logical and physical status of a node.
1236

1237
    @type name: string
1238
    @ivar name: the node name to which this object refers
1239
    @ivar volumes: a structure as returned from
1240
        L{ganeti.backend.GetVolumeList} (runtime)
1241
    @ivar instances: a list of running instances (runtime)
1242
    @ivar pinst: list of configured primary instances (config)
1243
    @ivar sinst: list of configured secondary instances (config)
1244
    @ivar sbp: diction of {secondary-node: list of instances} of all peers
1245
        of this node (config)
1246
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1247
    @ivar dfree: free disk, as reported by the node (runtime)
1248
    @ivar offline: the offline status (config)
1249
    @type rpc_fail: boolean
1250
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1251
        not whether the individual keys were correct) (runtime)
1252
    @type lvm_fail: boolean
1253
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1254
    @type hyp_fail: boolean
1255
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1256
    @type ghost: boolean
1257
    @ivar ghost: whether this is a known node or not (config)
1258
    @type os_fail: boolean
1259
    @ivar os_fail: whether the RPC call didn't return valid OS data
1260
    @type oslist: list
1261
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1262
    @type vm_capable: boolean
1263
    @ivar vm_capable: whether the node can host instances
1264

1265
    """
1266
    def __init__(self, offline=False, name=None, vm_capable=True):
1267
      self.name = name
1268
      self.volumes = {}
1269
      self.instances = []
1270
      self.pinst = []
1271
      self.sinst = []
1272
      self.sbp = {}
1273
      self.mfree = 0
1274
      self.dfree = 0
1275
      self.offline = offline
1276
      self.vm_capable = vm_capable
1277
      self.rpc_fail = False
1278
      self.lvm_fail = False
1279
      self.hyp_fail = False
1280
      self.ghost = False
1281
      self.os_fail = False
1282
      self.oslist = {}
1283

    
1284
  def ExpandNames(self):
1285
    self.needed_locks = {
1286
      locking.LEVEL_NODE: locking.ALL_SET,
1287
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1288
    }
1289
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1290

    
1291
  def _Error(self, ecode, item, msg, *args, **kwargs):
1292
    """Format an error message.
1293

1294
    Based on the opcode's error_codes parameter, either format a
1295
    parseable error code, or a simpler error string.
1296

1297
    This must be called only from Exec and functions called from Exec.
1298

1299
    """
1300
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1301
    itype, etxt = ecode
1302
    # first complete the msg
1303
    if args:
1304
      msg = msg % args
1305
    # then format the whole message
1306
    if self.op.error_codes:
1307
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1308
    else:
1309
      if item:
1310
        item = " " + item
1311
      else:
1312
        item = ""
1313
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1314
    # and finally report it via the feedback_fn
1315
    self._feedback_fn("  - %s" % msg)
1316

    
1317
  def _ErrorIf(self, cond, *args, **kwargs):
1318
    """Log an error message if the passed condition is True.
1319

1320
    """
1321
    cond = bool(cond) or self.op.debug_simulate_errors
1322
    if cond:
1323
      self._Error(*args, **kwargs)
1324
    # do not mark the operation as failed for WARN cases only
1325
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1326
      self.bad = self.bad or cond
1327

    
1328
  def _VerifyNode(self, ninfo, nresult):
1329
    """Perform some basic validation on data returned from a node.
1330

1331
      - check the result data structure is well formed and has all the
1332
        mandatory fields
1333
      - check ganeti version
1334

1335
    @type ninfo: L{objects.Node}
1336
    @param ninfo: the node to check
1337
    @param nresult: the results from the node
1338
    @rtype: boolean
1339
    @return: whether overall this call was successful (and we can expect
1340
         reasonable values in the respose)
1341

1342
    """
1343
    node = ninfo.name
1344
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1345

    
1346
    # main result, nresult should be a non-empty dict
1347
    test = not nresult or not isinstance(nresult, dict)
1348
    _ErrorIf(test, self.ENODERPC, node,
1349
                  "unable to verify node: no data returned")
1350
    if test:
1351
      return False
1352

    
1353
    # compares ganeti version
1354
    local_version = constants.PROTOCOL_VERSION
1355
    remote_version = nresult.get("version", None)
1356
    test = not (remote_version and
1357
                isinstance(remote_version, (list, tuple)) and
1358
                len(remote_version) == 2)
1359
    _ErrorIf(test, self.ENODERPC, node,
1360
             "connection to node returned invalid data")
1361
    if test:
1362
      return False
1363

    
1364
    test = local_version != remote_version[0]
1365
    _ErrorIf(test, self.ENODEVERSION, node,
1366
             "incompatible protocol versions: master %s,"
1367
             " node %s", local_version, remote_version[0])
1368
    if test:
1369
      return False
1370

    
1371
    # node seems compatible, we can actually try to look into its results
1372

    
1373
    # full package version
1374
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1375
                  self.ENODEVERSION, node,
1376
                  "software version mismatch: master %s, node %s",
1377
                  constants.RELEASE_VERSION, remote_version[1],
1378
                  code=self.ETYPE_WARNING)
1379

    
1380
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1381
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1382
      for hv_name, hv_result in hyp_result.iteritems():
1383
        test = hv_result is not None
1384
        _ErrorIf(test, self.ENODEHV, node,
1385
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1386

    
1387
    test = nresult.get(constants.NV_NODESETUP,
1388
                           ["Missing NODESETUP results"])
1389
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1390
             "; ".join(test))
1391

    
1392
    return True
1393

    
1394
  def _VerifyNodeTime(self, ninfo, nresult,
1395
                      nvinfo_starttime, nvinfo_endtime):
1396
    """Check the node time.
1397

1398
    @type ninfo: L{objects.Node}
1399
    @param ninfo: the node to check
1400
    @param nresult: the remote results for the node
1401
    @param nvinfo_starttime: the start time of the RPC call
1402
    @param nvinfo_endtime: the end time of the RPC call
1403

1404
    """
1405
    node = ninfo.name
1406
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1407

    
1408
    ntime = nresult.get(constants.NV_TIME, None)
1409
    try:
1410
      ntime_merged = utils.MergeTime(ntime)
1411
    except (ValueError, TypeError):
1412
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1413
      return
1414

    
1415
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1416
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1417
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1418
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1419
    else:
1420
      ntime_diff = None
1421

    
1422
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1423
             "Node time diverges by at least %s from master node time",
1424
             ntime_diff)
1425

    
1426
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1427
    """Check the node time.
1428

1429
    @type ninfo: L{objects.Node}
1430
    @param ninfo: the node to check
1431
    @param nresult: the remote results for the node
1432
    @param vg_name: the configured VG name
1433

1434
    """
1435
    if vg_name is None:
1436
      return
1437

    
1438
    node = ninfo.name
1439
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1440

    
1441
    # checks vg existence and size > 20G
1442
    vglist = nresult.get(constants.NV_VGLIST, None)
1443
    test = not vglist
1444
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1445
    if not test:
1446
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1447
                                            constants.MIN_VG_SIZE)
1448
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1449

    
1450
    # check pv names
1451
    pvlist = nresult.get(constants.NV_PVLIST, None)
1452
    test = pvlist is None
1453
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1454
    if not test:
1455
      # check that ':' is not present in PV names, since it's a
1456
      # special character for lvcreate (denotes the range of PEs to
1457
      # use on the PV)
1458
      for _, pvname, owner_vg in pvlist:
1459
        test = ":" in pvname
1460
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1461
                 " '%s' of VG '%s'", pvname, owner_vg)
1462

    
1463
  def _VerifyNodeNetwork(self, ninfo, nresult):
1464
    """Check the node time.
1465

1466
    @type ninfo: L{objects.Node}
1467
    @param ninfo: the node to check
1468
    @param nresult: the remote results for the node
1469

1470
    """
1471
    node = ninfo.name
1472
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1473

    
1474
    test = constants.NV_NODELIST not in nresult
1475
    _ErrorIf(test, self.ENODESSH, node,
1476
             "node hasn't returned node ssh connectivity data")
1477
    if not test:
1478
      if nresult[constants.NV_NODELIST]:
1479
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1480
          _ErrorIf(True, self.ENODESSH, node,
1481
                   "ssh communication with node '%s': %s", a_node, a_msg)
1482

    
1483
    test = constants.NV_NODENETTEST not in nresult
1484
    _ErrorIf(test, self.ENODENET, node,
1485
             "node hasn't returned node tcp connectivity data")
1486
    if not test:
1487
      if nresult[constants.NV_NODENETTEST]:
1488
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1489
        for anode in nlist:
1490
          _ErrorIf(True, self.ENODENET, node,
1491
                   "tcp communication with node '%s': %s",
1492
                   anode, nresult[constants.NV_NODENETTEST][anode])
1493

    
1494
    test = constants.NV_MASTERIP not in nresult
1495
    _ErrorIf(test, self.ENODENET, node,
1496
             "node hasn't returned node master IP reachability data")
1497
    if not test:
1498
      if not nresult[constants.NV_MASTERIP]:
1499
        if node == self.master_node:
1500
          msg = "the master node cannot reach the master IP (not configured?)"
1501
        else:
1502
          msg = "cannot reach the master IP"
1503
        _ErrorIf(True, self.ENODENET, node, msg)
1504

    
1505
  def _VerifyInstance(self, instance, instanceconfig, node_image,
1506
                      diskstatus):
1507
    """Verify an instance.
1508

1509
    This function checks to see if the required block devices are
1510
    available on the instance's node.
1511

1512
    """
1513
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1514
    node_current = instanceconfig.primary_node
1515

    
1516
    node_vol_should = {}
1517
    instanceconfig.MapLVsByNode(node_vol_should)
1518

    
1519
    for node in node_vol_should:
1520
      n_img = node_image[node]
1521
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1522
        # ignore missing volumes on offline or broken nodes
1523
        continue
1524
      for volume in node_vol_should[node]:
1525
        test = volume not in n_img.volumes
1526
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1527
                 "volume %s missing on node %s", volume, node)
1528

    
1529
    if instanceconfig.admin_up:
1530
      pri_img = node_image[node_current]
1531
      test = instance not in pri_img.instances and not pri_img.offline
1532
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1533
               "instance not running on its primary node %s",
1534
               node_current)
1535

    
1536
    for node, n_img in node_image.items():
1537
      if (not node == node_current):
1538
        test = instance in n_img.instances
1539
        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1540
                 "instance should not run on node %s", node)
1541

    
1542
    diskdata = [(nname, success, status, idx)
1543
                for (nname, disks) in diskstatus.items()
1544
                for idx, (success, status) in enumerate(disks)]
1545

    
1546
    for nname, success, bdev_status, idx in diskdata:
1547
      _ErrorIf(instanceconfig.admin_up and not success,
1548
               self.EINSTANCEFAULTYDISK, instance,
1549
               "couldn't retrieve status for disk/%s on %s: %s",
1550
               idx, nname, bdev_status)
1551
      _ErrorIf((instanceconfig.admin_up and success and
1552
                bdev_status.ldisk_status == constants.LDS_FAULTY),
1553
               self.EINSTANCEFAULTYDISK, instance,
1554
               "disk/%s on %s is faulty", idx, nname)
1555

    
1556
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1557
    """Verify if there are any unknown volumes in the cluster.
1558

1559
    The .os, .swap and backup volumes are ignored. All other volumes are
1560
    reported as unknown.
1561

1562
    @type reserved: L{ganeti.utils.FieldSet}
1563
    @param reserved: a FieldSet of reserved volume names
1564

1565
    """
1566
    for node, n_img in node_image.items():
1567
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1568
        # skip non-healthy nodes
1569
        continue
1570
      for volume in n_img.volumes:
1571
        test = ((node not in node_vol_should or
1572
                volume not in node_vol_should[node]) and
1573
                not reserved.Matches(volume))
1574
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1575
                      "volume %s is unknown", volume)
1576

    
1577
  def _VerifyOrphanInstances(self, instancelist, node_image):
1578
    """Verify the list of running instances.
1579

1580
    This checks what instances are running but unknown to the cluster.
1581

1582
    """
1583
    for node, n_img in node_image.items():
1584
      for o_inst in n_img.instances:
1585
        test = o_inst not in instancelist
1586
        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1587
                      "instance %s on node %s should not exist", o_inst, node)
1588

    
1589
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1590
    """Verify N+1 Memory Resilience.
1591

1592
    Check that if one single node dies we can still start all the
1593
    instances it was primary for.
1594

1595
    """
1596
    for node, n_img in node_image.items():
1597
      # This code checks that every node which is now listed as
1598
      # secondary has enough memory to host all instances it is
1599
      # supposed to should a single other node in the cluster fail.
1600
      # FIXME: not ready for failover to an arbitrary node
1601
      # FIXME: does not support file-backed instances
1602
      # WARNING: we currently take into account down instances as well
1603
      # as up ones, considering that even if they're down someone
1604
      # might want to start them even in the event of a node failure.
1605
      for prinode, instances in n_img.sbp.items():
1606
        needed_mem = 0
1607
        for instance in instances:
1608
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1609
          if bep[constants.BE_AUTO_BALANCE]:
1610
            needed_mem += bep[constants.BE_MEMORY]
1611
        test = n_img.mfree < needed_mem
1612
        self._ErrorIf(test, self.ENODEN1, node,
1613
                      "not enough memory on to accommodate"
1614
                      " failovers should peer node %s fail", prinode)
1615

    
1616
  def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum,
1617
                       master_files):
1618
    """Verifies and computes the node required file checksums.
1619

1620
    @type ninfo: L{objects.Node}
1621
    @param ninfo: the node to check
1622
    @param nresult: the remote results for the node
1623
    @param file_list: required list of files
1624
    @param local_cksum: dictionary of local files and their checksums
1625
    @param master_files: list of files that only masters should have
1626

1627
    """
1628
    node = ninfo.name
1629
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1630

    
1631
    remote_cksum = nresult.get(constants.NV_FILELIST, None)
1632
    test = not isinstance(remote_cksum, dict)
1633
    _ErrorIf(test, self.ENODEFILECHECK, node,
1634
             "node hasn't returned file checksum data")
1635
    if test:
1636
      return
1637

    
1638
    for file_name in file_list:
1639
      node_is_mc = ninfo.master_candidate
1640
      must_have = (file_name not in master_files) or node_is_mc
1641
      # missing
1642
      test1 = file_name not in remote_cksum
1643
      # invalid checksum
1644
      test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1645
      # existing and good
1646
      test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1647
      _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1648
               "file '%s' missing", file_name)
1649
      _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1650
               "file '%s' has wrong checksum", file_name)
1651
      # not candidate and this is not a must-have file
1652
      _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1653
               "file '%s' should not exist on non master"
1654
               " candidates (and the file is outdated)", file_name)
1655
      # all good, except non-master/non-must have combination
1656
      _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1657
               "file '%s' should not exist"
1658
               " on non master candidates", file_name)
1659

    
1660
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
1661
                      drbd_map):
1662
    """Verifies and the node DRBD status.
1663

1664
    @type ninfo: L{objects.Node}
1665
    @param ninfo: the node to check
1666
    @param nresult: the remote results for the node
1667
    @param instanceinfo: the dict of instances
1668
    @param drbd_helper: the configured DRBD usermode helper
1669
    @param drbd_map: the DRBD map as returned by
1670
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
1671

1672
    """
1673
    node = ninfo.name
1674
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1675

    
1676
    if drbd_helper:
1677
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
1678
      test = (helper_result == None)
1679
      _ErrorIf(test, self.ENODEDRBDHELPER, node,
1680
               "no drbd usermode helper returned")
1681
      if helper_result:
1682
        status, payload = helper_result
1683
        test = not status
1684
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
1685
                 "drbd usermode helper check unsuccessful: %s", payload)
1686
        test = status and (payload != drbd_helper)
1687
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
1688
                 "wrong drbd usermode helper: %s", payload)
1689

    
1690
    # compute the DRBD minors
1691
    node_drbd = {}
1692
    for minor, instance in drbd_map[node].items():
1693
      test = instance not in instanceinfo
1694
      _ErrorIf(test, self.ECLUSTERCFG, None,
1695
               "ghost instance '%s' in temporary DRBD map", instance)
1696
        # ghost instance should not be running, but otherwise we
1697
        # don't give double warnings (both ghost instance and
1698
        # unallocated minor in use)
1699
      if test:
1700
        node_drbd[minor] = (instance, False)
1701
      else:
1702
        instance = instanceinfo[instance]
1703
        node_drbd[minor] = (instance.name, instance.admin_up)
1704

    
1705
    # and now check them
1706
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
1707
    test = not isinstance(used_minors, (tuple, list))
1708
    _ErrorIf(test, self.ENODEDRBD, node,
1709
             "cannot parse drbd status file: %s", str(used_minors))
1710
    if test:
1711
      # we cannot check drbd status
1712
      return
1713

    
1714
    for minor, (iname, must_exist) in node_drbd.items():
1715
      test = minor not in used_minors and must_exist
1716
      _ErrorIf(test, self.ENODEDRBD, node,
1717
               "drbd minor %d of instance %s is not active", minor, iname)
1718
    for minor in used_minors:
1719
      test = minor not in node_drbd
1720
      _ErrorIf(test, self.ENODEDRBD, node,
1721
               "unallocated drbd minor %d is in use", minor)
1722

    
1723
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
1724
    """Builds the node OS structures.
1725

1726
    @type ninfo: L{objects.Node}
1727
    @param ninfo: the node to check
1728
    @param nresult: the remote results for the node
1729
    @param nimg: the node image object
1730

1731
    """
1732
    node = ninfo.name
1733
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1734

    
1735
    remote_os = nresult.get(constants.NV_OSLIST, None)
1736
    test = (not isinstance(remote_os, list) or
1737
            not compat.all(isinstance(v, list) and len(v) == 7
1738
                           for v in remote_os))
1739

    
1740
    _ErrorIf(test, self.ENODEOS, node,
1741
             "node hasn't returned valid OS data")
1742

    
1743
    nimg.os_fail = test
1744

    
1745
    if test:
1746
      return
1747

    
1748
    os_dict = {}
1749

    
1750
    for (name, os_path, status, diagnose,
1751
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
1752

    
1753
      if name not in os_dict:
1754
        os_dict[name] = []
1755

    
1756
      # parameters is a list of lists instead of list of tuples due to
1757
      # JSON lacking a real tuple type, fix it:
1758
      parameters = [tuple(v) for v in parameters]
1759
      os_dict[name].append((os_path, status, diagnose,
1760
                            set(variants), set(parameters), set(api_ver)))
1761

    
1762
    nimg.oslist = os_dict
1763

    
1764
  def _VerifyNodeOS(self, ninfo, nimg, base):
1765
    """Verifies the node OS list.
1766

1767
    @type ninfo: L{objects.Node}
1768
    @param ninfo: the node to check
1769
    @param nimg: the node image object
1770
    @param base: the 'template' node we match against (e.g. from the master)
1771

1772
    """
1773
    node = ninfo.name
1774
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1775

    
1776
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
1777

    
1778
    for os_name, os_data in nimg.oslist.items():
1779
      assert os_data, "Empty OS status for OS %s?!" % os_name
1780
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
1781
      _ErrorIf(not f_status, self.ENODEOS, node,
1782
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
1783
      _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
1784
               "OS '%s' has multiple entries (first one shadows the rest): %s",
1785
               os_name, utils.CommaJoin([v[0] for v in os_data]))
1786
      # this will catched in backend too
1787
      _ErrorIf(compat.any(v >= constants.OS_API_V15 for v in f_api)
1788
               and not f_var, self.ENODEOS, node,
1789
               "OS %s with API at least %d does not declare any variant",
1790
               os_name, constants.OS_API_V15)
1791
      # comparisons with the 'base' image
1792
      test = os_name not in base.oslist
1793
      _ErrorIf(test, self.ENODEOS, node,
1794
               "Extra OS %s not present on reference node (%s)",
1795
               os_name, base.name)
1796
      if test:
1797
        continue
1798
      assert base.oslist[os_name], "Base node has empty OS status?"
1799
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
1800
      if not b_status:
1801
        # base OS is invalid, skipping
1802
        continue
1803
      for kind, a, b in [("API version", f_api, b_api),
1804
                         ("variants list", f_var, b_var),
1805
                         ("parameters", f_param, b_param)]:
1806
        _ErrorIf(a != b, self.ENODEOS, node,
1807
                 "OS %s %s differs from reference node %s: %s vs. %s",
1808
                 kind, os_name, base.name,
1809
                 utils.CommaJoin(a), utils.CommaJoin(b))
1810

    
1811
    # check any missing OSes
1812
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
1813
    _ErrorIf(missing, self.ENODEOS, node,
1814
             "OSes present on reference node %s but missing on this node: %s",
1815
             base.name, utils.CommaJoin(missing))
1816

    
1817
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
1818
    """Verifies and updates the node volume data.
1819

1820
    This function will update a L{NodeImage}'s internal structures
1821
    with data from the remote call.
1822

1823
    @type ninfo: L{objects.Node}
1824
    @param ninfo: the node to check
1825
    @param nresult: the remote results for the node
1826
    @param nimg: the node image object
1827
    @param vg_name: the configured VG name
1828

1829
    """
1830
    node = ninfo.name
1831
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1832

    
1833
    nimg.lvm_fail = True
1834
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1835
    if vg_name is None:
1836
      pass
1837
    elif isinstance(lvdata, basestring):
1838
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1839
               utils.SafeEncode(lvdata))
1840
    elif not isinstance(lvdata, dict):
1841
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1842
    else:
1843
      nimg.volumes = lvdata
1844
      nimg.lvm_fail = False
1845

    
1846
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
1847
    """Verifies and updates the node instance list.
1848

1849
    If the listing was successful, then updates this node's instance
1850
    list. Otherwise, it marks the RPC call as failed for the instance
1851
    list key.
1852

1853
    @type ninfo: L{objects.Node}
1854
    @param ninfo: the node to check
1855
    @param nresult: the remote results for the node
1856
    @param nimg: the node image object
1857

1858
    """
1859
    idata = nresult.get(constants.NV_INSTANCELIST, None)
1860
    test = not isinstance(idata, list)
1861
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
1862
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
1863
    if test:
1864
      nimg.hyp_fail = True
1865
    else:
1866
      nimg.instances = idata
1867

    
1868
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
1869
    """Verifies and computes a node information map
1870

1871
    @type ninfo: L{objects.Node}
1872
    @param ninfo: the node to check
1873
    @param nresult: the remote results for the node
1874
    @param nimg: the node image object
1875
    @param vg_name: the configured VG name
1876

1877
    """
1878
    node = ninfo.name
1879
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1880

    
1881
    # try to read free memory (from the hypervisor)
1882
    hv_info = nresult.get(constants.NV_HVINFO, None)
1883
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
1884
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1885
    if not test:
1886
      try:
1887
        nimg.mfree = int(hv_info["memory_free"])
1888
      except (ValueError, TypeError):
1889
        _ErrorIf(True, self.ENODERPC, node,
1890
                 "node returned invalid nodeinfo, check hypervisor")
1891

    
1892
    # FIXME: devise a free space model for file based instances as well
1893
    if vg_name is not None:
1894
      test = (constants.NV_VGLIST not in nresult or
1895
              vg_name not in nresult[constants.NV_VGLIST])
1896
      _ErrorIf(test, self.ENODELVM, node,
1897
               "node didn't return data for the volume group '%s'"
1898
               " - it is either missing or broken", vg_name)
1899
      if not test:
1900
        try:
1901
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
1902
        except (ValueError, TypeError):
1903
          _ErrorIf(True, self.ENODERPC, node,
1904
                   "node returned invalid LVM info, check LVM status")
1905

    
1906
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
1907
    """Gets per-disk status information for all instances.
1908

1909
    @type nodelist: list of strings
1910
    @param nodelist: Node names
1911
    @type node_image: dict of (name, L{objects.Node})
1912
    @param node_image: Node objects
1913
    @type instanceinfo: dict of (name, L{objects.Instance})
1914
    @param instanceinfo: Instance objects
1915

1916
    """
1917
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1918

    
1919
    node_disks = {}
1920
    node_disks_devonly = {}
1921

    
1922
    for nname in nodelist:
1923
      disks = [(inst, disk)
1924
               for instlist in [node_image[nname].pinst,
1925
                                node_image[nname].sinst]
1926
               for inst in instlist
1927
               for disk in instanceinfo[inst].disks]
1928

    
1929
      if not disks:
1930
        # No need to collect data
1931
        continue
1932

    
1933
      node_disks[nname] = disks
1934

    
1935
      # Creating copies as SetDiskID below will modify the objects and that can
1936
      # lead to incorrect data returned from nodes
1937
      devonly = [dev.Copy() for (_, dev) in disks]
1938

    
1939
      for dev in devonly:
1940
        self.cfg.SetDiskID(dev, nname)
1941

    
1942
      node_disks_devonly[nname] = devonly
1943

    
1944
    assert len(node_disks) == len(node_disks_devonly)
1945

    
1946
    # Collect data from all nodes with disks
1947
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
1948
                                                          node_disks_devonly)
1949

    
1950
    assert len(result) == len(node_disks)
1951

    
1952
    instdisk = {}
1953

    
1954
    for (nname, nres) in result.items():
1955
      if nres.offline:
1956
        # Ignore offline node
1957
        continue
1958

    
1959
      disks = node_disks[nname]
1960

    
1961
      msg = nres.fail_msg
1962
      _ErrorIf(msg, self.ENODERPC, nname,
1963
               "while getting disk information: %s", nres.fail_msg)
1964
      if msg:
1965
        # No data from this node
1966
        data = len(disks) * [None]
1967
      else:
1968
        data = nres.payload
1969

    
1970
      for ((inst, _), status) in zip(disks, data):
1971
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
1972

    
1973
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
1974
                      len(nnames) <= len(instanceinfo[inst].all_nodes)
1975
                      for inst, nnames in instdisk.items()
1976
                      for nname, statuses in nnames.items())
1977

    
1978
    return instdisk
1979

    
1980
  def BuildHooksEnv(self):
1981
    """Build hooks env.
1982

1983
    Cluster-Verify hooks just ran in the post phase and their failure makes
1984
    the output be logged in the verify output and the verification to fail.
1985

1986
    """
1987
    all_nodes = self.cfg.GetNodeList()
1988
    env = {
1989
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1990
      }
1991
    for node in self.cfg.GetAllNodesInfo().values():
1992
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1993

    
1994
    return env, [], all_nodes
1995

    
1996
  def Exec(self, feedback_fn):
1997
    """Verify integrity of cluster, performing various test on nodes.
1998

1999
    """
2000
    self.bad = False
2001
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2002
    verbose = self.op.verbose
2003
    self._feedback_fn = feedback_fn
2004
    feedback_fn("* Verifying global settings")
2005
    for msg in self.cfg.VerifyConfig():
2006
      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
2007

    
2008
    # Check the cluster certificates
2009
    for cert_filename in constants.ALL_CERT_FILES:
2010
      (errcode, msg) = _VerifyCertificate(cert_filename)
2011
      _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
2012

    
2013
    vg_name = self.cfg.GetVGName()
2014
    drbd_helper = self.cfg.GetDRBDHelper()
2015
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2016
    cluster = self.cfg.GetClusterInfo()
2017
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
2018
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
2019
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
2020
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
2021
                        for iname in instancelist)
2022
    i_non_redundant = [] # Non redundant instances
2023
    i_non_a_balanced = [] # Non auto-balanced instances
2024
    n_offline = 0 # Count of offline nodes
2025
    n_drained = 0 # Count of nodes being drained
2026
    node_vol_should = {}
2027

    
2028
    # FIXME: verify OS list
2029
    # do local checksums
2030
    master_files = [constants.CLUSTER_CONF_FILE]
2031
    master_node = self.master_node = self.cfg.GetMasterNode()
2032
    master_ip = self.cfg.GetMasterIP()
2033

    
2034
    file_names = ssconf.SimpleStore().GetFileList()
2035
    file_names.extend(constants.ALL_CERT_FILES)
2036
    file_names.extend(master_files)
2037
    if cluster.modify_etc_hosts:
2038
      file_names.append(constants.ETC_HOSTS)
2039

    
2040
    local_checksums = utils.FingerprintFiles(file_names)
2041

    
2042
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
2043
    node_verify_param = {
2044
      constants.NV_FILELIST: file_names,
2045
      constants.NV_NODELIST: [node.name for node in nodeinfo
2046
                              if not node.offline],
2047
      constants.NV_HYPERVISOR: hypervisors,
2048
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
2049
                                  node.secondary_ip) for node in nodeinfo
2050
                                 if not node.offline],
2051
      constants.NV_INSTANCELIST: hypervisors,
2052
      constants.NV_VERSION: None,
2053
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2054
      constants.NV_NODESETUP: None,
2055
      constants.NV_TIME: None,
2056
      constants.NV_MASTERIP: (master_node, master_ip),
2057
      constants.NV_OSLIST: None,
2058
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2059
      }
2060

    
2061
    if vg_name is not None:
2062
      node_verify_param[constants.NV_VGLIST] = None
2063
      node_verify_param[constants.NV_LVLIST] = vg_name
2064
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2065
      node_verify_param[constants.NV_DRBDLIST] = None
2066

    
2067
    if drbd_helper:
2068
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2069

    
2070
    # Build our expected cluster state
2071
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2072
                                                 name=node.name,
2073
                                                 vm_capable=node.vm_capable))
2074
                      for node in nodeinfo)
2075

    
2076
    for instance in instancelist:
2077
      inst_config = instanceinfo[instance]
2078

    
2079
      for nname in inst_config.all_nodes:
2080
        if nname not in node_image:
2081
          # ghost node
2082
          gnode = self.NodeImage(name=nname)
2083
          gnode.ghost = True
2084
          node_image[nname] = gnode
2085

    
2086
      inst_config.MapLVsByNode(node_vol_should)
2087

    
2088
      pnode = inst_config.primary_node
2089
      node_image[pnode].pinst.append(instance)
2090

    
2091
      for snode in inst_config.secondary_nodes:
2092
        nimg = node_image[snode]
2093
        nimg.sinst.append(instance)
2094
        if pnode not in nimg.sbp:
2095
          nimg.sbp[pnode] = []
2096
        nimg.sbp[pnode].append(instance)
2097

    
2098
    # At this point, we have the in-memory data structures complete,
2099
    # except for the runtime information, which we'll gather next
2100

    
2101
    # Due to the way our RPC system works, exact response times cannot be
2102
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2103
    # time before and after executing the request, we can at least have a time
2104
    # window.
2105
    nvinfo_starttime = time.time()
2106
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
2107
                                           self.cfg.GetClusterName())
2108
    nvinfo_endtime = time.time()
2109

    
2110
    all_drbd_map = self.cfg.ComputeDRBDMap()
2111

    
2112
    feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist))
2113
    instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo)
2114

    
2115
    feedback_fn("* Verifying node status")
2116

    
2117
    refos_img = None
2118

    
2119
    for node_i in nodeinfo:
2120
      node = node_i.name
2121
      nimg = node_image[node]
2122

    
2123
      if node_i.offline:
2124
        if verbose:
2125
          feedback_fn("* Skipping offline node %s" % (node,))
2126
        n_offline += 1
2127
        continue
2128

    
2129
      if node == master_node:
2130
        ntype = "master"
2131
      elif node_i.master_candidate:
2132
        ntype = "master candidate"
2133
      elif node_i.drained:
2134
        ntype = "drained"
2135
        n_drained += 1
2136
      else:
2137
        ntype = "regular"
2138
      if verbose:
2139
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2140

    
2141
      msg = all_nvinfo[node].fail_msg
2142
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
2143
      if msg:
2144
        nimg.rpc_fail = True
2145
        continue
2146

    
2147
      nresult = all_nvinfo[node].payload
2148

    
2149
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2150
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2151
      self._VerifyNodeNetwork(node_i, nresult)
2152
      self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums,
2153
                            master_files)
2154

    
2155
      if nimg.vm_capable:
2156
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2157
        self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper,
2158
                             all_drbd_map)
2159

    
2160
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2161
        self._UpdateNodeInstances(node_i, nresult, nimg)
2162
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2163
        self._UpdateNodeOS(node_i, nresult, nimg)
2164
        if not nimg.os_fail:
2165
          if refos_img is None:
2166
            refos_img = nimg
2167
          self._VerifyNodeOS(node_i, nimg, refos_img)
2168

    
2169
    feedback_fn("* Verifying instance status")
2170
    for instance in instancelist:
2171
      if verbose:
2172
        feedback_fn("* Verifying instance %s" % instance)
2173
      inst_config = instanceinfo[instance]
2174
      self._VerifyInstance(instance, inst_config, node_image,
2175
                           instdisk[instance])
2176
      inst_nodes_offline = []
2177

    
2178
      pnode = inst_config.primary_node
2179
      pnode_img = node_image[pnode]
2180
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2181
               self.ENODERPC, pnode, "instance %s, connection to"
2182
               " primary node failed", instance)
2183

    
2184
      if pnode_img.offline:
2185
        inst_nodes_offline.append(pnode)
2186

    
2187
      # If the instance is non-redundant we cannot survive losing its primary
2188
      # node, so we are not N+1 compliant. On the other hand we have no disk
2189
      # templates with more than one secondary so that situation is not well
2190
      # supported either.
2191
      # FIXME: does not support file-backed instances
2192
      if not inst_config.secondary_nodes:
2193
        i_non_redundant.append(instance)
2194
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
2195
               instance, "instance has multiple secondary nodes: %s",
2196
               utils.CommaJoin(inst_config.secondary_nodes),
2197
               code=self.ETYPE_WARNING)
2198

    
2199
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2200
        i_non_a_balanced.append(instance)
2201

    
2202
      for snode in inst_config.secondary_nodes:
2203
        s_img = node_image[snode]
2204
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
2205
                 "instance %s, connection to secondary node failed", instance)
2206

    
2207
        if s_img.offline:
2208
          inst_nodes_offline.append(snode)
2209

    
2210
      # warn that the instance lives on offline nodes
2211
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
2212
               "instance lives on offline node(s) %s",
2213
               utils.CommaJoin(inst_nodes_offline))
2214
      # ... or ghost/non-vm_capable nodes
2215
      for node in inst_config.all_nodes:
2216
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
2217
                 "instance lives on ghost node %s", node)
2218
        _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE,
2219
                 instance, "instance lives on non-vm_capable node %s", node)
2220

    
2221
    feedback_fn("* Verifying orphan volumes")
2222
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2223
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2224

    
2225
    feedback_fn("* Verifying orphan instances")
2226
    self._VerifyOrphanInstances(instancelist, node_image)
2227

    
2228
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2229
      feedback_fn("* Verifying N+1 Memory redundancy")
2230
      self._VerifyNPlusOneMemory(node_image, instanceinfo)
2231

    
2232
    feedback_fn("* Other Notes")
2233
    if i_non_redundant:
2234
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2235
                  % len(i_non_redundant))
2236

    
2237
    if i_non_a_balanced:
2238
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2239
                  % len(i_non_a_balanced))
2240

    
2241
    if n_offline:
2242
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2243

    
2244
    if n_drained:
2245
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2246

    
2247
    return not self.bad
2248

    
2249
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2250
    """Analyze the post-hooks' result
2251

2252
    This method analyses the hook result, handles it, and sends some
2253
    nicely-formatted feedback back to the user.
2254

2255
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2256
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2257
    @param hooks_results: the results of the multi-node hooks rpc call
2258
    @param feedback_fn: function used send feedback back to the caller
2259
    @param lu_result: previous Exec result
2260
    @return: the new Exec result, based on the previous result
2261
        and hook results
2262

2263
    """
2264
    # We only really run POST phase hooks, and are only interested in
2265
    # their results
2266
    if phase == constants.HOOKS_PHASE_POST:
2267
      # Used to change hooks' output to proper indentation
2268
      indent_re = re.compile('^', re.M)
2269
      feedback_fn("* Hooks Results")
2270
      assert hooks_results, "invalid result from hooks"
2271

    
2272
      for node_name in hooks_results:
2273
        res = hooks_results[node_name]
2274
        msg = res.fail_msg
2275
        test = msg and not res.offline
2276
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
2277
                      "Communication failure in hooks execution: %s", msg)
2278
        if res.offline or msg:
2279
          # No need to investigate payload if node is offline or gave an error.
2280
          # override manually lu_result here as _ErrorIf only
2281
          # overrides self.bad
2282
          lu_result = 1
2283
          continue
2284
        for script, hkr, output in res.payload:
2285
          test = hkr == constants.HKR_FAIL
2286
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
2287
                        "Script %s failed, output:", script)
2288
          if test:
2289
            output = indent_re.sub('      ', output)
2290
            feedback_fn("%s" % output)
2291
            lu_result = 0
2292

    
2293
      return lu_result
2294

    
2295

    
2296
class LUVerifyDisks(NoHooksLU):
2297
  """Verifies the cluster disks status.
2298

2299
  """
2300
  REQ_BGL = False
2301

    
2302
  def ExpandNames(self):
2303
    self.needed_locks = {
2304
      locking.LEVEL_NODE: locking.ALL_SET,
2305
      locking.LEVEL_INSTANCE: locking.ALL_SET,
2306
    }
2307
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
2308

    
2309
  def Exec(self, feedback_fn):
2310
    """Verify integrity of cluster disks.
2311

2312
    @rtype: tuple of three items
2313
    @return: a tuple of (dict of node-to-node_error, list of instances
2314
        which need activate-disks, dict of instance: (node, volume) for
2315
        missing volumes
2316

2317
    """
2318
    result = res_nodes, res_instances, res_missing = {}, [], {}
2319

    
2320
    vg_name = self.cfg.GetVGName()
2321
    nodes = utils.NiceSort(self.cfg.GetNodeList())
2322
    instances = [self.cfg.GetInstanceInfo(name)
2323
                 for name in self.cfg.GetInstanceList()]
2324

    
2325
    nv_dict = {}
2326
    for inst in instances:
2327
      inst_lvs = {}
2328
      if (not inst.admin_up or
2329
          inst.disk_template not in constants.DTS_NET_MIRROR):
2330
        continue
2331
      inst.MapLVsByNode(inst_lvs)
2332
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
2333
      for node, vol_list in inst_lvs.iteritems():
2334
        for vol in vol_list:
2335
          nv_dict[(node, vol)] = inst
2336

    
2337
    if not nv_dict:
2338
      return result
2339

    
2340
    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
2341

    
2342
    for node in nodes:
2343
      # node_volume
2344
      node_res = node_lvs[node]
2345
      if node_res.offline:
2346
        continue
2347
      msg = node_res.fail_msg
2348
      if msg:
2349
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
2350
        res_nodes[node] = msg
2351
        continue
2352

    
2353
      lvs = node_res.payload
2354
      for lv_name, (_, _, lv_online) in lvs.items():
2355
        inst = nv_dict.pop((node, lv_name), None)
2356
        if (not lv_online and inst is not None
2357
            and inst.name not in res_instances):
2358
          res_instances.append(inst.name)
2359

    
2360
    # any leftover items in nv_dict are missing LVs, let's arrange the
2361
    # data better
2362
    for key, inst in nv_dict.iteritems():
2363
      if inst.name not in res_missing:
2364
        res_missing[inst.name] = []
2365
      res_missing[inst.name].append(key)
2366

    
2367
    return result
2368

    
2369

    
2370
class LURepairDiskSizes(NoHooksLU):
2371
  """Verifies the cluster disks sizes.
2372

2373
  """
2374
  _OP_PARAMS = [("instances", ht.EmptyList, ht.TListOf(ht.TNonEmptyString))]
2375
  REQ_BGL = False
2376

    
2377
  def ExpandNames(self):
2378
    if self.op.instances:
2379
      self.wanted_names = []
2380
      for name in self.op.instances:
2381
        full_name = _ExpandInstanceName(self.cfg, name)
2382
        self.wanted_names.append(full_name)
2383
      self.needed_locks = {
2384
        locking.LEVEL_NODE: [],
2385
        locking.LEVEL_INSTANCE: self.wanted_names,
2386
        }
2387
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2388
    else:
2389
      self.wanted_names = None
2390
      self.needed_locks = {
2391
        locking.LEVEL_NODE: locking.ALL_SET,
2392
        locking.LEVEL_INSTANCE: locking.ALL_SET,
2393
        }
2394
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
2395

    
2396
  def DeclareLocks(self, level):
2397
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
2398
      self._LockInstancesNodes(primary_only=True)
2399

    
2400
  def CheckPrereq(self):
2401
    """Check prerequisites.
2402

2403
    This only checks the optional instance list against the existing names.
2404

2405
    """
2406
    if self.wanted_names is None:
2407
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2408

    
2409
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
2410
                             in self.wanted_names]
2411

    
2412
  def _EnsureChildSizes(self, disk):
2413
    """Ensure children of the disk have the needed disk size.
2414

2415
    This is valid mainly for DRBD8 and fixes an issue where the
2416
    children have smaller disk size.
2417

2418
    @param disk: an L{ganeti.objects.Disk} object
2419

2420
    """
2421
    if disk.dev_type == constants.LD_DRBD8:
2422
      assert disk.children, "Empty children for DRBD8?"
2423
      fchild = disk.children[0]
2424
      mismatch = fchild.size < disk.size
2425
      if mismatch:
2426
        self.LogInfo("Child disk has size %d, parent %d, fixing",
2427
                     fchild.size, disk.size)
2428
        fchild.size = disk.size
2429

    
2430
      # and we recurse on this child only, not on the metadev
2431
      return self._EnsureChildSizes(fchild) or mismatch
2432
    else:
2433
      return False
2434

    
2435
  def Exec(self, feedback_fn):
2436
    """Verify the size of cluster disks.
2437

2438
    """
2439
    # TODO: check child disks too
2440
    # TODO: check differences in size between primary/secondary nodes
2441
    per_node_disks = {}
2442
    for instance in self.wanted_instances:
2443
      pnode = instance.primary_node
2444
      if pnode not in per_node_disks:
2445
        per_node_disks[pnode] = []
2446
      for idx, disk in enumerate(instance.disks):
2447
        per_node_disks[pnode].append((instance, idx, disk))
2448

    
2449
    changed = []
2450
    for node, dskl in per_node_disks.items():
2451
      newl = [v[2].Copy() for v in dskl]
2452
      for dsk in newl:
2453
        self.cfg.SetDiskID(dsk, node)
2454
      result = self.rpc.call_blockdev_getsizes(node, newl)
2455
      if result.fail_msg:
2456
        self.LogWarning("Failure in blockdev_getsizes call to node"
2457
                        " %s, ignoring", node)
2458
        continue
2459
      if len(result.data) != len(dskl):
2460
        self.LogWarning("Invalid result from node %s, ignoring node results",
2461
                        node)
2462
        continue
2463
      for ((instance, idx, disk), size) in zip(dskl, result.data):
2464
        if size is None:
2465
          self.LogWarning("Disk %d of instance %s did not return size"
2466
                          " information, ignoring", idx, instance.name)
2467
          continue
2468
        if not isinstance(size, (int, long)):
2469
          self.LogWarning("Disk %d of instance %s did not return valid"
2470
                          " size information, ignoring", idx, instance.name)
2471
          continue
2472
        size = size >> 20
2473
        if size != disk.size:
2474
          self.LogInfo("Disk %d of instance %s has mismatched size,"
2475
                       " correcting: recorded %d, actual %d", idx,
2476
                       instance.name, disk.size, size)
2477
          disk.size = size
2478
          self.cfg.Update(instance, feedback_fn)
2479
          changed.append((instance.name, idx, size))
2480
        if self._EnsureChildSizes(disk):
2481
          self.cfg.Update(instance, feedback_fn)
2482
          changed.append((instance.name, idx, disk.size))
2483
    return changed
2484

    
2485

    
2486
class LURenameCluster(LogicalUnit):
2487
  """Rename the cluster.
2488

2489
  """
2490
  HPATH = "cluster-rename"
2491
  HTYPE = constants.HTYPE_CLUSTER
2492
  _OP_PARAMS = [("name", ht.NoDefault, ht.TNonEmptyString)]
2493

    
2494
  def BuildHooksEnv(self):
2495
    """Build hooks env.
2496

2497
    """
2498
    env = {
2499
      "OP_TARGET": self.cfg.GetClusterName(),
2500
      "NEW_NAME": self.op.name,
2501
      }
2502
    mn = self.cfg.GetMasterNode()
2503
    all_nodes = self.cfg.GetNodeList()
2504
    return env, [mn], all_nodes
2505

    
2506
  def CheckPrereq(self):
2507
    """Verify that the passed name is a valid one.
2508

2509
    """
2510
    hostname = netutils.GetHostname(name=self.op.name,
2511
                                    family=self.cfg.GetPrimaryIPFamily())
2512

    
2513
    new_name = hostname.name
2514
    self.ip = new_ip = hostname.ip
2515
    old_name = self.cfg.GetClusterName()
2516
    old_ip = self.cfg.GetMasterIP()
2517
    if new_name == old_name and new_ip == old_ip:
2518
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
2519
                                 " cluster has changed",
2520
                                 errors.ECODE_INVAL)
2521
    if new_ip != old_ip:
2522
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
2523
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
2524
                                   " reachable on the network" %
2525
                                   new_ip, errors.ECODE_NOTUNIQUE)
2526

    
2527
    self.op.name = new_name
2528

    
2529
  def Exec(self, feedback_fn):
2530
    """Rename the cluster.
2531

2532
    """
2533
    clustername = self.op.name
2534
    ip = self.ip
2535

    
2536
    # shutdown the master IP
2537
    master = self.cfg.GetMasterNode()
2538
    result = self.rpc.call_node_stop_master(master, False)
2539
    result.Raise("Could not disable the master role")
2540

    
2541
    try:
2542
      cluster = self.cfg.GetClusterInfo()
2543
      cluster.cluster_name = clustername
2544
      cluster.master_ip = ip
2545
      self.cfg.Update(cluster, feedback_fn)
2546

    
2547
      # update the known hosts file
2548
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
2549
      node_list = self.cfg.GetNodeList()
2550
      try:
2551
        node_list.remove(master)
2552
      except ValueError:
2553
        pass
2554
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
2555
    finally:
2556
      result = self.rpc.call_node_start_master(master, False, False)
2557
      msg = result.fail_msg
2558
      if msg:
2559
        self.LogWarning("Could not re-enable the master role on"
2560
                        " the master, please restart manually: %s", msg)
2561

    
2562
    return clustername
2563

    
2564

    
2565
class LUSetClusterParams(LogicalUnit):
2566
  """Change the parameters of the cluster.
2567

2568
  """
2569
  HPATH = "cluster-modify"
2570
  HTYPE = constants.HTYPE_CLUSTER
2571
  _OP_PARAMS = [
2572
    ("vg_name", None, ht.TMaybeString),
2573
    ("enabled_hypervisors", None,
2574
     ht.TOr(ht.TAnd(ht.TListOf(ht.TElemOf(constants.HYPER_TYPES)), ht.TTrue),
2575
            ht.TNone)),
2576
    ("hvparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
2577
                              ht.TNone)),
2578
    ("beparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
2579
                              ht.TNone)),
2580
    ("os_hvp", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
2581
                            ht.TNone)),
2582
    ("osparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
2583
                              ht.TNone)),
2584
    ("candidate_pool_size", None, ht.TOr(ht.TStrictPositiveInt, ht.TNone)),
2585
    ("uid_pool", None, ht.NoType),
2586
    ("add_uids", None, ht.NoType),
2587
    ("remove_uids", None, ht.NoType),
2588
    ("maintain_node_health", None, ht.TMaybeBool),
2589
    ("prealloc_wipe_disks", None, ht.TMaybeBool),
2590
    ("nicparams", None, ht.TOr(ht.TDict, ht.TNone)),
2591
    ("drbd_helper", None, ht.TOr(ht.TString, ht.TNone)),
2592
    ("default_iallocator", None, ht.TOr(ht.TString, ht.TNone)),
2593
    ("reserved_lvs", None, ht.TOr(ht.TListOf(ht.TNonEmptyString), ht.TNone)),
2594
    ("hidden_os", None, ht.TOr(ht.TListOf(\
2595
          ht.TAnd(ht.TList,
2596
                ht.TIsLength(2),
2597
                ht.TMap(lambda v: v[0], ht.TElemOf(constants.DDMS_VALUES)))),
2598
          ht.TNone)),
2599
    ("blacklisted_os", None, ht.TOr(ht.TListOf(\
2600
          ht.TAnd(ht.TList,
2601
                ht.TIsLength(2),
2602
                ht.TMap(lambda v: v[0], ht.TElemOf(constants.DDMS_VALUES)))),
2603
          ht.TNone)),
2604
    ]
2605
  REQ_BGL = False
2606

    
2607
  def CheckArguments(self):
2608
    """Check parameters
2609

2610
    """
2611
    if self.op.uid_pool:
2612
      uidpool.CheckUidPool(self.op.uid_pool)
2613

    
2614
    if self.op.add_uids:
2615
      uidpool.CheckUidPool(self.op.add_uids)
2616

    
2617
    if self.op.remove_uids:
2618
      uidpool.CheckUidPool(self.op.remove_uids)
2619

    
2620
  def ExpandNames(self):
2621
    # FIXME: in the future maybe other cluster params won't require checking on
2622
    # all nodes to be modified.
2623
    self.needed_locks = {
2624
      locking.LEVEL_NODE: locking.ALL_SET,
2625
    }
2626
    self.share_locks[locking.LEVEL_NODE] = 1
2627

    
2628
  def BuildHooksEnv(self):
2629
    """Build hooks env.
2630

2631
    """
2632
    env = {
2633
      "OP_TARGET": self.cfg.GetClusterName(),
2634
      "NEW_VG_NAME": self.op.vg_name,
2635
      }
2636
    mn = self.cfg.GetMasterNode()
2637
    return env, [mn], [mn]
2638

    
2639
  def CheckPrereq(self):
2640
    """Check prerequisites.
2641

2642
    This checks whether the given params don't conflict and
2643
    if the given volume group is valid.
2644

2645
    """
2646
    if self.op.vg_name is not None and not self.op.vg_name:
2647
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
2648
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
2649
                                   " instances exist", errors.ECODE_INVAL)
2650

    
2651
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
2652
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
2653
        raise errors.OpPrereqError("Cannot disable drbd helper while"
2654
                                   " drbd-based instances exist",
2655
                                   errors.ECODE_INVAL)
2656

    
2657
    node_list = self.acquired_locks[locking.LEVEL_NODE]
2658

    
2659
    # if vg_name not None, checks given volume group on all nodes
2660
    if self.op.vg_name:
2661
      vglist = self.rpc.call_vg_list(node_list)
2662
      for node in node_list:
2663
        msg = vglist[node].fail_msg
2664
        if msg:
2665
          # ignoring down node
2666
          self.LogWarning("Error while gathering data on node %s"
2667
                          " (ignoring node): %s", node, msg)
2668
          continue
2669
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2670
                                              self.op.vg_name,
2671
                                              constants.MIN_VG_SIZE)
2672
        if vgstatus:
2673
          raise errors.OpPrereqError("Error on node '%s': %s" %
2674
                                     (node, vgstatus), errors.ECODE_ENVIRON)
2675

    
2676
    if self.op.drbd_helper:
2677
      # checks given drbd helper on all nodes
2678
      helpers = self.rpc.call_drbd_helper(node_list)
2679
      for node in node_list:
2680
        ninfo = self.cfg.GetNodeInfo(node)
2681
        if ninfo.offline:
2682
          self.LogInfo("Not checking drbd helper on offline node %s", node)
2683
          continue
2684
        msg = helpers[node].fail_msg
2685
        if msg:
2686
          raise errors.OpPrereqError("Error checking drbd helper on node"
2687
                                     " '%s': %s" % (node, msg),
2688
                                     errors.ECODE_ENVIRON)
2689
        node_helper = helpers[node].payload
2690
        if node_helper != self.op.drbd_helper:
2691
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
2692
                                     (node, node_helper), errors.ECODE_ENVIRON)
2693

    
2694
    self.cluster = cluster = self.cfg.GetClusterInfo()
2695
    # validate params changes
2696
    if self.op.beparams:
2697
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2698
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
2699

    
2700
    if self.op.nicparams:
2701
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2702
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
2703
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
2704
      nic_errors = []
2705

    
2706
      # check all instances for consistency
2707
      for instance in self.cfg.GetAllInstancesInfo().values():
2708
        for nic_idx, nic in enumerate(instance.nics):
2709
          params_copy = copy.deepcopy(nic.nicparams)
2710
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
2711

    
2712
          # check parameter syntax
2713
          try:
2714
            objects.NIC.CheckParameterSyntax(params_filled)
2715
          except errors.ConfigurationError, err:
2716
            nic_errors.append("Instance %s, nic/%d: %s" %
2717
                              (instance.name, nic_idx, err))
2718

    
2719
          # if we're moving instances to routed, check that they have an ip
2720
          target_mode = params_filled[constants.NIC_MODE]
2721
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2722
            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2723
                              (instance.name, nic_idx))
2724
      if nic_errors:
2725
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2726
                                   "\n".join(nic_errors))
2727

    
2728
    # hypervisor list/parameters
2729
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
2730
    if self.op.hvparams:
2731
      for hv_name, hv_dict in self.op.hvparams.items():
2732
        if hv_name not in self.new_hvparams:
2733
          self.new_hvparams[hv_name] = hv_dict
2734
        else:
2735
          self.new_hvparams[hv_name].update(hv_dict)
2736

    
2737
    # os hypervisor parameters
2738
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
2739
    if self.op.os_hvp:
2740
      for os_name, hvs in self.op.os_hvp.items():
2741
        if os_name not in self.new_os_hvp:
2742
          self.new_os_hvp[os_name] = hvs
2743
        else:
2744
          for hv_name, hv_dict in hvs.items():
2745
            if hv_name not in self.new_os_hvp[os_name]:
2746
              self.new_os_hvp[os_name][hv_name] = hv_dict
2747
            else:
2748
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
2749

    
2750
    # os parameters
2751
    self.new_osp = objects.FillDict(cluster.osparams, {})
2752
    if self.op.osparams:
2753
      for os_name, osp in self.op.osparams.items():
2754
        if os_name not in self.new_osp:
2755
          self.new_osp[os_name] = {}
2756

    
2757
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
2758
                                                  use_none=True)
2759

    
2760
        if not self.new_osp[os_name]:
2761
          # we removed all parameters
2762
          del self.new_osp[os_name]
2763
        else:
2764
          # check the parameter validity (remote check)
2765
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
2766
                         os_name, self.new_osp[os_name])
2767

    
2768
    # changes to the hypervisor list
2769
    if self.op.enabled_hypervisors is not None:
2770
      self.hv_list = self.op.enabled_hypervisors
2771
      for hv in self.hv_list:
2772
        # if the hypervisor doesn't already exist in the cluster
2773
        # hvparams, we initialize it to empty, and then (in both
2774
        # cases) we make sure to fill the defaults, as we might not
2775
        # have a complete defaults list if the hypervisor wasn't
2776
        # enabled before
2777
        if hv not in new_hvp:
2778
          new_hvp[hv] = {}
2779
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
2780
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
2781
    else:
2782
      self.hv_list = cluster.enabled_hypervisors
2783

    
2784
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
2785
      # either the enabled list has changed, or the parameters have, validate
2786
      for hv_name, hv_params in self.new_hvparams.items():
2787
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
2788
            (self.op.enabled_hypervisors and
2789
             hv_name in self.op.enabled_hypervisors)):
2790
          # either this is a new hypervisor, or its parameters have changed
2791
          hv_class = hypervisor.GetHypervisor(hv_name)
2792
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2793
          hv_class.CheckParameterSyntax(hv_params)
2794
          _CheckHVParams(self, node_list, hv_name, hv_params)
2795

    
2796
    if self.op.os_hvp:
2797
      # no need to check any newly-enabled hypervisors, since the
2798
      # defaults have already been checked in the above code-block
2799
      for os_name, os_hvp in self.new_os_hvp.items():
2800
        for hv_name, hv_params in os_hvp.items():
2801
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2802
          # we need to fill in the new os_hvp on top of the actual hv_p
2803
          cluster_defaults = self.new_hvparams.get(hv_name, {})
2804
          new_osp = objects.FillDict(cluster_defaults, hv_params)
2805
          hv_class = hypervisor.GetHypervisor(hv_name)
2806
          hv_class.CheckParameterSyntax(new_osp)
2807
          _CheckHVParams(self, node_list, hv_name, new_osp)
2808

    
2809
    if self.op.default_iallocator:
2810
      alloc_script = utils.FindFile(self.op.default_iallocator,
2811
                                    constants.IALLOCATOR_SEARCH_PATH,
2812
                                    os.path.isfile)
2813
      if alloc_script is None:
2814
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
2815
                                   " specified" % self.op.default_iallocator,
2816
                                   errors.ECODE_INVAL)
2817

    
2818
  def Exec(self, feedback_fn):
2819
    """Change the parameters of the cluster.
2820

2821
    """
2822
    if self.op.vg_name is not None:
2823
      new_volume = self.op.vg_name
2824
      if not new_volume:
2825
        new_volume = None
2826
      if new_volume != self.cfg.GetVGName():
2827
        self.cfg.SetVGName(new_volume)
2828
      else:
2829
        feedback_fn("Cluster LVM configuration already in desired"
2830
                    " state, not changing")
2831
    if self.op.drbd_helper is not None:
2832
      new_helper = self.op.drbd_helper
2833
      if not new_helper:
2834
        new_helper = None
2835
      if new_helper != self.cfg.GetDRBDHelper():
2836
        self.cfg.SetDRBDHelper(new_helper)
2837
      else:
2838
        feedback_fn("Cluster DRBD helper already in desired state,"
2839
                    " not changing")
2840
    if self.op.hvparams:
2841
      self.cluster.hvparams = self.new_hvparams
2842
    if self.op.os_hvp:
2843
      self.cluster.os_hvp = self.new_os_hvp
2844
    if self.op.enabled_hypervisors is not None:
2845
      self.cluster.hvparams = self.new_hvparams
2846
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2847
    if self.op.beparams:
2848
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2849
    if self.op.nicparams:
2850
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2851
    if self.op.osparams:
2852
      self.cluster.osparams = self.new_osp
2853

    
2854
    if self.op.candidate_pool_size is not None:
2855
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
2856
      # we need to update the pool size here, otherwise the save will fail
2857
      _AdjustCandidatePool(self, [])
2858

    
2859
    if self.op.maintain_node_health is not None:
2860
      self.cluster.maintain_node_health = self.op.maintain_node_health
2861

    
2862
    if self.op.prealloc_wipe_disks is not None:
2863
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
2864

    
2865
    if self.op.add_uids is not None:
2866
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
2867

    
2868
    if self.op.remove_uids is not None:
2869
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
2870

    
2871
    if self.op.uid_pool is not None:
2872
      self.cluster.uid_pool = self.op.uid_pool
2873

    
2874
    if self.op.default_iallocator is not None:
2875
      self.cluster.default_iallocator = self.op.default_iallocator
2876

    
2877
    if self.op.reserved_lvs is not None:
2878
      self.cluster.reserved_lvs = self.op.reserved_lvs
2879

    
2880
    def helper_os(aname, mods, desc):
2881
      desc += " OS list"
2882
      lst = getattr(self.cluster, aname)
2883
      for key, val in mods:
2884
        if key == constants.DDM_ADD:
2885
          if val in lst:
2886
            feedback_fn("OS %s already in %s, ignoring", val, desc)
2887
          else:
2888
            lst.append(val)
2889
        elif key == constants.DDM_REMOVE:
2890
          if val in lst:
2891
            lst.remove(val)
2892
          else:
2893
            feedback_fn("OS %s not found in %s, ignoring", val, desc)
2894
        else:
2895
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
2896

    
2897
    if self.op.hidden_os:
2898
      helper_os("hidden_os", self.op.hidden_os, "hidden")
2899

    
2900
    if self.op.blacklisted_os:
2901
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
2902

    
2903
    self.cfg.Update(self.cluster, feedback_fn)
2904

    
2905

    
2906
def _UploadHelper(lu, nodes, fname):
2907
  """Helper for uploading a file and showing warnings.
2908

2909
  """
2910
  if os.path.exists(fname):
2911
    result = lu.rpc.call_upload_file(nodes, fname)
2912
    for to_node, to_result in result.items():
2913
      msg = to_result.fail_msg
2914
      if msg:
2915
        msg = ("Copy of file %s to node %s failed: %s" %
2916
               (fname, to_node, msg))
2917
        lu.proc.LogWarning(msg)
2918

    
2919

    
2920
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
2921
  """Distribute additional files which are part of the cluster configuration.
2922

2923
  ConfigWriter takes care of distributing the config and ssconf files, but
2924
  there are more files which should be distributed to all nodes. This function
2925
  makes sure those are copied.
2926

2927
  @param lu: calling logical unit
2928
  @param additional_nodes: list of nodes not in the config to distribute to
2929
  @type additional_vm: boolean
2930
  @param additional_vm: whether the additional nodes are vm-capable or not
2931

2932
  """
2933
  # 1. Gather target nodes
2934
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2935
  dist_nodes = lu.cfg.GetOnlineNodeList()
2936
  nvm_nodes = lu.cfg.GetNonVmCapableNodeList()
2937
  vm_nodes = [name for name in dist_nodes if name not in nvm_nodes]
2938
  if additional_nodes is not None:
2939
    dist_nodes.extend(additional_nodes)
2940
    if additional_vm:
2941
      vm_nodes.extend(additional_nodes)
2942
  if myself.name in dist_nodes:
2943
    dist_nodes.remove(myself.name)
2944
  if myself.name in vm_nodes:
2945
    vm_nodes.remove(myself.name)
2946

    
2947
  # 2. Gather files to distribute
2948
  dist_files = set([constants.ETC_HOSTS,
2949
                    constants.SSH_KNOWN_HOSTS_FILE,
2950
                    constants.RAPI_CERT_FILE,
2951
                    constants.RAPI_USERS_FILE,
2952
                    constants.CONFD_HMAC_KEY,
2953
                    constants.CLUSTER_DOMAIN_SECRET_FILE,
2954
                   ])
2955

    
2956
  vm_files = set()
2957
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2958
  for hv_name in enabled_hypervisors:
2959
    hv_class = hypervisor.GetHypervisor(hv_name)
2960
    vm_files.update(hv_class.GetAncillaryFiles())
2961

    
2962
  # 3. Perform the files upload
2963
  for fname in dist_files:
2964
    _UploadHelper(lu, dist_nodes, fname)
2965
  for fname in vm_files:
2966
    _UploadHelper(lu, vm_nodes, fname)
2967

    
2968

    
2969
class LURedistributeConfig(NoHooksLU):
2970
  """Force the redistribution of cluster configuration.
2971

2972
  This is a very simple LU.
2973

2974
  """
2975
  REQ_BGL = False
2976

    
2977
  def ExpandNames(self):
2978
    self.needed_locks = {
2979
      locking.LEVEL_NODE: locking.ALL_SET,
2980
    }
2981
    self.share_locks[locking.LEVEL_NODE] = 1
2982

    
2983
  def Exec(self, feedback_fn):
2984
    """Redistribute the configuration.
2985

2986
    """
2987
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2988
    _RedistributeAncillaryFiles(self)
2989

    
2990

    
2991
def _WaitForSync(lu, instance, disks=None, oneshot=False):
2992
  """Sleep and poll for an instance's disk to sync.
2993

2994
  """
2995
  if not instance.disks or disks is not None and not disks:
2996
    return True
2997

    
2998
  disks = _ExpandCheckDisks(instance, disks)
2999

    
3000
  if not oneshot:
3001
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3002

    
3003
  node = instance.primary_node
3004

    
3005
  for dev in disks:
3006
    lu.cfg.SetDiskID(dev, node)
3007

    
3008
  # TODO: Convert to utils.Retry
3009

    
3010
  retries = 0
3011
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3012
  while True:
3013
    max_time = 0
3014
    done = True
3015
    cumul_degraded = False
3016
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3017
    msg = rstats.fail_msg
3018
    if msg:
3019
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3020
      retries += 1
3021
      if retries >= 10:
3022
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3023
                                 " aborting." % node)
3024
      time.sleep(6)
3025
      continue
3026
    rstats = rstats.payload
3027
    retries = 0
3028
    for i, mstat in enumerate(rstats):
3029
      if mstat is None:
3030
        lu.LogWarning("Can't compute data for node %s/%s",
3031
                           node, disks[i].iv_name)
3032
        continue
3033

    
3034
      cumul_degraded = (cumul_degraded or
3035
                        (mstat.is_degraded and mstat.sync_percent is None))
3036
      if mstat.sync_percent is not None:
3037
        done = False
3038
        if mstat.estimated_time is not None:
3039
          rem_time = ("%s remaining (estimated)" %
3040
                      utils.FormatSeconds(mstat.estimated_time))
3041
          max_time = mstat.estimated_time
3042
        else:
3043
          rem_time = "no time estimate"
3044
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
3045
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
3046

    
3047
    # if we're done but degraded, let's do a few small retries, to
3048
    # make sure we see a stable and not transient situation; therefore
3049
    # we force restart of the loop
3050
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3051
      logging.info("Degraded disks found, %d retries left", degr_retries)
3052
      degr_retries -= 1
3053
      time.sleep(1)
3054
      continue
3055

    
3056
    if done or oneshot:
3057
      break
3058

    
3059
    time.sleep(min(60, max_time))
3060

    
3061
  if done:
3062
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
3063
  return not cumul_degraded
3064

    
3065

    
3066
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3067
  """Check that mirrors are not degraded.
3068

3069
  The ldisk parameter, if True, will change the test from the
3070
  is_degraded attribute (which represents overall non-ok status for
3071
  the device(s)) to the ldisk (representing the local storage status).
3072

3073
  """
3074
  lu.cfg.SetDiskID(dev, node)
3075

    
3076
  result = True
3077

    
3078
  if on_primary or dev.AssembleOnSecondary():
3079
    rstats = lu.rpc.call_blockdev_find(node, dev)
3080
    msg = rstats.fail_msg
3081
    if msg:
3082
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
3083
      result = False
3084
    elif not rstats.payload:
3085
      lu.LogWarning("Can't find disk on node %s", node)
3086
      result = False
3087
    else:
3088
      if ldisk:
3089
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
3090
      else:
3091
        result = result and not rstats.payload.is_degraded
3092

    
3093
  if dev.children:
3094
    for child in dev.children:
3095
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
3096

    
3097
  return result
3098

    
3099

    
3100
class LUDiagnoseOS(NoHooksLU):
3101
  """Logical unit for OS diagnose/query.
3102

3103
  """
3104
  _OP_PARAMS = [
3105
    _POutputFields,
3106
    ("names", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
3107
    ]
3108
  REQ_BGL = False
3109
  _HID = "hidden"
3110
  _BLK = "blacklisted"
3111
  _VLD = "valid"
3112
  _FIELDS_STATIC = utils.FieldSet()
3113
  _FIELDS_DYNAMIC = utils.FieldSet("name", _VLD, "node_status", "variants",
3114
                                   "parameters", "api_versions", _HID, _BLK)
3115

    
3116
  def CheckArguments(self):
3117
    if self.op.names:
3118
      raise errors.OpPrereqError("Selective OS query not supported",
3119
                                 errors.ECODE_INVAL)
3120

    
3121
    _CheckOutputFields(static=self._FIELDS_STATIC,
3122
                       dynamic=self._FIELDS_DYNAMIC,
3123
                       selected=self.op.output_fields)
3124

    
3125
  def ExpandNames(self):
3126
    # Lock all nodes, in shared mode
3127
    # Temporary removal of locks, should be reverted later
3128
    # TODO: reintroduce locks when they are lighter-weight
3129
    self.needed_locks = {}
3130
    #self.share_locks[locking.LEVEL_NODE] = 1
3131
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3132

    
3133
  @staticmethod
3134
  def _DiagnoseByOS(rlist):
3135
    """Remaps a per-node return list into an a per-os per-node dictionary
3136

3137
    @param rlist: a map with node names as keys and OS objects as values
3138

3139
    @rtype: dict
3140
    @return: a dictionary with osnames as keys and as value another
3141
        map, with nodes as keys and tuples of (path, status, diagnose,
3142
        variants, parameters, api_versions) as values, eg::
3143

3144
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
3145
                                     (/srv/..., False, "invalid api")],
3146
                           "node2": [(/srv/..., True, "", [], [])]}
3147
          }
3148

3149
    """
3150
    all_os = {}
3151
    # we build here the list of nodes that didn't fail the RPC (at RPC
3152
    # level), so that nodes with a non-responding node daemon don't
3153
    # make all OSes invalid
3154
    good_nodes = [node_name for node_name in rlist
3155
                  if not rlist[node_name].fail_msg]
3156
    for node_name, nr in rlist.items():
3157
      if nr.fail_msg or not nr.payload:
3158
        continue
3159
      for (name, path, status, diagnose, variants,
3160
           params, api_versions) in nr.payload:
3161
        if name not in all_os:
3162
          # build a list of nodes for this os containing empty lists
3163
          # for each node in node_list
3164
          all_os[name] = {}
3165
          for nname in good_nodes:
3166
            all_os[name][nname] = []
3167
        # convert params from [name, help] to (name, help)
3168
        params = [tuple(v) for v in params]
3169
        all_os[name][node_name].append((path, status, diagnose,
3170
                                        variants, params, api_versions))
3171
    return all_os
3172

    
3173
  def Exec(self, feedback_fn):
3174
    """Compute the list of OSes.
3175

3176
    """
3177
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
3178
    node_data = self.rpc.call_os_diagnose(valid_nodes)
3179
    pol = self._DiagnoseByOS(node_data)
3180
    output = []
3181
    cluster = self.cfg.GetClusterInfo()
3182

    
3183
    for os_name in utils.NiceSort(pol.keys()):
3184
      os_data = pol[os_name]
3185
      row = []
3186
      valid = True
3187
      (variants, params, api_versions) = null_state = (set(), set(), set())
3188
      for idx, osl in enumerate(os_data.values()):
3189
        valid = bool(valid and osl and osl[0][1])
3190
        if not valid:
3191
          (variants, params, api_versions) = null_state
3192
          break
3193
        node_variants, node_params, node_api = osl[0][3:6]
3194
        if idx == 0: # first entry
3195
          variants = set(node_variants)
3196
          params = set(node_params)
3197
          api_versions = set(node_api)
3198
        else: # keep consistency
3199
          variants.intersection_update(node_variants)
3200
          params.intersection_update(node_params)
3201
          api_versions.intersection_update(node_api)
3202

    
3203
      is_hid = os_name in cluster.hidden_os
3204
      is_blk = os_name in cluster.blacklisted_os
3205
      if ((self._HID not in self.op.output_fields and is_hid) or
3206
          (self._BLK not in self.op.output_fields and is_blk) or
3207
          (self._VLD not in self.op.output_fields and not valid)):
3208
        continue
3209

    
3210
      for field in self.op.output_fields:
3211
        if field == "name":
3212
          val = os_name
3213
        elif field == self._VLD:
3214
          val = valid
3215
        elif field == "node_status":
3216
          # this is just a copy of the dict
3217
          val = {}
3218
          for node_name, nos_list in os_data.items():
3219
            val[node_name] = nos_list
3220
        elif field == "variants":
3221
          val = utils.NiceSort(list(variants))
3222
        elif field == "parameters":
3223
          val = list(params)
3224
        elif field == "api_versions":
3225
          val = list(api_versions)
3226
        elif field == self._HID:
3227
          val = is_hid
3228
        elif field == self._BLK:
3229
          val = is_blk
3230
        else:
3231
          raise errors.ParameterError(field)
3232
        row.append(val)
3233
      output.append(row)
3234

    
3235
    return output
3236

    
3237

    
3238
class LURemoveNode(LogicalUnit):
3239
  """Logical unit for removing a node.
3240

3241
  """
3242
  HPATH = "node-remove"
3243
  HTYPE = constants.HTYPE_NODE
3244
  _OP_PARAMS = [
3245
    _PNodeName,
3246
    ]
3247

    
3248
  def BuildHooksEnv(self):
3249
    """Build hooks env.
3250

3251
    This doesn't run on the target node in the pre phase as a failed
3252
    node would then be impossible to remove.
3253

3254
    """
3255
    env = {
3256
      "OP_TARGET": self.op.node_name,
3257
      "NODE_NAME": self.op.node_name,
3258
      }
3259
    all_nodes = self.cfg.GetNodeList()
3260
    try:
3261
      all_nodes.remove(self.op.node_name)
3262
    except ValueError:
3263
      logging.warning("Node %s which is about to be removed not found"
3264
                      " in the all nodes list", self.op.node_name)
3265
    return env, all_nodes, all_nodes
3266

    
3267
  def CheckPrereq(self):
3268
    """Check prerequisites.
3269

3270
    This checks:
3271
     - the node exists in the configuration
3272
     - it does not have primary or secondary instances
3273
     - it's not the master
3274

3275
    Any errors are signaled by raising errors.OpPrereqError.
3276

3277
    """
3278
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3279
    node = self.cfg.GetNodeInfo(self.op.node_name)
3280
    assert node is not None
3281

    
3282
    instance_list = self.cfg.GetInstanceList()
3283

    
3284
    masternode = self.cfg.GetMasterNode()
3285
    if node.name == masternode:
3286
      raise errors.OpPrereqError("Node is the master node,"
3287
                                 " you need to failover first.",
3288
                                 errors.ECODE_INVAL)
3289

    
3290
    for instance_name in instance_list:
3291
      instance = self.cfg.GetInstanceInfo(instance_name)
3292
      if node.name in instance.all_nodes:
3293
        raise errors.OpPrereqError("Instance %s is still running on the node,"
3294
                                   " please remove first." % instance_name,
3295
                                   errors.ECODE_INVAL)
3296
    self.op.node_name = node.name
3297
    self.node = node
3298

    
3299
  def Exec(self, feedback_fn):
3300
    """Removes the node from the cluster.
3301

3302
    """
3303
    node = self.node
3304
    logging.info("Stopping the node daemon and removing configs from node %s",
3305
                 node.name)
3306

    
3307
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
3308

    
3309
    # Promote nodes to master candidate as needed
3310
    _AdjustCandidatePool(self, exceptions=[node.name])
3311
    self.context.RemoveNode(node.name)
3312

    
3313
    # Run post hooks on the node before it's removed
3314
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
3315
    try:
3316
      hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
3317
    except:
3318
      # pylint: disable-msg=W0702
3319
      self.LogWarning("Errors occurred running hooks on %s" % node.name)
3320

    
3321
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
3322
    msg = result.fail_msg
3323
    if msg:
3324
      self.LogWarning("Errors encountered on the remote node while leaving"
3325
                      " the cluster: %s", msg)
3326

    
3327
    # Remove node from our /etc/hosts
3328
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3329
      master_node = self.cfg.GetMasterNode()
3330
      result = self.rpc.call_etc_hosts_modify(master_node,
3331
                                              constants.ETC_HOSTS_REMOVE,
3332
                                              node.name, None)
3333
      result.Raise("Can't update hosts file with new host data")
3334
      _RedistributeAncillaryFiles(self)
3335

    
3336

    
3337
class LUQueryNodes(NoHooksLU):
3338
  """Logical unit for querying nodes.
3339

3340
  """
3341
  # pylint: disable-msg=W0142
3342
  _OP_PARAMS = [
3343
    _POutputFields,
3344
    ("names", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
3345
    ("use_locking", False, ht.TBool),
3346
    ]
3347
  REQ_BGL = False
3348

    
3349
  _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
3350
                    "master_candidate", "offline", "drained",
3351
                    "master_capable", "vm_capable"]
3352

    
3353
  _FIELDS_DYNAMIC = utils.FieldSet(
3354
    "dtotal", "dfree",
3355
    "mtotal", "mnode", "mfree",
3356
    "bootid",
3357
    "ctotal", "cnodes", "csockets",
3358
    )
3359

    
3360
  _FIELDS_STATIC = utils.FieldSet(*[
3361
    "pinst_cnt", "sinst_cnt",
3362
    "pinst_list", "sinst_list",
3363
    "pip", "sip", "tags",
3364
    "master",
3365
    "role"] + _SIMPLE_FIELDS
3366
    )
3367

    
3368
  def CheckArguments(self):
3369
    _CheckOutputFields(static=self._FIELDS_STATIC,
3370
                       dynamic=self._FIELDS_DYNAMIC,
3371
                       selected=self.op.output_fields)
3372

    
3373
  def ExpandNames(self):
3374
    self.needed_locks = {}
3375
    self.share_locks[locking.LEVEL_NODE] = 1
3376

    
3377
    if self.op.names:
3378
      self.wanted = _GetWantedNodes(self, self.op.names)
3379
    else:
3380
      self.wanted = locking.ALL_SET
3381

    
3382
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3383
    self.do_locking = self.do_node_query and self.op.use_locking
3384
    if self.do_locking:
3385
      # if we don't request only static fields, we need to lock the nodes
3386
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
3387

    
3388
  def Exec(self, feedback_fn):
3389
    """Computes the list of nodes and their attributes.
3390

3391
    """
3392
    all_info = self.cfg.GetAllNodesInfo()
3393
    if self.do_locking:
3394
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
3395
    elif self.wanted != locking.ALL_SET:
3396
      nodenames = self.wanted
3397
      missing = set(nodenames).difference(all_info.keys())
3398
      if missing:
3399
        raise errors.OpExecError(
3400
          "Some nodes were removed before retrieving their data: %s" % missing)
3401
    else:
3402
      nodenames = all_info.keys()
3403

    
3404
    nodenames = utils.NiceSort(nodenames)
3405
    nodelist = [all_info[name] for name in nodenames]
3406

    
3407
    # begin data gathering
3408

    
3409
    if self.do_node_query:
3410
      live_data = {}
3411
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3412
                                          self.cfg.GetHypervisorType())
3413
      for name in nodenames:
3414
        nodeinfo = node_data[name]
3415
        if not nodeinfo.fail_msg and nodeinfo.payload:
3416
          nodeinfo = nodeinfo.payload
3417
          fn = utils.TryConvert
3418
          live_data[name] = {
3419
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
3420
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
3421
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
3422
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
3423
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
3424
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
3425
            "bootid": nodeinfo.get('bootid', None),
3426
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
3427
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
3428
            }
3429
        else:
3430
          live_data[name] = {}
3431
    else:
3432
      live_data = dict.fromkeys(nodenames, {})
3433

    
3434
    node_to_primary = dict([(name, set()) for name in nodenames])
3435
    node_to_secondary = dict([(name, set()) for name in nodenames])
3436

    
3437
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
3438
                             "sinst_cnt", "sinst_list"))
3439
    if inst_fields & frozenset(self.op.output_fields):
3440
      inst_data = self.cfg.GetAllInstancesInfo()
3441

    
3442
      for inst in inst_data.values():
3443
        if inst.primary_node in node_to_primary:
3444
          node_to_primary[inst.primary_node].add(inst.name)
3445
        for secnode in inst.secondary_nodes:
3446
          if secnode in node_to_secondary:
3447
            node_to_secondary[secnode].add(inst.name)
3448

    
3449
    master_node = self.cfg.GetMasterNode()
3450

    
3451
    # end data gathering
3452

    
3453
    output = []
3454
    for node in nodelist:
3455
      node_output = []
3456
      for field in self.op.output_fields:
3457
        if field in self._SIMPLE_FIELDS:
3458
          val = getattr(node, field)
3459
        elif field == "pinst_list":
3460
          val = list(node_to_primary[node.name])
3461
        elif field == "sinst_list":
3462
          val = list(node_to_secondary[node.name])
3463
        elif field == "pinst_cnt":
3464
          val = len(node_to_primary[node.name])
3465
        elif field == "sinst_cnt":
3466
          val = len(node_to_secondary[node.name])
3467
        elif field == "pip":
3468
          val = node.primary_ip
3469
        elif field == "sip":
3470
          val = node.secondary_ip
3471
        elif field == "tags":
3472
          val = list(node.GetTags())
3473
        elif field == "master":
3474
          val = node.name == master_node
3475
        elif self._FIELDS_DYNAMIC.Matches(field):
3476
          val = live_data[node.name].get(field, None)
3477
        elif field == "role":
3478
          if node.name == master_node:
3479
            val = "M"
3480
          elif node.master_candidate:
3481
            val = "C"
3482
          elif node.drained:
3483
            val = "D"
3484
          elif node.offline:
3485
            val = "O"
3486
          else:
3487
            val = "R"
3488
        else:
3489
          raise errors.ParameterError(field)
3490
        node_output.append(val)
3491
      output.append(node_output)
3492

    
3493
    return output
3494

    
3495

    
3496
class LUQueryNodeVolumes(NoHooksLU):
3497
  """Logical unit for getting volumes on node(s).
3498

3499
  """
3500
  _OP_PARAMS = [
3501
    ("nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
3502
    ("output_fields", ht.NoDefault, ht.TListOf(ht.TNonEmptyString)),
3503
    ]
3504
  REQ_BGL = False
3505
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
3506
  _FIELDS_STATIC = utils.FieldSet("node")
3507

    
3508
  def CheckArguments(self):
3509
    _CheckOutputFields(static=self._FIELDS_STATIC,
3510
                       dynamic=self._FIELDS_DYNAMIC,
3511
                       selected=self.op.output_fields)
3512

    
3513
  def ExpandNames(self):
3514
    self.needed_locks = {}
3515
    self.share_locks[locking.LEVEL_NODE] = 1
3516
    if not self.op.nodes:
3517
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3518
    else:
3519
      self.needed_locks[locking.LEVEL_NODE] = \
3520
        _GetWantedNodes(self, self.op.nodes)
3521

    
3522
  def Exec(self, feedback_fn):
3523
    """Computes the list of nodes and their attributes.
3524

3525
    """
3526
    nodenames = self.acquired_locks[locking.LEVEL_NODE]
3527
    volumes = self.rpc.call_node_volumes(nodenames)
3528

    
3529
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3530
             in self.cfg.GetInstanceList()]
3531

    
3532
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
3533

    
3534
    output = []
3535
    for node in nodenames:
3536
      nresult = volumes[node]
3537
      if nresult.offline:
3538
        continue
3539
      msg = nresult.fail_msg
3540
      if msg:
3541
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
3542
        continue
3543

    
3544
      node_vols = nresult.payload[:]
3545
      node_vols.sort(key=lambda vol: vol['dev'])
3546

    
3547
      for vol in node_vols:
3548
        node_output = []
3549
        for field in self.op.output_fields:
3550
          if field == "node":
3551
            val = node
3552
          elif field == "phys":
3553
            val = vol['dev']
3554
          elif field == "vg":
3555
            val = vol['vg']
3556
          elif field == "name":
3557
            val = vol['name']
3558
          elif field == "size":
3559
            val = int(float(vol['size']))
3560
          elif field == "instance":
3561
            for inst in ilist:
3562
              if node not in lv_by_node[inst]:
3563
                continue
3564
              if vol['name'] in lv_by_node[inst][node]:
3565
                val = inst.name
3566
                break
3567
            else:
3568
              val = '-'
3569
          else:
3570
            raise errors.ParameterError(field)
3571
          node_output.append(str(val))
3572

    
3573
        output.append(node_output)
3574

    
3575
    return output
3576

    
3577

    
3578
class LUQueryNodeStorage(NoHooksLU):
3579
  """Logical unit for getting information on storage units on node(s).
3580

3581
  """
3582
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
3583
  _OP_PARAMS = [
3584
    ("nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
3585
    ("storage_type", ht.NoDefault, _CheckStorageType),
3586
    ("output_fields", ht.NoDefault, ht.TListOf(ht.TNonEmptyString)),
3587
    ("name", None, ht.TMaybeString),
3588
    ]
3589
  REQ_BGL = False
3590

    
3591
  def CheckArguments(self):
3592
    _CheckOutputFields(static=self._FIELDS_STATIC,
3593
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
3594
                       selected=self.op.output_fields)
3595

    
3596
  def ExpandNames(self):
3597
    self.needed_locks = {}
3598
    self.share_locks[locking.LEVEL_NODE] = 1
3599

    
3600
    if self.op.nodes:
3601
      self.needed_locks[locking.LEVEL_NODE] = \
3602
        _GetWantedNodes(self, self.op.nodes)
3603
    else:
3604
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3605

    
3606
  def Exec(self, feedback_fn):
3607
    """Computes the list of nodes and their attributes.
3608

3609
    """
3610
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
3611

    
3612
    # Always get name to sort by
3613
    if constants.SF_NAME in self.op.output_fields:
3614
      fields = self.op.output_fields[:]
3615
    else:
3616
      fields = [constants.SF_NAME] + self.op.output_fields
3617

    
3618
    # Never ask for node or type as it's only known to the LU
3619
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
3620
      while extra in fields:
3621
        fields.remove(extra)
3622

    
3623
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
3624
    name_idx = field_idx[constants.SF_NAME]
3625

    
3626
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3627
    data = self.rpc.call_storage_list(self.nodes,
3628
                                      self.op.storage_type, st_args,
3629
                                      self.op.name, fields)
3630

    
3631
    result = []
3632

    
3633
    for node in utils.NiceSort(self.nodes):
3634
      nresult = data[node]
3635
      if nresult.offline:
3636
        continue
3637

    
3638
      msg = nresult.fail_msg
3639
      if msg:
3640
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
3641
        continue
3642

    
3643
      rows = dict([(row[name_idx], row) for row in nresult.payload])
3644

    
3645
      for name in utils.NiceSort(rows.keys()):
3646
        row = rows[name]
3647

    
3648
        out = []
3649

    
3650
        for field in self.op.output_fields:
3651
          if field == constants.SF_NODE:
3652
            val = node
3653
          elif field == constants.SF_TYPE:
3654
            val = self.op.storage_type
3655
          elif field in field_idx:
3656
            val = row[field_idx[field]]
3657
          else:
3658
            raise errors.ParameterError(field)
3659

    
3660
          out.append(val)
3661

    
3662
        result.append(out)
3663

    
3664
    return result
3665

    
3666

    
3667
class LUModifyNodeStorage(NoHooksLU):
3668
  """Logical unit for modifying a storage volume on a node.
3669

3670
  """
3671
  _OP_PARAMS = [
3672
    _PNodeName,
3673
    ("storage_type", ht.NoDefault, _CheckStorageType),
3674
    ("name", ht.NoDefault, ht.TNonEmptyString),
3675
    ("changes", ht.NoDefault, ht.TDict),
3676
    ]
3677
  REQ_BGL = False
3678

    
3679
  def CheckArguments(self):
3680
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3681

    
3682
    storage_type = self.op.storage_type
3683

    
3684
    try:
3685
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
3686
    except KeyError:
3687
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
3688
                                 " modified" % storage_type,
3689
                                 errors.ECODE_INVAL)
3690

    
3691
    diff = set(self.op.changes.keys()) - modifiable
3692
    if diff:
3693
      raise errors.OpPrereqError("The following fields can not be modified for"
3694
                                 " storage units of type '%s': %r" %
3695
                                 (storage_type, list(diff)),
3696
                                 errors.ECODE_INVAL)
3697

    
3698
  def ExpandNames(self):
3699
    self.needed_locks = {
3700
      locking.LEVEL_NODE: self.op.node_name,
3701
      }
3702

    
3703
  def Exec(self, feedback_fn):
3704
    """Computes the list of nodes and their attributes.
3705

3706
    """
3707
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3708
    result = self.rpc.call_storage_modify(self.op.node_name,
3709
                                          self.op.storage_type, st_args,
3710
                                          self.op.name, self.op.changes)
3711
    result.Raise("Failed to modify storage unit '%s' on %s" %
3712
                 (self.op.name, self.op.node_name))
3713

    
3714

    
3715
class LUAddNode(LogicalUnit):
3716
  """Logical unit for adding node to the cluster.
3717

3718
  """
3719
  HPATH = "node-add"
3720
  HTYPE = constants.HTYPE_NODE
3721
  _OP_PARAMS = [
3722
    _PNodeName,
3723
    ("primary_ip", None, ht.NoType),
3724
    ("secondary_ip", None, ht.TMaybeString),
3725
    ("readd", False, ht.TBool),
3726
    ("group", None, ht.TMaybeString),
3727
    ("master_capable", None, ht.TMaybeBool),
3728
    ("vm_capable", None, ht.TMaybeBool),
3729
    ]
3730
  _NFLAGS = ["master_capable", "vm_capable"]
3731

    
3732
  def CheckArguments(self):
3733
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
3734
    # validate/normalize the node name
3735
    self.hostname = netutils.GetHostname(name=self.op.node_name,
3736
                                         family=self.primary_ip_family)
3737
    self.op.node_name = self.hostname.name
3738
    if self.op.readd and self.op.group:
3739
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
3740
                                 " being readded", errors.ECODE_INVAL)
3741

    
3742
  def BuildHooksEnv(self):
3743
    """Build hooks env.
3744

3745
    This will run on all nodes before, and on all nodes + the new node after.
3746

3747
    """
3748
    env = {
3749
      "OP_TARGET": self.op.node_name,
3750
      "NODE_NAME": self.op.node_name,
3751
      "NODE_PIP": self.op.primary_ip,
3752
      "NODE_SIP": self.op.secondary_ip,
3753
      "MASTER_CAPABLE": str(self.op.master_capable),
3754
      "VM_CAPABLE": str(self.op.vm_capable),
3755
      }
3756
    nodes_0 = self.cfg.GetNodeList()
3757
    nodes_1 = nodes_0 + [self.op.node_name, ]
3758
    return env, nodes_0, nodes_1
3759

    
3760
  def CheckPrereq(self):
3761
    """Check prerequisites.
3762

3763
    This checks:
3764
     - the new node is not already in the config
3765
     - it is resolvable
3766
     - its parameters (single/dual homed) matches the cluster
3767

3768
    Any errors are signaled by raising errors.OpPrereqError.
3769

3770
    """
3771
    cfg = self.cfg
3772
    hostname = self.hostname
3773
    node = hostname.name
3774
    primary_ip = self.op.primary_ip = hostname.ip
3775
    if self.op.secondary_ip is None:
3776
      if self.primary_ip_family == netutils.IP6Address.family:
3777
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
3778
                                   " IPv4 address must be given as secondary",
3779
                                   errors.ECODE_INVAL)
3780
      self.op.secondary_ip = primary_ip
3781

    
3782
    secondary_ip = self.op.secondary_ip
3783
    if not netutils.IP4Address.IsValid(secondary_ip):
3784
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
3785
                                 " address" % secondary_ip, errors.ECODE_INVAL)
3786

    
3787
    node_list = cfg.GetNodeList()
3788
    if not self.op.readd and node in node_list:
3789
      raise errors.OpPrereqError("Node %s is already in the configuration" %
3790
                                 node, errors.ECODE_EXISTS)
3791
    elif self.op.readd and node not in node_list:
3792
      raise errors.OpPrereqError("Node %s is not in the configuration" % node,
3793
                                 errors.ECODE_NOENT)
3794

    
3795
    self.changed_primary_ip = False
3796

    
3797
    for existing_node_name in node_list:
3798
      existing_node = cfg.GetNodeInfo(existing_node_name)
3799

    
3800
      if self.op.readd and node == existing_node_name:
3801
        if existing_node.secondary_ip != secondary_ip:
3802
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
3803
                                     " address configuration as before",
3804
                                     errors.ECODE_INVAL)
3805
        if existing_node.primary_ip != primary_ip:
3806
          self.changed_primary_ip = True
3807

    
3808
        continue
3809

    
3810
      if (existing_node.primary_ip == primary_ip or
3811
          existing_node.secondary_ip == primary_ip or
3812
          existing_node.primary_ip == secondary_ip or
3813
          existing_node.secondary_ip == secondary_ip):
3814
        raise errors.OpPrereqError("New node ip address(es) conflict with"
3815
                                   " existing node %s" % existing_node.name,
3816
                                   errors.ECODE_NOTUNIQUE)
3817

    
3818
    # After this 'if' block, None is no longer a valid value for the
3819
    # _capable op attributes
3820
    if self.op.readd:
3821
      old_node = self.cfg.GetNodeInfo(node)
3822
      assert old_node is not None, "Can't retrieve locked node %s" % node
3823
      for attr in self._NFLAGS:
3824
        if getattr(self.op, attr) is None:
3825
          setattr(self.op, attr, getattr(old_node, attr))
3826
    else:
3827
      for attr in self._NFLAGS:
3828
        if getattr(self.op, attr) is None:
3829
          setattr(self.op, attr, True)
3830

    
3831
    if self.op.readd and not self.op.vm_capable:
3832
      pri, sec = cfg.GetNodeInstances(node)
3833
      if pri or sec:
3834
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
3835
                                   " flag set to false, but it already holds"
3836
                                   " instances" % node,
3837
                                   errors.ECODE_STATE)
3838

    
3839
    # check that the type of the node (single versus dual homed) is the
3840
    # same as for the master
3841
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
3842
    master_singlehomed = myself.secondary_ip == myself.primary_ip
3843
    newbie_singlehomed = secondary_ip == primary_ip
3844
    if master_singlehomed != newbie_singlehomed:
3845
      if master_singlehomed:
3846
        raise errors.OpPrereqError("The master has no secondary ip but the"
3847
                                   " new node has one",
3848
                                   errors.ECODE_INVAL)
3849
      else:
3850
        raise errors.OpPrereqError("The master has a secondary ip but the"
3851
                                   " new node doesn't have one",
3852
                                   errors.ECODE_INVAL)
3853

    
3854
    # checks reachability
3855
    if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
3856
      raise errors.OpPrereqError("Node not reachable by ping",
3857
                                 errors.ECODE_ENVIRON)
3858

    
3859
    if not newbie_singlehomed:
3860
      # check reachability from my secondary ip to newbie's secondary ip
3861
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
3862
                           source=myself.secondary_ip):
3863
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
3864
                                   " based ping to node daemon port",
3865
                                   errors.ECODE_ENVIRON)
3866

    
3867
    if self.op.readd:
3868
      exceptions = [node]
3869
    else:
3870
      exceptions = []
3871

    
3872
    if self.op.master_capable:
3873
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
3874
    else:
3875
      self.master_candidate = False
3876

    
3877
    if self.op.readd:
3878
      self.new_node = old_node
3879
    else:
3880
      node_group = cfg.LookupNodeGroup(self.op.group)
3881
      self.new_node = objects.Node(name=node,
3882
                                   primary_ip=primary_ip,
3883
                                   secondary_ip=secondary_ip,
3884
                                   master_candidate=self.master_candidate,
3885
                                   offline=False, drained=False,
3886
                                   group=node_group)
3887

    
3888
  def Exec(self, feedback_fn):
3889
    """Adds the new node to the cluster.
3890

3891
    """
3892
    new_node = self.new_node
3893
    node = new_node.name
3894

    
3895
    # for re-adds, reset the offline/drained/master-candidate flags;
3896
    # we need to reset here, otherwise offline would prevent RPC calls
3897
    # later in the procedure; this also means that if the re-add
3898
    # fails, we are left with a non-offlined, broken node
3899
    if self.op.readd:
3900
      new_node.drained = new_node.offline = False # pylint: disable-msg=W0201
3901
      self.LogInfo("Readding a node, the offline/drained flags were reset")
3902
      # if we demote the node, we do cleanup later in the procedure
3903
      new_node.master_candidate = self.master_candidate
3904
      if self.changed_primary_ip:
3905
        new_node.primary_ip = self.op.primary_ip
3906

    
3907
    # copy the master/vm_capable flags
3908
    for attr in self._NFLAGS:
3909
      setattr(new_node, attr, getattr(self.op, attr))
3910

    
3911
    # notify the user about any possible mc promotion
3912
    if new_node.master_candidate:
3913
      self.LogInfo("Node will be a master candidate")
3914

    
3915
    # check connectivity
3916
    result = self.rpc.call_version([node])[node]
3917
    result.Raise("Can't get version information from node %s" % node)
3918
    if constants.PROTOCOL_VERSION == result.payload:
3919
      logging.info("Communication to node %s fine, sw version %s match",
3920
                   node, result.payload)
3921
    else:
3922
      raise errors.OpExecError("Version mismatch master version %s,"
3923
                               " node version %s" %
3924
                               (constants.PROTOCOL_VERSION, result.payload))
3925

    
3926
    # Add node to our /etc/hosts, and add key to known_hosts
3927
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3928
      master_node = self.cfg.GetMasterNode()
3929
      result = self.rpc.call_etc_hosts_modify(master_node,
3930
                                              constants.ETC_HOSTS_ADD,
3931
                                              self.hostname.name,
3932
                                              self.hostname.ip)
3933
      result.Raise("Can't update hosts file with new host data")
3934

    
3935
    if new_node.secondary_ip != new_node.primary_ip:
3936
      _CheckNodeHasSecondaryIP(self, new_node.name, new_node.secondary_ip,
3937
                               False)
3938

    
3939
    node_verify_list = [self.cfg.GetMasterNode()]
3940
    node_verify_param = {
3941
      constants.NV_NODELIST: [node],
3942
      # TODO: do a node-net-test as well?
3943
    }
3944

    
3945
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3946
                                       self.cfg.GetClusterName())
3947
    for verifier in node_verify_list:
3948
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
3949
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
3950
      if nl_payload:
3951
        for failed in nl_payload:
3952
          feedback_fn("ssh/hostname verification failed"
3953
                      " (checking from %s): %s" %
3954
                      (verifier, nl_payload[failed]))
3955
        raise errors.OpExecError("ssh/hostname verification failed.")
3956

    
3957
    if self.op.readd:
3958
      _RedistributeAncillaryFiles(self)
3959
      self.context.ReaddNode(new_node)
3960
      # make sure we redistribute the config
3961
      self.cfg.Update(new_node, feedback_fn)
3962
      # and make sure the new node will not have old files around
3963
      if not new_node.master_candidate:
3964
        result = self.rpc.call_node_demote_from_mc(new_node.name)
3965
        msg = result.fail_msg
3966
        if msg:
3967
          self.LogWarning("Node failed to demote itself from master"
3968
                          " candidate status: %s" % msg)
3969
    else:
3970
      _RedistributeAncillaryFiles(self, additional_nodes=[node],
3971
                                  additional_vm=self.op.vm_capable)
3972
      self.context.AddNode(new_node, self.proc.GetECId())
3973

    
3974

    
3975
class LUSetNodeParams(LogicalUnit):
3976
  """Modifies the parameters of a node.
3977

3978
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
3979
      to the node role (as _ROLE_*)
3980
  @cvar _R2F: a dictionary from node role to tuples of flags
3981
  @cvar _FLAGS: a list of attribute names corresponding to the flags
3982

3983
  """
3984
  HPATH = "node-modify"
3985
  HTYPE = constants.HTYPE_NODE
3986
  _OP_PARAMS = [
3987
    _PNodeName,
3988
    ("master_candidate", None, ht.TMaybeBool),
3989
    ("offline", None, ht.TMaybeBool),
3990
    ("drained", None, ht.TMaybeBool),
3991
    ("auto_promote", False, ht.TBool),
3992
    ("master_capable", None, ht.TMaybeBool),
3993
    ("vm_capable", None, ht.TMaybeBool),
3994
    ("secondary_ip", None, ht.TMaybeString),
3995
    _PForce,
3996
    ]
3997
  REQ_BGL = False
3998
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
3999
  _F2R = {
4000
    (True, False, False): _ROLE_CANDIDATE,
4001
    (False, True, False): _ROLE_DRAINED,
4002
    (False, False, True): _ROLE_OFFLINE,
4003
    (False, False, False): _ROLE_REGULAR,
4004
    }
4005
  _R2F = dict((v, k) for k, v in _F2R.items())
4006
  _FLAGS = ["master_candidate", "drained", "offline"]
4007

    
4008
  def CheckArguments(self):
4009
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
4010
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
4011
                self.op.master_capable, self.op.vm_capable,
4012
                self.op.secondary_ip]
4013
    if all_mods.count(None) == len(all_mods):
4014
      raise errors.OpPrereqError("Please pass at least one modification",
4015
                                 errors.ECODE_INVAL)
4016
    if all_mods.count(True) > 1:
4017
      raise errors.OpPrereqError("Can't set the node into more than one"
4018
                                 " state at the same time",
4019
                                 errors.ECODE_INVAL)
4020

    
4021
    # Boolean value that tells us whether we might be demoting from MC
4022
    self.might_demote = (self.op.master_candidate == False or
4023
                         self.op.offline == True or
4024
                         self.op.drained == True or
4025
                         self.op.master_capable == False)
4026

    
4027
    if self.op.secondary_ip:
4028
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
4029
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"