Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 661515f6

History | View | Annotate | Download (421.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay to many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42
import itertools
43

    
44
from ganeti import ssh
45
from ganeti import utils
46
from ganeti import errors
47
from ganeti import hypervisor
48
from ganeti import locking
49
from ganeti import constants
50
from ganeti import objects
51
from ganeti import serializer
52
from ganeti import ssconf
53
from ganeti import uidpool
54
from ganeti import compat
55
from ganeti import masterd
56
from ganeti import netutils
57
from ganeti import query
58
from ganeti import qlang
59
from ganeti import opcodes
60

    
61
import ganeti.masterd.instance # pylint: disable-msg=W0611
62

    
63

    
64
def _SupportsOob(cfg, node):
65
  """Tells if node supports OOB.
66

67
  @type cfg: L{config.ConfigWriter}
68
  @param cfg: The cluster configuration
69
  @type node: L{objects.Node}
70
  @param node: The node
71
  @return: The OOB script if supported or an empty string otherwise
72

73
  """
74
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
75

    
76

    
77
class ResultWithJobs:
78
  """Data container for LU results with jobs.
79

80
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
81
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
82
  contained in the C{jobs} attribute and include the job IDs in the opcode
83
  result.
84

85
  """
86
  def __init__(self, jobs, **kwargs):
87
    """Initializes this class.
88

89
    Additional return values can be specified as keyword arguments.
90

91
    @type jobs: list of lists of L{opcode.OpCode}
92
    @param jobs: A list of lists of opcode objects
93

94
    """
95
    self.jobs = jobs
96
    self.other = kwargs
97

    
98

    
99
class LogicalUnit(object):
100
  """Logical Unit base class.
101

102
  Subclasses must follow these rules:
103
    - implement ExpandNames
104
    - implement CheckPrereq (except when tasklets are used)
105
    - implement Exec (except when tasklets are used)
106
    - implement BuildHooksEnv
107
    - implement BuildHooksNodes
108
    - redefine HPATH and HTYPE
109
    - optionally redefine their run requirements:
110
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
111

112
  Note that all commands require root permissions.
113

114
  @ivar dry_run_result: the value (if any) that will be returned to the caller
115
      in dry-run mode (signalled by opcode dry_run parameter)
116

117
  """
118
  HPATH = None
119
  HTYPE = None
120
  REQ_BGL = True
121

    
122
  def __init__(self, processor, op, context, rpc):
123
    """Constructor for LogicalUnit.
124

125
    This needs to be overridden in derived classes in order to check op
126
    validity.
127

128
    """
129
    self.proc = processor
130
    self.op = op
131
    self.cfg = context.cfg
132
    self.context = context
133
    self.rpc = rpc
134
    # Dicts used to declare locking needs to mcpu
135
    self.needed_locks = None
136
    self.acquired_locks = {}
137
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
138
    self.add_locks = {}
139
    self.remove_locks = {}
140
    # Used to force good behavior when calling helper functions
141
    self.recalculate_locks = {}
142
    # logging
143
    self.Log = processor.Log # pylint: disable-msg=C0103
144
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
145
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
146
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
147
    # support for dry-run
148
    self.dry_run_result = None
149
    # support for generic debug attribute
150
    if (not hasattr(self.op, "debug_level") or
151
        not isinstance(self.op.debug_level, int)):
152
      self.op.debug_level = 0
153

    
154
    # Tasklets
155
    self.tasklets = None
156

    
157
    # Validate opcode parameters and set defaults
158
    self.op.Validate(True)
159

    
160
    self.CheckArguments()
161

    
162
  def CheckArguments(self):
163
    """Check syntactic validity for the opcode arguments.
164

165
    This method is for doing a simple syntactic check and ensure
166
    validity of opcode parameters, without any cluster-related
167
    checks. While the same can be accomplished in ExpandNames and/or
168
    CheckPrereq, doing these separate is better because:
169

170
      - ExpandNames is left as as purely a lock-related function
171
      - CheckPrereq is run after we have acquired locks (and possible
172
        waited for them)
173

174
    The function is allowed to change the self.op attribute so that
175
    later methods can no longer worry about missing parameters.
176

177
    """
178
    pass
179

    
180
  def ExpandNames(self):
181
    """Expand names for this LU.
182

183
    This method is called before starting to execute the opcode, and it should
184
    update all the parameters of the opcode to their canonical form (e.g. a
185
    short node name must be fully expanded after this method has successfully
186
    completed). This way locking, hooks, logging, etc. can work correctly.
187

188
    LUs which implement this method must also populate the self.needed_locks
189
    member, as a dict with lock levels as keys, and a list of needed lock names
190
    as values. Rules:
191

192
      - use an empty dict if you don't need any lock
193
      - if you don't need any lock at a particular level omit that level
194
      - don't put anything for the BGL level
195
      - if you want all locks at a level use locking.ALL_SET as a value
196

197
    If you need to share locks (rather than acquire them exclusively) at one
198
    level you can modify self.share_locks, setting a true value (usually 1) for
199
    that level. By default locks are not shared.
200

201
    This function can also define a list of tasklets, which then will be
202
    executed in order instead of the usual LU-level CheckPrereq and Exec
203
    functions, if those are not defined by the LU.
204

205
    Examples::
206

207
      # Acquire all nodes and one instance
208
      self.needed_locks = {
209
        locking.LEVEL_NODE: locking.ALL_SET,
210
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
211
      }
212
      # Acquire just two nodes
213
      self.needed_locks = {
214
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
215
      }
216
      # Acquire no locks
217
      self.needed_locks = {} # No, you can't leave it to the default value None
218

219
    """
220
    # The implementation of this method is mandatory only if the new LU is
221
    # concurrent, so that old LUs don't need to be changed all at the same
222
    # time.
223
    if self.REQ_BGL:
224
      self.needed_locks = {} # Exclusive LUs don't need locks.
225
    else:
226
      raise NotImplementedError
227

    
228
  def DeclareLocks(self, level):
229
    """Declare LU locking needs for a level
230

231
    While most LUs can just declare their locking needs at ExpandNames time,
232
    sometimes there's the need to calculate some locks after having acquired
233
    the ones before. This function is called just before acquiring locks at a
234
    particular level, but after acquiring the ones at lower levels, and permits
235
    such calculations. It can be used to modify self.needed_locks, and by
236
    default it does nothing.
237

238
    This function is only called if you have something already set in
239
    self.needed_locks for the level.
240

241
    @param level: Locking level which is going to be locked
242
    @type level: member of ganeti.locking.LEVELS
243

244
    """
245

    
246
  def CheckPrereq(self):
247
    """Check prerequisites for this LU.
248

249
    This method should check that the prerequisites for the execution
250
    of this LU are fulfilled. It can do internode communication, but
251
    it should be idempotent - no cluster or system changes are
252
    allowed.
253

254
    The method should raise errors.OpPrereqError in case something is
255
    not fulfilled. Its return value is ignored.
256

257
    This method should also update all the parameters of the opcode to
258
    their canonical form if it hasn't been done by ExpandNames before.
259

260
    """
261
    if self.tasklets is not None:
262
      for (idx, tl) in enumerate(self.tasklets):
263
        logging.debug("Checking prerequisites for tasklet %s/%s",
264
                      idx + 1, len(self.tasklets))
265
        tl.CheckPrereq()
266
    else:
267
      pass
268

    
269
  def Exec(self, feedback_fn):
270
    """Execute the LU.
271

272
    This method should implement the actual work. It should raise
273
    errors.OpExecError for failures that are somewhat dealt with in
274
    code, or expected.
275

276
    """
277
    if self.tasklets is not None:
278
      for (idx, tl) in enumerate(self.tasklets):
279
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
280
        tl.Exec(feedback_fn)
281
    else:
282
      raise NotImplementedError
283

    
284
  def BuildHooksEnv(self):
285
    """Build hooks environment for this LU.
286

287
    @rtype: dict
288
    @return: Dictionary containing the environment that will be used for
289
      running the hooks for this LU. The keys of the dict must not be prefixed
290
      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
291
      will extend the environment with additional variables. If no environment
292
      should be defined, an empty dictionary should be returned (not C{None}).
293
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
294
      will not be called.
295

296
    """
297
    raise NotImplementedError
298

    
299
  def BuildHooksNodes(self):
300
    """Build list of nodes to run LU's hooks.
301

302
    @rtype: tuple; (list, list)
303
    @return: Tuple containing a list of node names on which the hook
304
      should run before the execution and a list of node names on which the
305
      hook should run after the execution. No nodes should be returned as an
306
      empty list (and not None).
307
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
308
      will not be called.
309

310
    """
311
    raise NotImplementedError
312

    
313
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
314
    """Notify the LU about the results of its hooks.
315

316
    This method is called every time a hooks phase is executed, and notifies
317
    the Logical Unit about the hooks' result. The LU can then use it to alter
318
    its result based on the hooks.  By default the method does nothing and the
319
    previous result is passed back unchanged but any LU can define it if it
320
    wants to use the local cluster hook-scripts somehow.
321

322
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
323
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
324
    @param hook_results: the results of the multi-node hooks rpc call
325
    @param feedback_fn: function used send feedback back to the caller
326
    @param lu_result: the previous Exec result this LU had, or None
327
        in the PRE phase
328
    @return: the new Exec result, based on the previous result
329
        and hook results
330

331
    """
332
    # API must be kept, thus we ignore the unused argument and could
333
    # be a function warnings
334
    # pylint: disable-msg=W0613,R0201
335
    return lu_result
336

    
337
  def _ExpandAndLockInstance(self):
338
    """Helper function to expand and lock an instance.
339

340
    Many LUs that work on an instance take its name in self.op.instance_name
341
    and need to expand it and then declare the expanded name for locking. This
342
    function does it, and then updates self.op.instance_name to the expanded
343
    name. It also initializes needed_locks as a dict, if this hasn't been done
344
    before.
345

346
    """
347
    if self.needed_locks is None:
348
      self.needed_locks = {}
349
    else:
350
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
351
        "_ExpandAndLockInstance called with instance-level locks set"
352
    self.op.instance_name = _ExpandInstanceName(self.cfg,
353
                                                self.op.instance_name)
354
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
355

    
356
  def _LockInstancesNodes(self, primary_only=False):
357
    """Helper function to declare instances' nodes for locking.
358

359
    This function should be called after locking one or more instances to lock
360
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
361
    with all primary or secondary nodes for instances already locked and
362
    present in self.needed_locks[locking.LEVEL_INSTANCE].
363

364
    It should be called from DeclareLocks, and for safety only works if
365
    self.recalculate_locks[locking.LEVEL_NODE] is set.
366

367
    In the future it may grow parameters to just lock some instance's nodes, or
368
    to just lock primaries or secondary nodes, if needed.
369

370
    If should be called in DeclareLocks in a way similar to::
371

372
      if level == locking.LEVEL_NODE:
373
        self._LockInstancesNodes()
374

375
    @type primary_only: boolean
376
    @param primary_only: only lock primary nodes of locked instances
377

378
    """
379
    assert locking.LEVEL_NODE in self.recalculate_locks, \
380
      "_LockInstancesNodes helper function called with no nodes to recalculate"
381

    
382
    # TODO: check if we're really been called with the instance locks held
383

    
384
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
385
    # future we might want to have different behaviors depending on the value
386
    # of self.recalculate_locks[locking.LEVEL_NODE]
387
    wanted_nodes = []
388
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
389
      instance = self.context.cfg.GetInstanceInfo(instance_name)
390
      wanted_nodes.append(instance.primary_node)
391
      if not primary_only:
392
        wanted_nodes.extend(instance.secondary_nodes)
393

    
394
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
395
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
396
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
397
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
398

    
399
    del self.recalculate_locks[locking.LEVEL_NODE]
400

    
401

    
402
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
403
  """Simple LU which runs no hooks.
404

405
  This LU is intended as a parent for other LogicalUnits which will
406
  run no hooks, in order to reduce duplicate code.
407

408
  """
409
  HPATH = None
410
  HTYPE = None
411

    
412
  def BuildHooksEnv(self):
413
    """Empty BuildHooksEnv for NoHooksLu.
414

415
    This just raises an error.
416

417
    """
418
    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
419

    
420
  def BuildHooksNodes(self):
421
    """Empty BuildHooksNodes for NoHooksLU.
422

423
    """
424
    raise AssertionError("BuildHooksNodes called for NoHooksLU")
425

    
426

    
427
class Tasklet:
428
  """Tasklet base class.
429

430
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
431
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
432
  tasklets know nothing about locks.
433

434
  Subclasses must follow these rules:
435
    - Implement CheckPrereq
436
    - Implement Exec
437

438
  """
439
  def __init__(self, lu):
440
    self.lu = lu
441

    
442
    # Shortcuts
443
    self.cfg = lu.cfg
444
    self.rpc = lu.rpc
445

    
446
  def CheckPrereq(self):
447
    """Check prerequisites for this tasklets.
448

449
    This method should check whether the prerequisites for the execution of
450
    this tasklet are fulfilled. It can do internode communication, but it
451
    should be idempotent - no cluster or system changes are allowed.
452

453
    The method should raise errors.OpPrereqError in case something is not
454
    fulfilled. Its return value is ignored.
455

456
    This method should also update all parameters to their canonical form if it
457
    hasn't been done before.
458

459
    """
460
    pass
461

    
462
  def Exec(self, feedback_fn):
463
    """Execute the tasklet.
464

465
    This method should implement the actual work. It should raise
466
    errors.OpExecError for failures that are somewhat dealt with in code, or
467
    expected.
468

469
    """
470
    raise NotImplementedError
471

    
472

    
473
class _QueryBase:
474
  """Base for query utility classes.
475

476
  """
477
  #: Attribute holding field definitions
478
  FIELDS = None
479

    
480
  def __init__(self, filter_, fields, use_locking):
481
    """Initializes this class.
482

483
    """
484
    self.use_locking = use_locking
485

    
486
    self.query = query.Query(self.FIELDS, fields, filter_=filter_,
487
                             namefield="name")
488
    self.requested_data = self.query.RequestedData()
489
    self.names = self.query.RequestedNames()
490

    
491
    # Sort only if no names were requested
492
    self.sort_by_name = not self.names
493

    
494
    self.do_locking = None
495
    self.wanted = None
496

    
497
  def _GetNames(self, lu, all_names, lock_level):
498
    """Helper function to determine names asked for in the query.
499

500
    """
501
    if self.do_locking:
502
      names = lu.acquired_locks[lock_level]
503
    else:
504
      names = all_names
505

    
506
    if self.wanted == locking.ALL_SET:
507
      assert not self.names
508
      # caller didn't specify names, so ordering is not important
509
      return utils.NiceSort(names)
510

    
511
    # caller specified names and we must keep the same order
512
    assert self.names
513
    assert not self.do_locking or lu.acquired_locks[lock_level]
514

    
515
    missing = set(self.wanted).difference(names)
516
    if missing:
517
      raise errors.OpExecError("Some items were removed before retrieving"
518
                               " their data: %s" % missing)
519

    
520
    # Return expanded names
521
    return self.wanted
522

    
523
  def ExpandNames(self, lu):
524
    """Expand names for this query.
525

526
    See L{LogicalUnit.ExpandNames}.
527

528
    """
529
    raise NotImplementedError()
530

    
531
  def DeclareLocks(self, lu, level):
532
    """Declare locks for this query.
533

534
    See L{LogicalUnit.DeclareLocks}.
535

536
    """
537
    raise NotImplementedError()
538

    
539
  def _GetQueryData(self, lu):
540
    """Collects all data for this query.
541

542
    @return: Query data object
543

544
    """
545
    raise NotImplementedError()
546

    
547
  def NewStyleQuery(self, lu):
548
    """Collect data and execute query.
549

550
    """
551
    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
552
                                  sort_by_name=self.sort_by_name)
553

    
554
  def OldStyleQuery(self, lu):
555
    """Collect data and execute query.
556

557
    """
558
    return self.query.OldStyleQuery(self._GetQueryData(lu),
559
                                    sort_by_name=self.sort_by_name)
560

    
561

    
562
def _GetWantedNodes(lu, nodes):
563
  """Returns list of checked and expanded node names.
564

565
  @type lu: L{LogicalUnit}
566
  @param lu: the logical unit on whose behalf we execute
567
  @type nodes: list
568
  @param nodes: list of node names or None for all nodes
569
  @rtype: list
570
  @return: the list of nodes, sorted
571
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
572

573
  """
574
  if nodes:
575
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
576

    
577
  return utils.NiceSort(lu.cfg.GetNodeList())
578

    
579

    
580
def _GetWantedInstances(lu, instances):
581
  """Returns list of checked and expanded instance names.
582

583
  @type lu: L{LogicalUnit}
584
  @param lu: the logical unit on whose behalf we execute
585
  @type instances: list
586
  @param instances: list of instance names or None for all instances
587
  @rtype: list
588
  @return: the list of instances, sorted
589
  @raise errors.OpPrereqError: if the instances parameter is wrong type
590
  @raise errors.OpPrereqError: if any of the passed instances is not found
591

592
  """
593
  if instances:
594
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
595
  else:
596
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
597
  return wanted
598

    
599

    
600
def _GetUpdatedParams(old_params, update_dict,
601
                      use_default=True, use_none=False):
602
  """Return the new version of a parameter dictionary.
603

604
  @type old_params: dict
605
  @param old_params: old parameters
606
  @type update_dict: dict
607
  @param update_dict: dict containing new parameter values, or
608
      constants.VALUE_DEFAULT to reset the parameter to its default
609
      value
610
  @param use_default: boolean
611
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
612
      values as 'to be deleted' values
613
  @param use_none: boolean
614
  @type use_none: whether to recognise C{None} values as 'to be
615
      deleted' values
616
  @rtype: dict
617
  @return: the new parameter dictionary
618

619
  """
620
  params_copy = copy.deepcopy(old_params)
621
  for key, val in update_dict.iteritems():
622
    if ((use_default and val == constants.VALUE_DEFAULT) or
623
        (use_none and val is None)):
624
      try:
625
        del params_copy[key]
626
      except KeyError:
627
        pass
628
    else:
629
      params_copy[key] = val
630
  return params_copy
631

    
632

    
633
def _RunPostHook(lu, node_name):
634
  """Runs the post-hook for an opcode on a single node.
635

636
  """
637
  hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu)
638
  try:
639
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
640
  except:
641
    # pylint: disable-msg=W0702
642
    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
643

    
644

    
645
def _CheckOutputFields(static, dynamic, selected):
646
  """Checks whether all selected fields are valid.
647

648
  @type static: L{utils.FieldSet}
649
  @param static: static fields set
650
  @type dynamic: L{utils.FieldSet}
651
  @param dynamic: dynamic fields set
652

653
  """
654
  f = utils.FieldSet()
655
  f.Extend(static)
656
  f.Extend(dynamic)
657

    
658
  delta = f.NonMatching(selected)
659
  if delta:
660
    raise errors.OpPrereqError("Unknown output fields selected: %s"
661
                               % ",".join(delta), errors.ECODE_INVAL)
662

    
663

    
664
def _CheckGlobalHvParams(params):
665
  """Validates that given hypervisor params are not global ones.
666

667
  This will ensure that instances don't get customised versions of
668
  global params.
669

670
  """
671
  used_globals = constants.HVC_GLOBALS.intersection(params)
672
  if used_globals:
673
    msg = ("The following hypervisor parameters are global and cannot"
674
           " be customized at instance level, please modify them at"
675
           " cluster level: %s" % utils.CommaJoin(used_globals))
676
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
677

    
678

    
679
def _CheckNodeOnline(lu, node, msg=None):
680
  """Ensure that a given node is online.
681

682
  @param lu: the LU on behalf of which we make the check
683
  @param node: the node to check
684
  @param msg: if passed, should be a message to replace the default one
685
  @raise errors.OpPrereqError: if the node is offline
686

687
  """
688
  if msg is None:
689
    msg = "Can't use offline node"
690
  if lu.cfg.GetNodeInfo(node).offline:
691
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
692

    
693

    
694
def _CheckNodeNotDrained(lu, node):
695
  """Ensure that a given node is not drained.
696

697
  @param lu: the LU on behalf of which we make the check
698
  @param node: the node to check
699
  @raise errors.OpPrereqError: if the node is drained
700

701
  """
702
  if lu.cfg.GetNodeInfo(node).drained:
703
    raise errors.OpPrereqError("Can't use drained node %s" % node,
704
                               errors.ECODE_STATE)
705

    
706

    
707
def _CheckNodeVmCapable(lu, node):
708
  """Ensure that a given node is vm capable.
709

710
  @param lu: the LU on behalf of which we make the check
711
  @param node: the node to check
712
  @raise errors.OpPrereqError: if the node is not vm capable
713

714
  """
715
  if not lu.cfg.GetNodeInfo(node).vm_capable:
716
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
717
                               errors.ECODE_STATE)
718

    
719

    
720
def _CheckNodeHasOS(lu, node, os_name, force_variant):
721
  """Ensure that a node supports a given OS.
722

723
  @param lu: the LU on behalf of which we make the check
724
  @param node: the node to check
725
  @param os_name: the OS to query about
726
  @param force_variant: whether to ignore variant errors
727
  @raise errors.OpPrereqError: if the node is not supporting the OS
728

729
  """
730
  result = lu.rpc.call_os_get(node, os_name)
731
  result.Raise("OS '%s' not in supported OS list for node %s" %
732
               (os_name, node),
733
               prereq=True, ecode=errors.ECODE_INVAL)
734
  if not force_variant:
735
    _CheckOSVariant(result.payload, os_name)
736

    
737

    
738
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
739
  """Ensure that a node has the given secondary ip.
740

741
  @type lu: L{LogicalUnit}
742
  @param lu: the LU on behalf of which we make the check
743
  @type node: string
744
  @param node: the node to check
745
  @type secondary_ip: string
746
  @param secondary_ip: the ip to check
747
  @type prereq: boolean
748
  @param prereq: whether to throw a prerequisite or an execute error
749
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
750
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
751

752
  """
753
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
754
  result.Raise("Failure checking secondary ip on node %s" % node,
755
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
756
  if not result.payload:
757
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
758
           " please fix and re-run this command" % secondary_ip)
759
    if prereq:
760
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
761
    else:
762
      raise errors.OpExecError(msg)
763

    
764

    
765
def _GetClusterDomainSecret():
766
  """Reads the cluster domain secret.
767

768
  """
769
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
770
                               strict=True)
771

    
772

    
773
def _CheckInstanceDown(lu, instance, reason):
774
  """Ensure that an instance is not running."""
775
  if instance.admin_up:
776
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
777
                               (instance.name, reason), errors.ECODE_STATE)
778

    
779
  pnode = instance.primary_node
780
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
781
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
782
              prereq=True, ecode=errors.ECODE_ENVIRON)
783

    
784
  if instance.name in ins_l.payload:
785
    raise errors.OpPrereqError("Instance %s is running, %s" %
786
                               (instance.name, reason), errors.ECODE_STATE)
787

    
788

    
789
def _ExpandItemName(fn, name, kind):
790
  """Expand an item name.
791

792
  @param fn: the function to use for expansion
793
  @param name: requested item name
794
  @param kind: text description ('Node' or 'Instance')
795
  @return: the resolved (full) name
796
  @raise errors.OpPrereqError: if the item is not found
797

798
  """
799
  full_name = fn(name)
800
  if full_name is None:
801
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
802
                               errors.ECODE_NOENT)
803
  return full_name
804

    
805

    
806
def _ExpandNodeName(cfg, name):
807
  """Wrapper over L{_ExpandItemName} for nodes."""
808
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
809

    
810

    
811
def _ExpandInstanceName(cfg, name):
812
  """Wrapper over L{_ExpandItemName} for instance."""
813
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
814

    
815

    
816
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
817
                          memory, vcpus, nics, disk_template, disks,
818
                          bep, hvp, hypervisor_name):
819
  """Builds instance related env variables for hooks
820

821
  This builds the hook environment from individual variables.
822

823
  @type name: string
824
  @param name: the name of the instance
825
  @type primary_node: string
826
  @param primary_node: the name of the instance's primary node
827
  @type secondary_nodes: list
828
  @param secondary_nodes: list of secondary nodes as strings
829
  @type os_type: string
830
  @param os_type: the name of the instance's OS
831
  @type status: boolean
832
  @param status: the should_run status of the instance
833
  @type memory: string
834
  @param memory: the memory size of the instance
835
  @type vcpus: string
836
  @param vcpus: the count of VCPUs the instance has
837
  @type nics: list
838
  @param nics: list of tuples (ip, mac, mode, link) representing
839
      the NICs the instance has
840
  @type disk_template: string
841
  @param disk_template: the disk template of the instance
842
  @type disks: list
843
  @param disks: the list of (size, mode) pairs
844
  @type bep: dict
845
  @param bep: the backend parameters for the instance
846
  @type hvp: dict
847
  @param hvp: the hypervisor parameters for the instance
848
  @type hypervisor_name: string
849
  @param hypervisor_name: the hypervisor for the instance
850
  @rtype: dict
851
  @return: the hook environment for this instance
852

853
  """
854
  if status:
855
    str_status = "up"
856
  else:
857
    str_status = "down"
858
  env = {
859
    "OP_TARGET": name,
860
    "INSTANCE_NAME": name,
861
    "INSTANCE_PRIMARY": primary_node,
862
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
863
    "INSTANCE_OS_TYPE": os_type,
864
    "INSTANCE_STATUS": str_status,
865
    "INSTANCE_MEMORY": memory,
866
    "INSTANCE_VCPUS": vcpus,
867
    "INSTANCE_DISK_TEMPLATE": disk_template,
868
    "INSTANCE_HYPERVISOR": hypervisor_name,
869
  }
870

    
871
  if nics:
872
    nic_count = len(nics)
873
    for idx, (ip, mac, mode, link) in enumerate(nics):
874
      if ip is None:
875
        ip = ""
876
      env["INSTANCE_NIC%d_IP" % idx] = ip
877
      env["INSTANCE_NIC%d_MAC" % idx] = mac
878
      env["INSTANCE_NIC%d_MODE" % idx] = mode
879
      env["INSTANCE_NIC%d_LINK" % idx] = link
880
      if mode == constants.NIC_MODE_BRIDGED:
881
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
882
  else:
883
    nic_count = 0
884

    
885
  env["INSTANCE_NIC_COUNT"] = nic_count
886

    
887
  if disks:
888
    disk_count = len(disks)
889
    for idx, (size, mode) in enumerate(disks):
890
      env["INSTANCE_DISK%d_SIZE" % idx] = size
891
      env["INSTANCE_DISK%d_MODE" % idx] = mode
892
  else:
893
    disk_count = 0
894

    
895
  env["INSTANCE_DISK_COUNT"] = disk_count
896

    
897
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
898
    for key, value in source.items():
899
      env["INSTANCE_%s_%s" % (kind, key)] = value
900

    
901
  return env
902

    
903

    
904
def _NICListToTuple(lu, nics):
905
  """Build a list of nic information tuples.
906

907
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
908
  value in LUInstanceQueryData.
909

910
  @type lu:  L{LogicalUnit}
911
  @param lu: the logical unit on whose behalf we execute
912
  @type nics: list of L{objects.NIC}
913
  @param nics: list of nics to convert to hooks tuples
914

915
  """
916
  hooks_nics = []
917
  cluster = lu.cfg.GetClusterInfo()
918
  for nic in nics:
919
    ip = nic.ip
920
    mac = nic.mac
921
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
922
    mode = filled_params[constants.NIC_MODE]
923
    link = filled_params[constants.NIC_LINK]
924
    hooks_nics.append((ip, mac, mode, link))
925
  return hooks_nics
926

    
927

    
928
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
929
  """Builds instance related env variables for hooks from an object.
930

931
  @type lu: L{LogicalUnit}
932
  @param lu: the logical unit on whose behalf we execute
933
  @type instance: L{objects.Instance}
934
  @param instance: the instance for which we should build the
935
      environment
936
  @type override: dict
937
  @param override: dictionary with key/values that will override
938
      our values
939
  @rtype: dict
940
  @return: the hook environment dictionary
941

942
  """
943
  cluster = lu.cfg.GetClusterInfo()
944
  bep = cluster.FillBE(instance)
945
  hvp = cluster.FillHV(instance)
946
  args = {
947
    'name': instance.name,
948
    'primary_node': instance.primary_node,
949
    'secondary_nodes': instance.secondary_nodes,
950
    'os_type': instance.os,
951
    'status': instance.admin_up,
952
    'memory': bep[constants.BE_MEMORY],
953
    'vcpus': bep[constants.BE_VCPUS],
954
    'nics': _NICListToTuple(lu, instance.nics),
955
    'disk_template': instance.disk_template,
956
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
957
    'bep': bep,
958
    'hvp': hvp,
959
    'hypervisor_name': instance.hypervisor,
960
  }
961
  if override:
962
    args.update(override)
963
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
964

    
965

    
966
def _AdjustCandidatePool(lu, exceptions):
967
  """Adjust the candidate pool after node operations.
968

969
  """
970
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
971
  if mod_list:
972
    lu.LogInfo("Promoted nodes to master candidate role: %s",
973
               utils.CommaJoin(node.name for node in mod_list))
974
    for name in mod_list:
975
      lu.context.ReaddNode(name)
976
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
977
  if mc_now > mc_max:
978
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
979
               (mc_now, mc_max))
980

    
981

    
982
def _DecideSelfPromotion(lu, exceptions=None):
983
  """Decide whether I should promote myself as a master candidate.
984

985
  """
986
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
987
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
988
  # the new node will increase mc_max with one, so:
989
  mc_should = min(mc_should + 1, cp_size)
990
  return mc_now < mc_should
991

    
992

    
993
def _CheckNicsBridgesExist(lu, target_nics, target_node):
994
  """Check that the brigdes needed by a list of nics exist.
995

996
  """
997
  cluster = lu.cfg.GetClusterInfo()
998
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
999
  brlist = [params[constants.NIC_LINK] for params in paramslist
1000
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1001
  if brlist:
1002
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1003
    result.Raise("Error checking bridges on destination node '%s'" %
1004
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1005

    
1006

    
1007
def _CheckInstanceBridgesExist(lu, instance, node=None):
1008
  """Check that the brigdes needed by an instance exist.
1009

1010
  """
1011
  if node is None:
1012
    node = instance.primary_node
1013
  _CheckNicsBridgesExist(lu, instance.nics, node)
1014

    
1015

    
1016
def _CheckOSVariant(os_obj, name):
1017
  """Check whether an OS name conforms to the os variants specification.
1018

1019
  @type os_obj: L{objects.OS}
1020
  @param os_obj: OS object to check
1021
  @type name: string
1022
  @param name: OS name passed by the user, to check for validity
1023

1024
  """
1025
  if not os_obj.supported_variants:
1026
    return
1027
  variant = objects.OS.GetVariant(name)
1028
  if not variant:
1029
    raise errors.OpPrereqError("OS name must include a variant",
1030
                               errors.ECODE_INVAL)
1031

    
1032
  if variant not in os_obj.supported_variants:
1033
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1034

    
1035

    
1036
def _GetNodeInstancesInner(cfg, fn):
1037
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1038

    
1039

    
1040
def _GetNodeInstances(cfg, node_name):
1041
  """Returns a list of all primary and secondary instances on a node.
1042

1043
  """
1044

    
1045
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1046

    
1047

    
1048
def _GetNodePrimaryInstances(cfg, node_name):
1049
  """Returns primary instances on a node.
1050

1051
  """
1052
  return _GetNodeInstancesInner(cfg,
1053
                                lambda inst: node_name == inst.primary_node)
1054

    
1055

    
1056
def _GetNodeSecondaryInstances(cfg, node_name):
1057
  """Returns secondary instances on a node.
1058

1059
  """
1060
  return _GetNodeInstancesInner(cfg,
1061
                                lambda inst: node_name in inst.secondary_nodes)
1062

    
1063

    
1064
def _GetStorageTypeArgs(cfg, storage_type):
1065
  """Returns the arguments for a storage type.
1066

1067
  """
1068
  # Special case for file storage
1069
  if storage_type == constants.ST_FILE:
1070
    # storage.FileStorage wants a list of storage directories
1071
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1072

    
1073
  return []
1074

    
1075

    
1076
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
1077
  faulty = []
1078

    
1079
  for dev in instance.disks:
1080
    cfg.SetDiskID(dev, node_name)
1081

    
1082
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
1083
  result.Raise("Failed to get disk status from node %s" % node_name,
1084
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1085

    
1086
  for idx, bdev_status in enumerate(result.payload):
1087
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1088
      faulty.append(idx)
1089

    
1090
  return faulty
1091

    
1092

    
1093
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1094
  """Check the sanity of iallocator and node arguments and use the
1095
  cluster-wide iallocator if appropriate.
1096

1097
  Check that at most one of (iallocator, node) is specified. If none is
1098
  specified, then the LU's opcode's iallocator slot is filled with the
1099
  cluster-wide default iallocator.
1100

1101
  @type iallocator_slot: string
1102
  @param iallocator_slot: the name of the opcode iallocator slot
1103
  @type node_slot: string
1104
  @param node_slot: the name of the opcode target node slot
1105

1106
  """
1107
  node = getattr(lu.op, node_slot, None)
1108
  iallocator = getattr(lu.op, iallocator_slot, None)
1109

    
1110
  if node is not None and iallocator is not None:
1111
    raise errors.OpPrereqError("Do not specify both, iallocator and node.",
1112
                               errors.ECODE_INVAL)
1113
  elif node is None and iallocator is None:
1114
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1115
    if default_iallocator:
1116
      setattr(lu.op, iallocator_slot, default_iallocator)
1117
    else:
1118
      raise errors.OpPrereqError("No iallocator or node given and no"
1119
                                 " cluster-wide default iallocator found."
1120
                                 " Please specify either an iallocator or a"
1121
                                 " node, or set a cluster-wide default"
1122
                                 " iallocator.")
1123

    
1124

    
1125
class LUClusterPostInit(LogicalUnit):
1126
  """Logical unit for running hooks after cluster initialization.
1127

1128
  """
1129
  HPATH = "cluster-init"
1130
  HTYPE = constants.HTYPE_CLUSTER
1131

    
1132
  def BuildHooksEnv(self):
1133
    """Build hooks env.
1134

1135
    """
1136
    return {
1137
      "OP_TARGET": self.cfg.GetClusterName(),
1138
      }
1139

    
1140
  def BuildHooksNodes(self):
1141
    """Build hooks nodes.
1142

1143
    """
1144
    return ([], [self.cfg.GetMasterNode()])
1145

    
1146
  def Exec(self, feedback_fn):
1147
    """Nothing to do.
1148

1149
    """
1150
    return True
1151

    
1152

    
1153
class LUClusterDestroy(LogicalUnit):
1154
  """Logical unit for destroying the cluster.
1155

1156
  """
1157
  HPATH = "cluster-destroy"
1158
  HTYPE = constants.HTYPE_CLUSTER
1159

    
1160
  def BuildHooksEnv(self):
1161
    """Build hooks env.
1162

1163
    """
1164
    return {
1165
      "OP_TARGET": self.cfg.GetClusterName(),
1166
      }
1167

    
1168
  def BuildHooksNodes(self):
1169
    """Build hooks nodes.
1170

1171
    """
1172
    return ([], [])
1173

    
1174
  def CheckPrereq(self):
1175
    """Check prerequisites.
1176

1177
    This checks whether the cluster is empty.
1178

1179
    Any errors are signaled by raising errors.OpPrereqError.
1180

1181
    """
1182
    master = self.cfg.GetMasterNode()
1183

    
1184
    nodelist = self.cfg.GetNodeList()
1185
    if len(nodelist) != 1 or nodelist[0] != master:
1186
      raise errors.OpPrereqError("There are still %d node(s) in"
1187
                                 " this cluster." % (len(nodelist) - 1),
1188
                                 errors.ECODE_INVAL)
1189
    instancelist = self.cfg.GetInstanceList()
1190
    if instancelist:
1191
      raise errors.OpPrereqError("There are still %d instance(s) in"
1192
                                 " this cluster." % len(instancelist),
1193
                                 errors.ECODE_INVAL)
1194

    
1195
  def Exec(self, feedback_fn):
1196
    """Destroys the cluster.
1197

1198
    """
1199
    master = self.cfg.GetMasterNode()
1200

    
1201
    # Run post hooks on master node before it's removed
1202
    _RunPostHook(self, master)
1203

    
1204
    result = self.rpc.call_node_stop_master(master, False)
1205
    result.Raise("Could not disable the master role")
1206

    
1207
    return master
1208

    
1209

    
1210
def _VerifyCertificate(filename):
1211
  """Verifies a certificate for LUClusterVerify.
1212

1213
  @type filename: string
1214
  @param filename: Path to PEM file
1215

1216
  """
1217
  try:
1218
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1219
                                           utils.ReadFile(filename))
1220
  except Exception, err: # pylint: disable-msg=W0703
1221
    return (LUClusterVerify.ETYPE_ERROR,
1222
            "Failed to load X509 certificate %s: %s" % (filename, err))
1223

    
1224
  (errcode, msg) = \
1225
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1226
                                constants.SSL_CERT_EXPIRATION_ERROR)
1227

    
1228
  if msg:
1229
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1230
  else:
1231
    fnamemsg = None
1232

    
1233
  if errcode is None:
1234
    return (None, fnamemsg)
1235
  elif errcode == utils.CERT_WARNING:
1236
    return (LUClusterVerify.ETYPE_WARNING, fnamemsg)
1237
  elif errcode == utils.CERT_ERROR:
1238
    return (LUClusterVerify.ETYPE_ERROR, fnamemsg)
1239

    
1240
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1241

    
1242

    
1243
class LUClusterVerify(LogicalUnit):
1244
  """Verifies the cluster status.
1245

1246
  """
1247
  HPATH = "cluster-verify"
1248
  HTYPE = constants.HTYPE_CLUSTER
1249
  REQ_BGL = False
1250

    
1251
  TCLUSTER = "cluster"
1252
  TNODE = "node"
1253
  TINSTANCE = "instance"
1254

    
1255
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1256
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1257
  ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK")
1258
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1259
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1260
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1261
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1262
  EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
1263
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1264
  EINSTANCESPLITGROUPS = (TINSTANCE, "EINSTANCESPLITGROUPS")
1265
  ENODEDRBD = (TNODE, "ENODEDRBD")
1266
  ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
1267
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1268
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1269
  ENODEHV = (TNODE, "ENODEHV")
1270
  ENODELVM = (TNODE, "ENODELVM")
1271
  ENODEN1 = (TNODE, "ENODEN1")
1272
  ENODENET = (TNODE, "ENODENET")
1273
  ENODEOS = (TNODE, "ENODEOS")
1274
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1275
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1276
  ENODERPC = (TNODE, "ENODERPC")
1277
  ENODESSH = (TNODE, "ENODESSH")
1278
  ENODEVERSION = (TNODE, "ENODEVERSION")
1279
  ENODESETUP = (TNODE, "ENODESETUP")
1280
  ENODETIME = (TNODE, "ENODETIME")
1281
  ENODEOOBPATH = (TNODE, "ENODEOOBPATH")
1282

    
1283
  ETYPE_FIELD = "code"
1284
  ETYPE_ERROR = "ERROR"
1285
  ETYPE_WARNING = "WARNING"
1286

    
1287
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1288

    
1289
  class NodeImage(object):
1290
    """A class representing the logical and physical status of a node.
1291

1292
    @type name: string
1293
    @ivar name: the node name to which this object refers
1294
    @ivar volumes: a structure as returned from
1295
        L{ganeti.backend.GetVolumeList} (runtime)
1296
    @ivar instances: a list of running instances (runtime)
1297
    @ivar pinst: list of configured primary instances (config)
1298
    @ivar sinst: list of configured secondary instances (config)
1299
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1300
        instances for which this node is secondary (config)
1301
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1302
    @ivar dfree: free disk, as reported by the node (runtime)
1303
    @ivar offline: the offline status (config)
1304
    @type rpc_fail: boolean
1305
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1306
        not whether the individual keys were correct) (runtime)
1307
    @type lvm_fail: boolean
1308
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1309
    @type hyp_fail: boolean
1310
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1311
    @type ghost: boolean
1312
    @ivar ghost: whether this is a known node or not (config)
1313
    @type os_fail: boolean
1314
    @ivar os_fail: whether the RPC call didn't return valid OS data
1315
    @type oslist: list
1316
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1317
    @type vm_capable: boolean
1318
    @ivar vm_capable: whether the node can host instances
1319

1320
    """
1321
    def __init__(self, offline=False, name=None, vm_capable=True):
1322
      self.name = name
1323
      self.volumes = {}
1324
      self.instances = []
1325
      self.pinst = []
1326
      self.sinst = []
1327
      self.sbp = {}
1328
      self.mfree = 0
1329
      self.dfree = 0
1330
      self.offline = offline
1331
      self.vm_capable = vm_capable
1332
      self.rpc_fail = False
1333
      self.lvm_fail = False
1334
      self.hyp_fail = False
1335
      self.ghost = False
1336
      self.os_fail = False
1337
      self.oslist = {}
1338

    
1339
  def ExpandNames(self):
1340
    self.needed_locks = {
1341
      locking.LEVEL_NODE: locking.ALL_SET,
1342
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1343
    }
1344
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1345

    
1346
  def _Error(self, ecode, item, msg, *args, **kwargs):
1347
    """Format an error message.
1348

1349
    Based on the opcode's error_codes parameter, either format a
1350
    parseable error code, or a simpler error string.
1351

1352
    This must be called only from Exec and functions called from Exec.
1353

1354
    """
1355
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1356
    itype, etxt = ecode
1357
    # first complete the msg
1358
    if args:
1359
      msg = msg % args
1360
    # then format the whole message
1361
    if self.op.error_codes:
1362
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1363
    else:
1364
      if item:
1365
        item = " " + item
1366
      else:
1367
        item = ""
1368
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1369
    # and finally report it via the feedback_fn
1370
    self._feedback_fn("  - %s" % msg)
1371

    
1372
  def _ErrorIf(self, cond, *args, **kwargs):
1373
    """Log an error message if the passed condition is True.
1374

1375
    """
1376
    cond = bool(cond) or self.op.debug_simulate_errors
1377
    if cond:
1378
      self._Error(*args, **kwargs)
1379
    # do not mark the operation as failed for WARN cases only
1380
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1381
      self.bad = self.bad or cond
1382

    
1383
  def _VerifyNode(self, ninfo, nresult):
1384
    """Perform some basic validation on data returned from a node.
1385

1386
      - check the result data structure is well formed and has all the
1387
        mandatory fields
1388
      - check ganeti version
1389

1390
    @type ninfo: L{objects.Node}
1391
    @param ninfo: the node to check
1392
    @param nresult: the results from the node
1393
    @rtype: boolean
1394
    @return: whether overall this call was successful (and we can expect
1395
         reasonable values in the respose)
1396

1397
    """
1398
    node = ninfo.name
1399
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1400

    
1401
    # main result, nresult should be a non-empty dict
1402
    test = not nresult or not isinstance(nresult, dict)
1403
    _ErrorIf(test, self.ENODERPC, node,
1404
                  "unable to verify node: no data returned")
1405
    if test:
1406
      return False
1407

    
1408
    # compares ganeti version
1409
    local_version = constants.PROTOCOL_VERSION
1410
    remote_version = nresult.get("version", None)
1411
    test = not (remote_version and
1412
                isinstance(remote_version, (list, tuple)) and
1413
                len(remote_version) == 2)
1414
    _ErrorIf(test, self.ENODERPC, node,
1415
             "connection to node returned invalid data")
1416
    if test:
1417
      return False
1418

    
1419
    test = local_version != remote_version[0]
1420
    _ErrorIf(test, self.ENODEVERSION, node,
1421
             "incompatible protocol versions: master %s,"
1422
             " node %s", local_version, remote_version[0])
1423
    if test:
1424
      return False
1425

    
1426
    # node seems compatible, we can actually try to look into its results
1427

    
1428
    # full package version
1429
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1430
                  self.ENODEVERSION, node,
1431
                  "software version mismatch: master %s, node %s",
1432
                  constants.RELEASE_VERSION, remote_version[1],
1433
                  code=self.ETYPE_WARNING)
1434

    
1435
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1436
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1437
      for hv_name, hv_result in hyp_result.iteritems():
1438
        test = hv_result is not None
1439
        _ErrorIf(test, self.ENODEHV, node,
1440
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1441

    
1442
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1443
    if ninfo.vm_capable and isinstance(hvp_result, list):
1444
      for item, hv_name, hv_result in hvp_result:
1445
        _ErrorIf(True, self.ENODEHV, node,
1446
                 "hypervisor %s parameter verify failure (source %s): %s",
1447
                 hv_name, item, hv_result)
1448

    
1449
    test = nresult.get(constants.NV_NODESETUP,
1450
                       ["Missing NODESETUP results"])
1451
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1452
             "; ".join(test))
1453

    
1454
    return True
1455

    
1456
  def _VerifyNodeTime(self, ninfo, nresult,
1457
                      nvinfo_starttime, nvinfo_endtime):
1458
    """Check the node time.
1459

1460
    @type ninfo: L{objects.Node}
1461
    @param ninfo: the node to check
1462
    @param nresult: the remote results for the node
1463
    @param nvinfo_starttime: the start time of the RPC call
1464
    @param nvinfo_endtime: the end time of the RPC call
1465

1466
    """
1467
    node = ninfo.name
1468
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1469

    
1470
    ntime = nresult.get(constants.NV_TIME, None)
1471
    try:
1472
      ntime_merged = utils.MergeTime(ntime)
1473
    except (ValueError, TypeError):
1474
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1475
      return
1476

    
1477
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1478
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1479
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1480
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1481
    else:
1482
      ntime_diff = None
1483

    
1484
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1485
             "Node time diverges by at least %s from master node time",
1486
             ntime_diff)
1487

    
1488
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1489
    """Check the node time.
1490

1491
    @type ninfo: L{objects.Node}
1492
    @param ninfo: the node to check
1493
    @param nresult: the remote results for the node
1494
    @param vg_name: the configured VG name
1495

1496
    """
1497
    if vg_name is None:
1498
      return
1499

    
1500
    node = ninfo.name
1501
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1502

    
1503
    # checks vg existence and size > 20G
1504
    vglist = nresult.get(constants.NV_VGLIST, None)
1505
    test = not vglist
1506
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1507
    if not test:
1508
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1509
                                            constants.MIN_VG_SIZE)
1510
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1511

    
1512
    # check pv names
1513
    pvlist = nresult.get(constants.NV_PVLIST, None)
1514
    test = pvlist is None
1515
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1516
    if not test:
1517
      # check that ':' is not present in PV names, since it's a
1518
      # special character for lvcreate (denotes the range of PEs to
1519
      # use on the PV)
1520
      for _, pvname, owner_vg in pvlist:
1521
        test = ":" in pvname
1522
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1523
                 " '%s' of VG '%s'", pvname, owner_vg)
1524

    
1525
  def _VerifyNodeNetwork(self, ninfo, nresult):
1526
    """Check the node time.
1527

1528
    @type ninfo: L{objects.Node}
1529
    @param ninfo: the node to check
1530
    @param nresult: the remote results for the node
1531

1532
    """
1533
    node = ninfo.name
1534
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1535

    
1536
    test = constants.NV_NODELIST not in nresult
1537
    _ErrorIf(test, self.ENODESSH, node,
1538
             "node hasn't returned node ssh connectivity data")
1539
    if not test:
1540
      if nresult[constants.NV_NODELIST]:
1541
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1542
          _ErrorIf(True, self.ENODESSH, node,
1543
                   "ssh communication with node '%s': %s", a_node, a_msg)
1544

    
1545
    test = constants.NV_NODENETTEST not in nresult
1546
    _ErrorIf(test, self.ENODENET, node,
1547
             "node hasn't returned node tcp connectivity data")
1548
    if not test:
1549
      if nresult[constants.NV_NODENETTEST]:
1550
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1551
        for anode in nlist:
1552
          _ErrorIf(True, self.ENODENET, node,
1553
                   "tcp communication with node '%s': %s",
1554
                   anode, nresult[constants.NV_NODENETTEST][anode])
1555

    
1556
    test = constants.NV_MASTERIP not in nresult
1557
    _ErrorIf(test, self.ENODENET, node,
1558
             "node hasn't returned node master IP reachability data")
1559
    if not test:
1560
      if not nresult[constants.NV_MASTERIP]:
1561
        if node == self.master_node:
1562
          msg = "the master node cannot reach the master IP (not configured?)"
1563
        else:
1564
          msg = "cannot reach the master IP"
1565
        _ErrorIf(True, self.ENODENET, node, msg)
1566

    
1567
  def _VerifyInstance(self, instance, instanceconfig, node_image,
1568
                      diskstatus):
1569
    """Verify an instance.
1570

1571
    This function checks to see if the required block devices are
1572
    available on the instance's node.
1573

1574
    """
1575
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1576
    node_current = instanceconfig.primary_node
1577

    
1578
    node_vol_should = {}
1579
    instanceconfig.MapLVsByNode(node_vol_should)
1580

    
1581
    for node in node_vol_should:
1582
      n_img = node_image[node]
1583
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1584
        # ignore missing volumes on offline or broken nodes
1585
        continue
1586
      for volume in node_vol_should[node]:
1587
        test = volume not in n_img.volumes
1588
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1589
                 "volume %s missing on node %s", volume, node)
1590

    
1591
    if instanceconfig.admin_up:
1592
      pri_img = node_image[node_current]
1593
      test = instance not in pri_img.instances and not pri_img.offline
1594
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1595
               "instance not running on its primary node %s",
1596
               node_current)
1597

    
1598
    for node, n_img in node_image.items():
1599
      if node != node_current:
1600
        test = instance in n_img.instances
1601
        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1602
                 "instance should not run on node %s", node)
1603

    
1604
    diskdata = [(nname, success, status, idx)
1605
                for (nname, disks) in diskstatus.items()
1606
                for idx, (success, status) in enumerate(disks)]
1607

    
1608
    for nname, success, bdev_status, idx in diskdata:
1609
      # the 'ghost node' construction in Exec() ensures that we have a
1610
      # node here
1611
      snode = node_image[nname]
1612
      bad_snode = snode.ghost or snode.offline
1613
      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
1614
               self.EINSTANCEFAULTYDISK, instance,
1615
               "couldn't retrieve status for disk/%s on %s: %s",
1616
               idx, nname, bdev_status)
1617
      _ErrorIf((instanceconfig.admin_up and success and
1618
                bdev_status.ldisk_status == constants.LDS_FAULTY),
1619
               self.EINSTANCEFAULTYDISK, instance,
1620
               "disk/%s on %s is faulty", idx, nname)
1621

    
1622
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1623
    """Verify if there are any unknown volumes in the cluster.
1624

1625
    The .os, .swap and backup volumes are ignored. All other volumes are
1626
    reported as unknown.
1627

1628
    @type reserved: L{ganeti.utils.FieldSet}
1629
    @param reserved: a FieldSet of reserved volume names
1630

1631
    """
1632
    for node, n_img in node_image.items():
1633
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1634
        # skip non-healthy nodes
1635
        continue
1636
      for volume in n_img.volumes:
1637
        test = ((node not in node_vol_should or
1638
                volume not in node_vol_should[node]) and
1639
                not reserved.Matches(volume))
1640
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1641
                      "volume %s is unknown", volume)
1642

    
1643
  def _VerifyOrphanInstances(self, instancelist, node_image):
1644
    """Verify the list of running instances.
1645

1646
    This checks what instances are running but unknown to the cluster.
1647

1648
    """
1649
    for node, n_img in node_image.items():
1650
      for o_inst in n_img.instances:
1651
        test = o_inst not in instancelist
1652
        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1653
                      "instance %s on node %s should not exist", o_inst, node)
1654

    
1655
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1656
    """Verify N+1 Memory Resilience.
1657

1658
    Check that if one single node dies we can still start all the
1659
    instances it was primary for.
1660

1661
    """
1662
    cluster_info = self.cfg.GetClusterInfo()
1663
    for node, n_img in node_image.items():
1664
      # This code checks that every node which is now listed as
1665
      # secondary has enough memory to host all instances it is
1666
      # supposed to should a single other node in the cluster fail.
1667
      # FIXME: not ready for failover to an arbitrary node
1668
      # FIXME: does not support file-backed instances
1669
      # WARNING: we currently take into account down instances as well
1670
      # as up ones, considering that even if they're down someone
1671
      # might want to start them even in the event of a node failure.
1672
      if n_img.offline:
1673
        # we're skipping offline nodes from the N+1 warning, since
1674
        # most likely we don't have good memory infromation from them;
1675
        # we already list instances living on such nodes, and that's
1676
        # enough warning
1677
        continue
1678
      for prinode, instances in n_img.sbp.items():
1679
        needed_mem = 0
1680
        for instance in instances:
1681
          bep = cluster_info.FillBE(instance_cfg[instance])
1682
          if bep[constants.BE_AUTO_BALANCE]:
1683
            needed_mem += bep[constants.BE_MEMORY]
1684
        test = n_img.mfree < needed_mem
1685
        self._ErrorIf(test, self.ENODEN1, node,
1686
                      "not enough memory to accomodate instance failovers"
1687
                      " should node %s fail (%dMiB needed, %dMiB available)",
1688
                      prinode, needed_mem, n_img.mfree)
1689

    
1690
  @classmethod
1691
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
1692
                   (files_all, files_all_opt, files_mc, files_vm)):
1693
    """Verifies file checksums collected from all nodes.
1694

1695
    @param errorif: Callback for reporting errors
1696
    @param nodeinfo: List of L{objects.Node} objects
1697
    @param master_node: Name of master node
1698
    @param all_nvinfo: RPC results
1699

1700
    """
1701
    node_names = frozenset(node.name for node in nodeinfo)
1702

    
1703
    assert master_node in node_names
1704
    assert (len(files_all | files_all_opt | files_mc | files_vm) ==
1705
            sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
1706
           "Found file listed in more than one file list"
1707

    
1708
    # Define functions determining which nodes to consider for a file
1709
    file2nodefn = dict([(filename, fn)
1710
      for (files, fn) in [(files_all, None),
1711
                          (files_all_opt, None),
1712
                          (files_mc, lambda node: (node.master_candidate or
1713
                                                   node.name == master_node)),
1714
                          (files_vm, lambda node: node.vm_capable)]
1715
      for filename in files])
1716

    
1717
    fileinfo = dict((filename, {}) for filename in file2nodefn.keys())
1718

    
1719
    for node in nodeinfo:
1720
      nresult = all_nvinfo[node.name]
1721

    
1722
      if nresult.fail_msg or not nresult.payload:
1723
        node_files = None
1724
      else:
1725
        node_files = nresult.payload.get(constants.NV_FILELIST, None)
1726

    
1727
      test = not (node_files and isinstance(node_files, dict))
1728
      errorif(test, cls.ENODEFILECHECK, node.name,
1729
              "Node did not return file checksum data")
1730
      if test:
1731
        continue
1732

    
1733
      for (filename, checksum) in node_files.items():
1734
        # Check if the file should be considered for a node
1735
        fn = file2nodefn[filename]
1736
        if fn is None or fn(node):
1737
          fileinfo[filename].setdefault(checksum, set()).add(node.name)
1738

    
1739
    for (filename, checksums) in fileinfo.items():
1740
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
1741

    
1742
      # Nodes having the file
1743
      with_file = frozenset(node_name
1744
                            for nodes in fileinfo[filename].values()
1745
                            for node_name in nodes)
1746

    
1747
      # Nodes missing file
1748
      missing_file = node_names - with_file
1749

    
1750
      if filename in files_all_opt:
1751
        # All or no nodes
1752
        errorif(missing_file and missing_file != node_names,
1753
                cls.ECLUSTERFILECHECK, None,
1754
                "File %s is optional, but it must exist on all or no nodes (not"
1755
                " found on %s)",
1756
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
1757
      else:
1758
        errorif(missing_file, cls.ECLUSTERFILECHECK, None,
1759
                "File %s is missing from node(s) %s", filename,
1760
                utils.CommaJoin(utils.NiceSort(missing_file)))
1761

    
1762
      # See if there are multiple versions of the file
1763
      test = len(checksums) > 1
1764
      if test:
1765
        variants = ["variant %s on %s" %
1766
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
1767
                    for (idx, (checksum, nodes)) in
1768
                      enumerate(sorted(checksums.items()))]
1769
      else:
1770
        variants = []
1771

    
1772
      errorif(test, cls.ECLUSTERFILECHECK, None,
1773
              "File %s found with %s different checksums (%s)",
1774
              filename, len(checksums), "; ".join(variants))
1775

    
1776
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
1777
                      drbd_map):
1778
    """Verifies and the node DRBD status.
1779

1780
    @type ninfo: L{objects.Node}
1781
    @param ninfo: the node to check
1782
    @param nresult: the remote results for the node
1783
    @param instanceinfo: the dict of instances
1784
    @param drbd_helper: the configured DRBD usermode helper
1785
    @param drbd_map: the DRBD map as returned by
1786
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
1787

1788
    """
1789
    node = ninfo.name
1790
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1791

    
1792
    if drbd_helper:
1793
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
1794
      test = (helper_result == None)
1795
      _ErrorIf(test, self.ENODEDRBDHELPER, node,
1796
               "no drbd usermode helper returned")
1797
      if helper_result:
1798
        status, payload = helper_result
1799
        test = not status
1800
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
1801
                 "drbd usermode helper check unsuccessful: %s", payload)
1802
        test = status and (payload != drbd_helper)
1803
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
1804
                 "wrong drbd usermode helper: %s", payload)
1805

    
1806
    # compute the DRBD minors
1807
    node_drbd = {}
1808
    for minor, instance in drbd_map[node].items():
1809
      test = instance not in instanceinfo
1810
      _ErrorIf(test, self.ECLUSTERCFG, None,
1811
               "ghost instance '%s' in temporary DRBD map", instance)
1812
        # ghost instance should not be running, but otherwise we
1813
        # don't give double warnings (both ghost instance and
1814
        # unallocated minor in use)
1815
      if test:
1816
        node_drbd[minor] = (instance, False)
1817
      else:
1818
        instance = instanceinfo[instance]
1819
        node_drbd[minor] = (instance.name, instance.admin_up)
1820

    
1821
    # and now check them
1822
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
1823
    test = not isinstance(used_minors, (tuple, list))
1824
    _ErrorIf(test, self.ENODEDRBD, node,
1825
             "cannot parse drbd status file: %s", str(used_minors))
1826
    if test:
1827
      # we cannot check drbd status
1828
      return
1829

    
1830
    for minor, (iname, must_exist) in node_drbd.items():
1831
      test = minor not in used_minors and must_exist
1832
      _ErrorIf(test, self.ENODEDRBD, node,
1833
               "drbd minor %d of instance %s is not active", minor, iname)
1834
    for minor in used_minors:
1835
      test = minor not in node_drbd
1836
      _ErrorIf(test, self.ENODEDRBD, node,
1837
               "unallocated drbd minor %d is in use", minor)
1838

    
1839
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
1840
    """Builds the node OS structures.
1841

1842
    @type ninfo: L{objects.Node}
1843
    @param ninfo: the node to check
1844
    @param nresult: the remote results for the node
1845
    @param nimg: the node image object
1846

1847
    """
1848
    node = ninfo.name
1849
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1850

    
1851
    remote_os = nresult.get(constants.NV_OSLIST, None)
1852
    test = (not isinstance(remote_os, list) or
1853
            not compat.all(isinstance(v, list) and len(v) == 7
1854
                           for v in remote_os))
1855

    
1856
    _ErrorIf(test, self.ENODEOS, node,
1857
             "node hasn't returned valid OS data")
1858

    
1859
    nimg.os_fail = test
1860

    
1861
    if test:
1862
      return
1863

    
1864
    os_dict = {}
1865

    
1866
    for (name, os_path, status, diagnose,
1867
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
1868

    
1869
      if name not in os_dict:
1870
        os_dict[name] = []
1871

    
1872
      # parameters is a list of lists instead of list of tuples due to
1873
      # JSON lacking a real tuple type, fix it:
1874
      parameters = [tuple(v) for v in parameters]
1875
      os_dict[name].append((os_path, status, diagnose,
1876
                            set(variants), set(parameters), set(api_ver)))
1877

    
1878
    nimg.oslist = os_dict
1879

    
1880
  def _VerifyNodeOS(self, ninfo, nimg, base):
1881
    """Verifies the node OS list.
1882

1883
    @type ninfo: L{objects.Node}
1884
    @param ninfo: the node to check
1885
    @param nimg: the node image object
1886
    @param base: the 'template' node we match against (e.g. from the master)
1887

1888
    """
1889
    node = ninfo.name
1890
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1891

    
1892
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
1893

    
1894
    for os_name, os_data in nimg.oslist.items():
1895
      assert os_data, "Empty OS status for OS %s?!" % os_name
1896
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
1897
      _ErrorIf(not f_status, self.ENODEOS, node,
1898
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
1899
      _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
1900
               "OS '%s' has multiple entries (first one shadows the rest): %s",
1901
               os_name, utils.CommaJoin([v[0] for v in os_data]))
1902
      # this will catched in backend too
1903
      _ErrorIf(compat.any(v >= constants.OS_API_V15 for v in f_api)
1904
               and not f_var, self.ENODEOS, node,
1905
               "OS %s with API at least %d does not declare any variant",
1906
               os_name, constants.OS_API_V15)
1907
      # comparisons with the 'base' image
1908
      test = os_name not in base.oslist
1909
      _ErrorIf(test, self.ENODEOS, node,
1910
               "Extra OS %s not present on reference node (%s)",
1911
               os_name, base.name)
1912
      if test:
1913
        continue
1914
      assert base.oslist[os_name], "Base node has empty OS status?"
1915
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
1916
      if not b_status:
1917
        # base OS is invalid, skipping
1918
        continue
1919
      for kind, a, b in [("API version", f_api, b_api),
1920
                         ("variants list", f_var, b_var),
1921
                         ("parameters", f_param, b_param)]:
1922
        _ErrorIf(a != b, self.ENODEOS, node,
1923
                 "OS %s %s differs from reference node %s: %s vs. %s",
1924
                 kind, os_name, base.name,
1925
                 utils.CommaJoin(a), utils.CommaJoin(b))
1926

    
1927
    # check any missing OSes
1928
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
1929
    _ErrorIf(missing, self.ENODEOS, node,
1930
             "OSes present on reference node %s but missing on this node: %s",
1931
             base.name, utils.CommaJoin(missing))
1932

    
1933
  def _VerifyOob(self, ninfo, nresult):
1934
    """Verifies out of band functionality of a node.
1935

1936
    @type ninfo: L{objects.Node}
1937
    @param ninfo: the node to check
1938
    @param nresult: the remote results for the node
1939

1940
    """
1941
    node = ninfo.name
1942
    # We just have to verify the paths on master and/or master candidates
1943
    # as the oob helper is invoked on the master
1944
    if ((ninfo.master_candidate or ninfo.master_capable) and
1945
        constants.NV_OOB_PATHS in nresult):
1946
      for path_result in nresult[constants.NV_OOB_PATHS]:
1947
        self._ErrorIf(path_result, self.ENODEOOBPATH, node, path_result)
1948

    
1949
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
1950
    """Verifies and updates the node volume data.
1951

1952
    This function will update a L{NodeImage}'s internal structures
1953
    with data from the remote call.
1954

1955
    @type ninfo: L{objects.Node}
1956
    @param ninfo: the node to check
1957
    @param nresult: the remote results for the node
1958
    @param nimg: the node image object
1959
    @param vg_name: the configured VG name
1960

1961
    """
1962
    node = ninfo.name
1963
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1964

    
1965
    nimg.lvm_fail = True
1966
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1967
    if vg_name is None:
1968
      pass
1969
    elif isinstance(lvdata, basestring):
1970
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1971
               utils.SafeEncode(lvdata))
1972
    elif not isinstance(lvdata, dict):
1973
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1974
    else:
1975
      nimg.volumes = lvdata
1976
      nimg.lvm_fail = False
1977

    
1978
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
1979
    """Verifies and updates the node instance list.
1980

1981
    If the listing was successful, then updates this node's instance
1982
    list. Otherwise, it marks the RPC call as failed for the instance
1983
    list key.
1984

1985
    @type ninfo: L{objects.Node}
1986
    @param ninfo: the node to check
1987
    @param nresult: the remote results for the node
1988
    @param nimg: the node image object
1989

1990
    """
1991
    idata = nresult.get(constants.NV_INSTANCELIST, None)
1992
    test = not isinstance(idata, list)
1993
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
1994
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
1995
    if test:
1996
      nimg.hyp_fail = True
1997
    else:
1998
      nimg.instances = idata
1999

    
2000
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2001
    """Verifies and computes a node information map
2002

2003
    @type ninfo: L{objects.Node}
2004
    @param ninfo: the node to check
2005
    @param nresult: the remote results for the node
2006
    @param nimg: the node image object
2007
    @param vg_name: the configured VG name
2008

2009
    """
2010
    node = ninfo.name
2011
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2012

    
2013
    # try to read free memory (from the hypervisor)
2014
    hv_info = nresult.get(constants.NV_HVINFO, None)
2015
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2016
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
2017
    if not test:
2018
      try:
2019
        nimg.mfree = int(hv_info["memory_free"])
2020
      except (ValueError, TypeError):
2021
        _ErrorIf(True, self.ENODERPC, node,
2022
                 "node returned invalid nodeinfo, check hypervisor")
2023

    
2024
    # FIXME: devise a free space model for file based instances as well
2025
    if vg_name is not None:
2026
      test = (constants.NV_VGLIST not in nresult or
2027
              vg_name not in nresult[constants.NV_VGLIST])
2028
      _ErrorIf(test, self.ENODELVM, node,
2029
               "node didn't return data for the volume group '%s'"
2030
               " - it is either missing or broken", vg_name)
2031
      if not test:
2032
        try:
2033
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2034
        except (ValueError, TypeError):
2035
          _ErrorIf(True, self.ENODERPC, node,
2036
                   "node returned invalid LVM info, check LVM status")
2037

    
2038
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2039
    """Gets per-disk status information for all instances.
2040

2041
    @type nodelist: list of strings
2042
    @param nodelist: Node names
2043
    @type node_image: dict of (name, L{objects.Node})
2044
    @param node_image: Node objects
2045
    @type instanceinfo: dict of (name, L{objects.Instance})
2046
    @param instanceinfo: Instance objects
2047
    @rtype: {instance: {node: [(succes, payload)]}}
2048
    @return: a dictionary of per-instance dictionaries with nodes as
2049
        keys and disk information as values; the disk information is a
2050
        list of tuples (success, payload)
2051

2052
    """
2053
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2054

    
2055
    node_disks = {}
2056
    node_disks_devonly = {}
2057
    diskless_instances = set()
2058
    diskless = constants.DT_DISKLESS
2059

    
2060
    for nname in nodelist:
2061
      node_instances = list(itertools.chain(node_image[nname].pinst,
2062
                                            node_image[nname].sinst))
2063
      diskless_instances.update(inst for inst in node_instances
2064
                                if instanceinfo[inst].disk_template == diskless)
2065
      disks = [(inst, disk)
2066
               for inst in node_instances
2067
               for disk in instanceinfo[inst].disks]
2068

    
2069
      if not disks:
2070
        # No need to collect data
2071
        continue
2072

    
2073
      node_disks[nname] = disks
2074

    
2075
      # Creating copies as SetDiskID below will modify the objects and that can
2076
      # lead to incorrect data returned from nodes
2077
      devonly = [dev.Copy() for (_, dev) in disks]
2078

    
2079
      for dev in devonly:
2080
        self.cfg.SetDiskID(dev, nname)
2081

    
2082
      node_disks_devonly[nname] = devonly
2083

    
2084
    assert len(node_disks) == len(node_disks_devonly)
2085

    
2086
    # Collect data from all nodes with disks
2087
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2088
                                                          node_disks_devonly)
2089

    
2090
    assert len(result) == len(node_disks)
2091

    
2092
    instdisk = {}
2093

    
2094
    for (nname, nres) in result.items():
2095
      disks = node_disks[nname]
2096

    
2097
      if nres.offline:
2098
        # No data from this node
2099
        data = len(disks) * [(False, "node offline")]
2100
      else:
2101
        msg = nres.fail_msg
2102
        _ErrorIf(msg, self.ENODERPC, nname,
2103
                 "while getting disk information: %s", msg)
2104
        if msg:
2105
          # No data from this node
2106
          data = len(disks) * [(False, msg)]
2107
        else:
2108
          data = []
2109
          for idx, i in enumerate(nres.payload):
2110
            if isinstance(i, (tuple, list)) and len(i) == 2:
2111
              data.append(i)
2112
            else:
2113
              logging.warning("Invalid result from node %s, entry %d: %s",
2114
                              nname, idx, i)
2115
              data.append((False, "Invalid result from the remote node"))
2116

    
2117
      for ((inst, _), status) in zip(disks, data):
2118
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2119

    
2120
    # Add empty entries for diskless instances.
2121
    for inst in diskless_instances:
2122
      assert inst not in instdisk
2123
      instdisk[inst] = {}
2124

    
2125
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2126
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2127
                      compat.all(isinstance(s, (tuple, list)) and
2128
                                 len(s) == 2 for s in statuses)
2129
                      for inst, nnames in instdisk.items()
2130
                      for nname, statuses in nnames.items())
2131
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2132

    
2133
    return instdisk
2134

    
2135
  def _VerifyHVP(self, hvp_data):
2136
    """Verifies locally the syntax of the hypervisor parameters.
2137

2138
    """
2139
    for item, hv_name, hv_params in hvp_data:
2140
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
2141
             (item, hv_name))
2142
      try:
2143
        hv_class = hypervisor.GetHypervisor(hv_name)
2144
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2145
        hv_class.CheckParameterSyntax(hv_params)
2146
      except errors.GenericError, err:
2147
        self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
2148

    
2149
  def BuildHooksEnv(self):
2150
    """Build hooks env.
2151

2152
    Cluster-Verify hooks just ran in the post phase and their failure makes
2153
    the output be logged in the verify output and the verification to fail.
2154

2155
    """
2156
    cfg = self.cfg
2157

    
2158
    env = {
2159
      "CLUSTER_TAGS": " ".join(cfg.GetClusterInfo().GetTags())
2160
      }
2161

    
2162
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2163
               for node in cfg.GetAllNodesInfo().values())
2164

    
2165
    return env
2166

    
2167
  def BuildHooksNodes(self):
2168
    """Build hooks nodes.
2169

2170
    """
2171
    return ([], self.cfg.GetNodeList())
2172

    
2173
  def Exec(self, feedback_fn):
2174
    """Verify integrity of cluster, performing various test on nodes.
2175

2176
    """
2177
    # This method has too many local variables. pylint: disable-msg=R0914
2178
    self.bad = False
2179
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2180
    verbose = self.op.verbose
2181
    self._feedback_fn = feedback_fn
2182
    feedback_fn("* Verifying global settings")
2183
    for msg in self.cfg.VerifyConfig():
2184
      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
2185

    
2186
    # Check the cluster certificates
2187
    for cert_filename in constants.ALL_CERT_FILES:
2188
      (errcode, msg) = _VerifyCertificate(cert_filename)
2189
      _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
2190

    
2191
    vg_name = self.cfg.GetVGName()
2192
    drbd_helper = self.cfg.GetDRBDHelper()
2193
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2194
    cluster = self.cfg.GetClusterInfo()
2195
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
2196
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
2197
    nodeinfo_byname = dict(zip(nodelist, nodeinfo))
2198
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
2199
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
2200
                        for iname in instancelist)
2201
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2202
    i_non_redundant = [] # Non redundant instances
2203
    i_non_a_balanced = [] # Non auto-balanced instances
2204
    n_offline = 0 # Count of offline nodes
2205
    n_drained = 0 # Count of nodes being drained
2206
    node_vol_should = {}
2207

    
2208
    # FIXME: verify OS list
2209

    
2210
    # File verification
2211
    filemap = _ComputeAncillaryFiles(cluster, False)
2212

    
2213
    # do local checksums
2214
    master_node = self.master_node = self.cfg.GetMasterNode()
2215
    master_ip = self.cfg.GetMasterIP()
2216

    
2217
    # Compute the set of hypervisor parameters
2218
    hvp_data = []
2219
    for hv_name in hypervisors:
2220
      hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
2221
    for os_name, os_hvp in cluster.os_hvp.items():
2222
      for hv_name, hv_params in os_hvp.items():
2223
        if not hv_params:
2224
          continue
2225
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
2226
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
2227
    # TODO: collapse identical parameter values in a single one
2228
    for instance in instanceinfo.values():
2229
      if not instance.hvparams:
2230
        continue
2231
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
2232
                       cluster.FillHV(instance)))
2233
    # and verify them locally
2234
    self._VerifyHVP(hvp_data)
2235

    
2236
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
2237
    node_verify_param = {
2238
      constants.NV_FILELIST:
2239
        utils.UniqueSequence(filename
2240
                             for files in filemap
2241
                             for filename in files),
2242
      constants.NV_NODELIST: [node.name for node in nodeinfo
2243
                              if not node.offline],
2244
      constants.NV_HYPERVISOR: hypervisors,
2245
      constants.NV_HVPARAMS: hvp_data,
2246
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
2247
                                  node.secondary_ip) for node in nodeinfo
2248
                                 if not node.offline],
2249
      constants.NV_INSTANCELIST: hypervisors,
2250
      constants.NV_VERSION: None,
2251
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2252
      constants.NV_NODESETUP: None,
2253
      constants.NV_TIME: None,
2254
      constants.NV_MASTERIP: (master_node, master_ip),
2255
      constants.NV_OSLIST: None,
2256
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2257
      }
2258

    
2259
    if vg_name is not None:
2260
      node_verify_param[constants.NV_VGLIST] = None
2261
      node_verify_param[constants.NV_LVLIST] = vg_name
2262
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2263
      node_verify_param[constants.NV_DRBDLIST] = None
2264

    
2265
    if drbd_helper:
2266
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2267

    
2268
    # Build our expected cluster state
2269
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2270
                                                 name=node.name,
2271
                                                 vm_capable=node.vm_capable))
2272
                      for node in nodeinfo)
2273

    
2274
    # Gather OOB paths
2275
    oob_paths = []
2276
    for node in nodeinfo:
2277
      path = _SupportsOob(self.cfg, node)
2278
      if path and path not in oob_paths:
2279
        oob_paths.append(path)
2280

    
2281
    if oob_paths:
2282
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2283

    
2284
    for instance in instancelist:
2285
      inst_config = instanceinfo[instance]
2286

    
2287
      for nname in inst_config.all_nodes:
2288
        if nname not in node_image:
2289
          # ghost node
2290
          gnode = self.NodeImage(name=nname)
2291
          gnode.ghost = True
2292
          node_image[nname] = gnode
2293

    
2294
      inst_config.MapLVsByNode(node_vol_should)
2295

    
2296
      pnode = inst_config.primary_node
2297
      node_image[pnode].pinst.append(instance)
2298

    
2299
      for snode in inst_config.secondary_nodes:
2300
        nimg = node_image[snode]
2301
        nimg.sinst.append(instance)
2302
        if pnode not in nimg.sbp:
2303
          nimg.sbp[pnode] = []
2304
        nimg.sbp[pnode].append(instance)
2305

    
2306
    # At this point, we have the in-memory data structures complete,
2307
    # except for the runtime information, which we'll gather next
2308

    
2309
    # Due to the way our RPC system works, exact response times cannot be
2310
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2311
    # time before and after executing the request, we can at least have a time
2312
    # window.
2313
    nvinfo_starttime = time.time()
2314
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
2315
                                           self.cfg.GetClusterName())
2316
    nvinfo_endtime = time.time()
2317

    
2318
    all_drbd_map = self.cfg.ComputeDRBDMap()
2319

    
2320
    feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist))
2321
    instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo)
2322

    
2323
    feedback_fn("* Verifying configuration file consistency")
2324
    self._VerifyFiles(_ErrorIf, nodeinfo, master_node, all_nvinfo, filemap)
2325

    
2326
    feedback_fn("* Verifying node status")
2327

    
2328
    refos_img = None
2329

    
2330
    for node_i in nodeinfo:
2331
      node = node_i.name
2332
      nimg = node_image[node]
2333

    
2334
      if node_i.offline:
2335
        if verbose:
2336
          feedback_fn("* Skipping offline node %s" % (node,))
2337
        n_offline += 1
2338
        continue
2339

    
2340
      if node == master_node:
2341
        ntype = "master"
2342
      elif node_i.master_candidate:
2343
        ntype = "master candidate"
2344
      elif node_i.drained:
2345
        ntype = "drained"
2346
        n_drained += 1
2347
      else:
2348
        ntype = "regular"
2349
      if verbose:
2350
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2351

    
2352
      msg = all_nvinfo[node].fail_msg
2353
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
2354
      if msg:
2355
        nimg.rpc_fail = True
2356
        continue
2357

    
2358
      nresult = all_nvinfo[node].payload
2359

    
2360
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2361
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2362
      self._VerifyNodeNetwork(node_i, nresult)
2363
      self._VerifyOob(node_i, nresult)
2364

    
2365
      if nimg.vm_capable:
2366
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2367
        self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper,
2368
                             all_drbd_map)
2369

    
2370
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2371
        self._UpdateNodeInstances(node_i, nresult, nimg)
2372
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2373
        self._UpdateNodeOS(node_i, nresult, nimg)
2374
        if not nimg.os_fail:
2375
          if refos_img is None:
2376
            refos_img = nimg
2377
          self._VerifyNodeOS(node_i, nimg, refos_img)
2378

    
2379
    feedback_fn("* Verifying instance status")
2380
    for instance in instancelist:
2381
      if verbose:
2382
        feedback_fn("* Verifying instance %s" % instance)
2383
      inst_config = instanceinfo[instance]
2384
      self._VerifyInstance(instance, inst_config, node_image,
2385
                           instdisk[instance])
2386
      inst_nodes_offline = []
2387

    
2388
      pnode = inst_config.primary_node
2389
      pnode_img = node_image[pnode]
2390
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2391
               self.ENODERPC, pnode, "instance %s, connection to"
2392
               " primary node failed", instance)
2393

    
2394
      _ErrorIf(inst_config.admin_up and pnode_img.offline,
2395
               self.EINSTANCEBADNODE, instance,
2396
               "instance is marked as running and lives on offline node %s",
2397
               inst_config.primary_node)
2398

    
2399
      # If the instance is non-redundant we cannot survive losing its primary
2400
      # node, so we are not N+1 compliant. On the other hand we have no disk
2401
      # templates with more than one secondary so that situation is not well
2402
      # supported either.
2403
      # FIXME: does not support file-backed instances
2404
      if not inst_config.secondary_nodes:
2405
        i_non_redundant.append(instance)
2406

    
2407
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
2408
               instance, "instance has multiple secondary nodes: %s",
2409
               utils.CommaJoin(inst_config.secondary_nodes),
2410
               code=self.ETYPE_WARNING)
2411

    
2412
      if inst_config.disk_template in constants.DTS_INT_MIRROR:
2413
        pnode = inst_config.primary_node
2414
        instance_nodes = utils.NiceSort(inst_config.all_nodes)
2415
        instance_groups = {}
2416

    
2417
        for node in instance_nodes:
2418
          instance_groups.setdefault(nodeinfo_byname[node].group,
2419
                                     []).append(node)
2420

    
2421
        pretty_list = [
2422
          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
2423
          # Sort so that we always list the primary node first.
2424
          for group, nodes in sorted(instance_groups.items(),
2425
                                     key=lambda (_, nodes): pnode in nodes,
2426
                                     reverse=True)]
2427

    
2428
        self._ErrorIf(len(instance_groups) > 1, self.EINSTANCESPLITGROUPS,
2429
                      instance, "instance has primary and secondary nodes in"
2430
                      " different groups: %s", utils.CommaJoin(pretty_list),
2431
                      code=self.ETYPE_WARNING)
2432

    
2433
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2434
        i_non_a_balanced.append(instance)
2435

    
2436
      for snode in inst_config.secondary_nodes:
2437
        s_img = node_image[snode]
2438
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
2439
                 "instance %s, connection to secondary node failed", instance)
2440

    
2441
        if s_img.offline:
2442
          inst_nodes_offline.append(snode)
2443

    
2444
      # warn that the instance lives on offline nodes
2445
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
2446
               "instance has offline secondary node(s) %s",
2447
               utils.CommaJoin(inst_nodes_offline))
2448
      # ... or ghost/non-vm_capable nodes
2449
      for node in inst_config.all_nodes:
2450
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
2451
                 "instance lives on ghost node %s", node)
2452
        _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE,
2453
                 instance, "instance lives on non-vm_capable node %s", node)
2454

    
2455
    feedback_fn("* Verifying orphan volumes")
2456
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2457
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2458

    
2459
    feedback_fn("* Verifying orphan instances")
2460
    self._VerifyOrphanInstances(instancelist, node_image)
2461

    
2462
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2463
      feedback_fn("* Verifying N+1 Memory redundancy")
2464
      self._VerifyNPlusOneMemory(node_image, instanceinfo)
2465

    
2466
    feedback_fn("* Other Notes")
2467
    if i_non_redundant:
2468
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2469
                  % len(i_non_redundant))
2470

    
2471
    if i_non_a_balanced:
2472
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2473
                  % len(i_non_a_balanced))
2474

    
2475
    if n_offline:
2476
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2477

    
2478
    if n_drained:
2479
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2480

    
2481
    return not self.bad
2482

    
2483
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2484
    """Analyze the post-hooks' result
2485

2486
    This method analyses the hook result, handles it, and sends some
2487
    nicely-formatted feedback back to the user.
2488

2489
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2490
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2491
    @param hooks_results: the results of the multi-node hooks rpc call
2492
    @param feedback_fn: function used send feedback back to the caller
2493
    @param lu_result: previous Exec result
2494
    @return: the new Exec result, based on the previous result
2495
        and hook results
2496

2497
    """
2498
    # We only really run POST phase hooks, and are only interested in
2499
    # their results
2500
    if phase == constants.HOOKS_PHASE_POST:
2501
      # Used to change hooks' output to proper indentation
2502
      feedback_fn("* Hooks Results")
2503
      assert hooks_results, "invalid result from hooks"
2504

    
2505
      for node_name in hooks_results:
2506
        res = hooks_results[node_name]
2507
        msg = res.fail_msg
2508
        test = msg and not res.offline
2509
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
2510
                      "Communication failure in hooks execution: %s", msg)
2511
        if res.offline or msg:
2512
          # No need to investigate payload if node is offline or gave an error.
2513
          # override manually lu_result here as _ErrorIf only
2514
          # overrides self.bad
2515
          lu_result = 1
2516
          continue
2517
        for script, hkr, output in res.payload:
2518
          test = hkr == constants.HKR_FAIL
2519
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
2520
                        "Script %s failed, output:", script)
2521
          if test:
2522
            output = self._HOOKS_INDENT_RE.sub('      ', output)
2523
            feedback_fn("%s" % output)
2524
            lu_result = 0
2525

    
2526
      return lu_result
2527

    
2528

    
2529
class LUClusterVerifyDisks(NoHooksLU):
2530
  """Verifies the cluster disks status.
2531

2532
  """
2533
  REQ_BGL = False
2534

    
2535
  def ExpandNames(self):
2536
    self.needed_locks = {
2537
      locking.LEVEL_NODE: locking.ALL_SET,
2538
      locking.LEVEL_INSTANCE: locking.ALL_SET,
2539
    }
2540
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
2541

    
2542
  def Exec(self, feedback_fn):
2543
    """Verify integrity of cluster disks.
2544

2545
    @rtype: tuple of three items
2546
    @return: a tuple of (dict of node-to-node_error, list of instances
2547
        which need activate-disks, dict of instance: (node, volume) for
2548
        missing volumes
2549

2550
    """
2551
    result = res_nodes, res_instances, res_missing = {}, [], {}
2552

    
2553
    nodes = utils.NiceSort(self.cfg.GetVmCapableNodeList())
2554
    instances = self.cfg.GetAllInstancesInfo().values()
2555

    
2556
    nv_dict = {}
2557
    for inst in instances:
2558
      inst_lvs = {}
2559
      if not inst.admin_up:
2560
        continue
2561
      inst.MapLVsByNode(inst_lvs)
2562
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
2563
      for node, vol_list in inst_lvs.iteritems():
2564
        for vol in vol_list:
2565
          nv_dict[(node, vol)] = inst
2566

    
2567
    if not nv_dict:
2568
      return result
2569

    
2570
    node_lvs = self.rpc.call_lv_list(nodes, [])
2571
    for node, node_res in node_lvs.items():
2572
      if node_res.offline:
2573
        continue
2574
      msg = node_res.fail_msg
2575
      if msg:
2576
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
2577
        res_nodes[node] = msg
2578
        continue
2579

    
2580
      lvs = node_res.payload
2581
      for lv_name, (_, _, lv_online) in lvs.items():
2582
        inst = nv_dict.pop((node, lv_name), None)
2583
        if (not lv_online and inst is not None
2584
            and inst.name not in res_instances):
2585
          res_instances.append(inst.name)
2586

    
2587
    # any leftover items in nv_dict are missing LVs, let's arrange the
2588
    # data better
2589
    for key, inst in nv_dict.iteritems():
2590
      if inst.name not in res_missing:
2591
        res_missing[inst.name] = []
2592
      res_missing[inst.name].append(key)
2593

    
2594
    return result
2595

    
2596

    
2597
class LUClusterRepairDiskSizes(NoHooksLU):
2598
  """Verifies the cluster disks sizes.
2599

2600
  """
2601
  REQ_BGL = False
2602

    
2603
  def ExpandNames(self):
2604
    if self.op.instances:
2605
      self.wanted_names = []
2606
      for name in self.op.instances:
2607
        full_name = _ExpandInstanceName(self.cfg, name)
2608
        self.wanted_names.append(full_name)
2609
      self.needed_locks = {
2610
        locking.LEVEL_NODE: [],
2611
        locking.LEVEL_INSTANCE: self.wanted_names,
2612
        }
2613
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2614
    else:
2615
      self.wanted_names = None
2616
      self.needed_locks = {
2617
        locking.LEVEL_NODE: locking.ALL_SET,
2618
        locking.LEVEL_INSTANCE: locking.ALL_SET,
2619
        }
2620
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
2621

    
2622
  def DeclareLocks(self, level):
2623
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
2624
      self._LockInstancesNodes(primary_only=True)
2625

    
2626
  def CheckPrereq(self):
2627
    """Check prerequisites.
2628

2629
    This only checks the optional instance list against the existing names.
2630

2631
    """
2632
    if self.wanted_names is None:
2633
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2634

    
2635
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
2636
                             in self.wanted_names]
2637

    
2638
  def _EnsureChildSizes(self, disk):
2639
    """Ensure children of the disk have the needed disk size.
2640

2641
    This is valid mainly for DRBD8 and fixes an issue where the
2642
    children have smaller disk size.
2643

2644
    @param disk: an L{ganeti.objects.Disk} object
2645

2646
    """
2647
    if disk.dev_type == constants.LD_DRBD8:
2648
      assert disk.children, "Empty children for DRBD8?"
2649
      fchild = disk.children[0]
2650
      mismatch = fchild.size < disk.size
2651
      if mismatch:
2652
        self.LogInfo("Child disk has size %d, parent %d, fixing",
2653
                     fchild.size, disk.size)
2654
        fchild.size = disk.size
2655

    
2656
      # and we recurse on this child only, not on the metadev
2657
      return self._EnsureChildSizes(fchild) or mismatch
2658
    else:
2659
      return False
2660

    
2661
  def Exec(self, feedback_fn):
2662
    """Verify the size of cluster disks.
2663

2664
    """
2665
    # TODO: check child disks too
2666
    # TODO: check differences in size between primary/secondary nodes
2667
    per_node_disks = {}
2668
    for instance in self.wanted_instances:
2669
      pnode = instance.primary_node
2670
      if pnode not in per_node_disks:
2671
        per_node_disks[pnode] = []
2672
      for idx, disk in enumerate(instance.disks):
2673
        per_node_disks[pnode].append((instance, idx, disk))
2674

    
2675
    changed = []
2676
    for node, dskl in per_node_disks.items():
2677
      newl = [v[2].Copy() for v in dskl]
2678
      for dsk in newl:
2679
        self.cfg.SetDiskID(dsk, node)
2680
      result = self.rpc.call_blockdev_getsize(node, newl)
2681
      if result.fail_msg:
2682
        self.LogWarning("Failure in blockdev_getsize call to node"
2683
                        " %s, ignoring", node)
2684
        continue
2685
      if len(result.payload) != len(dskl):
2686
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
2687
                        " result.payload=%s", node, len(dskl), result.payload)
2688
        self.LogWarning("Invalid result from node %s, ignoring node results",
2689
                        node)
2690
        continue
2691
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
2692
        if size is None:
2693
          self.LogWarning("Disk %d of instance %s did not return size"
2694
                          " information, ignoring", idx, instance.name)
2695
          continue
2696
        if not isinstance(size, (int, long)):
2697
          self.LogWarning("Disk %d of instance %s did not return valid"
2698
                          " size information, ignoring", idx, instance.name)
2699
          continue
2700
        size = size >> 20
2701
        if size != disk.size:
2702
          self.LogInfo("Disk %d of instance %s has mismatched size,"
2703
                       " correcting: recorded %d, actual %d", idx,
2704
                       instance.name, disk.size, size)
2705
          disk.size = size
2706
          self.cfg.Update(instance, feedback_fn)
2707
          changed.append((instance.name, idx, size))
2708
        if self._EnsureChildSizes(disk):
2709
          self.cfg.Update(instance, feedback_fn)
2710
          changed.append((instance.name, idx, disk.size))
2711
    return changed
2712

    
2713

    
2714
class LUClusterRename(LogicalUnit):
2715
  """Rename the cluster.
2716

2717
  """
2718
  HPATH = "cluster-rename"
2719
  HTYPE = constants.HTYPE_CLUSTER
2720

    
2721
  def BuildHooksEnv(self):
2722
    """Build hooks env.
2723

2724
    """
2725
    return {
2726
      "OP_TARGET": self.cfg.GetClusterName(),
2727
      "NEW_NAME": self.op.name,
2728
      }
2729

    
2730
  def BuildHooksNodes(self):
2731
    """Build hooks nodes.
2732

2733
    """
2734
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
2735

    
2736
  def CheckPrereq(self):
2737
    """Verify that the passed name is a valid one.
2738

2739
    """
2740
    hostname = netutils.GetHostname(name=self.op.name,
2741
                                    family=self.cfg.GetPrimaryIPFamily())
2742

    
2743
    new_name = hostname.name
2744
    self.ip = new_ip = hostname.ip
2745
    old_name = self.cfg.GetClusterName()
2746
    old_ip = self.cfg.GetMasterIP()
2747
    if new_name == old_name and new_ip == old_ip:
2748
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
2749
                                 " cluster has changed",
2750
                                 errors.ECODE_INVAL)
2751
    if new_ip != old_ip:
2752
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
2753
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
2754
                                   " reachable on the network" %
2755
                                   new_ip, errors.ECODE_NOTUNIQUE)
2756

    
2757
    self.op.name = new_name
2758

    
2759
  def Exec(self, feedback_fn):
2760
    """Rename the cluster.
2761

2762
    """
2763
    clustername = self.op.name
2764
    ip = self.ip
2765

    
2766
    # shutdown the master IP
2767
    master = self.cfg.GetMasterNode()
2768
    result = self.rpc.call_node_stop_master(master, False)
2769
    result.Raise("Could not disable the master role")
2770

    
2771
    try:
2772
      cluster = self.cfg.GetClusterInfo()
2773
      cluster.cluster_name = clustername
2774
      cluster.master_ip = ip
2775
      self.cfg.Update(cluster, feedback_fn)
2776

    
2777
      # update the known hosts file
2778
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
2779
      node_list = self.cfg.GetOnlineNodeList()
2780
      try:
2781
        node_list.remove(master)
2782
      except ValueError:
2783
        pass
2784
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
2785
    finally:
2786
      result = self.rpc.call_node_start_master(master, False, False)
2787
      msg = result.fail_msg
2788
      if msg:
2789
        self.LogWarning("Could not re-enable the master role on"
2790
                        " the master, please restart manually: %s", msg)
2791

    
2792
    return clustername
2793

    
2794

    
2795
class LUClusterSetParams(LogicalUnit):
2796
  """Change the parameters of the cluster.
2797

2798
  """
2799
  HPATH = "cluster-modify"
2800
  HTYPE = constants.HTYPE_CLUSTER
2801
  REQ_BGL = False
2802

    
2803
  def CheckArguments(self):
2804
    """Check parameters
2805

2806
    """
2807
    if self.op.uid_pool:
2808
      uidpool.CheckUidPool(self.op.uid_pool)
2809

    
2810
    if self.op.add_uids:
2811
      uidpool.CheckUidPool(self.op.add_uids)
2812

    
2813
    if self.op.remove_uids:
2814
      uidpool.CheckUidPool(self.op.remove_uids)
2815

    
2816
  def ExpandNames(self):
2817
    # FIXME: in the future maybe other cluster params won't require checking on
2818
    # all nodes to be modified.
2819
    self.needed_locks = {
2820
      locking.LEVEL_NODE: locking.ALL_SET,
2821
    }
2822
    self.share_locks[locking.LEVEL_NODE] = 1
2823

    
2824
  def BuildHooksEnv(self):
2825
    """Build hooks env.
2826

2827
    """
2828
    return {
2829
      "OP_TARGET": self.cfg.GetClusterName(),
2830
      "NEW_VG_NAME": self.op.vg_name,
2831
      }
2832

    
2833
  def BuildHooksNodes(self):
2834
    """Build hooks nodes.
2835

2836
    """
2837
    mn = self.cfg.GetMasterNode()
2838
    return ([mn], [mn])
2839

    
2840
  def CheckPrereq(self):
2841
    """Check prerequisites.
2842

2843
    This checks whether the given params don't conflict and
2844
    if the given volume group is valid.
2845

2846
    """
2847
    if self.op.vg_name is not None and not self.op.vg_name:
2848
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
2849
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
2850
                                   " instances exist", errors.ECODE_INVAL)
2851

    
2852
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
2853
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
2854
        raise errors.OpPrereqError("Cannot disable drbd helper while"
2855
                                   " drbd-based instances exist",
2856
                                   errors.ECODE_INVAL)
2857

    
2858
    node_list = self.acquired_locks[locking.LEVEL_NODE]
2859

    
2860
    # if vg_name not None, checks given volume group on all nodes
2861
    if self.op.vg_name:
2862
      vglist = self.rpc.call_vg_list(node_list)
2863
      for node in node_list:
2864
        msg = vglist[node].fail_msg
2865
        if msg:
2866
          # ignoring down node
2867
          self.LogWarning("Error while gathering data on node %s"
2868
                          " (ignoring node): %s", node, msg)
2869
          continue
2870
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2871
                                              self.op.vg_name,
2872
                                              constants.MIN_VG_SIZE)
2873
        if vgstatus:
2874
          raise errors.OpPrereqError("Error on node '%s': %s" %
2875
                                     (node, vgstatus), errors.ECODE_ENVIRON)
2876

    
2877
    if self.op.drbd_helper:
2878
      # checks given drbd helper on all nodes
2879
      helpers = self.rpc.call_drbd_helper(node_list)
2880
      for node in node_list:
2881
        ninfo = self.cfg.GetNodeInfo(node)
2882
        if ninfo.offline:
2883
          self.LogInfo("Not checking drbd helper on offline node %s", node)
2884
          continue
2885
        msg = helpers[node].fail_msg
2886
        if msg:
2887
          raise errors.OpPrereqError("Error checking drbd helper on node"
2888
                                     " '%s': %s" % (node, msg),
2889
                                     errors.ECODE_ENVIRON)
2890
        node_helper = helpers[node].payload
2891
        if node_helper != self.op.drbd_helper:
2892
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
2893
                                     (node, node_helper), errors.ECODE_ENVIRON)
2894

    
2895
    self.cluster = cluster = self.cfg.GetClusterInfo()
2896
    # validate params changes
2897
    if self.op.beparams:
2898
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2899
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
2900

    
2901
    if self.op.ndparams:
2902
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
2903
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
2904

    
2905
      # TODO: we need a more general way to handle resetting
2906
      # cluster-level parameters to default values
2907
      if self.new_ndparams["oob_program"] == "":
2908
        self.new_ndparams["oob_program"] = \
2909
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
2910

    
2911
    if self.op.nicparams:
2912
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2913
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
2914
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
2915
      nic_errors = []
2916

    
2917
      # check all instances for consistency
2918
      for instance in self.cfg.GetAllInstancesInfo().values():
2919
        for nic_idx, nic in enumerate(instance.nics):
2920
          params_copy = copy.deepcopy(nic.nicparams)
2921
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
2922

    
2923
          # check parameter syntax
2924
          try:
2925
            objects.NIC.CheckParameterSyntax(params_filled)
2926
          except errors.ConfigurationError, err:
2927
            nic_errors.append("Instance %s, nic/%d: %s" %
2928
                              (instance.name, nic_idx, err))
2929

    
2930
          # if we're moving instances to routed, check that they have an ip
2931
          target_mode = params_filled[constants.NIC_MODE]
2932
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2933
            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2934
                              (instance.name, nic_idx))
2935
      if nic_errors:
2936
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2937
                                   "\n".join(nic_errors))
2938

    
2939
    # hypervisor list/parameters
2940
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
2941
    if self.op.hvparams:
2942
      for hv_name, hv_dict in self.op.hvparams.items():
2943
        if hv_name not in self.new_hvparams:
2944
          self.new_hvparams[hv_name] = hv_dict
2945
        else:
2946
          self.new_hvparams[hv_name].update(hv_dict)
2947

    
2948
    # os hypervisor parameters
2949
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
2950
    if self.op.os_hvp:
2951
      for os_name, hvs in self.op.os_hvp.items():
2952
        if os_name not in self.new_os_hvp:
2953
          self.new_os_hvp[os_name] = hvs
2954
        else:
2955
          for hv_name, hv_dict in hvs.items():
2956
            if hv_name not in self.new_os_hvp[os_name]:
2957
              self.new_os_hvp[os_name][hv_name] = hv_dict
2958
            else:
2959
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
2960

    
2961
    # os parameters
2962
    self.new_osp = objects.FillDict(cluster.osparams, {})
2963
    if self.op.osparams:
2964
      for os_name, osp in self.op.osparams.items():
2965
        if os_name not in self.new_osp:
2966
          self.new_osp[os_name] = {}
2967

    
2968
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
2969
                                                  use_none=True)
2970

    
2971
        if not self.new_osp[os_name]:
2972
          # we removed all parameters
2973
          del self.new_osp[os_name]
2974
        else:
2975
          # check the parameter validity (remote check)
2976
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
2977
                         os_name, self.new_osp[os_name])
2978

    
2979
    # changes to the hypervisor list
2980
    if self.op.enabled_hypervisors is not None:
2981
      self.hv_list = self.op.enabled_hypervisors
2982
      for hv in self.hv_list:
2983
        # if the hypervisor doesn't already exist in the cluster
2984
        # hvparams, we initialize it to empty, and then (in both
2985
        # cases) we make sure to fill the defaults, as we might not
2986
        # have a complete defaults list if the hypervisor wasn't
2987
        # enabled before
2988
        if hv not in new_hvp:
2989
          new_hvp[hv] = {}
2990
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
2991
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
2992
    else:
2993
      self.hv_list = cluster.enabled_hypervisors
2994

    
2995
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
2996
      # either the enabled list has changed, or the parameters have, validate
2997
      for hv_name, hv_params in self.new_hvparams.items():
2998
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
2999
            (self.op.enabled_hypervisors and
3000
             hv_name in self.op.enabled_hypervisors)):
3001
          # either this is a new hypervisor, or its parameters have changed
3002
          hv_class = hypervisor.GetHypervisor(hv_name)
3003
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3004
          hv_class.CheckParameterSyntax(hv_params)
3005
          _CheckHVParams(self, node_list, hv_name, hv_params)
3006

    
3007
    if self.op.os_hvp:
3008
      # no need to check any newly-enabled hypervisors, since the
3009
      # defaults have already been checked in the above code-block
3010
      for os_name, os_hvp in self.new_os_hvp.items():
3011
        for hv_name, hv_params in os_hvp.items():
3012
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3013
          # we need to fill in the new os_hvp on top of the actual hv_p
3014
          cluster_defaults = self.new_hvparams.get(hv_name, {})
3015
          new_osp = objects.FillDict(cluster_defaults, hv_params)
3016
          hv_class = hypervisor.GetHypervisor(hv_name)
3017
          hv_class.CheckParameterSyntax(new_osp)
3018
          _CheckHVParams(self, node_list, hv_name, new_osp)
3019

    
3020
    if self.op.default_iallocator:
3021
      alloc_script = utils.FindFile(self.op.default_iallocator,
3022
                                    constants.IALLOCATOR_SEARCH_PATH,
3023
                                    os.path.isfile)
3024
      if alloc_script is None:
3025
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
3026
                                   " specified" % self.op.default_iallocator,
3027
                                   errors.ECODE_INVAL)
3028

    
3029
  def Exec(self, feedback_fn):
3030
    """Change the parameters of the cluster.
3031

3032
    """
3033
    if self.op.vg_name is not None:
3034
      new_volume = self.op.vg_name
3035
      if not new_volume:
3036
        new_volume = None
3037
      if new_volume != self.cfg.GetVGName():
3038
        self.cfg.SetVGName(new_volume)
3039
      else:
3040
        feedback_fn("Cluster LVM configuration already in desired"
3041
                    " state, not changing")
3042
    if self.op.drbd_helper is not None:
3043
      new_helper = self.op.drbd_helper
3044
      if not new_helper:
3045
        new_helper = None
3046
      if new_helper != self.cfg.GetDRBDHelper():
3047
        self.cfg.SetDRBDHelper(new_helper)
3048
      else:
3049
        feedback_fn("Cluster DRBD helper already in desired state,"
3050
                    " not changing")
3051
    if self.op.hvparams:
3052
      self.cluster.hvparams = self.new_hvparams
3053
    if self.op.os_hvp:
3054
      self.cluster.os_hvp = self.new_os_hvp
3055
    if self.op.enabled_hypervisors is not None:
3056
      self.cluster.hvparams = self.new_hvparams
3057
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
3058
    if self.op.beparams:
3059
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
3060
    if self.op.nicparams:
3061
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
3062
    if self.op.osparams:
3063
      self.cluster.osparams = self.new_osp
3064
    if self.op.ndparams:
3065
      self.cluster.ndparams = self.new_ndparams
3066

    
3067
    if self.op.candidate_pool_size is not None:
3068
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3069
      # we need to update the pool size here, otherwise the save will fail
3070
      _AdjustCandidatePool(self, [])
3071

    
3072
    if self.op.maintain_node_health is not None:
3073
      self.cluster.maintain_node_health = self.op.maintain_node_health
3074

    
3075
    if self.op.prealloc_wipe_disks is not None:
3076
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3077

    
3078
    if self.op.add_uids is not None:
3079
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3080

    
3081
    if self.op.remove_uids is not None:
3082
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3083

    
3084
    if self.op.uid_pool is not None:
3085
      self.cluster.uid_pool = self.op.uid_pool
3086

    
3087
    if self.op.default_iallocator is not None:
3088
      self.cluster.default_iallocator = self.op.default_iallocator
3089

    
3090
    if self.op.reserved_lvs is not None:
3091
      self.cluster.reserved_lvs = self.op.reserved_lvs
3092

    
3093
    def helper_os(aname, mods, desc):
3094
      desc += " OS list"
3095
      lst = getattr(self.cluster, aname)
3096
      for key, val in mods:
3097
        if key == constants.DDM_ADD:
3098
          if val in lst:
3099
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3100
          else:
3101
            lst.append(val)
3102
        elif key == constants.DDM_REMOVE:
3103
          if val in lst:
3104
            lst.remove(val)
3105
          else:
3106
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3107
        else:
3108
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3109

    
3110
    if self.op.hidden_os:
3111
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3112

    
3113
    if self.op.blacklisted_os:
3114
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3115

    
3116
    if self.op.master_netdev:
3117
      master = self.cfg.GetMasterNode()
3118
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3119
                  self.cluster.master_netdev)
3120
      result = self.rpc.call_node_stop_master(master, False)
3121
      result.Raise("Could not disable the master ip")
3122
      feedback_fn("Changing master_netdev from %s to %s" %
3123
                  (self.cluster.master_netdev, self.op.master_netdev))
3124
      self.cluster.master_netdev = self.op.master_netdev
3125

    
3126
    self.cfg.Update(self.cluster, feedback_fn)
3127

    
3128
    if self.op.master_netdev:
3129
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3130
                  self.op.master_netdev)
3131
      result = self.rpc.call_node_start_master(master, False, False)
3132
      if result.fail_msg:
3133
        self.LogWarning("Could not re-enable the master ip on"
3134
                        " the master, please restart manually: %s",
3135
                        result.fail_msg)
3136

    
3137

    
3138
def _UploadHelper(lu, nodes, fname):
3139
  """Helper for uploading a file and showing warnings.
3140

3141
  """
3142
  if os.path.exists(fname):
3143
    result = lu.rpc.call_upload_file(nodes, fname)
3144
    for to_node, to_result in result.items():
3145
      msg = to_result.fail_msg
3146
      if msg:
3147
        msg = ("Copy of file %s to node %s failed: %s" %
3148
               (fname, to_node, msg))
3149
        lu.proc.LogWarning(msg)
3150

    
3151

    
3152
def _ComputeAncillaryFiles(cluster, redist):
3153
  """Compute files external to Ganeti which need to be consistent.
3154

3155
  @type redist: boolean
3156
  @param redist: Whether to include files which need to be redistributed
3157

3158
  """
3159
  # Compute files for all nodes
3160
  files_all = set([
3161
    constants.SSH_KNOWN_HOSTS_FILE,
3162
    constants.CONFD_HMAC_KEY,
3163
    constants.CLUSTER_DOMAIN_SECRET_FILE,
3164
    ])
3165

    
3166
  if not redist:
3167
    files_all.update(constants.ALL_CERT_FILES)
3168
    files_all.update(ssconf.SimpleStore().GetFileList())
3169

    
3170
  if cluster.modify_etc_hosts:
3171
    files_all.add(constants.ETC_HOSTS)
3172

    
3173
  # Files which must either exist on all nodes or on none
3174
  files_all_opt = set([
3175
    constants.RAPI_USERS_FILE,
3176
    ])
3177

    
3178
  # Files which should only be on master candidates
3179
  files_mc = set()
3180
  if not redist:
3181
    files_mc.add(constants.CLUSTER_CONF_FILE)
3182

    
3183
  # Files which should only be on VM-capable nodes
3184
  files_vm = set(filename
3185
    for hv_name in cluster.enabled_hypervisors
3186
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles())
3187

    
3188
  # Filenames must be unique
3189
  assert (len(files_all | files_all_opt | files_mc | files_vm) ==
3190
          sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
3191
         "Found file listed in more than one file list"
3192

    
3193
  return (files_all, files_all_opt, files_mc, files_vm)
3194

    
3195

    
3196
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3197
  """Distribute additional files which are part of the cluster configuration.
3198

3199
  ConfigWriter takes care of distributing the config and ssconf files, but
3200
  there are more files which should be distributed to all nodes. This function
3201
  makes sure those are copied.
3202

3203
  @param lu: calling logical unit
3204
  @param additional_nodes: list of nodes not in the config to distribute to
3205
  @type additional_vm: boolean
3206
  @param additional_vm: whether the additional nodes are vm-capable or not
3207

3208
  """
3209
  # Gather target nodes
3210
  cluster = lu.cfg.GetClusterInfo()
3211
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3212

    
3213
  online_nodes = lu.cfg.GetOnlineNodeList()
3214
  vm_nodes = lu.cfg.GetVmCapableNodeList()
3215

    
3216
  if additional_nodes is not None:
3217
    online_nodes.extend(additional_nodes)
3218
    if additional_vm:
3219
      vm_nodes.extend(additional_nodes)
3220

    
3221
  # Never distribute to master node
3222
  for nodelist in [online_nodes, vm_nodes]:
3223
    if master_info.name in nodelist:
3224
      nodelist.remove(master_info.name)
3225

    
3226
  # Gather file lists
3227
  (files_all, files_all_opt, files_mc, files_vm) = \
3228
    _ComputeAncillaryFiles(cluster, True)
3229

    
3230
  # Never re-distribute configuration file from here
3231
  assert not (constants.CLUSTER_CONF_FILE in files_all or
3232
              constants.CLUSTER_CONF_FILE in files_vm)
3233
  assert not files_mc, "Master candidates not handled in this function"
3234

    
3235
  filemap = [
3236
    (online_nodes, files_all),
3237
    (online_nodes, files_all_opt),
3238
    (vm_nodes, files_vm),
3239
    ]
3240

    
3241
  # Upload the files
3242
  for (node_list, files) in filemap:
3243
    for fname in files:
3244
      _UploadHelper(lu, node_list, fname)
3245

    
3246

    
3247
class LUClusterRedistConf(NoHooksLU):
3248
  """Force the redistribution of cluster configuration.
3249

3250
  This is a very simple LU.
3251

3252
  """
3253
  REQ_BGL = False
3254

    
3255
  def ExpandNames(self):
3256
    self.needed_locks = {
3257
      locking.LEVEL_NODE: locking.ALL_SET,
3258
    }
3259
    self.share_locks[locking.LEVEL_NODE] = 1
3260

    
3261
  def Exec(self, feedback_fn):
3262
    """Redistribute the configuration.
3263

3264
    """
3265
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3266
    _RedistributeAncillaryFiles(self)
3267

    
3268

    
3269
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3270
  """Sleep and poll for an instance's disk to sync.
3271

3272
  """
3273
  if not instance.disks or disks is not None and not disks:
3274
    return True
3275

    
3276
  disks = _ExpandCheckDisks(instance, disks)
3277

    
3278
  if not oneshot:
3279
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3280

    
3281
  node = instance.primary_node
3282

    
3283
  for dev in disks:
3284
    lu.cfg.SetDiskID(dev, node)
3285

    
3286
  # TODO: Convert to utils.Retry
3287

    
3288
  retries = 0
3289
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3290
  while True:
3291
    max_time = 0
3292
    done = True
3293
    cumul_degraded = False
3294
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3295
    msg = rstats.fail_msg
3296
    if msg:
3297
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3298
      retries += 1
3299
      if retries >= 10:
3300
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3301
                                 " aborting." % node)
3302
      time.sleep(6)
3303
      continue
3304
    rstats = rstats.payload
3305
    retries = 0
3306
    for i, mstat in enumerate(rstats):
3307
      if mstat is None:
3308
        lu.LogWarning("Can't compute data for node %s/%s",
3309
                           node, disks[i].iv_name)
3310
        continue
3311

    
3312
      cumul_degraded = (cumul_degraded or
3313
                        (mstat.is_degraded and mstat.sync_percent is None))
3314
      if mstat.sync_percent is not None:
3315
        done = False
3316
        if mstat.estimated_time is not None:
3317
          rem_time = ("%s remaining (estimated)" %
3318
                      utils.FormatSeconds(mstat.estimated_time))
3319
          max_time = mstat.estimated_time
3320
        else:
3321
          rem_time = "no time estimate"
3322
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
3323
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
3324

    
3325
    # if we're done but degraded, let's do a few small retries, to
3326
    # make sure we see a stable and not transient situation; therefore
3327
    # we force restart of the loop
3328
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3329
      logging.info("Degraded disks found, %d retries left", degr_retries)
3330
      degr_retries -= 1
3331
      time.sleep(1)
3332
      continue
3333

    
3334
    if done or oneshot:
3335
      break
3336

    
3337
    time.sleep(min(60, max_time))
3338

    
3339
  if done:
3340
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
3341
  return not cumul_degraded
3342

    
3343

    
3344
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3345
  """Check that mirrors are not degraded.
3346

3347
  The ldisk parameter, if True, will change the test from the
3348
  is_degraded attribute (which represents overall non-ok status for
3349
  the device(s)) to the ldisk (representing the local storage status).
3350

3351
  """
3352
  lu.cfg.SetDiskID(dev, node)
3353

    
3354
  result = True
3355

    
3356
  if on_primary or dev.AssembleOnSecondary():
3357
    rstats = lu.rpc.call_blockdev_find(node, dev)
3358
    msg = rstats.fail_msg
3359
    if msg:
3360
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
3361
      result = False
3362
    elif not rstats.payload:
3363
      lu.LogWarning("Can't find disk on node %s", node)
3364
      result = False
3365
    else:
3366
      if ldisk:
3367
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
3368
      else:
3369
        result = result and not rstats.payload.is_degraded
3370

    
3371
  if dev.children:
3372
    for child in dev.children:
3373
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
3374

    
3375
  return result
3376

    
3377

    
3378
class LUOobCommand(NoHooksLU):
3379
  """Logical unit for OOB handling.
3380

3381
  """
3382
  REG_BGL = False
3383
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
3384

    
3385
  def CheckPrereq(self):
3386
    """Check prerequisites.
3387

3388
    This checks:
3389
     - the node exists in the configuration
3390
     - OOB is supported
3391

3392
    Any errors are signaled by raising errors.OpPrereqError.
3393

3394
    """
3395
    self.nodes = []
3396
    self.master_node = self.cfg.GetMasterNode()
3397

    
3398
    assert self.op.power_delay >= 0.0
3399

    
3400
    if self.op.node_names:
3401
      if self.op.command in self._SKIP_MASTER:
3402
        if self.master_node in self.op.node_names:
3403
          master_node_obj = self.cfg.GetNodeInfo(self.master_node)
3404
          master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
3405

    
3406
          if master_oob_handler:
3407
            additional_text = ("Run '%s %s %s' if you want to operate on the"
3408
                               " master regardless") % (master_oob_handler,
3409
                                                        self.op.command,
3410
                                                        self.master_node)
3411
          else:
3412
            additional_text = "The master node does not support out-of-band"
3413

    
3414
          raise errors.OpPrereqError(("Operating on the master node %s is not"
3415
                                      " allowed for %s\n%s") %
3416
                                     (self.master_node, self.op.command,
3417
                                      additional_text), errors.ECODE_INVAL)
3418
    else:
3419
      self.op.node_names = self.cfg.GetNodeList()
3420
      if self.op.command in self._SKIP_MASTER:
3421
        self.op.node_names.remove(self.master_node)
3422

    
3423
    if self.op.command in self._SKIP_MASTER:
3424
      assert self.master_node not in self.op.node_names
3425

    
3426
    for node_name in self.op.node_names:
3427
      node = self.cfg.GetNodeInfo(node_name)
3428

    
3429
      if node is None:
3430
        raise errors.OpPrereqError("Node %s not found" % node_name,
3431
                                   errors.ECODE_NOENT)
3432
      else:
3433
        self.nodes.append(node)
3434

    
3435
      if (not self.op.ignore_status and
3436
          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
3437
        raise errors.OpPrereqError(("Cannot power off node %s because it is"
3438
                                    " not marked offline") % node_name,
3439
                                   errors.ECODE_STATE)
3440

    
3441
  def ExpandNames(self):
3442
    """Gather locks we need.
3443

3444
    """
3445
    if self.op.node_names:
3446
      self.op.node_names = [_ExpandNodeName(self.cfg, name)
3447
                            for name in self.op.node_names]
3448
      lock_names = self.op.node_names
3449
    else:
3450
      lock_names = locking.ALL_SET
3451

    
3452
    self.needed_locks = {
3453
      locking.LEVEL_NODE: lock_names,
3454
      }
3455

    
3456
  def Exec(self, feedback_fn):
3457
    """Execute OOB and return result if we expect any.
3458

3459
    """
3460
    master_node = self.master_node
3461
    ret = []
3462

    
3463
    for idx, node in enumerate(self.nodes):
3464
      node_entry = [(constants.RS_NORMAL, node.name)]
3465
      ret.append(node_entry)
3466

    
3467
      oob_program = _SupportsOob(self.cfg, node)
3468

    
3469
      if not oob_program:
3470
        node_entry.append((constants.RS_UNAVAIL, None))
3471
        continue
3472

    
3473
      logging.info("Executing out-of-band command '%s' using '%s' on %s",
3474
                   self.op.command, oob_program, node.name)
3475
      result = self.rpc.call_run_oob(master_node, oob_program,
3476
                                     self.op.command, node.name,
3477
                                     self.op.timeout)
3478

    
3479
      if result.fail_msg:
3480
        self.LogWarning("On node '%s' out-of-band RPC failed with: %s",
3481
                        node.name, result.fail_msg)
3482
        node_entry.append((constants.RS_NODATA, None))
3483
      else:
3484
        try:
3485
          self._CheckPayload(result)
3486
        except errors.OpExecError, err:
3487
          self.LogWarning("The payload returned by '%s' is not valid: %s",
3488
                          node.name, err)
3489
          node_entry.append((constants.RS_NODATA, None))
3490
        else:
3491
          if self.op.command == constants.OOB_HEALTH:
3492
            # For health we should log important events
3493
            for item, status in result.payload:
3494
              if status in [constants.OOB_STATUS_WARNING,
3495
                            constants.OOB_STATUS_CRITICAL]:
3496
                self.LogWarning("On node '%s' item '%s' has status '%s'",
3497
                                node.name, item, status)
3498

    
3499
          if self.op.command == constants.OOB_POWER_ON:
3500
            node.powered = True
3501
          elif self.op.command == constants.OOB_POWER_OFF:
3502
            node.powered = False
3503
          elif self.op.command == constants.OOB_POWER_STATUS:
3504
            powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
3505
            if powered != node.powered:
3506
              logging.warning(("Recorded power state (%s) of node '%s' does not"
3507
                               " match actual power state (%s)"), node.powered,
3508
                              node.name, powered)
3509

    
3510
          # For configuration changing commands we should update the node
3511
          if self.op.command in (constants.OOB_POWER_ON,
3512
                                 constants.OOB_POWER_OFF):
3513
            self.cfg.Update(node, feedback_fn)
3514

    
3515
          node_entry.append((constants.RS_NORMAL, result.payload))
3516

    
3517
          if (self.op.command == constants.OOB_POWER_ON and
3518
              idx < len(self.nodes) - 1):
3519
            time.sleep(self.op.power_delay)
3520

    
3521
    return ret
3522

    
3523
  def _CheckPayload(self, result):
3524
    """Checks if the payload is valid.
3525

3526
    @param result: RPC result
3527
    @raises errors.OpExecError: If payload is not valid
3528

3529
    """
3530
    errs = []
3531
    if self.op.command == constants.OOB_HEALTH:
3532
      if not isinstance(result.payload, list):
3533
        errs.append("command 'health' is expected to return a list but got %s" %
3534
                    type(result.payload))
3535
      else:
3536
        for item, status in result.payload:
3537
          if status not in constants.OOB_STATUSES:
3538
            errs.append("health item '%s' has invalid status '%s'" %
3539
                        (item, status))
3540

    
3541
    if self.op.command == constants.OOB_POWER_STATUS:
3542
      if not isinstance(result.payload, dict):
3543
        errs.append("power-status is expected to return a dict but got %s" %
3544
                    type(result.payload))
3545

    
3546
    if self.op.command in [
3547
        constants.OOB_POWER_ON,
3548
        constants.OOB_POWER_OFF,
3549
        constants.OOB_POWER_CYCLE,
3550
        ]:
3551
      if result.payload is not None:
3552
        errs.append("%s is expected to not return payload but got '%s'" %
3553
                    (self.op.command, result.payload))
3554

    
3555
    if errs:
3556
      raise errors.OpExecError("Check of out-of-band payload failed due to %s" %
3557
                               utils.CommaJoin(errs))
3558

    
3559
class _OsQuery(_QueryBase):
3560
  FIELDS = query.OS_FIELDS
3561

    
3562
  def ExpandNames(self, lu):
3563
    # Lock all nodes in shared mode
3564
    # Temporary removal of locks, should be reverted later
3565
    # TODO: reintroduce locks when they are lighter-weight
3566
    lu.needed_locks = {}
3567
    #self.share_locks[locking.LEVEL_NODE] = 1
3568
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3569

    
3570
    # The following variables interact with _QueryBase._GetNames
3571
    if self.names:
3572
      self.wanted = self.names
3573
    else:
3574
      self.wanted = locking.ALL_SET
3575

    
3576
    self.do_locking = self.use_locking
3577

    
3578
  def DeclareLocks(self, lu, level):
3579
    pass
3580

    
3581
  @staticmethod
3582
  def _DiagnoseByOS(rlist):
3583
    """Remaps a per-node return list into an a per-os per-node dictionary
3584

3585
    @param rlist: a map with node names as keys and OS objects as values
3586

3587
    @rtype: dict
3588
    @return: a dictionary with osnames as keys and as value another
3589
        map, with nodes as keys and tuples of (path, status, diagnose,
3590
        variants, parameters, api_versions) as values, eg::
3591

3592
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
3593
                                     (/srv/..., False, "invalid api")],
3594
                           "node2": [(/srv/..., True, "", [], [])]}
3595
          }
3596

3597
    """
3598
    all_os = {}
3599
    # we build here the list of nodes that didn't fail the RPC (at RPC
3600
    # level), so that nodes with a non-responding node daemon don't
3601
    # make all OSes invalid
3602
    good_nodes = [node_name for node_name in rlist
3603
                  if not rlist[node_name].fail_msg]
3604
    for node_name, nr in rlist.items():
3605
      if nr.fail_msg or not nr.payload:
3606
        continue
3607
      for (name, path, status, diagnose, variants,
3608
           params, api_versions) in nr.payload:
3609
        if name not in all_os:
3610
          # build a list of nodes for this os containing empty lists
3611
          # for each node in node_list
3612
          all_os[name] = {}
3613
          for nname in good_nodes:
3614
            all_os[name][nname] = []
3615
        # convert params from [name, help] to (name, help)
3616
        params = [tuple(v) for v in params]
3617
        all_os[name][node_name].append((path, status, diagnose,
3618
                                        variants, params, api_versions))
3619
    return all_os
3620

    
3621
  def _GetQueryData(self, lu):
3622
    """Computes the list of nodes and their attributes.
3623

3624
    """
3625
    # Locking is not used
3626
    assert not (lu.acquired_locks or self.do_locking or self.use_locking)
3627

    
3628
    valid_nodes = [node.name
3629
                   for node in lu.cfg.GetAllNodesInfo().values()
3630
                   if not node.offline and node.vm_capable]
3631
    pol = self._DiagnoseByOS(lu.rpc.call_os_diagnose(valid_nodes))
3632
    cluster = lu.cfg.GetClusterInfo()
3633

    
3634
    data = {}
3635

    
3636
    for (os_name, os_data) in pol.items():
3637
      info = query.OsInfo(name=os_name, valid=True, node_status=os_data,
3638
                          hidden=(os_name in cluster.hidden_os),
3639
                          blacklisted=(os_name in cluster.blacklisted_os))
3640

    
3641
      variants = set()
3642
      parameters = set()
3643
      api_versions = set()
3644

    
3645
      for idx, osl in enumerate(os_data.values()):
3646
        info.valid = bool(info.valid and osl and osl[0][1])
3647
        if not info.valid:
3648
          break
3649

    
3650
        (node_variants, node_params, node_api) = osl[0][3:6]
3651
        if idx == 0:
3652
          # First entry
3653
          variants.update(node_variants)
3654
          parameters.update(node_params)
3655
          api_versions.update(node_api)
3656
        else:
3657
          # Filter out inconsistent values
3658
          variants.intersection_update(node_variants)
3659
          parameters.intersection_update(node_params)
3660
          api_versions.intersection_update(node_api)
3661

    
3662
      info.variants = list(variants)
3663
      info.parameters = list(parameters)
3664
      info.api_versions = list(api_versions)
3665

    
3666
      data[os_name] = info
3667

    
3668
    # Prepare data in requested order
3669
    return [data[name] for name in self._GetNames(lu, pol.keys(), None)
3670
            if name in data]
3671

    
3672

    
3673
class LUOsDiagnose(NoHooksLU):
3674
  """Logical unit for OS diagnose/query.
3675

3676
  """
3677
  REQ_BGL = False
3678

    
3679
  @staticmethod
3680
  def _BuildFilter(fields, names):
3681
    """Builds a filter for querying OSes.
3682

3683
    """
3684
    name_filter = qlang.MakeSimpleFilter("name", names)
3685

    
3686
    # Legacy behaviour: Hide hidden, blacklisted or invalid OSes if the
3687
    # respective field is not requested
3688
    status_filter = [[qlang.OP_NOT, [qlang.OP_TRUE, fname]]
3689
                     for fname in ["hidden", "blacklisted"]
3690
                     if fname not in fields]
3691
    if "valid" not in fields:
3692
      status_filter.append([qlang.OP_TRUE, "valid"])
3693

    
3694
    if status_filter:
3695
      status_filter.insert(0, qlang.OP_AND)
3696
    else:
3697
      status_filter = None
3698

    
3699
    if name_filter and status_filter:
3700
      return [qlang.OP_AND, name_filter, status_filter]
3701
    elif name_filter:
3702
      return name_filter
3703
    else:
3704
      return status_filter
3705

    
3706
  def CheckArguments(self):
3707
    self.oq = _OsQuery(self._BuildFilter(self.op.output_fields, self.op.names),
3708
                       self.op.output_fields, False)
3709

    
3710
  def ExpandNames(self):
3711
    self.oq.ExpandNames(self)
3712

    
3713
  def Exec(self, feedback_fn):
3714
    return self.oq.OldStyleQuery(self)
3715

    
3716

    
3717
class LUNodeRemove(LogicalUnit):
3718
  """Logical unit for removing a node.
3719

3720
  """
3721
  HPATH = "node-remove"
3722
  HTYPE = constants.HTYPE_NODE
3723

    
3724
  def BuildHooksEnv(self):
3725
    """Build hooks env.
3726

3727
    This doesn't run on the target node in the pre phase as a failed
3728
    node would then be impossible to remove.
3729

3730
    """
3731
    return {
3732
      "OP_TARGET": self.op.node_name,
3733
      "NODE_NAME": self.op.node_name,
3734
      }
3735

    
3736
  def BuildHooksNodes(self):
3737
    """Build hooks nodes.
3738

3739
    """
3740
    all_nodes = self.cfg.GetNodeList()
3741
    try:
3742
      all_nodes.remove(self.op.node_name)
3743
    except ValueError:
3744
      logging.warning("Node '%s', which is about to be removed, was not found"
3745
                      " in the list of all nodes", self.op.node_name)
3746
    return (all_nodes, all_nodes)
3747

    
3748
  def CheckPrereq(self):
3749
    """Check prerequisites.
3750

3751
    This checks:
3752
     - the node exists in the configuration
3753
     - it does not have primary or secondary instances
3754
     - it's not the master
3755

3756
    Any errors are signaled by raising errors.OpPrereqError.
3757

3758
    """
3759
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3760
    node = self.cfg.GetNodeInfo(self.op.node_name)
3761
    assert node is not None
3762

    
3763
    instance_list = self.cfg.GetInstanceList()
3764

    
3765
    masternode = self.cfg.GetMasterNode()
3766
    if node.name == masternode:
3767
      raise errors.OpPrereqError("Node is the master node,"
3768
                                 " you need to failover first.",
3769
                                 errors.ECODE_INVAL)
3770

    
3771
    for instance_name in instance_list:
3772
      instance = self.cfg.GetInstanceInfo(instance_name)
3773
      if node.name in instance.all_nodes:
3774
        raise errors.OpPrereqError("Instance %s is still running on the node,"
3775
                                   " please remove first." % instance_name,
3776
                                   errors.ECODE_INVAL)
3777
    self.op.node_name = node.name
3778
    self.node = node
3779

    
3780
  def Exec(self, feedback_fn):
3781
    """Removes the node from the cluster.
3782

3783
    """
3784
    node = self.node
3785
    logging.info("Stopping the node daemon and removing configs from node %s",
3786
                 node.name)
3787

    
3788
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
3789

    
3790
    # Promote nodes to master candidate as needed
3791
    _AdjustCandidatePool(self, exceptions=[node.name])
3792
    self.context.RemoveNode(node.name)
3793

    
3794
    # Run post hooks on the node before it's removed
3795
    _RunPostHook(self, node.name)
3796

    
3797
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
3798
    msg = result.fail_msg
3799
    if msg:
3800
      self.LogWarning("Errors encountered on the remote node while leaving"
3801
                      " the cluster: %s", msg)
3802

    
3803
    # Remove node from our /etc/hosts
3804
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3805
      master_node = self.cfg.GetMasterNode()
3806
      result = self.rpc.call_etc_hosts_modify(master_node,
3807
                                              constants.ETC_HOSTS_REMOVE,
3808
                                              node.name, None)
3809
      result.Raise("Can't update hosts file with new host data")
3810
      _RedistributeAncillaryFiles(self)
3811

    
3812

    
3813
class _NodeQuery(_QueryBase):
3814
  FIELDS = query.NODE_FIELDS
3815

    
3816
  def ExpandNames(self, lu):
3817
    lu.needed_locks = {}
3818
    lu.share_locks[locking.LEVEL_NODE] = 1
3819

    
3820
    if self.names:
3821
      self.wanted = _GetWantedNodes(lu, self.names)
3822
    else:
3823
      self.wanted = locking.ALL_SET
3824

    
3825
    self.do_locking = (self.use_locking and
3826
                       query.NQ_LIVE in self.requested_data)
3827

    
3828
    if self.do_locking:
3829
      # if we don't request only static fields, we need to lock the nodes
3830
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
3831

    
3832
  def DeclareLocks(self, lu, level):
3833
    pass
3834

    
3835
  def _GetQueryData(self, lu):
3836
    """Computes the list of nodes and their attributes.
3837

3838
    """
3839
    all_info = lu.cfg.GetAllNodesInfo()
3840

    
3841
    nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
3842

    
3843
    # Gather data as requested
3844
    if query.NQ_LIVE in self.requested_data:
3845
      # filter out non-vm_capable nodes
3846
      toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
3847

    
3848
      node_data = lu.rpc.call_node_info(toquery_nodes, lu.cfg.GetVGName(),
3849
                                        lu.cfg.GetHypervisorType())
3850
      live_data = dict((name, nresult.payload)
3851
                       for (name, nresult) in node_data.items()
3852
                       if not nresult.fail_msg and nresult.payload)
3853
    else:
3854
      live_data = None
3855

    
3856
    if query.NQ_INST in self.requested_data:
3857
      node_to_primary = dict([(name, set()) for name in nodenames])
3858
      node_to_secondary = dict([(name, set()) for name in nodenames])
3859

    
3860
      inst_data = lu.cfg.GetAllInstancesInfo()
3861

    
3862
      for inst in inst_data.values():
3863
        if inst.primary_node in node_to_primary:
3864
          node_to_primary[inst.primary_node].add(inst.name)
3865
        for secnode in inst.secondary_nodes:
3866
          if secnode in node_to_secondary:
3867
            node_to_secondary[secnode].add(inst.name)
3868
    else:
3869
      node_to_primary = None
3870
      node_to_secondary = None
3871

    
3872
    if query.NQ_OOB in self.requested_data:
3873
      oob_support = dict((name, bool(_SupportsOob(lu.cfg, node)))
3874
                         for name, node in all_info.iteritems())
3875
    else:
3876
      oob_support = None
3877

    
3878
    if query.NQ_GROUP in self.requested_data:
3879
      groups = lu.cfg.GetAllNodeGroupsInfo()
3880
    else:
3881
      groups = {}
3882

    
3883
    return query.NodeQueryData([all_info[name] for name in nodenames],
3884
                               live_data, lu.cfg.GetMasterNode(),
3885
                               node_to_primary, node_to_secondary, groups,
3886
                               oob_support, lu.cfg.GetClusterInfo())
3887

    
3888

    
3889
class LUNodeQuery(NoHooksLU):
3890
  """Logical unit for querying nodes.
3891

3892
  """
3893
  # pylint: disable-msg=W0142
3894
  REQ_BGL = False
3895

    
3896
  def CheckArguments(self):
3897
    self.nq = _NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
3898
                         self.op.output_fields, self.op.use_locking)
3899

    
3900
  def ExpandNames(self):
3901
    self.nq.ExpandNames(self)
3902

    
3903
  def Exec(self, feedback_fn):
3904
    return self.nq.OldStyleQuery(self)
3905

    
3906

    
3907
class LUNodeQueryvols(NoHooksLU):
3908
  """Logical unit for getting volumes on node(s).
3909

3910
  """
3911
  REQ_BGL = False
3912
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
3913
  _FIELDS_STATIC = utils.FieldSet("node")
3914

    
3915
  def CheckArguments(self):
3916
    _CheckOutputFields(static=self._FIELDS_STATIC,
3917
                       dynamic=self._FIELDS_DYNAMIC,
3918
                       selected=self.op.output_fields)
3919

    
3920
  def ExpandNames(self):
3921
    self.needed_locks = {}
3922
    self.share_locks[locking.LEVEL_NODE] = 1
3923
    if not self.op.nodes:
3924
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3925
    else:
3926
      self.needed_locks[locking.LEVEL_NODE] = \
3927
        _GetWantedNodes(self, self.op.nodes)
3928

    
3929
  def Exec(self, feedback_fn):
3930
    """Computes the list of nodes and their attributes.
3931

3932
    """
3933
    nodenames = self.acquired_locks[locking.LEVEL_NODE]
3934
    volumes = self.rpc.call_node_volumes(nodenames)
3935

    
3936
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3937
             in self.cfg.GetInstanceList()]
3938

    
3939
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
3940

    
3941
    output = []
3942
    for node in nodenames:
3943
      nresult = volumes[node]
3944
      if nresult.offline:
3945
        continue
3946
      msg = nresult.fail_msg
3947
      if msg:
3948
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
3949
        continue
3950

    
3951
      node_vols = nresult.payload[:]
3952
      node_vols.sort(key=lambda vol: vol['dev'])
3953

    
3954
      for vol in node_vols:
3955
        node_output = []
3956
        for field in self.op.output_fields:
3957
          if field == "node":
3958
            val = node
3959
          elif field == "phys":
3960
            val = vol['dev']
3961
          elif field == "vg":
3962
            val = vol['vg']
3963
          elif field == "name":
3964
            val = vol['name']
3965
          elif field == "size":
3966
            val = int(float(vol['size']))
3967
          elif field == "instance":
3968
            for inst in ilist:
3969
              if node not in lv_by_node[inst]:
3970
                continue
3971
              if vol['name'] in lv_by_node[inst][node]:
3972
                val = inst.name
3973
                break
3974
            else:
3975
              val = '-'
3976
          else:
3977
            raise errors.ParameterError(field)
3978
          node_output.append(str(val))
3979

    
3980
        output.append(node_output)
3981

    
3982
    return output
3983

    
3984

    
3985
class LUNodeQueryStorage(NoHooksLU):
3986
  """Logical unit for getting information on storage units on node(s).
3987

3988
  """
3989
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
3990
  REQ_BGL = False
3991

    
3992
  def CheckArguments(self):
3993
    _CheckOutputFields(static=self._FIELDS_STATIC,
3994
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
3995
                       selected=self.op.output_fields)
3996

    
3997
  def ExpandNames(self):
3998
    self.needed_locks = {}
3999
    self.share_locks[locking.LEVEL_NODE] = 1
4000

    
4001
    if self.op.nodes:
4002
      self.needed_locks[locking.LEVEL_NODE] = \
4003
        _GetWantedNodes(self, self.op.nodes)
4004
    else:
4005
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4006

    
4007
  def Exec(self, feedback_fn):
4008
    """Computes the list of nodes and their attributes.
4009

4010
    """
4011
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4012

    
4013
    # Always get name to sort by
4014
    if constants.SF_NAME in self.op.output_fields:
4015
      fields = self.op.output_fields[:]
4016
    else:
4017
      fields = [constants.SF_NAME] + self.op.output_fields
4018

    
4019
    # Never ask for node or type as it's only known to the LU
4020
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
4021
      while extra in fields:
4022
        fields.remove(extra)
4023

    
4024
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
4025
    name_idx = field_idx[constants.SF_NAME]
4026

    
4027
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
4028
    data = self.rpc.call_storage_list(self.nodes,
4029
                                      self.op.storage_type, st_args,
4030
                                      self.op.name, fields)
4031

    
4032
    result = []
4033

    
4034
    for node in utils.NiceSort(self.nodes):
4035
      nresult = data[node]
4036
      if nresult.offline:
4037
        continue
4038

    
4039
      msg = nresult.fail_msg
4040
      if msg:
4041
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
4042
        continue
4043

    
4044
      rows = dict([(row[name_idx], row) for row in nresult.payload])
4045

    
4046
      for name in utils.NiceSort(rows.keys()):
4047
        row = rows[name]
4048

    
4049
        out = []
4050

    
4051
        for field in self.op.output_fields:
4052
          if field == constants.SF_NODE:
4053
            val = node
4054
          elif field == constants.SF_TYPE:
4055
            val = self.op.storage_type
4056
          elif field in field_idx:
4057
            val = row[field_idx[field]]
4058
          else:
4059
            raise errors.ParameterError(field)
4060

    
4061
          out.append(val)
4062

    
4063
        result.append(out)
4064

    
4065
    return result
4066

    
4067

    
4068
class _InstanceQuery(_QueryBase):
4069
  FIELDS = query.INSTANCE_FIELDS
4070

    
4071
  def ExpandNames(self, lu):
4072
    lu.needed_locks = {}
4073
    lu.share_locks[locking.LEVEL_INSTANCE] = 1
4074
    lu.share_locks[locking.LEVEL_NODE] = 1
4075

    
4076
    if self.names:
4077
      self.wanted = _GetWantedInstances(lu, self.names)
4078
    else:
4079
      self.wanted = locking.ALL_SET
4080

    
4081
    self.do_locking = (self.use_locking and
4082
                       query.IQ_LIVE in self.requested_data)
4083
    if self.do_locking: