Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 7ee045a0

History | View | Annotate | Download (425.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay to many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42
import itertools
43

    
44
from ganeti import ssh
45
from ganeti import utils
46
from ganeti import errors
47
from ganeti import hypervisor
48
from ganeti import locking
49
from ganeti import constants
50
from ganeti import objects
51
from ganeti import serializer
52
from ganeti import ssconf
53
from ganeti import uidpool
54
from ganeti import compat
55
from ganeti import masterd
56
from ganeti import netutils
57
from ganeti import query
58
from ganeti import qlang
59
from ganeti import opcodes
60

    
61
import ganeti.masterd.instance # pylint: disable-msg=W0611
62

    
63

    
64
def _SupportsOob(cfg, node):
65
  """Tells if node supports OOB.
66

67
  @type cfg: L{config.ConfigWriter}
68
  @param cfg: The cluster configuration
69
  @type node: L{objects.Node}
70
  @param node: The node
71
  @return: The OOB script if supported or an empty string otherwise
72

73
  """
74
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
75

    
76

    
77
class ResultWithJobs:
78
  """Data container for LU results with jobs.
79

80
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
81
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
82
  contained in the C{jobs} attribute and include the job IDs in the opcode
83
  result.
84

85
  """
86
  def __init__(self, jobs, **kwargs):
87
    """Initializes this class.
88

89
    Additional return values can be specified as keyword arguments.
90

91
    @type jobs: list of lists of L{opcode.OpCode}
92
    @param jobs: A list of lists of opcode objects
93

94
    """
95
    self.jobs = jobs
96
    self.other = kwargs
97

    
98

    
99
class LogicalUnit(object):
100
  """Logical Unit base class.
101

102
  Subclasses must follow these rules:
103
    - implement ExpandNames
104
    - implement CheckPrereq (except when tasklets are used)
105
    - implement Exec (except when tasklets are used)
106
    - implement BuildHooksEnv
107
    - implement BuildHooksNodes
108
    - redefine HPATH and HTYPE
109
    - optionally redefine their run requirements:
110
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
111

112
  Note that all commands require root permissions.
113

114
  @ivar dry_run_result: the value (if any) that will be returned to the caller
115
      in dry-run mode (signalled by opcode dry_run parameter)
116

117
  """
118
  HPATH = None
119
  HTYPE = None
120
  REQ_BGL = True
121

    
122
  def __init__(self, processor, op, context, rpc):
123
    """Constructor for LogicalUnit.
124

125
    This needs to be overridden in derived classes in order to check op
126
    validity.
127

128
    """
129
    self.proc = processor
130
    self.op = op
131
    self.cfg = context.cfg
132
    self.glm = context.glm
133
    self.context = context
134
    self.rpc = rpc
135
    # Dicts used to declare locking needs to mcpu
136
    self.needed_locks = None
137
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
138
    self.add_locks = {}
139
    self.remove_locks = {}
140
    # Used to force good behavior when calling helper functions
141
    self.recalculate_locks = {}
142
    # logging
143
    self.Log = processor.Log # pylint: disable-msg=C0103
144
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
145
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
146
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
147
    # support for dry-run
148
    self.dry_run_result = None
149
    # support for generic debug attribute
150
    if (not hasattr(self.op, "debug_level") or
151
        not isinstance(self.op.debug_level, int)):
152
      self.op.debug_level = 0
153

    
154
    # Tasklets
155
    self.tasklets = None
156

    
157
    # Validate opcode parameters and set defaults
158
    self.op.Validate(True)
159

    
160
    self.CheckArguments()
161

    
162
  def CheckArguments(self):
163
    """Check syntactic validity for the opcode arguments.
164

165
    This method is for doing a simple syntactic check and ensure
166
    validity of opcode parameters, without any cluster-related
167
    checks. While the same can be accomplished in ExpandNames and/or
168
    CheckPrereq, doing these separate is better because:
169

170
      - ExpandNames is left as as purely a lock-related function
171
      - CheckPrereq is run after we have acquired locks (and possible
172
        waited for them)
173

174
    The function is allowed to change the self.op attribute so that
175
    later methods can no longer worry about missing parameters.
176

177
    """
178
    pass
179

    
180
  def ExpandNames(self):
181
    """Expand names for this LU.
182

183
    This method is called before starting to execute the opcode, and it should
184
    update all the parameters of the opcode to their canonical form (e.g. a
185
    short node name must be fully expanded after this method has successfully
186
    completed). This way locking, hooks, logging, etc. can work correctly.
187

188
    LUs which implement this method must also populate the self.needed_locks
189
    member, as a dict with lock levels as keys, and a list of needed lock names
190
    as values. Rules:
191

192
      - use an empty dict if you don't need any lock
193
      - if you don't need any lock at a particular level omit that level
194
      - don't put anything for the BGL level
195
      - if you want all locks at a level use locking.ALL_SET as a value
196

197
    If you need to share locks (rather than acquire them exclusively) at one
198
    level you can modify self.share_locks, setting a true value (usually 1) for
199
    that level. By default locks are not shared.
200

201
    This function can also define a list of tasklets, which then will be
202
    executed in order instead of the usual LU-level CheckPrereq and Exec
203
    functions, if those are not defined by the LU.
204

205
    Examples::
206

207
      # Acquire all nodes and one instance
208
      self.needed_locks = {
209
        locking.LEVEL_NODE: locking.ALL_SET,
210
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
211
      }
212
      # Acquire just two nodes
213
      self.needed_locks = {
214
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
215
      }
216
      # Acquire no locks
217
      self.needed_locks = {} # No, you can't leave it to the default value None
218

219
    """
220
    # The implementation of this method is mandatory only if the new LU is
221
    # concurrent, so that old LUs don't need to be changed all at the same
222
    # time.
223
    if self.REQ_BGL:
224
      self.needed_locks = {} # Exclusive LUs don't need locks.
225
    else:
226
      raise NotImplementedError
227

    
228
  def DeclareLocks(self, level):
229
    """Declare LU locking needs for a level
230

231
    While most LUs can just declare their locking needs at ExpandNames time,
232
    sometimes there's the need to calculate some locks after having acquired
233
    the ones before. This function is called just before acquiring locks at a
234
    particular level, but after acquiring the ones at lower levels, and permits
235
    such calculations. It can be used to modify self.needed_locks, and by
236
    default it does nothing.
237

238
    This function is only called if you have something already set in
239
    self.needed_locks for the level.
240

241
    @param level: Locking level which is going to be locked
242
    @type level: member of ganeti.locking.LEVELS
243

244
    """
245

    
246
  def CheckPrereq(self):
247
    """Check prerequisites for this LU.
248

249
    This method should check that the prerequisites for the execution
250
    of this LU are fulfilled. It can do internode communication, but
251
    it should be idempotent - no cluster or system changes are
252
    allowed.
253

254
    The method should raise errors.OpPrereqError in case something is
255
    not fulfilled. Its return value is ignored.
256

257
    This method should also update all the parameters of the opcode to
258
    their canonical form if it hasn't been done by ExpandNames before.
259

260
    """
261
    if self.tasklets is not None:
262
      for (idx, tl) in enumerate(self.tasklets):
263
        logging.debug("Checking prerequisites for tasklet %s/%s",
264
                      idx + 1, len(self.tasklets))
265
        tl.CheckPrereq()
266
    else:
267
      pass
268

    
269
  def Exec(self, feedback_fn):
270
    """Execute the LU.
271

272
    This method should implement the actual work. It should raise
273
    errors.OpExecError for failures that are somewhat dealt with in
274
    code, or expected.
275

276
    """
277
    if self.tasklets is not None:
278
      for (idx, tl) in enumerate(self.tasklets):
279
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
280
        tl.Exec(feedback_fn)
281
    else:
282
      raise NotImplementedError
283

    
284
  def BuildHooksEnv(self):
285
    """Build hooks environment for this LU.
286

287
    @rtype: dict
288
    @return: Dictionary containing the environment that will be used for
289
      running the hooks for this LU. The keys of the dict must not be prefixed
290
      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
291
      will extend the environment with additional variables. If no environment
292
      should be defined, an empty dictionary should be returned (not C{None}).
293
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
294
      will not be called.
295

296
    """
297
    raise NotImplementedError
298

    
299
  def BuildHooksNodes(self):
300
    """Build list of nodes to run LU's hooks.
301

302
    @rtype: tuple; (list, list)
303
    @return: Tuple containing a list of node names on which the hook
304
      should run before the execution and a list of node names on which the
305
      hook should run after the execution. No nodes should be returned as an
306
      empty list (and not None).
307
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
308
      will not be called.
309

310
    """
311
    raise NotImplementedError
312

    
313
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
314
    """Notify the LU about the results of its hooks.
315

316
    This method is called every time a hooks phase is executed, and notifies
317
    the Logical Unit about the hooks' result. The LU can then use it to alter
318
    its result based on the hooks.  By default the method does nothing and the
319
    previous result is passed back unchanged but any LU can define it if it
320
    wants to use the local cluster hook-scripts somehow.
321

322
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
323
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
324
    @param hook_results: the results of the multi-node hooks rpc call
325
    @param feedback_fn: function used send feedback back to the caller
326
    @param lu_result: the previous Exec result this LU had, or None
327
        in the PRE phase
328
    @return: the new Exec result, based on the previous result
329
        and hook results
330

331
    """
332
    # API must be kept, thus we ignore the unused argument and could
333
    # be a function warnings
334
    # pylint: disable-msg=W0613,R0201
335
    return lu_result
336

    
337
  def _ExpandAndLockInstance(self):
338
    """Helper function to expand and lock an instance.
339

340
    Many LUs that work on an instance take its name in self.op.instance_name
341
    and need to expand it and then declare the expanded name for locking. This
342
    function does it, and then updates self.op.instance_name to the expanded
343
    name. It also initializes needed_locks as a dict, if this hasn't been done
344
    before.
345

346
    """
347
    if self.needed_locks is None:
348
      self.needed_locks = {}
349
    else:
350
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
351
        "_ExpandAndLockInstance called with instance-level locks set"
352
    self.op.instance_name = _ExpandInstanceName(self.cfg,
353
                                                self.op.instance_name)
354
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
355

    
356
  def _LockInstancesNodes(self, primary_only=False):
357
    """Helper function to declare instances' nodes for locking.
358

359
    This function should be called after locking one or more instances to lock
360
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
361
    with all primary or secondary nodes for instances already locked and
362
    present in self.needed_locks[locking.LEVEL_INSTANCE].
363

364
    It should be called from DeclareLocks, and for safety only works if
365
    self.recalculate_locks[locking.LEVEL_NODE] is set.
366

367
    In the future it may grow parameters to just lock some instance's nodes, or
368
    to just lock primaries or secondary nodes, if needed.
369

370
    If should be called in DeclareLocks in a way similar to::
371

372
      if level == locking.LEVEL_NODE:
373
        self._LockInstancesNodes()
374

375
    @type primary_only: boolean
376
    @param primary_only: only lock primary nodes of locked instances
377

378
    """
379
    assert locking.LEVEL_NODE in self.recalculate_locks, \
380
      "_LockInstancesNodes helper function called with no nodes to recalculate"
381

    
382
    # TODO: check if we're really been called with the instance locks held
383

    
384
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
385
    # future we might want to have different behaviors depending on the value
386
    # of self.recalculate_locks[locking.LEVEL_NODE]
387
    wanted_nodes = []
388
    for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
389
      instance = self.context.cfg.GetInstanceInfo(instance_name)
390
      wanted_nodes.append(instance.primary_node)
391
      if not primary_only:
392
        wanted_nodes.extend(instance.secondary_nodes)
393

    
394
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
395
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
396
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
397
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
398

    
399
    del self.recalculate_locks[locking.LEVEL_NODE]
400

    
401

    
402
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
403
  """Simple LU which runs no hooks.
404

405
  This LU is intended as a parent for other LogicalUnits which will
406
  run no hooks, in order to reduce duplicate code.
407

408
  """
409
  HPATH = None
410
  HTYPE = None
411

    
412
  def BuildHooksEnv(self):
413
    """Empty BuildHooksEnv for NoHooksLu.
414

415
    This just raises an error.
416

417
    """
418
    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
419

    
420
  def BuildHooksNodes(self):
421
    """Empty BuildHooksNodes for NoHooksLU.
422

423
    """
424
    raise AssertionError("BuildHooksNodes called for NoHooksLU")
425

    
426

    
427
class Tasklet:
428
  """Tasklet base class.
429

430
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
431
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
432
  tasklets know nothing about locks.
433

434
  Subclasses must follow these rules:
435
    - Implement CheckPrereq
436
    - Implement Exec
437

438
  """
439
  def __init__(self, lu):
440
    self.lu = lu
441

    
442
    # Shortcuts
443
    self.cfg = lu.cfg
444
    self.rpc = lu.rpc
445

    
446
  def CheckPrereq(self):
447
    """Check prerequisites for this tasklets.
448

449
    This method should check whether the prerequisites for the execution of
450
    this tasklet are fulfilled. It can do internode communication, but it
451
    should be idempotent - no cluster or system changes are allowed.
452

453
    The method should raise errors.OpPrereqError in case something is not
454
    fulfilled. Its return value is ignored.
455

456
    This method should also update all parameters to their canonical form if it
457
    hasn't been done before.
458

459
    """
460
    pass
461

    
462
  def Exec(self, feedback_fn):
463
    """Execute the tasklet.
464

465
    This method should implement the actual work. It should raise
466
    errors.OpExecError for failures that are somewhat dealt with in code, or
467
    expected.
468

469
    """
470
    raise NotImplementedError
471

    
472

    
473
class _QueryBase:
474
  """Base for query utility classes.
475

476
  """
477
  #: Attribute holding field definitions
478
  FIELDS = None
479

    
480
  def __init__(self, filter_, fields, use_locking):
481
    """Initializes this class.
482

483
    """
484
    self.use_locking = use_locking
485

    
486
    self.query = query.Query(self.FIELDS, fields, filter_=filter_,
487
                             namefield="name")
488
    self.requested_data = self.query.RequestedData()
489
    self.names = self.query.RequestedNames()
490

    
491
    # Sort only if no names were requested
492
    self.sort_by_name = not self.names
493

    
494
    self.do_locking = None
495
    self.wanted = None
496

    
497
  def _GetNames(self, lu, all_names, lock_level):
498
    """Helper function to determine names asked for in the query.
499

500
    """
501
    if self.do_locking:
502
      names = lu.glm.list_owned(lock_level)
503
    else:
504
      names = all_names
505

    
506
    if self.wanted == locking.ALL_SET:
507
      assert not self.names
508
      # caller didn't specify names, so ordering is not important
509
      return utils.NiceSort(names)
510

    
511
    # caller specified names and we must keep the same order
512
    assert self.names
513
    assert not self.do_locking or lu.glm.is_owned(lock_level)
514

    
515
    missing = set(self.wanted).difference(names)
516
    if missing:
517
      raise errors.OpExecError("Some items were removed before retrieving"
518
                               " their data: %s" % missing)
519

    
520
    # Return expanded names
521
    return self.wanted
522

    
523
  def ExpandNames(self, lu):
524
    """Expand names for this query.
525

526
    See L{LogicalUnit.ExpandNames}.
527

528
    """
529
    raise NotImplementedError()
530

    
531
  def DeclareLocks(self, lu, level):
532
    """Declare locks for this query.
533

534
    See L{LogicalUnit.DeclareLocks}.
535

536
    """
537
    raise NotImplementedError()
538

    
539
  def _GetQueryData(self, lu):
540
    """Collects all data for this query.
541

542
    @return: Query data object
543

544
    """
545
    raise NotImplementedError()
546

    
547
  def NewStyleQuery(self, lu):
548
    """Collect data and execute query.
549

550
    """
551
    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
552
                                  sort_by_name=self.sort_by_name)
553

    
554
  def OldStyleQuery(self, lu):
555
    """Collect data and execute query.
556

557
    """
558
    return self.query.OldStyleQuery(self._GetQueryData(lu),
559
                                    sort_by_name=self.sort_by_name)
560

    
561

    
562
def _GetWantedNodes(lu, nodes):
563
  """Returns list of checked and expanded node names.
564

565
  @type lu: L{LogicalUnit}
566
  @param lu: the logical unit on whose behalf we execute
567
  @type nodes: list
568
  @param nodes: list of node names or None for all nodes
569
  @rtype: list
570
  @return: the list of nodes, sorted
571
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
572

573
  """
574
  if nodes:
575
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
576

    
577
  return utils.NiceSort(lu.cfg.GetNodeList())
578

    
579

    
580
def _GetWantedInstances(lu, instances):
581
  """Returns list of checked and expanded instance names.
582

583
  @type lu: L{LogicalUnit}
584
  @param lu: the logical unit on whose behalf we execute
585
  @type instances: list
586
  @param instances: list of instance names or None for all instances
587
  @rtype: list
588
  @return: the list of instances, sorted
589
  @raise errors.OpPrereqError: if the instances parameter is wrong type
590
  @raise errors.OpPrereqError: if any of the passed instances is not found
591

592
  """
593
  if instances:
594
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
595
  else:
596
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
597
  return wanted
598

    
599

    
600
def _GetUpdatedParams(old_params, update_dict,
601
                      use_default=True, use_none=False):
602
  """Return the new version of a parameter dictionary.
603

604
  @type old_params: dict
605
  @param old_params: old parameters
606
  @type update_dict: dict
607
  @param update_dict: dict containing new parameter values, or
608
      constants.VALUE_DEFAULT to reset the parameter to its default
609
      value
610
  @param use_default: boolean
611
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
612
      values as 'to be deleted' values
613
  @param use_none: boolean
614
  @type use_none: whether to recognise C{None} values as 'to be
615
      deleted' values
616
  @rtype: dict
617
  @return: the new parameter dictionary
618

619
  """
620
  params_copy = copy.deepcopy(old_params)
621
  for key, val in update_dict.iteritems():
622
    if ((use_default and val == constants.VALUE_DEFAULT) or
623
        (use_none and val is None)):
624
      try:
625
        del params_copy[key]
626
      except KeyError:
627
        pass
628
    else:
629
      params_copy[key] = val
630
  return params_copy
631

    
632

    
633
def _ReleaseLocks(lu, level, names=None, keep=None):
634
  """Releases locks owned by an LU.
635

636
  @type lu: L{LogicalUnit}
637
  @param level: Lock level
638
  @type names: list or None
639
  @param names: Names of locks to release
640
  @type keep: list or None
641
  @param keep: Names of locks to retain
642

643
  """
644
  assert not (keep is not None and names is not None), \
645
         "Only one of the 'names' and the 'keep' parameters can be given"
646

    
647
  if names is not None:
648
    should_release = names.__contains__
649
  elif keep:
650
    should_release = lambda name: name not in keep
651
  else:
652
    should_release = None
653

    
654
  if should_release:
655
    retain = []
656
    release = []
657

    
658
    # Determine which locks to release
659
    for name in lu.glm.list_owned(level):
660
      if should_release(name):
661
        release.append(name)
662
      else:
663
        retain.append(name)
664

    
665
    assert len(lu.glm.list_owned(level)) == (len(retain) + len(release))
666

    
667
    # Release just some locks
668
    lu.glm.release(level, names=release)
669

    
670
    assert frozenset(lu.glm.list_owned(level)) == frozenset(retain)
671
  else:
672
    # Release everything
673
    lu.glm.release(level)
674

    
675
    assert not lu.glm.is_owned(level), "No locks should be owned"
676

    
677

    
678
def _RunPostHook(lu, node_name):
679
  """Runs the post-hook for an opcode on a single node.
680

681
  """
682
  hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu)
683
  try:
684
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
685
  except:
686
    # pylint: disable-msg=W0702
687
    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
688

    
689

    
690
def _CheckOutputFields(static, dynamic, selected):
691
  """Checks whether all selected fields are valid.
692

693
  @type static: L{utils.FieldSet}
694
  @param static: static fields set
695
  @type dynamic: L{utils.FieldSet}
696
  @param dynamic: dynamic fields set
697

698
  """
699
  f = utils.FieldSet()
700
  f.Extend(static)
701
  f.Extend(dynamic)
702

    
703
  delta = f.NonMatching(selected)
704
  if delta:
705
    raise errors.OpPrereqError("Unknown output fields selected: %s"
706
                               % ",".join(delta), errors.ECODE_INVAL)
707

    
708

    
709
def _CheckGlobalHvParams(params):
710
  """Validates that given hypervisor params are not global ones.
711

712
  This will ensure that instances don't get customised versions of
713
  global params.
714

715
  """
716
  used_globals = constants.HVC_GLOBALS.intersection(params)
717
  if used_globals:
718
    msg = ("The following hypervisor parameters are global and cannot"
719
           " be customized at instance level, please modify them at"
720
           " cluster level: %s" % utils.CommaJoin(used_globals))
721
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
722

    
723

    
724
def _CheckNodeOnline(lu, node, msg=None):
725
  """Ensure that a given node is online.
726

727
  @param lu: the LU on behalf of which we make the check
728
  @param node: the node to check
729
  @param msg: if passed, should be a message to replace the default one
730
  @raise errors.OpPrereqError: if the node is offline
731

732
  """
733
  if msg is None:
734
    msg = "Can't use offline node"
735
  if lu.cfg.GetNodeInfo(node).offline:
736
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
737

    
738

    
739
def _CheckNodeNotDrained(lu, node):
740
  """Ensure that a given node is not drained.
741

742
  @param lu: the LU on behalf of which we make the check
743
  @param node: the node to check
744
  @raise errors.OpPrereqError: if the node is drained
745

746
  """
747
  if lu.cfg.GetNodeInfo(node).drained:
748
    raise errors.OpPrereqError("Can't use drained node %s" % node,
749
                               errors.ECODE_STATE)
750

    
751

    
752
def _CheckNodeVmCapable(lu, node):
753
  """Ensure that a given node is vm capable.
754

755
  @param lu: the LU on behalf of which we make the check
756
  @param node: the node to check
757
  @raise errors.OpPrereqError: if the node is not vm capable
758

759
  """
760
  if not lu.cfg.GetNodeInfo(node).vm_capable:
761
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
762
                               errors.ECODE_STATE)
763

    
764

    
765
def _CheckNodeHasOS(lu, node, os_name, force_variant):
766
  """Ensure that a node supports a given OS.
767

768
  @param lu: the LU on behalf of which we make the check
769
  @param node: the node to check
770
  @param os_name: the OS to query about
771
  @param force_variant: whether to ignore variant errors
772
  @raise errors.OpPrereqError: if the node is not supporting the OS
773

774
  """
775
  result = lu.rpc.call_os_get(node, os_name)
776
  result.Raise("OS '%s' not in supported OS list for node %s" %
777
               (os_name, node),
778
               prereq=True, ecode=errors.ECODE_INVAL)
779
  if not force_variant:
780
    _CheckOSVariant(result.payload, os_name)
781

    
782

    
783
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
784
  """Ensure that a node has the given secondary ip.
785

786
  @type lu: L{LogicalUnit}
787
  @param lu: the LU on behalf of which we make the check
788
  @type node: string
789
  @param node: the node to check
790
  @type secondary_ip: string
791
  @param secondary_ip: the ip to check
792
  @type prereq: boolean
793
  @param prereq: whether to throw a prerequisite or an execute error
794
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
795
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
796

797
  """
798
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
799
  result.Raise("Failure checking secondary ip on node %s" % node,
800
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
801
  if not result.payload:
802
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
803
           " please fix and re-run this command" % secondary_ip)
804
    if prereq:
805
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
806
    else:
807
      raise errors.OpExecError(msg)
808

    
809

    
810
def _GetClusterDomainSecret():
811
  """Reads the cluster domain secret.
812

813
  """
814
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
815
                               strict=True)
816

    
817

    
818
def _CheckInstanceDown(lu, instance, reason):
819
  """Ensure that an instance is not running."""
820
  if instance.admin_up:
821
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
822
                               (instance.name, reason), errors.ECODE_STATE)
823

    
824
  pnode = instance.primary_node
825
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
826
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
827
              prereq=True, ecode=errors.ECODE_ENVIRON)
828

    
829
  if instance.name in ins_l.payload:
830
    raise errors.OpPrereqError("Instance %s is running, %s" %
831
                               (instance.name, reason), errors.ECODE_STATE)
832

    
833

    
834
def _ExpandItemName(fn, name, kind):
835
  """Expand an item name.
836

837
  @param fn: the function to use for expansion
838
  @param name: requested item name
839
  @param kind: text description ('Node' or 'Instance')
840
  @return: the resolved (full) name
841
  @raise errors.OpPrereqError: if the item is not found
842

843
  """
844
  full_name = fn(name)
845
  if full_name is None:
846
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
847
                               errors.ECODE_NOENT)
848
  return full_name
849

    
850

    
851
def _ExpandNodeName(cfg, name):
852
  """Wrapper over L{_ExpandItemName} for nodes."""
853
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
854

    
855

    
856
def _ExpandInstanceName(cfg, name):
857
  """Wrapper over L{_ExpandItemName} for instance."""
858
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
859

    
860

    
861
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
862
                          memory, vcpus, nics, disk_template, disks,
863
                          bep, hvp, hypervisor_name):
864
  """Builds instance related env variables for hooks
865

866
  This builds the hook environment from individual variables.
867

868
  @type name: string
869
  @param name: the name of the instance
870
  @type primary_node: string
871
  @param primary_node: the name of the instance's primary node
872
  @type secondary_nodes: list
873
  @param secondary_nodes: list of secondary nodes as strings
874
  @type os_type: string
875
  @param os_type: the name of the instance's OS
876
  @type status: boolean
877
  @param status: the should_run status of the instance
878
  @type memory: string
879
  @param memory: the memory size of the instance
880
  @type vcpus: string
881
  @param vcpus: the count of VCPUs the instance has
882
  @type nics: list
883
  @param nics: list of tuples (ip, mac, mode, link) representing
884
      the NICs the instance has
885
  @type disk_template: string
886
  @param disk_template: the disk template of the instance
887
  @type disks: list
888
  @param disks: the list of (size, mode) pairs
889
  @type bep: dict
890
  @param bep: the backend parameters for the instance
891
  @type hvp: dict
892
  @param hvp: the hypervisor parameters for the instance
893
  @type hypervisor_name: string
894
  @param hypervisor_name: the hypervisor for the instance
895
  @rtype: dict
896
  @return: the hook environment for this instance
897

898
  """
899
  if status:
900
    str_status = "up"
901
  else:
902
    str_status = "down"
903
  env = {
904
    "OP_TARGET": name,
905
    "INSTANCE_NAME": name,
906
    "INSTANCE_PRIMARY": primary_node,
907
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
908
    "INSTANCE_OS_TYPE": os_type,
909
    "INSTANCE_STATUS": str_status,
910
    "INSTANCE_MEMORY": memory,
911
    "INSTANCE_VCPUS": vcpus,
912
    "INSTANCE_DISK_TEMPLATE": disk_template,
913
    "INSTANCE_HYPERVISOR": hypervisor_name,
914
  }
915

    
916
  if nics:
917
    nic_count = len(nics)
918
    for idx, (ip, mac, mode, link) in enumerate(nics):
919
      if ip is None:
920
        ip = ""
921
      env["INSTANCE_NIC%d_IP" % idx] = ip
922
      env["INSTANCE_NIC%d_MAC" % idx] = mac
923
      env["INSTANCE_NIC%d_MODE" % idx] = mode
924
      env["INSTANCE_NIC%d_LINK" % idx] = link
925
      if mode == constants.NIC_MODE_BRIDGED:
926
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
927
  else:
928
    nic_count = 0
929

    
930
  env["INSTANCE_NIC_COUNT"] = nic_count
931

    
932
  if disks:
933
    disk_count = len(disks)
934
    for idx, (size, mode) in enumerate(disks):
935
      env["INSTANCE_DISK%d_SIZE" % idx] = size
936
      env["INSTANCE_DISK%d_MODE" % idx] = mode
937
  else:
938
    disk_count = 0
939

    
940
  env["INSTANCE_DISK_COUNT"] = disk_count
941

    
942
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
943
    for key, value in source.items():
944
      env["INSTANCE_%s_%s" % (kind, key)] = value
945

    
946
  return env
947

    
948

    
949
def _NICListToTuple(lu, nics):
950
  """Build a list of nic information tuples.
951

952
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
953
  value in LUInstanceQueryData.
954

955
  @type lu:  L{LogicalUnit}
956
  @param lu: the logical unit on whose behalf we execute
957
  @type nics: list of L{objects.NIC}
958
  @param nics: list of nics to convert to hooks tuples
959

960
  """
961
  hooks_nics = []
962
  cluster = lu.cfg.GetClusterInfo()
963
  for nic in nics:
964
    ip = nic.ip
965
    mac = nic.mac
966
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
967
    mode = filled_params[constants.NIC_MODE]
968
    link = filled_params[constants.NIC_LINK]
969
    hooks_nics.append((ip, mac, mode, link))
970
  return hooks_nics
971

    
972

    
973
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
974
  """Builds instance related env variables for hooks from an object.
975

976
  @type lu: L{LogicalUnit}
977
  @param lu: the logical unit on whose behalf we execute
978
  @type instance: L{objects.Instance}
979
  @param instance: the instance for which we should build the
980
      environment
981
  @type override: dict
982
  @param override: dictionary with key/values that will override
983
      our values
984
  @rtype: dict
985
  @return: the hook environment dictionary
986

987
  """
988
  cluster = lu.cfg.GetClusterInfo()
989
  bep = cluster.FillBE(instance)
990
  hvp = cluster.FillHV(instance)
991
  args = {
992
    'name': instance.name,
993
    'primary_node': instance.primary_node,
994
    'secondary_nodes': instance.secondary_nodes,
995
    'os_type': instance.os,
996
    'status': instance.admin_up,
997
    'memory': bep[constants.BE_MEMORY],
998
    'vcpus': bep[constants.BE_VCPUS],
999
    'nics': _NICListToTuple(lu, instance.nics),
1000
    'disk_template': instance.disk_template,
1001
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
1002
    'bep': bep,
1003
    'hvp': hvp,
1004
    'hypervisor_name': instance.hypervisor,
1005
  }
1006
  if override:
1007
    args.update(override)
1008
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
1009

    
1010

    
1011
def _AdjustCandidatePool(lu, exceptions):
1012
  """Adjust the candidate pool after node operations.
1013

1014
  """
1015
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1016
  if mod_list:
1017
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1018
               utils.CommaJoin(node.name for node in mod_list))
1019
    for name in mod_list:
1020
      lu.context.ReaddNode(name)
1021
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1022
  if mc_now > mc_max:
1023
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1024
               (mc_now, mc_max))
1025

    
1026

    
1027
def _DecideSelfPromotion(lu, exceptions=None):
1028
  """Decide whether I should promote myself as a master candidate.
1029

1030
  """
1031
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1032
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1033
  # the new node will increase mc_max with one, so:
1034
  mc_should = min(mc_should + 1, cp_size)
1035
  return mc_now < mc_should
1036

    
1037

    
1038
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1039
  """Check that the brigdes needed by a list of nics exist.
1040

1041
  """
1042
  cluster = lu.cfg.GetClusterInfo()
1043
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1044
  brlist = [params[constants.NIC_LINK] for params in paramslist
1045
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1046
  if brlist:
1047
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1048
    result.Raise("Error checking bridges on destination node '%s'" %
1049
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1050

    
1051

    
1052
def _CheckInstanceBridgesExist(lu, instance, node=None):
1053
  """Check that the brigdes needed by an instance exist.
1054

1055
  """
1056
  if node is None:
1057
    node = instance.primary_node
1058
  _CheckNicsBridgesExist(lu, instance.nics, node)
1059

    
1060

    
1061
def _CheckOSVariant(os_obj, name):
1062
  """Check whether an OS name conforms to the os variants specification.
1063

1064
  @type os_obj: L{objects.OS}
1065
  @param os_obj: OS object to check
1066
  @type name: string
1067
  @param name: OS name passed by the user, to check for validity
1068

1069
  """
1070
  if not os_obj.supported_variants:
1071
    return
1072
  variant = objects.OS.GetVariant(name)
1073
  if not variant:
1074
    raise errors.OpPrereqError("OS name must include a variant",
1075
                               errors.ECODE_INVAL)
1076

    
1077
  if variant not in os_obj.supported_variants:
1078
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1079

    
1080

    
1081
def _GetNodeInstancesInner(cfg, fn):
1082
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1083

    
1084

    
1085
def _GetNodeInstances(cfg, node_name):
1086
  """Returns a list of all primary and secondary instances on a node.
1087

1088
  """
1089

    
1090
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1091

    
1092

    
1093
def _GetNodePrimaryInstances(cfg, node_name):
1094
  """Returns primary instances on a node.
1095

1096
  """
1097
  return _GetNodeInstancesInner(cfg,
1098
                                lambda inst: node_name == inst.primary_node)
1099

    
1100

    
1101
def _GetNodeSecondaryInstances(cfg, node_name):
1102
  """Returns secondary instances on a node.
1103

1104
  """
1105
  return _GetNodeInstancesInner(cfg,
1106
                                lambda inst: node_name in inst.secondary_nodes)
1107

    
1108

    
1109
def _GetStorageTypeArgs(cfg, storage_type):
1110
  """Returns the arguments for a storage type.
1111

1112
  """
1113
  # Special case for file storage
1114
  if storage_type == constants.ST_FILE:
1115
    # storage.FileStorage wants a list of storage directories
1116
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1117

    
1118
  return []
1119

    
1120

    
1121
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
1122
  faulty = []
1123

    
1124
  for dev in instance.disks:
1125
    cfg.SetDiskID(dev, node_name)
1126

    
1127
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
1128
  result.Raise("Failed to get disk status from node %s" % node_name,
1129
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1130

    
1131
  for idx, bdev_status in enumerate(result.payload):
1132
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1133
      faulty.append(idx)
1134

    
1135
  return faulty
1136

    
1137

    
1138
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1139
  """Check the sanity of iallocator and node arguments and use the
1140
  cluster-wide iallocator if appropriate.
1141

1142
  Check that at most one of (iallocator, node) is specified. If none is
1143
  specified, then the LU's opcode's iallocator slot is filled with the
1144
  cluster-wide default iallocator.
1145

1146
  @type iallocator_slot: string
1147
  @param iallocator_slot: the name of the opcode iallocator slot
1148
  @type node_slot: string
1149
  @param node_slot: the name of the opcode target node slot
1150

1151
  """
1152
  node = getattr(lu.op, node_slot, None)
1153
  iallocator = getattr(lu.op, iallocator_slot, None)
1154

    
1155
  if node is not None and iallocator is not None:
1156
    raise errors.OpPrereqError("Do not specify both, iallocator and node.",
1157
                               errors.ECODE_INVAL)
1158
  elif node is None and iallocator is None:
1159
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1160
    if default_iallocator:
1161
      setattr(lu.op, iallocator_slot, default_iallocator)
1162
    else:
1163
      raise errors.OpPrereqError("No iallocator or node given and no"
1164
                                 " cluster-wide default iallocator found."
1165
                                 " Please specify either an iallocator or a"
1166
                                 " node, or set a cluster-wide default"
1167
                                 " iallocator.")
1168

    
1169

    
1170
class LUClusterPostInit(LogicalUnit):
1171
  """Logical unit for running hooks after cluster initialization.
1172

1173
  """
1174
  HPATH = "cluster-init"
1175
  HTYPE = constants.HTYPE_CLUSTER
1176

    
1177
  def BuildHooksEnv(self):
1178
    """Build hooks env.
1179

1180
    """
1181
    return {
1182
      "OP_TARGET": self.cfg.GetClusterName(),
1183
      }
1184

    
1185
  def BuildHooksNodes(self):
1186
    """Build hooks nodes.
1187

1188
    """
1189
    return ([], [self.cfg.GetMasterNode()])
1190

    
1191
  def Exec(self, feedback_fn):
1192
    """Nothing to do.
1193

1194
    """
1195
    return True
1196

    
1197

    
1198
class LUClusterDestroy(LogicalUnit):
1199
  """Logical unit for destroying the cluster.
1200

1201
  """
1202
  HPATH = "cluster-destroy"
1203
  HTYPE = constants.HTYPE_CLUSTER
1204

    
1205
  def BuildHooksEnv(self):
1206
    """Build hooks env.
1207

1208
    """
1209
    return {
1210
      "OP_TARGET": self.cfg.GetClusterName(),
1211
      }
1212

    
1213
  def BuildHooksNodes(self):
1214
    """Build hooks nodes.
1215

1216
    """
1217
    return ([], [])
1218

    
1219
  def CheckPrereq(self):
1220
    """Check prerequisites.
1221

1222
    This checks whether the cluster is empty.
1223

1224
    Any errors are signaled by raising errors.OpPrereqError.
1225

1226
    """
1227
    master = self.cfg.GetMasterNode()
1228

    
1229
    nodelist = self.cfg.GetNodeList()
1230
    if len(nodelist) != 1 or nodelist[0] != master:
1231
      raise errors.OpPrereqError("There are still %d node(s) in"
1232
                                 " this cluster." % (len(nodelist) - 1),
1233
                                 errors.ECODE_INVAL)
1234
    instancelist = self.cfg.GetInstanceList()
1235
    if instancelist:
1236
      raise errors.OpPrereqError("There are still %d instance(s) in"
1237
                                 " this cluster." % len(instancelist),
1238
                                 errors.ECODE_INVAL)
1239

    
1240
  def Exec(self, feedback_fn):
1241
    """Destroys the cluster.
1242

1243
    """
1244
    master = self.cfg.GetMasterNode()
1245

    
1246
    # Run post hooks on master node before it's removed
1247
    _RunPostHook(self, master)
1248

    
1249
    result = self.rpc.call_node_stop_master(master, False)
1250
    result.Raise("Could not disable the master role")
1251

    
1252
    return master
1253

    
1254

    
1255
def _VerifyCertificate(filename):
1256
  """Verifies a certificate for LUClusterVerify.
1257

1258
  @type filename: string
1259
  @param filename: Path to PEM file
1260

1261
  """
1262
  try:
1263
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1264
                                           utils.ReadFile(filename))
1265
  except Exception, err: # pylint: disable-msg=W0703
1266
    return (LUClusterVerify.ETYPE_ERROR,
1267
            "Failed to load X509 certificate %s: %s" % (filename, err))
1268

    
1269
  (errcode, msg) = \
1270
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1271
                                constants.SSL_CERT_EXPIRATION_ERROR)
1272

    
1273
  if msg:
1274
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1275
  else:
1276
    fnamemsg = None
1277

    
1278
  if errcode is None:
1279
    return (None, fnamemsg)
1280
  elif errcode == utils.CERT_WARNING:
1281
    return (LUClusterVerify.ETYPE_WARNING, fnamemsg)
1282
  elif errcode == utils.CERT_ERROR:
1283
    return (LUClusterVerify.ETYPE_ERROR, fnamemsg)
1284

    
1285
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1286

    
1287

    
1288
class LUClusterVerify(LogicalUnit):
1289
  """Verifies the cluster status.
1290

1291
  """
1292
  HPATH = "cluster-verify"
1293
  HTYPE = constants.HTYPE_CLUSTER
1294
  REQ_BGL = False
1295

    
1296
  TCLUSTER = "cluster"
1297
  TNODE = "node"
1298
  TINSTANCE = "instance"
1299

    
1300
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1301
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1302
  ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK")
1303
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1304
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1305
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1306
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1307
  EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
1308
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1309
  EINSTANCESPLITGROUPS = (TINSTANCE, "EINSTANCESPLITGROUPS")
1310
  ENODEDRBD = (TNODE, "ENODEDRBD")
1311
  ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
1312
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1313
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1314
  ENODEHV = (TNODE, "ENODEHV")
1315
  ENODELVM = (TNODE, "ENODELVM")
1316
  ENODEN1 = (TNODE, "ENODEN1")
1317
  ENODENET = (TNODE, "ENODENET")
1318
  ENODEOS = (TNODE, "ENODEOS")
1319
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1320
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1321
  ENODERPC = (TNODE, "ENODERPC")
1322
  ENODESSH = (TNODE, "ENODESSH")
1323
  ENODEVERSION = (TNODE, "ENODEVERSION")
1324
  ENODESETUP = (TNODE, "ENODESETUP")
1325
  ENODETIME = (TNODE, "ENODETIME")
1326
  ENODEOOBPATH = (TNODE, "ENODEOOBPATH")
1327

    
1328
  ETYPE_FIELD = "code"
1329
  ETYPE_ERROR = "ERROR"
1330
  ETYPE_WARNING = "WARNING"
1331

    
1332
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1333

    
1334
  class NodeImage(object):
1335
    """A class representing the logical and physical status of a node.
1336

1337
    @type name: string
1338
    @ivar name: the node name to which this object refers
1339
    @ivar volumes: a structure as returned from
1340
        L{ganeti.backend.GetVolumeList} (runtime)
1341
    @ivar instances: a list of running instances (runtime)
1342
    @ivar pinst: list of configured primary instances (config)
1343
    @ivar sinst: list of configured secondary instances (config)
1344
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1345
        instances for which this node is secondary (config)
1346
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1347
    @ivar dfree: free disk, as reported by the node (runtime)
1348
    @ivar offline: the offline status (config)
1349
    @type rpc_fail: boolean
1350
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1351
        not whether the individual keys were correct) (runtime)
1352
    @type lvm_fail: boolean
1353
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1354
    @type hyp_fail: boolean
1355
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1356
    @type ghost: boolean
1357
    @ivar ghost: whether this is a known node or not (config)
1358
    @type os_fail: boolean
1359
    @ivar os_fail: whether the RPC call didn't return valid OS data
1360
    @type oslist: list
1361
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1362
    @type vm_capable: boolean
1363
    @ivar vm_capable: whether the node can host instances
1364

1365
    """
1366
    def __init__(self, offline=False, name=None, vm_capable=True):
1367
      self.name = name
1368
      self.volumes = {}
1369
      self.instances = []
1370
      self.pinst = []
1371
      self.sinst = []
1372
      self.sbp = {}
1373
      self.mfree = 0
1374
      self.dfree = 0
1375
      self.offline = offline
1376
      self.vm_capable = vm_capable
1377
      self.rpc_fail = False
1378
      self.lvm_fail = False
1379
      self.hyp_fail = False
1380
      self.ghost = False
1381
      self.os_fail = False
1382
      self.oslist = {}
1383

    
1384
  def ExpandNames(self):
1385
    self.needed_locks = {
1386
      locking.LEVEL_NODE: locking.ALL_SET,
1387
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1388
    }
1389
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1390

    
1391
  def _Error(self, ecode, item, msg, *args, **kwargs):
1392
    """Format an error message.
1393

1394
    Based on the opcode's error_codes parameter, either format a
1395
    parseable error code, or a simpler error string.
1396

1397
    This must be called only from Exec and functions called from Exec.
1398

1399
    """
1400
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1401
    itype, etxt = ecode
1402
    # first complete the msg
1403
    if args:
1404
      msg = msg % args
1405
    # then format the whole message
1406
    if self.op.error_codes:
1407
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1408
    else:
1409
      if item:
1410
        item = " " + item
1411
      else:
1412
        item = ""
1413
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1414
    # and finally report it via the feedback_fn
1415
    self._feedback_fn("  - %s" % msg)
1416

    
1417
  def _ErrorIf(self, cond, *args, **kwargs):
1418
    """Log an error message if the passed condition is True.
1419

1420
    """
1421
    cond = bool(cond) or self.op.debug_simulate_errors
1422
    if cond:
1423
      self._Error(*args, **kwargs)
1424
    # do not mark the operation as failed for WARN cases only
1425
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1426
      self.bad = self.bad or cond
1427

    
1428
  def _VerifyNode(self, ninfo, nresult):
1429
    """Perform some basic validation on data returned from a node.
1430

1431
      - check the result data structure is well formed and has all the
1432
        mandatory fields
1433
      - check ganeti version
1434

1435
    @type ninfo: L{objects.Node}
1436
    @param ninfo: the node to check
1437
    @param nresult: the results from the node
1438
    @rtype: boolean
1439
    @return: whether overall this call was successful (and we can expect
1440
         reasonable values in the respose)
1441

1442
    """
1443
    node = ninfo.name
1444
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1445

    
1446
    # main result, nresult should be a non-empty dict
1447
    test = not nresult or not isinstance(nresult, dict)
1448
    _ErrorIf(test, self.ENODERPC, node,
1449
                  "unable to verify node: no data returned")
1450
    if test:
1451
      return False
1452

    
1453
    # compares ganeti version
1454
    local_version = constants.PROTOCOL_VERSION
1455
    remote_version = nresult.get("version", None)
1456
    test = not (remote_version and
1457
                isinstance(remote_version, (list, tuple)) and
1458
                len(remote_version) == 2)
1459
    _ErrorIf(test, self.ENODERPC, node,
1460
             "connection to node returned invalid data")
1461
    if test:
1462
      return False
1463

    
1464
    test = local_version != remote_version[0]
1465
    _ErrorIf(test, self.ENODEVERSION, node,
1466
             "incompatible protocol versions: master %s,"
1467
             " node %s", local_version, remote_version[0])
1468
    if test:
1469
      return False
1470

    
1471
    # node seems compatible, we can actually try to look into its results
1472

    
1473
    # full package version
1474
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1475
                  self.ENODEVERSION, node,
1476
                  "software version mismatch: master %s, node %s",
1477
                  constants.RELEASE_VERSION, remote_version[1],
1478
                  code=self.ETYPE_WARNING)
1479

    
1480
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1481
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1482
      for hv_name, hv_result in hyp_result.iteritems():
1483
        test = hv_result is not None
1484
        _ErrorIf(test, self.ENODEHV, node,
1485
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1486

    
1487
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1488
    if ninfo.vm_capable and isinstance(hvp_result, list):
1489
      for item, hv_name, hv_result in hvp_result:
1490
        _ErrorIf(True, self.ENODEHV, node,
1491
                 "hypervisor %s parameter verify failure (source %s): %s",
1492
                 hv_name, item, hv_result)
1493

    
1494
    test = nresult.get(constants.NV_NODESETUP,
1495
                       ["Missing NODESETUP results"])
1496
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1497
             "; ".join(test))
1498

    
1499
    return True
1500

    
1501
  def _VerifyNodeTime(self, ninfo, nresult,
1502
                      nvinfo_starttime, nvinfo_endtime):
1503
    """Check the node time.
1504

1505
    @type ninfo: L{objects.Node}
1506
    @param ninfo: the node to check
1507
    @param nresult: the remote results for the node
1508
    @param nvinfo_starttime: the start time of the RPC call
1509
    @param nvinfo_endtime: the end time of the RPC call
1510

1511
    """
1512
    node = ninfo.name
1513
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1514

    
1515
    ntime = nresult.get(constants.NV_TIME, None)
1516
    try:
1517
      ntime_merged = utils.MergeTime(ntime)
1518
    except (ValueError, TypeError):
1519
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1520
      return
1521

    
1522
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1523
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1524
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1525
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1526
    else:
1527
      ntime_diff = None
1528

    
1529
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1530
             "Node time diverges by at least %s from master node time",
1531
             ntime_diff)
1532

    
1533
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1534
    """Check the node time.
1535

1536
    @type ninfo: L{objects.Node}
1537
    @param ninfo: the node to check
1538
    @param nresult: the remote results for the node
1539
    @param vg_name: the configured VG name
1540

1541
    """
1542
    if vg_name is None:
1543
      return
1544

    
1545
    node = ninfo.name
1546
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1547

    
1548
    # checks vg existence and size > 20G
1549
    vglist = nresult.get(constants.NV_VGLIST, None)
1550
    test = not vglist
1551
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1552
    if not test:
1553
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1554
                                            constants.MIN_VG_SIZE)
1555
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1556

    
1557
    # check pv names
1558
    pvlist = nresult.get(constants.NV_PVLIST, None)
1559
    test = pvlist is None
1560
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1561
    if not test:
1562
      # check that ':' is not present in PV names, since it's a
1563
      # special character for lvcreate (denotes the range of PEs to
1564
      # use on the PV)
1565
      for _, pvname, owner_vg in pvlist:
1566
        test = ":" in pvname
1567
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1568
                 " '%s' of VG '%s'", pvname, owner_vg)
1569

    
1570
  def _VerifyNodeNetwork(self, ninfo, nresult):
1571
    """Check the node time.
1572

1573
    @type ninfo: L{objects.Node}
1574
    @param ninfo: the node to check
1575
    @param nresult: the remote results for the node
1576

1577
    """
1578
    node = ninfo.name
1579
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1580

    
1581
    test = constants.NV_NODELIST not in nresult
1582
    _ErrorIf(test, self.ENODESSH, node,
1583
             "node hasn't returned node ssh connectivity data")
1584
    if not test:
1585
      if nresult[constants.NV_NODELIST]:
1586
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1587
          _ErrorIf(True, self.ENODESSH, node,
1588
                   "ssh communication with node '%s': %s", a_node, a_msg)
1589

    
1590
    test = constants.NV_NODENETTEST not in nresult
1591
    _ErrorIf(test, self.ENODENET, node,
1592
             "node hasn't returned node tcp connectivity data")
1593
    if not test:
1594
      if nresult[constants.NV_NODENETTEST]:
1595
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1596
        for anode in nlist:
1597
          _ErrorIf(True, self.ENODENET, node,
1598
                   "tcp communication with node '%s': %s",
1599
                   anode, nresult[constants.NV_NODENETTEST][anode])
1600

    
1601
    test = constants.NV_MASTERIP not in nresult
1602
    _ErrorIf(test, self.ENODENET, node,
1603
             "node hasn't returned node master IP reachability data")
1604
    if not test:
1605
      if not nresult[constants.NV_MASTERIP]:
1606
        if node == self.master_node:
1607
          msg = "the master node cannot reach the master IP (not configured?)"
1608
        else:
1609
          msg = "cannot reach the master IP"
1610
        _ErrorIf(True, self.ENODENET, node, msg)
1611

    
1612
  def _VerifyInstance(self, instance, instanceconfig, node_image,
1613
                      diskstatus):
1614
    """Verify an instance.
1615

1616
    This function checks to see if the required block devices are
1617
    available on the instance's node.
1618

1619
    """
1620
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1621
    node_current = instanceconfig.primary_node
1622

    
1623
    node_vol_should = {}
1624
    instanceconfig.MapLVsByNode(node_vol_should)
1625

    
1626
    for node in node_vol_should:
1627
      n_img = node_image[node]
1628
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1629
        # ignore missing volumes on offline or broken nodes
1630
        continue
1631
      for volume in node_vol_should[node]:
1632
        test = volume not in n_img.volumes
1633
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1634
                 "volume %s missing on node %s", volume, node)
1635

    
1636
    if instanceconfig.admin_up:
1637
      pri_img = node_image[node_current]
1638
      test = instance not in pri_img.instances and not pri_img.offline
1639
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1640
               "instance not running on its primary node %s",
1641
               node_current)
1642

    
1643
    for node, n_img in node_image.items():
1644
      if node != node_current:
1645
        test = instance in n_img.instances
1646
        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1647
                 "instance should not run on node %s", node)
1648

    
1649
    diskdata = [(nname, success, status, idx)
1650
                for (nname, disks) in diskstatus.items()
1651
                for idx, (success, status) in enumerate(disks)]
1652

    
1653
    for nname, success, bdev_status, idx in diskdata:
1654
      # the 'ghost node' construction in Exec() ensures that we have a
1655
      # node here
1656
      snode = node_image[nname]
1657
      bad_snode = snode.ghost or snode.offline
1658
      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
1659
               self.EINSTANCEFAULTYDISK, instance,
1660
               "couldn't retrieve status for disk/%s on %s: %s",
1661
               idx, nname, bdev_status)
1662
      _ErrorIf((instanceconfig.admin_up and success and
1663
                bdev_status.ldisk_status == constants.LDS_FAULTY),
1664
               self.EINSTANCEFAULTYDISK, instance,
1665
               "disk/%s on %s is faulty", idx, nname)
1666

    
1667
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1668
    """Verify if there are any unknown volumes in the cluster.
1669

1670
    The .os, .swap and backup volumes are ignored. All other volumes are
1671
    reported as unknown.
1672

1673
    @type reserved: L{ganeti.utils.FieldSet}
1674
    @param reserved: a FieldSet of reserved volume names
1675

1676
    """
1677
    for node, n_img in node_image.items():
1678
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1679
        # skip non-healthy nodes
1680
        continue
1681
      for volume in n_img.volumes:
1682
        test = ((node not in node_vol_should or
1683
                volume not in node_vol_should[node]) and
1684
                not reserved.Matches(volume))
1685
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1686
                      "volume %s is unknown", volume)
1687

    
1688
  def _VerifyOrphanInstances(self, instancelist, node_image):
1689
    """Verify the list of running instances.
1690

1691
    This checks what instances are running but unknown to the cluster.
1692

1693
    """
1694
    for node, n_img in node_image.items():
1695
      for o_inst in n_img.instances:
1696
        test = o_inst not in instancelist
1697
        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1698
                      "instance %s on node %s should not exist", o_inst, node)
1699

    
1700
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1701
    """Verify N+1 Memory Resilience.
1702

1703
    Check that if one single node dies we can still start all the
1704
    instances it was primary for.
1705

1706
    """
1707
    cluster_info = self.cfg.GetClusterInfo()
1708
    for node, n_img in node_image.items():
1709
      # This code checks that every node which is now listed as
1710
      # secondary has enough memory to host all instances it is
1711
      # supposed to should a single other node in the cluster fail.
1712
      # FIXME: not ready for failover to an arbitrary node
1713
      # FIXME: does not support file-backed instances
1714
      # WARNING: we currently take into account down instances as well
1715
      # as up ones, considering that even if they're down someone
1716
      # might want to start them even in the event of a node failure.
1717
      if n_img.offline:
1718
        # we're skipping offline nodes from the N+1 warning, since
1719
        # most likely we don't have good memory infromation from them;
1720
        # we already list instances living on such nodes, and that's
1721
        # enough warning
1722
        continue
1723
      for prinode, instances in n_img.sbp.items():
1724
        needed_mem = 0
1725
        for instance in instances:
1726
          bep = cluster_info.FillBE(instance_cfg[instance])
1727
          if bep[constants.BE_AUTO_BALANCE]:
1728
            needed_mem += bep[constants.BE_MEMORY]
1729
        test = n_img.mfree < needed_mem
1730
        self._ErrorIf(test, self.ENODEN1, node,
1731
                      "not enough memory to accomodate instance failovers"
1732
                      " should node %s fail (%dMiB needed, %dMiB available)",
1733
                      prinode, needed_mem, n_img.mfree)
1734

    
1735
  @classmethod
1736
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
1737
                   (files_all, files_all_opt, files_mc, files_vm)):
1738
    """Verifies file checksums collected from all nodes.
1739

1740
    @param errorif: Callback for reporting errors
1741
    @param nodeinfo: List of L{objects.Node} objects
1742
    @param master_node: Name of master node
1743
    @param all_nvinfo: RPC results
1744

1745
    """
1746
    node_names = frozenset(node.name for node in nodeinfo)
1747

    
1748
    assert master_node in node_names
1749
    assert (len(files_all | files_all_opt | files_mc | files_vm) ==
1750
            sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
1751
           "Found file listed in more than one file list"
1752

    
1753
    # Define functions determining which nodes to consider for a file
1754
    file2nodefn = dict([(filename, fn)
1755
      for (files, fn) in [(files_all, None),
1756
                          (files_all_opt, None),
1757
                          (files_mc, lambda node: (node.master_candidate or
1758
                                                   node.name == master_node)),
1759
                          (files_vm, lambda node: node.vm_capable)]
1760
      for filename in files])
1761

    
1762
    fileinfo = dict((filename, {}) for filename in file2nodefn.keys())
1763

    
1764
    for node in nodeinfo:
1765
      nresult = all_nvinfo[node.name]
1766

    
1767
      if nresult.fail_msg or not nresult.payload:
1768
        node_files = None
1769
      else:
1770
        node_files = nresult.payload.get(constants.NV_FILELIST, None)
1771

    
1772
      test = not (node_files and isinstance(node_files, dict))
1773
      errorif(test, cls.ENODEFILECHECK, node.name,
1774
              "Node did not return file checksum data")
1775
      if test:
1776
        continue
1777

    
1778
      for (filename, checksum) in node_files.items():
1779
        # Check if the file should be considered for a node
1780
        fn = file2nodefn[filename]
1781
        if fn is None or fn(node):
1782
          fileinfo[filename].setdefault(checksum, set()).add(node.name)
1783

    
1784
    for (filename, checksums) in fileinfo.items():
1785
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
1786

    
1787
      # Nodes having the file
1788
      with_file = frozenset(node_name
1789
                            for nodes in fileinfo[filename].values()
1790
                            for node_name in nodes)
1791

    
1792
      # Nodes missing file
1793
      missing_file = node_names - with_file
1794

    
1795
      if filename in files_all_opt:
1796
        # All or no nodes
1797
        errorif(missing_file and missing_file != node_names,
1798
                cls.ECLUSTERFILECHECK, None,
1799
                "File %s is optional, but it must exist on all or no nodes (not"
1800
                " found on %s)",
1801
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
1802
      else:
1803
        errorif(missing_file, cls.ECLUSTERFILECHECK, None,
1804
                "File %s is missing from node(s) %s", filename,
1805
                utils.CommaJoin(utils.NiceSort(missing_file)))
1806

    
1807
      # See if there are multiple versions of the file
1808
      test = len(checksums) > 1
1809
      if test:
1810
        variants = ["variant %s on %s" %
1811
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
1812
                    for (idx, (checksum, nodes)) in
1813
                      enumerate(sorted(checksums.items()))]
1814
      else:
1815
        variants = []
1816

    
1817
      errorif(test, cls.ECLUSTERFILECHECK, None,
1818
              "File %s found with %s different checksums (%s)",
1819
              filename, len(checksums), "; ".join(variants))
1820

    
1821
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
1822
                      drbd_map):
1823
    """Verifies and the node DRBD status.
1824

1825
    @type ninfo: L{objects.Node}
1826
    @param ninfo: the node to check
1827
    @param nresult: the remote results for the node
1828
    @param instanceinfo: the dict of instances
1829
    @param drbd_helper: the configured DRBD usermode helper
1830
    @param drbd_map: the DRBD map as returned by
1831
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
1832

1833
    """
1834
    node = ninfo.name
1835
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1836

    
1837
    if drbd_helper:
1838
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
1839
      test = (helper_result == None)
1840
      _ErrorIf(test, self.ENODEDRBDHELPER, node,
1841
               "no drbd usermode helper returned")
1842
      if helper_result:
1843
        status, payload = helper_result
1844
        test = not status
1845
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
1846
                 "drbd usermode helper check unsuccessful: %s", payload)
1847
        test = status and (payload != drbd_helper)
1848
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
1849
                 "wrong drbd usermode helper: %s", payload)
1850

    
1851
    # compute the DRBD minors
1852
    node_drbd = {}
1853
    for minor, instance in drbd_map[node].items():
1854
      test = instance not in instanceinfo
1855
      _ErrorIf(test, self.ECLUSTERCFG, None,
1856
               "ghost instance '%s' in temporary DRBD map", instance)
1857
        # ghost instance should not be running, but otherwise we
1858
        # don't give double warnings (both ghost instance and
1859
        # unallocated minor in use)
1860
      if test:
1861
        node_drbd[minor] = (instance, False)
1862
      else:
1863
        instance = instanceinfo[instance]
1864
        node_drbd[minor] = (instance.name, instance.admin_up)
1865

    
1866
    # and now check them
1867
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
1868
    test = not isinstance(used_minors, (tuple, list))
1869
    _ErrorIf(test, self.ENODEDRBD, node,
1870
             "cannot parse drbd status file: %s", str(used_minors))
1871
    if test:
1872
      # we cannot check drbd status
1873
      return
1874

    
1875
    for minor, (iname, must_exist) in node_drbd.items():
1876
      test = minor not in used_minors and must_exist
1877
      _ErrorIf(test, self.ENODEDRBD, node,
1878
               "drbd minor %d of instance %s is not active", minor, iname)
1879
    for minor in used_minors:
1880
      test = minor not in node_drbd
1881
      _ErrorIf(test, self.ENODEDRBD, node,
1882
               "unallocated drbd minor %d is in use", minor)
1883

    
1884
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
1885
    """Builds the node OS structures.
1886

1887
    @type ninfo: L{objects.Node}
1888
    @param ninfo: the node to check
1889
    @param nresult: the remote results for the node
1890
    @param nimg: the node image object
1891

1892
    """
1893
    node = ninfo.name
1894
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1895

    
1896
    remote_os = nresult.get(constants.NV_OSLIST, None)
1897
    test = (not isinstance(remote_os, list) or
1898
            not compat.all(isinstance(v, list) and len(v) == 7
1899
                           for v in remote_os))
1900

    
1901
    _ErrorIf(test, self.ENODEOS, node,
1902
             "node hasn't returned valid OS data")
1903

    
1904
    nimg.os_fail = test
1905

    
1906
    if test:
1907
      return
1908

    
1909
    os_dict = {}
1910

    
1911
    for (name, os_path, status, diagnose,
1912
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
1913

    
1914
      if name not in os_dict:
1915
        os_dict[name] = []
1916

    
1917
      # parameters is a list of lists instead of list of tuples due to
1918
      # JSON lacking a real tuple type, fix it:
1919
      parameters = [tuple(v) for v in parameters]
1920
      os_dict[name].append((os_path, status, diagnose,
1921
                            set(variants), set(parameters), set(api_ver)))
1922

    
1923
    nimg.oslist = os_dict
1924

    
1925
  def _VerifyNodeOS(self, ninfo, nimg, base):
1926
    """Verifies the node OS list.
1927

1928
    @type ninfo: L{objects.Node}
1929
    @param ninfo: the node to check
1930
    @param nimg: the node image object
1931
    @param base: the 'template' node we match against (e.g. from the master)
1932

1933
    """
1934
    node = ninfo.name
1935
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1936

    
1937
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
1938

    
1939
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
1940
    for os_name, os_data in nimg.oslist.items():
1941
      assert os_data, "Empty OS status for OS %s?!" % os_name
1942
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
1943
      _ErrorIf(not f_status, self.ENODEOS, node,
1944
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
1945
      _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
1946
               "OS '%s' has multiple entries (first one shadows the rest): %s",
1947
               os_name, utils.CommaJoin([v[0] for v in os_data]))
1948
      # this will catched in backend too
1949
      _ErrorIf(compat.any(v >= constants.OS_API_V15 for v in f_api)
1950
               and not f_var, self.ENODEOS, node,
1951
               "OS %s with API at least %d does not declare any variant",
1952
               os_name, constants.OS_API_V15)
1953
      # comparisons with the 'base' image
1954
      test = os_name not in base.oslist
1955
      _ErrorIf(test, self.ENODEOS, node,
1956
               "Extra OS %s not present on reference node (%s)",
1957
               os_name, base.name)
1958
      if test:
1959
        continue
1960
      assert base.oslist[os_name], "Base node has empty OS status?"
1961
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
1962
      if not b_status:
1963
        # base OS is invalid, skipping
1964
        continue
1965
      for kind, a, b in [("API version", f_api, b_api),
1966
                         ("variants list", f_var, b_var),
1967
                         ("parameters", beautify_params(f_param),
1968
                          beautify_params(b_param))]:
1969
        _ErrorIf(a != b, self.ENODEOS, node,
1970
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
1971
                 kind, os_name, base.name,
1972
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
1973

    
1974
    # check any missing OSes
1975
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
1976
    _ErrorIf(missing, self.ENODEOS, node,
1977
             "OSes present on reference node %s but missing on this node: %s",
1978
             base.name, utils.CommaJoin(missing))
1979

    
1980
  def _VerifyOob(self, ninfo, nresult):
1981
    """Verifies out of band functionality of a node.
1982

1983
    @type ninfo: L{objects.Node}
1984
    @param ninfo: the node to check
1985
    @param nresult: the remote results for the node
1986

1987
    """
1988
    node = ninfo.name
1989
    # We just have to verify the paths on master and/or master candidates
1990
    # as the oob helper is invoked on the master
1991
    if ((ninfo.master_candidate or ninfo.master_capable) and
1992
        constants.NV_OOB_PATHS in nresult):
1993
      for path_result in nresult[constants.NV_OOB_PATHS]:
1994
        self._ErrorIf(path_result, self.ENODEOOBPATH, node, path_result)
1995

    
1996
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
1997
    """Verifies and updates the node volume data.
1998

1999
    This function will update a L{NodeImage}'s internal structures
2000
    with data from the remote call.
2001

2002
    @type ninfo: L{objects.Node}
2003
    @param ninfo: the node to check
2004
    @param nresult: the remote results for the node
2005
    @param nimg: the node image object
2006
    @param vg_name: the configured VG name
2007

2008
    """
2009
    node = ninfo.name
2010
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2011

    
2012
    nimg.lvm_fail = True
2013
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2014
    if vg_name is None:
2015
      pass
2016
    elif isinstance(lvdata, basestring):
2017
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
2018
               utils.SafeEncode(lvdata))
2019
    elif not isinstance(lvdata, dict):
2020
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
2021
    else:
2022
      nimg.volumes = lvdata
2023
      nimg.lvm_fail = False
2024

    
2025
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2026
    """Verifies and updates the node instance list.
2027

2028
    If the listing was successful, then updates this node's instance
2029
    list. Otherwise, it marks the RPC call as failed for the instance
2030
    list key.
2031

2032
    @type ninfo: L{objects.Node}
2033
    @param ninfo: the node to check
2034
    @param nresult: the remote results for the node
2035
    @param nimg: the node image object
2036

2037
    """
2038
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2039
    test = not isinstance(idata, list)
2040
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
2041
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
2042
    if test:
2043
      nimg.hyp_fail = True
2044
    else:
2045
      nimg.instances = idata
2046

    
2047
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2048
    """Verifies and computes a node information map
2049

2050
    @type ninfo: L{objects.Node}
2051
    @param ninfo: the node to check
2052
    @param nresult: the remote results for the node
2053
    @param nimg: the node image object
2054
    @param vg_name: the configured VG name
2055

2056
    """
2057
    node = ninfo.name
2058
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2059

    
2060
    # try to read free memory (from the hypervisor)
2061
    hv_info = nresult.get(constants.NV_HVINFO, None)
2062
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2063
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
2064
    if not test:
2065
      try:
2066
        nimg.mfree = int(hv_info["memory_free"])
2067
      except (ValueError, TypeError):
2068
        _ErrorIf(True, self.ENODERPC, node,
2069
                 "node returned invalid nodeinfo, check hypervisor")
2070

    
2071
    # FIXME: devise a free space model for file based instances as well
2072
    if vg_name is not None:
2073
      test = (constants.NV_VGLIST not in nresult or
2074
              vg_name not in nresult[constants.NV_VGLIST])
2075
      _ErrorIf(test, self.ENODELVM, node,
2076
               "node didn't return data for the volume group '%s'"
2077
               " - it is either missing or broken", vg_name)
2078
      if not test:
2079
        try:
2080
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2081
        except (ValueError, TypeError):
2082
          _ErrorIf(True, self.ENODERPC, node,
2083
                   "node returned invalid LVM info, check LVM status")
2084

    
2085
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2086
    """Gets per-disk status information for all instances.
2087

2088
    @type nodelist: list of strings
2089
    @param nodelist: Node names
2090
    @type node_image: dict of (name, L{objects.Node})
2091
    @param node_image: Node objects
2092
    @type instanceinfo: dict of (name, L{objects.Instance})
2093
    @param instanceinfo: Instance objects
2094
    @rtype: {instance: {node: [(succes, payload)]}}
2095
    @return: a dictionary of per-instance dictionaries with nodes as
2096
        keys and disk information as values; the disk information is a
2097
        list of tuples (success, payload)
2098

2099
    """
2100
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2101

    
2102
    node_disks = {}
2103
    node_disks_devonly = {}
2104
    diskless_instances = set()
2105
    diskless = constants.DT_DISKLESS
2106

    
2107
    for nname in nodelist:
2108
      node_instances = list(itertools.chain(node_image[nname].pinst,
2109
                                            node_image[nname].sinst))
2110
      diskless_instances.update(inst for inst in node_instances
2111
                                if instanceinfo[inst].disk_template == diskless)
2112
      disks = [(inst, disk)
2113
               for inst in node_instances
2114
               for disk in instanceinfo[inst].disks]
2115

    
2116
      if not disks:
2117
        # No need to collect data
2118
        continue
2119

    
2120
      node_disks[nname] = disks
2121

    
2122
      # Creating copies as SetDiskID below will modify the objects and that can
2123
      # lead to incorrect data returned from nodes
2124
      devonly = [dev.Copy() for (_, dev) in disks]
2125

    
2126
      for dev in devonly:
2127
        self.cfg.SetDiskID(dev, nname)
2128

    
2129
      node_disks_devonly[nname] = devonly
2130

    
2131
    assert len(node_disks) == len(node_disks_devonly)
2132

    
2133
    # Collect data from all nodes with disks
2134
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2135
                                                          node_disks_devonly)
2136

    
2137
    assert len(result) == len(node_disks)
2138

    
2139
    instdisk = {}
2140

    
2141
    for (nname, nres) in result.items():
2142
      disks = node_disks[nname]
2143

    
2144
      if nres.offline:
2145
        # No data from this node
2146
        data = len(disks) * [(False, "node offline")]
2147
      else:
2148
        msg = nres.fail_msg
2149
        _ErrorIf(msg, self.ENODERPC, nname,
2150
                 "while getting disk information: %s", msg)
2151
        if msg:
2152
          # No data from this node
2153
          data = len(disks) * [(False, msg)]
2154
        else:
2155
          data = []
2156
          for idx, i in enumerate(nres.payload):
2157
            if isinstance(i, (tuple, list)) and len(i) == 2:
2158
              data.append(i)
2159
            else:
2160
              logging.warning("Invalid result from node %s, entry %d: %s",
2161
                              nname, idx, i)
2162
              data.append((False, "Invalid result from the remote node"))
2163

    
2164
      for ((inst, _), status) in zip(disks, data):
2165
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2166

    
2167
    # Add empty entries for diskless instances.
2168
    for inst in diskless_instances:
2169
      assert inst not in instdisk
2170
      instdisk[inst] = {}
2171

    
2172
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2173
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2174
                      compat.all(isinstance(s, (tuple, list)) and
2175
                                 len(s) == 2 for s in statuses)
2176
                      for inst, nnames in instdisk.items()
2177
                      for nname, statuses in nnames.items())
2178
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2179

    
2180
    return instdisk
2181

    
2182
  def _VerifyHVP(self, hvp_data):
2183
    """Verifies locally the syntax of the hypervisor parameters.
2184

2185
    """
2186
    for item, hv_name, hv_params in hvp_data:
2187
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
2188
             (item, hv_name))
2189
      try:
2190
        hv_class = hypervisor.GetHypervisor(hv_name)
2191
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2192
        hv_class.CheckParameterSyntax(hv_params)
2193
      except errors.GenericError, err:
2194
        self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
2195

    
2196
  def BuildHooksEnv(self):
2197
    """Build hooks env.
2198

2199
    Cluster-Verify hooks just ran in the post phase and their failure makes
2200
    the output be logged in the verify output and the verification to fail.
2201

2202
    """
2203
    cfg = self.cfg
2204

    
2205
    env = {
2206
      "CLUSTER_TAGS": " ".join(cfg.GetClusterInfo().GetTags())
2207
      }
2208

    
2209
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2210
               for node in cfg.GetAllNodesInfo().values())
2211

    
2212
    return env
2213

    
2214
  def BuildHooksNodes(self):
2215
    """Build hooks nodes.
2216

2217
    """
2218
    return ([], self.cfg.GetNodeList())
2219

    
2220
  def Exec(self, feedback_fn):
2221
    """Verify integrity of cluster, performing various test on nodes.
2222

2223
    """
2224
    # This method has too many local variables. pylint: disable-msg=R0914
2225
    self.bad = False
2226
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2227
    verbose = self.op.verbose
2228
    self._feedback_fn = feedback_fn
2229
    feedback_fn("* Verifying global settings")
2230
    for msg in self.cfg.VerifyConfig():
2231
      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
2232

    
2233
    # Check the cluster certificates
2234
    for cert_filename in constants.ALL_CERT_FILES:
2235
      (errcode, msg) = _VerifyCertificate(cert_filename)
2236
      _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
2237

    
2238
    vg_name = self.cfg.GetVGName()
2239
    drbd_helper = self.cfg.GetDRBDHelper()
2240
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2241
    cluster = self.cfg.GetClusterInfo()
2242
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
2243
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
2244
    nodeinfo_byname = dict(zip(nodelist, nodeinfo))
2245
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
2246
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
2247
                        for iname in instancelist)
2248
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2249
    i_non_redundant = [] # Non redundant instances
2250
    i_non_a_balanced = [] # Non auto-balanced instances
2251
    n_offline = 0 # Count of offline nodes
2252
    n_drained = 0 # Count of nodes being drained
2253
    node_vol_should = {}
2254

    
2255
    # FIXME: verify OS list
2256

    
2257
    # File verification
2258
    filemap = _ComputeAncillaryFiles(cluster, False)
2259

    
2260
    # do local checksums
2261
    master_node = self.master_node = self.cfg.GetMasterNode()
2262
    master_ip = self.cfg.GetMasterIP()
2263

    
2264
    # Compute the set of hypervisor parameters
2265
    hvp_data = []
2266
    for hv_name in hypervisors:
2267
      hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
2268
    for os_name, os_hvp in cluster.os_hvp.items():
2269
      for hv_name, hv_params in os_hvp.items():
2270
        if not hv_params:
2271
          continue
2272
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
2273
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
2274
    # TODO: collapse identical parameter values in a single one
2275
    for instance in instanceinfo.values():
2276
      if not instance.hvparams:
2277
        continue
2278
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
2279
                       cluster.FillHV(instance)))
2280
    # and verify them locally
2281
    self._VerifyHVP(hvp_data)
2282

    
2283
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
2284
    node_verify_param = {
2285
      constants.NV_FILELIST:
2286
        utils.UniqueSequence(filename
2287
                             for files in filemap
2288
                             for filename in files),
2289
      constants.NV_NODELIST: [node.name for node in nodeinfo
2290
                              if not node.offline],
2291
      constants.NV_HYPERVISOR: hypervisors,
2292
      constants.NV_HVPARAMS: hvp_data,
2293
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
2294
                                  node.secondary_ip) for node in nodeinfo
2295
                                 if not node.offline],
2296
      constants.NV_INSTANCELIST: hypervisors,
2297
      constants.NV_VERSION: None,
2298
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2299
      constants.NV_NODESETUP: None,
2300
      constants.NV_TIME: None,
2301
      constants.NV_MASTERIP: (master_node, master_ip),
2302
      constants.NV_OSLIST: None,
2303
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2304
      }
2305

    
2306
    if vg_name is not None:
2307
      node_verify_param[constants.NV_VGLIST] = None
2308
      node_verify_param[constants.NV_LVLIST] = vg_name
2309
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2310
      node_verify_param[constants.NV_DRBDLIST] = None
2311

    
2312
    if drbd_helper:
2313
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2314

    
2315
    # Build our expected cluster state
2316
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2317
                                                 name=node.name,
2318
                                                 vm_capable=node.vm_capable))
2319
                      for node in nodeinfo)
2320

    
2321
    # Gather OOB paths
2322
    oob_paths = []
2323
    for node in nodeinfo:
2324
      path = _SupportsOob(self.cfg, node)
2325
      if path and path not in oob_paths:
2326
        oob_paths.append(path)
2327

    
2328
    if oob_paths:
2329
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2330

    
2331
    for instance in instancelist:
2332
      inst_config = instanceinfo[instance]
2333

    
2334
      for nname in inst_config.all_nodes:
2335
        if nname not in node_image:
2336
          # ghost node
2337
          gnode = self.NodeImage(name=nname)
2338
          gnode.ghost = True
2339
          node_image[nname] = gnode
2340

    
2341
      inst_config.MapLVsByNode(node_vol_should)
2342

    
2343
      pnode = inst_config.primary_node
2344
      node_image[pnode].pinst.append(instance)
2345

    
2346
      for snode in inst_config.secondary_nodes:
2347
        nimg = node_image[snode]
2348
        nimg.sinst.append(instance)
2349
        if pnode not in nimg.sbp:
2350
          nimg.sbp[pnode] = []
2351
        nimg.sbp[pnode].append(instance)
2352

    
2353
    # At this point, we have the in-memory data structures complete,
2354
    # except for the runtime information, which we'll gather next
2355

    
2356
    # Due to the way our RPC system works, exact response times cannot be
2357
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2358
    # time before and after executing the request, we can at least have a time
2359
    # window.
2360
    nvinfo_starttime = time.time()
2361
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
2362
                                           self.cfg.GetClusterName())
2363
    nvinfo_endtime = time.time()
2364

    
2365
    all_drbd_map = self.cfg.ComputeDRBDMap()
2366

    
2367
    feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist))
2368
    instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo)
2369

    
2370
    feedback_fn("* Verifying configuration file consistency")
2371
    self._VerifyFiles(_ErrorIf, nodeinfo, master_node, all_nvinfo, filemap)
2372

    
2373
    feedback_fn("* Verifying node status")
2374

    
2375
    refos_img = None
2376

    
2377
    for node_i in nodeinfo:
2378
      node = node_i.name
2379
      nimg = node_image[node]
2380

    
2381
      if node_i.offline:
2382
        if verbose:
2383
          feedback_fn("* Skipping offline node %s" % (node,))
2384
        n_offline += 1
2385
        continue
2386

    
2387
      if node == master_node:
2388
        ntype = "master"
2389
      elif node_i.master_candidate:
2390
        ntype = "master candidate"
2391
      elif node_i.drained:
2392
        ntype = "drained"
2393
        n_drained += 1
2394
      else:
2395
        ntype = "regular"
2396
      if verbose:
2397
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2398

    
2399
      msg = all_nvinfo[node].fail_msg
2400
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
2401
      if msg:
2402
        nimg.rpc_fail = True
2403
        continue
2404

    
2405
      nresult = all_nvinfo[node].payload
2406

    
2407
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2408
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2409
      self._VerifyNodeNetwork(node_i, nresult)
2410
      self._VerifyOob(node_i, nresult)
2411

    
2412
      if nimg.vm_capable:
2413
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2414
        self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper,
2415
                             all_drbd_map)
2416

    
2417
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2418
        self._UpdateNodeInstances(node_i, nresult, nimg)
2419
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2420
        self._UpdateNodeOS(node_i, nresult, nimg)
2421
        if not nimg.os_fail:
2422
          if refos_img is None:
2423
            refos_img = nimg
2424
          self._VerifyNodeOS(node_i, nimg, refos_img)
2425

    
2426
    feedback_fn("* Verifying instance status")
2427
    for instance in instancelist:
2428
      if verbose:
2429
        feedback_fn("* Verifying instance %s" % instance)
2430
      inst_config = instanceinfo[instance]
2431
      self._VerifyInstance(instance, inst_config, node_image,
2432
                           instdisk[instance])
2433
      inst_nodes_offline = []
2434

    
2435
      pnode = inst_config.primary_node
2436
      pnode_img = node_image[pnode]
2437
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2438
               self.ENODERPC, pnode, "instance %s, connection to"
2439
               " primary node failed", instance)
2440

    
2441
      _ErrorIf(inst_config.admin_up and pnode_img.offline,
2442
               self.EINSTANCEBADNODE, instance,
2443
               "instance is marked as running and lives on offline node %s",
2444
               inst_config.primary_node)
2445

    
2446
      # If the instance is non-redundant we cannot survive losing its primary
2447
      # node, so we are not N+1 compliant. On the other hand we have no disk
2448
      # templates with more than one secondary so that situation is not well
2449
      # supported either.
2450
      # FIXME: does not support file-backed instances
2451
      if not inst_config.secondary_nodes:
2452
        i_non_redundant.append(instance)
2453

    
2454
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
2455
               instance, "instance has multiple secondary nodes: %s",
2456
               utils.CommaJoin(inst_config.secondary_nodes),
2457
               code=self.ETYPE_WARNING)
2458

    
2459
      if inst_config.disk_template in constants.DTS_INT_MIRROR:
2460
        pnode = inst_config.primary_node
2461
        instance_nodes = utils.NiceSort(inst_config.all_nodes)
2462
        instance_groups = {}
2463

    
2464
        for node in instance_nodes:
2465
          instance_groups.setdefault(nodeinfo_byname[node].group,
2466
                                     []).append(node)
2467

    
2468
        pretty_list = [
2469
          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
2470
          # Sort so that we always list the primary node first.
2471
          for group, nodes in sorted(instance_groups.items(),
2472
                                     key=lambda (_, nodes): pnode in nodes,
2473
                                     reverse=True)]
2474

    
2475
        self._ErrorIf(len(instance_groups) > 1, self.EINSTANCESPLITGROUPS,
2476
                      instance, "instance has primary and secondary nodes in"
2477
                      " different groups: %s", utils.CommaJoin(pretty_list),
2478
                      code=self.ETYPE_WARNING)
2479

    
2480
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2481
        i_non_a_balanced.append(instance)
2482

    
2483
      for snode in inst_config.secondary_nodes:
2484
        s_img = node_image[snode]
2485
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
2486
                 "instance %s, connection to secondary node failed", instance)
2487

    
2488
        if s_img.offline:
2489
          inst_nodes_offline.append(snode)
2490

    
2491
      # warn that the instance lives on offline nodes
2492
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
2493
               "instance has offline secondary node(s) %s",
2494
               utils.CommaJoin(inst_nodes_offline))
2495
      # ... or ghost/non-vm_capable nodes
2496
      for node in inst_config.all_nodes:
2497
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
2498
                 "instance lives on ghost node %s", node)
2499
        _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE,
2500
                 instance, "instance lives on non-vm_capable node %s", node)
2501

    
2502
    feedback_fn("* Verifying orphan volumes")
2503
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2504
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2505

    
2506
    feedback_fn("* Verifying orphan instances")
2507
    self._VerifyOrphanInstances(instancelist, node_image)
2508

    
2509
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2510
      feedback_fn("* Verifying N+1 Memory redundancy")
2511
      self._VerifyNPlusOneMemory(node_image, instanceinfo)
2512

    
2513
    feedback_fn("* Other Notes")
2514
    if i_non_redundant:
2515
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2516
                  % len(i_non_redundant))
2517

    
2518
    if i_non_a_balanced:
2519
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2520
                  % len(i_non_a_balanced))
2521

    
2522
    if n_offline:
2523
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2524

    
2525
    if n_drained:
2526
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2527

    
2528
    return not self.bad
2529

    
2530
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2531
    """Analyze the post-hooks' result
2532

2533
    This method analyses the hook result, handles it, and sends some
2534
    nicely-formatted feedback back to the user.
2535

2536
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2537
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2538
    @param hooks_results: the results of the multi-node hooks rpc call
2539
    @param feedback_fn: function used send feedback back to the caller
2540
    @param lu_result: previous Exec result
2541
    @return: the new Exec result, based on the previous result
2542
        and hook results
2543

2544
    """
2545
    # We only really run POST phase hooks, and are only interested in
2546
    # their results
2547
    if phase == constants.HOOKS_PHASE_POST:
2548
      # Used to change hooks' output to proper indentation
2549
      feedback_fn("* Hooks Results")
2550
      assert hooks_results, "invalid result from hooks"
2551

    
2552
      for node_name in hooks_results:
2553
        res = hooks_results[node_name]
2554
        msg = res.fail_msg
2555
        test = msg and not res.offline
2556
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
2557
                      "Communication failure in hooks execution: %s", msg)
2558
        if res.offline or msg:
2559
          # No need to investigate payload if node is offline or gave an error.
2560
          # override manually lu_result here as _ErrorIf only
2561
          # overrides self.bad
2562
          lu_result = 1
2563
          continue
2564
        for script, hkr, output in res.payload:
2565
          test = hkr == constants.HKR_FAIL
2566
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
2567
                        "Script %s failed, output:", script)
2568
          if test:
2569
            output = self._HOOKS_INDENT_RE.sub('      ', output)
2570
            feedback_fn("%s" % output)
2571
            lu_result = 0
2572

    
2573
      return lu_result
2574

    
2575

    
2576
class LUClusterVerifyDisks(NoHooksLU):
2577
  """Verifies the cluster disks status.
2578

2579
  """
2580
  REQ_BGL = False
2581

    
2582
  def ExpandNames(self):
2583
    self.needed_locks = {
2584
      locking.LEVEL_NODE: locking.ALL_SET,
2585
      locking.LEVEL_INSTANCE: locking.ALL_SET,
2586
    }
2587
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
2588

    
2589
  def Exec(self, feedback_fn):
2590
    """Verify integrity of cluster disks.
2591

2592
    @rtype: tuple of three items
2593
    @return: a tuple of (dict of node-to-node_error, list of instances
2594
        which need activate-disks, dict of instance: (node, volume) for
2595
        missing volumes
2596

2597
    """
2598
    result = res_nodes, res_instances, res_missing = {}, [], {}
2599

    
2600
    nodes = utils.NiceSort(self.cfg.GetVmCapableNodeList())
2601
    instances = self.cfg.GetAllInstancesInfo().values()
2602

    
2603
    nv_dict = {}
2604
    for inst in instances:
2605
      inst_lvs = {}
2606
      if not inst.admin_up:
2607
        continue
2608
      inst.MapLVsByNode(inst_lvs)
2609
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
2610
      for node, vol_list in inst_lvs.iteritems():
2611
        for vol in vol_list:
2612
          nv_dict[(node, vol)] = inst
2613

    
2614
    if not nv_dict:
2615
      return result
2616

    
2617
    node_lvs = self.rpc.call_lv_list(nodes, [])
2618
    for node, node_res in node_lvs.items():
2619
      if node_res.offline:
2620
        continue
2621
      msg = node_res.fail_msg
2622
      if msg:
2623
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
2624
        res_nodes[node] = msg
2625
        continue
2626

    
2627
      lvs = node_res.payload
2628
      for lv_name, (_, _, lv_online) in lvs.items():
2629
        inst = nv_dict.pop((node, lv_name), None)
2630
        if (not lv_online and inst is not None
2631
            and inst.name not in res_instances):
2632
          res_instances.append(inst.name)
2633

    
2634
    # any leftover items in nv_dict are missing LVs, let's arrange the
2635
    # data better
2636
    for key, inst in nv_dict.iteritems():
2637
      if inst.name not in res_missing:
2638
        res_missing[inst.name] = []
2639
      res_missing[inst.name].append(key)
2640

    
2641
    return result
2642

    
2643

    
2644
class LUClusterRepairDiskSizes(NoHooksLU):
2645
  """Verifies the cluster disks sizes.
2646

2647
  """
2648
  REQ_BGL = False
2649

    
2650
  def ExpandNames(self):
2651
    if self.op.instances:
2652
      self.wanted_names = _GetWantedInstances(self, self.op.instances)
2653
      self.needed_locks = {
2654
        locking.LEVEL_NODE: [],
2655
        locking.LEVEL_INSTANCE: self.wanted_names,
2656
        }
2657
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2658
    else:
2659
      self.wanted_names = None
2660
      self.needed_locks = {
2661
        locking.LEVEL_NODE: locking.ALL_SET,
2662
        locking.LEVEL_INSTANCE: locking.ALL_SET,
2663
        }
2664
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
2665

    
2666
  def DeclareLocks(self, level):
2667
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
2668
      self._LockInstancesNodes(primary_only=True)
2669

    
2670
  def CheckPrereq(self):
2671
    """Check prerequisites.
2672

2673
    This only checks the optional instance list against the existing names.
2674

2675
    """
2676
    if self.wanted_names is None:
2677
      self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
2678

    
2679
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
2680
                             in self.wanted_names]
2681

    
2682
  def _EnsureChildSizes(self, disk):
2683
    """Ensure children of the disk have the needed disk size.
2684

2685
    This is valid mainly for DRBD8 and fixes an issue where the
2686
    children have smaller disk size.
2687

2688
    @param disk: an L{ganeti.objects.Disk} object
2689

2690
    """
2691
    if disk.dev_type == constants.LD_DRBD8:
2692
      assert disk.children, "Empty children for DRBD8?"
2693
      fchild = disk.children[0]
2694
      mismatch = fchild.size < disk.size
2695
      if mismatch:
2696
        self.LogInfo("Child disk has size %d, parent %d, fixing",
2697
                     fchild.size, disk.size)
2698
        fchild.size = disk.size
2699

    
2700
      # and we recurse on this child only, not on the metadev
2701
      return self._EnsureChildSizes(fchild) or mismatch
2702
    else:
2703
      return False
2704

    
2705
  def Exec(self, feedback_fn):
2706
    """Verify the size of cluster disks.
2707

2708
    """
2709
    # TODO: check child disks too
2710
    # TODO: check differences in size between primary/secondary nodes
2711
    per_node_disks = {}
2712
    for instance in self.wanted_instances:
2713
      pnode = instance.primary_node
2714
      if pnode not in per_node_disks:
2715
        per_node_disks[pnode] = []
2716
      for idx, disk in enumerate(instance.disks):
2717
        per_node_disks[pnode].append((instance, idx, disk))
2718

    
2719
    changed = []
2720
    for node, dskl in per_node_disks.items():
2721
      newl = [v[2].Copy() for v in dskl]
2722
      for dsk in newl:
2723
        self.cfg.SetDiskID(dsk, node)
2724
      result = self.rpc.call_blockdev_getsize(node, newl)
2725
      if result.fail_msg:
2726
        self.LogWarning("Failure in blockdev_getsize call to node"
2727
                        " %s, ignoring", node)
2728
        continue
2729
      if len(result.payload) != len(dskl):
2730
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
2731
                        " result.payload=%s", node, len(dskl), result.payload)
2732
        self.LogWarning("Invalid result from node %s, ignoring node results",
2733
                        node)
2734
        continue
2735
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
2736
        if size is None:
2737
          self.LogWarning("Disk %d of instance %s did not return size"
2738
                          " information, ignoring", idx, instance.name)
2739
          continue
2740
        if not isinstance(size, (int, long)):
2741
          self.LogWarning("Disk %d of instance %s did not return valid"
2742
                          " size information, ignoring", idx, instance.name)
2743
          continue
2744
        size = size >> 20
2745
        if size != disk.size:
2746
          self.LogInfo("Disk %d of instance %s has mismatched size,"
2747
                       " correcting: recorded %d, actual %d", idx,
2748
                       instance.name, disk.size, size)
2749
          disk.size = size
2750
          self.cfg.Update(instance, feedback_fn)
2751
          changed.append((instance.name, idx, size))
2752
        if self._EnsureChildSizes(disk):
2753
          self.cfg.Update(instance, feedback_fn)
2754
          changed.append((instance.name, idx, disk.size))
2755
    return changed
2756

    
2757

    
2758
class LUClusterRename(LogicalUnit):
2759
  """Rename the cluster.
2760

2761
  """
2762
  HPATH = "cluster-rename"
2763
  HTYPE = constants.HTYPE_CLUSTER
2764

    
2765
  def BuildHooksEnv(self):
2766
    """Build hooks env.
2767

2768
    """
2769
    return {
2770
      "OP_TARGET": self.cfg.GetClusterName(),
2771
      "NEW_NAME": self.op.name,
2772
      }
2773

    
2774
  def BuildHooksNodes(self):
2775
    """Build hooks nodes.
2776

2777
    """
2778
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
2779

    
2780
  def CheckPrereq(self):
2781
    """Verify that the passed name is a valid one.
2782

2783
    """
2784
    hostname = netutils.GetHostname(name=self.op.name,
2785
                                    family=self.cfg.GetPrimaryIPFamily())
2786

    
2787
    new_name = hostname.name
2788
    self.ip = new_ip = hostname.ip
2789
    old_name = self.cfg.GetClusterName()
2790
    old_ip = self.cfg.GetMasterIP()
2791
    if new_name == old_name and new_ip == old_ip:
2792
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
2793
                                 " cluster has changed",
2794
                                 errors.ECODE_INVAL)
2795
    if new_ip != old_ip:
2796
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
2797
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
2798
                                   " reachable on the network" %
2799
                                   new_ip, errors.ECODE_NOTUNIQUE)
2800

    
2801
    self.op.name = new_name
2802

    
2803
  def Exec(self, feedback_fn):
2804
    """Rename the cluster.
2805

2806
    """
2807
    clustername = self.op.name
2808
    ip = self.ip
2809

    
2810
    # shutdown the master IP
2811
    master = self.cfg.GetMasterNode()
2812
    result = self.rpc.call_node_stop_master(master, False)
2813
    result.Raise("Could not disable the master role")
2814

    
2815
    try:
2816
      cluster = self.cfg.GetClusterInfo()
2817
      cluster.cluster_name = clustername
2818
      cluster.master_ip = ip
2819
      self.cfg.Update(cluster, feedback_fn)
2820

    
2821
      # update the known hosts file
2822
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
2823
      node_list = self.cfg.GetOnlineNodeList()
2824
      try:
2825
        node_list.remove(master)
2826
      except ValueError:
2827
        pass
2828
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
2829
    finally:
2830
      result = self.rpc.call_node_start_master(master, False, False)
2831
      msg = result.fail_msg
2832
      if msg:
2833
        self.LogWarning("Could not re-enable the master role on"
2834
                        " the master, please restart manually: %s", msg)
2835

    
2836
    return clustername
2837

    
2838

    
2839
class LUClusterSetParams(LogicalUnit):
2840
  """Change the parameters of the cluster.
2841

2842
  """
2843
  HPATH = "cluster-modify"
2844
  HTYPE = constants.HTYPE_CLUSTER
2845
  REQ_BGL = False
2846

    
2847
  def CheckArguments(self):
2848
    """Check parameters
2849

2850
    """
2851
    if self.op.uid_pool:
2852
      uidpool.CheckUidPool(self.op.uid_pool)
2853

    
2854
    if self.op.add_uids:
2855
      uidpool.CheckUidPool(self.op.add_uids)
2856

    
2857
    if self.op.remove_uids:
2858
      uidpool.CheckUidPool(self.op.remove_uids)
2859

    
2860
  def ExpandNames(self):
2861
    # FIXME: in the future maybe other cluster params won't require checking on
2862
    # all nodes to be modified.
2863
    self.needed_locks = {
2864
      locking.LEVEL_NODE: locking.ALL_SET,
2865
    }
2866
    self.share_locks[locking.LEVEL_NODE] = 1
2867

    
2868
  def BuildHooksEnv(self):
2869
    """Build hooks env.
2870

2871
    """
2872
    return {
2873
      "OP_TARGET": self.cfg.GetClusterName(),
2874
      "NEW_VG_NAME": self.op.vg_name,
2875
      }
2876

    
2877
  def BuildHooksNodes(self):
2878
    """Build hooks nodes.
2879

2880
    """
2881
    mn = self.cfg.GetMasterNode()
2882
    return ([mn], [mn])
2883

    
2884
  def CheckPrereq(self):
2885
    """Check prerequisites.
2886

2887
    This checks whether the given params don't conflict and
2888
    if the given volume group is valid.
2889

2890
    """
2891
    if self.op.vg_name is not None and not self.op.vg_name:
2892
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
2893
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
2894
                                   " instances exist", errors.ECODE_INVAL)
2895

    
2896
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
2897
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
2898
        raise errors.OpPrereqError("Cannot disable drbd helper while"
2899
                                   " drbd-based instances exist",
2900
                                   errors.ECODE_INVAL)
2901

    
2902
    node_list = self.glm.list_owned(locking.LEVEL_NODE)
2903

    
2904
    # if vg_name not None, checks given volume group on all nodes
2905
    if self.op.vg_name:
2906
      vglist = self.rpc.call_vg_list(node_list)
2907
      for node in node_list:
2908
        msg = vglist[node].fail_msg
2909
        if msg:
2910
          # ignoring down node
2911
          self.LogWarning("Error while gathering data on node %s"
2912
                          " (ignoring node): %s", node, msg)
2913
          continue
2914
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2915
                                              self.op.vg_name,
2916
                                              constants.MIN_VG_SIZE)
2917
        if vgstatus:
2918
          raise errors.OpPrereqError("Error on node '%s': %s" %
2919
                                     (node, vgstatus), errors.ECODE_ENVIRON)
2920

    
2921
    if self.op.drbd_helper:
2922
      # checks given drbd helper on all nodes
2923
      helpers = self.rpc.call_drbd_helper(node_list)
2924
      for node in node_list:
2925
        ninfo = self.cfg.GetNodeInfo(node)
2926
        if ninfo.offline:
2927
          self.LogInfo("Not checking drbd helper on offline node %s", node)
2928
          continue
2929
        msg = helpers[node].fail_msg
2930
        if msg:
2931
          raise errors.OpPrereqError("Error checking drbd helper on node"
2932
                                     " '%s': %s" % (node, msg),
2933
                                     errors.ECODE_ENVIRON)
2934
        node_helper = helpers[node].payload
2935
        if node_helper != self.op.drbd_helper:
2936
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
2937
                                     (node, node_helper), errors.ECODE_ENVIRON)
2938

    
2939
    self.cluster = cluster = self.cfg.GetClusterInfo()
2940
    # validate params changes
2941
    if self.op.beparams:
2942
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2943
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
2944

    
2945
    if self.op.ndparams:
2946
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
2947
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
2948

    
2949
      # TODO: we need a more general way to handle resetting
2950
      # cluster-level parameters to default values
2951
      if self.new_ndparams["oob_program"] == "":
2952
        self.new_ndparams["oob_program"] = \
2953
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
2954

    
2955
    if self.op.nicparams:
2956
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2957
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
2958
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
2959
      nic_errors = []
2960

    
2961
      # check all instances for consistency
2962
      for instance in self.cfg.GetAllInstancesInfo().values():
2963
        for nic_idx, nic in enumerate(instance.nics):
2964
          params_copy = copy.deepcopy(nic.nicparams)
2965
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
2966

    
2967
          # check parameter syntax
2968
          try:
2969
            objects.NIC.CheckParameterSyntax(params_filled)
2970
          except errors.ConfigurationError, err:
2971
            nic_errors.append("Instance %s, nic/%d: %s" %
2972
                              (instance.name, nic_idx, err))
2973

    
2974
          # if we're moving instances to routed, check that they have an ip
2975
          target_mode = params_filled[constants.NIC_MODE]
2976
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2977
            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2978
                              (instance.name, nic_idx))
2979
      if nic_errors:
2980
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2981
                                   "\n".join(nic_errors))
2982

    
2983
    # hypervisor list/parameters
2984
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
2985
    if self.op.hvparams:
2986
      for hv_name, hv_dict in self.op.hvparams.items():
2987
        if hv_name not in self.new_hvparams:
2988
          self.new_hvparams[hv_name] = hv_dict
2989
        else:
2990
          self.new_hvparams[hv_name].update(hv_dict)
2991

    
2992
    # os hypervisor parameters
2993
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
2994
    if self.op.os_hvp:
2995
      for os_name, hvs in self.op.os_hvp.items():
2996
        if os_name not in self.new_os_hvp:
2997
          self.new_os_hvp[os_name] = hvs
2998
        else:
2999
          for hv_name, hv_dict in hvs.items():
3000
            if hv_name not in self.new_os_hvp[os_name]:
3001
              self.new_os_hvp[os_name][hv_name] = hv_dict
3002
            else:
3003
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
3004

    
3005
    # os parameters
3006
    self.new_osp = objects.FillDict(cluster.osparams, {})
3007
    if self.op.osparams:
3008
      for os_name, osp in self.op.osparams.items():
3009
        if os_name not in self.new_osp:
3010
          self.new_osp[os_name] = {}
3011

    
3012
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
3013
                                                  use_none=True)
3014

    
3015
        if not self.new_osp[os_name]:
3016
          # we removed all parameters
3017
          del self.new_osp[os_name]
3018
        else:
3019
          # check the parameter validity (remote check)
3020
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
3021
                         os_name, self.new_osp[os_name])
3022

    
3023
    # changes to the hypervisor list
3024
    if self.op.enabled_hypervisors is not None:
3025
      self.hv_list = self.op.enabled_hypervisors
3026
      for hv in self.hv_list:
3027
        # if the hypervisor doesn't already exist in the cluster
3028
        # hvparams, we initialize it to empty, and then (in both
3029
        # cases) we make sure to fill the defaults, as we might not
3030
        # have a complete defaults list if the hypervisor wasn't
3031
        # enabled before
3032
        if hv not in new_hvp:
3033
          new_hvp[hv] = {}
3034
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
3035
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
3036
    else:
3037
      self.hv_list = cluster.enabled_hypervisors
3038

    
3039
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
3040
      # either the enabled list has changed, or the parameters have, validate
3041
      for hv_name, hv_params in self.new_hvparams.items():
3042
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
3043
            (self.op.enabled_hypervisors and
3044
             hv_name in self.op.enabled_hypervisors)):
3045
          # either this is a new hypervisor, or its parameters have changed
3046
          hv_class = hypervisor.GetHypervisor(hv_name)
3047
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3048
          hv_class.CheckParameterSyntax(hv_params)
3049
          _CheckHVParams(self, node_list, hv_name, hv_params)
3050

    
3051
    if self.op.os_hvp:
3052
      # no need to check any newly-enabled hypervisors, since the
3053
      # defaults have already been checked in the above code-block
3054
      for os_name, os_hvp in self.new_os_hvp.items():
3055
        for hv_name, hv_params in os_hvp.items():
3056
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3057
          # we need to fill in the new os_hvp on top of the actual hv_p
3058
          cluster_defaults = self.new_hvparams.get(hv_name, {})
3059
          new_osp = objects.FillDict(cluster_defaults, hv_params)
3060
          hv_class = hypervisor.GetHypervisor(hv_name)
3061
          hv_class.CheckParameterSyntax(new_osp)
3062
          _CheckHVParams(self, node_list, hv_name, new_osp)
3063

    
3064
    if self.op.default_iallocator:
3065
      alloc_script = utils.FindFile(self.op.default_iallocator,
3066
                                    constants.IALLOCATOR_SEARCH_PATH,
3067
                                    os.path.isfile)
3068
      if alloc_script is None:
3069
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
3070
                                   " specified" % self.op.default_iallocator,
3071
                                   errors.ECODE_INVAL)
3072

    
3073
  def Exec(self, feedback_fn):
3074
    """Change the parameters of the cluster.
3075

3076
    """
3077
    if self.op.vg_name is not None:
3078
      new_volume = self.op.vg_name
3079
      if not new_volume:
3080
        new_volume = None
3081
      if new_volume != self.cfg.GetVGName():
3082
        self.cfg.SetVGName(new_volume)
3083
      else:
3084
        feedback_fn("Cluster LVM configuration already in desired"
3085
                    " state, not changing")
3086
    if self.op.drbd_helper is not None:
3087
      new_helper = self.op.drbd_helper
3088
      if not new_helper:
3089
        new_helper = None
3090
      if new_helper != self.cfg.GetDRBDHelper():
3091
        self.cfg.SetDRBDHelper(new_helper)
3092
      else:
3093
        feedback_fn("Cluster DRBD helper already in desired state,"
3094
                    " not changing")
3095
    if self.op.hvparams:
3096
      self.cluster.hvparams = self.new_hvparams
3097
    if self.op.os_hvp:
3098
      self.cluster.os_hvp = self.new_os_hvp
3099
    if self.op.enabled_hypervisors is not None:
3100
      self.cluster.hvparams = self.new_hvparams
3101
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
3102
    if self.op.beparams:
3103
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
3104
    if self.op.nicparams:
3105
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
3106
    if self.op.osparams:
3107
      self.cluster.osparams = self.new_osp
3108
    if self.op.ndparams:
3109
      self.cluster.ndparams = self.new_ndparams
3110

    
3111
    if self.op.candidate_pool_size is not None:
3112
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3113
      # we need to update the pool size here, otherwise the save will fail
3114
      _AdjustCandidatePool(self, [])
3115

    
3116
    if self.op.maintain_node_health is not None:
3117
      self.cluster.maintain_node_health = self.op.maintain_node_health
3118

    
3119
    if self.op.prealloc_wipe_disks is not None:
3120
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3121

    
3122
    if self.op.add_uids is not None:
3123
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3124

    
3125
    if self.op.remove_uids is not None:
3126
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3127

    
3128
    if self.op.uid_pool is not None:
3129
      self.cluster.uid_pool = self.op.uid_pool
3130

    
3131
    if self.op.default_iallocator is not None:
3132
      self.cluster.default_iallocator = self.op.default_iallocator
3133

    
3134
    if self.op.reserved_lvs is not None:
3135
      self.cluster.reserved_lvs = self.op.reserved_lvs
3136

    
3137
    def helper_os(aname, mods, desc):
3138
      desc += " OS list"
3139
      lst = getattr(self.cluster, aname)
3140
      for key, val in mods:
3141
        if key == constants.DDM_ADD:
3142
          if val in lst:
3143
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3144
          else:
3145
            lst.append(val)
3146
        elif key == constants.DDM_REMOVE:
3147
          if val in lst:
3148
            lst.remove(val)
3149
          else:
3150
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3151
        else:
3152
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3153

    
3154
    if self.op.hidden_os:
3155
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3156

    
3157
    if self.op.blacklisted_os:
3158
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3159

    
3160
    if self.op.master_netdev:
3161
      master = self.cfg.GetMasterNode()
3162
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3163
                  self.cluster.master_netdev)
3164
      result = self.rpc.call_node_stop_master(master, False)
3165
      result.Raise("Could not disable the master ip")
3166
      feedback_fn("Changing master_netdev from %s to %s" %
3167
                  (self.cluster.master_netdev, self.op.master_netdev))
3168
      self.cluster.master_netdev = self.op.master_netdev
3169

    
3170
    self.cfg.Update(self.cluster, feedback_fn)
3171

    
3172
    if self.op.master_netdev:
3173
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3174
                  self.op.master_netdev)
3175
      result = self.rpc.call_node_start_master(master, False, False)
3176
      if result.fail_msg:
3177
        self.LogWarning("Could not re-enable the master ip on"
3178
                        " the master, please restart manually: %s",
3179
                        result.fail_msg)
3180

    
3181

    
3182
def _UploadHelper(lu, nodes, fname):
3183
  """Helper for uploading a file and showing warnings.
3184

3185
  """
3186
  if os.path.exists(fname):
3187
    result = lu.rpc.call_upload_file(nodes, fname)
3188
    for to_node, to_result in result.items():
3189
      msg = to_result.fail_msg
3190
      if msg:
3191
        msg = ("Copy of file %s to node %s failed: %s" %
3192
               (fname, to_node, msg))
3193
        lu.proc.LogWarning(msg)
3194

    
3195

    
3196
def _ComputeAncillaryFiles(cluster, redist):
3197
  """Compute files external to Ganeti which need to be consistent.
3198

3199
  @type redist: boolean
3200
  @param redist: Whether to include files which need to be redistributed
3201

3202
  """
3203
  # Compute files for all nodes
3204
  files_all = set([
3205
    constants.SSH_KNOWN_HOSTS_FILE,
3206
    constants.CONFD_HMAC_KEY,
3207
    constants.CLUSTER_DOMAIN_SECRET_FILE,
3208
    ])
3209

    
3210
  if not redist:
3211
    files_all.update(constants.ALL_CERT_FILES)
3212
    files_all.update(ssconf.SimpleStore().GetFileList())
3213

    
3214
  if cluster.modify_etc_hosts:
3215
    files_all.add(constants.ETC_HOSTS)
3216

    
3217
  # Files which must either exist on all nodes or on none
3218
  files_all_opt = set([
3219
    constants.RAPI_USERS_FILE,
3220
    ])
3221

    
3222
  # Files which should only be on master candidates
3223
  files_mc = set()
3224
  if not redist:
3225
    files_mc.add(constants.CLUSTER_CONF_FILE)
3226

    
3227
  # Files which should only be on VM-capable nodes
3228
  files_vm = set(filename
3229
    for hv_name in cluster.enabled_hypervisors
3230
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles())
3231

    
3232
  # Filenames must be unique
3233
  assert (len(files_all | files_all_opt | files_mc | files_vm) ==
3234
          sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
3235
         "Found file listed in more than one file list"
3236

    
3237
  return (files_all, files_all_opt, files_mc, files_vm)
3238

    
3239

    
3240
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3241
  """Distribute additional files which are part of the cluster configuration.
3242

3243
  ConfigWriter takes care of distributing the config and ssconf files, but
3244
  there are more files which should be distributed to all nodes. This function
3245
  makes sure those are copied.
3246

3247
  @param lu: calling logical unit
3248
  @param additional_nodes: list of nodes not in the config to distribute to
3249
  @type additional_vm: boolean
3250
  @param additional_vm: whether the additional nodes are vm-capable or not
3251

3252
  """
3253
  # Gather target nodes
3254
  cluster = lu.cfg.GetClusterInfo()
3255
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3256

    
3257
  online_nodes = lu.cfg.GetOnlineNodeList()
3258
  vm_nodes = lu.cfg.GetVmCapableNodeList()
3259

    
3260
  if additional_nodes is not None:
3261
    online_nodes.extend(additional_nodes)
3262
    if additional_vm:
3263
      vm_nodes.extend(additional_nodes)
3264

    
3265
  # Never distribute to master node
3266
  for nodelist in [online_nodes, vm_nodes]:
3267
    if master_info.name in nodelist:
3268
      nodelist.remove(master_info.name)
3269

    
3270
  # Gather file lists
3271
  (files_all, files_all_opt, files_mc, files_vm) = \
3272
    _ComputeAncillaryFiles(cluster, True)
3273

    
3274
  # Never re-distribute configuration file from here
3275
  assert not (constants.CLUSTER_CONF_FILE in files_all or
3276
              constants.CLUSTER_CONF_FILE in files_vm)
3277
  assert not files_mc, "Master candidates not handled in this function"
3278

    
3279
  filemap = [
3280
    (online_nodes, files_all),
3281
    (online_nodes, files_all_opt),
3282
    (vm_nodes, files_vm),
3283
    ]
3284

    
3285
  # Upload the files
3286
  for (node_list, files) in filemap:
3287
    for fname in files:
3288
      _UploadHelper(lu, node_list, fname)
3289

    
3290

    
3291
class LUClusterRedistConf(NoHooksLU):
3292
  """Force the redistribution of cluster configuration.
3293

3294
  This is a very simple LU.
3295

3296
  """
3297
  REQ_BGL = False
3298

    
3299
  def ExpandNames(self):
3300
    self.needed_locks = {
3301
      locking.LEVEL_NODE: locking.ALL_SET,
3302
    }
3303
    self.share_locks[locking.LEVEL_NODE] = 1
3304

    
3305
  def Exec(self, feedback_fn):
3306
    """Redistribute the configuration.
3307

3308
    """
3309
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3310
    _RedistributeAncillaryFiles(self)
3311

    
3312

    
3313
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3314
  """Sleep and poll for an instance's disk to sync.
3315

3316
  """
3317
  if not instance.disks or disks is not None and not disks:
3318
    return True
3319

    
3320
  disks = _ExpandCheckDisks(instance, disks)
3321

    
3322
  if not oneshot:
3323
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3324

    
3325
  node = instance.primary_node
3326

    
3327
  for dev in disks:
3328
    lu.cfg.SetDiskID(dev, node)
3329

    
3330
  # TODO: Convert to utils.Retry
3331

    
3332
  retries = 0
3333
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3334
  while True:
3335
    max_time = 0
3336
    done = True
3337
    cumul_degraded = False
3338
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3339
    msg = rstats.fail_msg
3340
    if msg:
3341
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3342
      retries += 1
3343
      if retries >= 10:
3344
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3345
                                 " aborting." % node)
3346
      time.sleep(6)
3347
      continue
3348
    rstats = rstats.payload
3349
    retries = 0
3350
    for i, mstat in enumerate(rstats):
3351
      if mstat is None:
3352
        lu.LogWarning("Can't compute data for node %s/%s",
3353
                           node, disks[i].iv_name)
3354
        continue
3355

    
3356
      cumul_degraded = (cumul_degraded or
3357
                        (mstat.is_degraded and mstat.sync_percent is None))
3358
      if mstat.sync_percent is not None:
3359
        done = False
3360
        if mstat.estimated_time is not None:
3361
          rem_time = ("%s remaining (estimated)" %
3362
                      utils.FormatSeconds(mstat.estimated_time))
3363
          max_time = mstat.estimated_time
3364
        else:
3365
          rem_time = "no time estimate"
3366
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
3367
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
3368

    
3369
    # if we're done but degraded, let's do a few small retries, to
3370
    # make sure we see a stable and not transient situation; therefore
3371
    # we force restart of the loop
3372
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3373
      logging.info("Degraded disks found, %d retries left", degr_retries)
3374
      degr_retries -= 1
3375
      time.sleep(1)
3376
      continue
3377

    
3378
    if done or oneshot:
3379
      break
3380

    
3381
    time.sleep(min(60, max_time))
3382

    
3383
  if done:
3384
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
3385
  return not cumul_degraded
3386

    
3387

    
3388
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3389
  """Check that mirrors are not degraded.
3390

3391
  The ldisk parameter, if True, will change the test from the
3392
  is_degraded attribute (which represents overall non-ok status for
3393
  the device(s)) to the ldisk (representing the local storage status).
3394

3395
  """
3396
  lu.cfg.SetDiskID(dev, node)
3397

    
3398
  result = True
3399

    
3400
  if on_primary or dev.AssembleOnSecondary():
3401
    rstats = lu.rpc.call_blockdev_find(node, dev)
3402
    msg = rstats.fail_msg
3403
    if msg:
3404
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
3405
      result = False
3406
    elif not rstats.payload:
3407
      lu.LogWarning("Can't find disk on node %s", node)
3408
      result = False
3409
    else:
3410
      if ldisk:
3411
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
3412
      else:
3413
        result = result and not rstats.payload.is_degraded
3414

    
3415
  if dev.children:
3416
    for child in dev.children:
3417
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
3418

    
3419
  return result
3420

    
3421

    
3422
class LUOobCommand(NoHooksLU):
3423
  """Logical unit for OOB handling.
3424

3425
  """
3426
  REG_BGL = False
3427
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
3428

    
3429
  def CheckPrereq(self):
3430
    """Check prerequisites.
3431

3432
    This checks:
3433
     - the node exists in the configuration
3434
     - OOB is supported
3435

3436
    Any errors are signaled by raising errors.OpPrereqError.
3437

3438
    """
3439
    self.nodes = []
3440
    self.master_node = self.cfg.GetMasterNode()
3441

    
3442
    assert self.op.power_delay >= 0.0
3443

    
3444
    if self.op.node_names:
3445
      if (self.op.command in self._SKIP_MASTER and
3446
          self.master_node in self.op.node_names):
3447
        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
3448
        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
3449

    
3450
        if master_oob_handler:
3451
          additional_text = ("run '%s %s %s' if you want to operate on the"
3452
                             " master regardless") % (master_oob_handler,
3453
                                                      self.op.command,
3454
                                                      self.master_node)
3455
        else:
3456
          additional_text = "it does not support out-of-band operations"
3457

    
3458
        raise errors.OpPrereqError(("Operating on the master node %s is not"
3459
                                    " allowed for %s; %s") %
3460
                                   (self.master_node, self.op.command,
3461
                                    additional_text), errors.ECODE_INVAL)
3462
    else:
3463
      self.op.node_names = self.cfg.GetNodeList()
3464
      if self.op.command in self._SKIP_MASTER:
3465
        self.op.node_names.remove(self.master_node)
3466

    
3467
    if self.op.command in self._SKIP_MASTER:
3468
      assert self.master_node not in self.op.node_names
3469

    
3470
    for node_name in self.op.node_names:
3471
      node = self.cfg.GetNodeInfo(node_name)
3472

    
3473
      if node is None:
3474
        raise errors.OpPrereqError("Node %s not found" % node_name,
3475
                                   errors.ECODE_NOENT)
3476
      else:
3477
        self.nodes.append(node)
3478

    
3479
      if (not self.op.ignore_status and
3480
          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
3481
        raise errors.OpPrereqError(("Cannot power off node %s because it is"
3482
                                    " not marked offline") % node_name,
3483
                                   errors.ECODE_STATE)
3484

    
3485
  def ExpandNames(self):
3486
    """Gather locks we need.
3487

3488
    """
3489
    if self.op.node_names:
3490
      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
3491
      lock_names = self.op.node_names
3492
    else:
3493
      lock_names = locking.ALL_SET
3494

    
3495
    self.needed_locks = {
3496
      locking.LEVEL_NODE: lock_names,
3497
      }
3498

    
3499
  def Exec(self, feedback_fn):
3500
    """Execute OOB and return result if we expect any.
3501

3502
    """
3503
    master_node = self.master_node
3504
    ret = []
3505

    
3506
    for idx, node in enumerate(self.nodes):
3507
      node_entry = [(constants.RS_NORMAL, node.name)]
3508
      ret.append(node_entry)
3509

    
3510
      oob_program = _SupportsOob(self.cfg, node)
3511

    
3512
      if not oob_program:
3513
        node_entry.append((constants.RS_UNAVAIL, None))
3514
        continue
3515

    
3516
      logging.info("Executing out-of-band command '%s' using '%s' on %s",
3517
                   self.op.command, oob_program, node.name)
3518
      result = self.rpc.call_run_oob(master_node, oob_program,
3519
                                     self.op.command, node.name,
3520
                                     self.op.timeout)
3521

    
3522
      if result.fail_msg:
3523
        self.LogWarning("Out-of-band RPC failed on node '%s': %s",
3524
                        node.name, result.fail_msg)
3525
        node_entry.append((constants.RS_NODATA, None))
3526
      else:
3527
        try:
3528
          self._CheckPayload(result)
3529
        except errors.OpExecError, err:
3530
          self.LogWarning("Payload returned by node '%s' is not valid: %s",
3531
                          node.name, err)
3532
          node_entry.append((constants.RS_NODATA, None))
3533
        else:
3534
          if self.op.command == constants.OOB_HEALTH:
3535
            # For health we should log important events
3536
            for item, status in result.payload:
3537
              if status in [constants.OOB_STATUS_WARNING,
3538
                            constants.OOB_STATUS_CRITICAL]:
3539
                self.LogWarning("Item '%s' on node '%s' has status '%s'",
3540
                                item, node.name, status)
3541

    
3542
          if self.op.command == constants.OOB_POWER_ON:
3543
            node.powered = True
3544
          elif self.op.command == constants.OOB_POWER_OFF:
3545
            node.powered = False
3546
          elif self.op.command == constants.OOB_POWER_STATUS:
3547
            powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
3548
            if powered != node.powered:
3549
              logging.warning(("Recorded power state (%s) of node '%s' does not"
3550
                               " match actual power state (%s)"), node.powered,
3551
                              node.name, powered)
3552

    
3553
          # For configuration changing commands we should update the node
3554
          if self.op.command in (constants.OOB_POWER_ON,
3555
                                 constants.OOB_POWER_OFF):
3556
            self.cfg.Update(node, feedback_fn)
3557

    
3558
          node_entry.append((constants.RS_NORMAL, result.payload))
3559

    
3560
          if (self.op.command == constants.OOB_POWER_ON and
3561
              idx < len(self.nodes) - 1):
3562
            time.sleep(self.op.power_delay)
3563

    
3564
    return ret
3565

    
3566
  def _CheckPayload(self, result):
3567
    """Checks if the payload is valid.
3568

3569
    @param result: RPC result
3570
    @raises errors.OpExecError: If payload is not valid
3571

3572
    """
3573
    errs = []
3574
    if self.op.command == constants.OOB_HEALTH:
3575
      if not isinstance(result.payload, list):
3576
        errs.append("command 'health' is expected to return a list but got %s" %
3577
                    type(result.payload))
3578
      else:
3579
        for item, status in result.payload:
3580
          if status not in constants.OOB_STATUSES:
3581
            errs.append("health item '%s' has invalid status '%s'" %
3582
                        (item, status))
3583

    
3584
    if self.op.command == constants.OOB_POWER_STATUS:
3585
      if not isinstance(result.payload, dict):
3586
        errs.append("power-status is expected to return a dict but got %s" %
3587
                    type(result.payload))
3588

    
3589
    if self.op.command in [
3590
        constants.OOB_POWER_ON,
3591
        constants.OOB_POWER_OFF,
3592
        constants.OOB_POWER_CYCLE,
3593
        ]:
3594
      if result.payload is not None:
3595
        errs.append("%s is expected to not return payload but got '%s'" %
3596
                    (self.op.command, result.payload))
3597

    
3598
    if errs:
3599
      raise errors.OpExecError("Check of out-of-band payload failed due to %s" %
3600
                               utils.CommaJoin(errs))
3601

    
3602
class _OsQuery(_QueryBase):
3603
  FIELDS = query.OS_FIELDS
3604

    
3605
  def ExpandNames(self, lu):
3606
    # Lock all nodes in shared mode
3607
    # Temporary removal of locks, should be reverted later
3608
    # TODO: reintroduce locks when they are lighter-weight
3609
    lu.needed_locks = {}
3610
    #self.share_locks[locking.LEVEL_NODE] = 1
3611
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3612

    
3613
    # The following variables interact with _QueryBase._GetNames
3614
    if self.names:
3615
      self.wanted = self.names
3616
    else:
3617
      self.wanted = locking.ALL_SET
3618

    
3619
    self.do_locking = self.use_locking
3620

    
3621
  def DeclareLocks(self, lu, level):
3622
    pass
3623

    
3624
  @staticmethod
3625
  def _DiagnoseByOS(rlist):
3626
    """Remaps a per-node return list into an a per-os per-node dictionary
3627

3628
    @param rlist: a map with node names as keys and OS objects as values
3629

3630
    @rtype: dict
3631
    @return: a dictionary with osnames as keys and as value another
3632
        map, with nodes as keys and tuples of (path, status, diagnose,
3633
        variants, parameters, api_versions) as values, eg::
3634

3635
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
3636
                                     (/srv/..., False, "invalid api")],
3637
                           "node2": [(/srv/..., True, "", [], [])]}
3638
          }
3639

3640
    """
3641
    all_os = {}
3642
    # we build here the list of nodes that didn't fail the RPC (at RPC
3643
    # level), so that nodes with a non-responding node daemon don't
3644
    # make all OSes invalid
3645
    good_nodes = [node_name for node_name in rlist
3646
                  if not rlist[node_name].fail_msg]
3647
    for node_name, nr in rlist.items():
3648
      if nr.fail_msg or not nr.payload:
3649
        continue
3650
      for (name, path, status, diagnose, variants,
3651
           params, api_versions) in nr.payload:
3652
        if name not in all_os:
3653
          # build a list of nodes for this os containing empty lists
3654
          # for each node in node_list
3655
          all_os[name] = {}
3656
          for nname in good_nodes:
3657
            all_os[name][nname] = []
3658
        # convert params from [name, help] to (name, help)
3659
        params = [tuple(v) for v in params]
3660
        all_os[name][node_name].append((path, status, diagnose,
3661
                                        variants, params, api_versions))
3662
    return all_os
3663

    
3664
  def _GetQueryData(self, lu):
3665
    """Computes the list of nodes and their attributes.
3666

3667
    """
3668
    # Locking is not used
3669
    assert not (compat.any(lu.glm.is_owned(level)
3670
                           for level in locking.LEVELS
3671
                           if level != locking.LEVEL_CLUSTER) or
3672
                self.do_locking or self.use_locking)
3673

    
3674
    valid_nodes = [node.name
3675
                   for node in lu.cfg.GetAllNodesInfo().values()
3676
                   if not node.offline and node.vm_capable]
3677
    pol = self._DiagnoseByOS(lu.rpc.call_os_diagnose(valid_nodes))
3678
    cluster = lu.cfg.GetClusterInfo()
3679

    
3680
    data = {}
3681

    
3682
    for (os_name, os_data) in pol.items():
3683
      info = query.OsInfo(name=os_name, valid=True, node_status=os_data,
3684
                          hidden=(os_name in cluster.hidden_os),
3685
                          blacklisted=(os_name in cluster.blacklisted_os))
3686

    
3687
      variants = set()
3688
      parameters = set()
3689
      api_versions = set()
3690

    
3691
      for idx, osl in enumerate(os_data.values()):
3692
        info.valid = bool(info.valid and osl and osl[0][1])
3693
        if not info.valid:
3694
          break
3695

    
3696
        (node_variants, node_params, node_api) = osl[0][3:6]
3697
        if idx == 0:
3698
          # First entry
3699
          variants.update(node_variants)
3700
          parameters.update(node_params)
3701
          api_versions.update(node_api)
3702
        else:
3703
          # Filter out inconsistent values
3704
          variants.intersection_update(node_variants)
3705
          parameters.intersection_update(node_params)
3706
          api_versions.intersection_update(node_api)
3707

    
3708
      info.variants = list(variants)
3709
      info.parameters = list(parameters)
3710
      info.api_versions = list(api_versions)
3711

    
3712
      data[os_name] = info
3713

    
3714
    # Prepare data in requested order
3715
    return [data[name] for name in self._GetNames(lu, pol.keys(), None)
3716
            if name in data]
3717

    
3718

    
3719
class LUOsDiagnose(NoHooksLU):
3720
  """Logical unit for OS diagnose/query.
3721

3722
  """
3723
  REQ_BGL = False
3724

    
3725
  @staticmethod
3726
  def _BuildFilter(fields, names):
3727
    """Builds a filter for querying OSes.
3728

3729
    """
3730
    name_filter = qlang.MakeSimpleFilter("name", names)
3731

    
3732
    # Legacy behaviour: Hide hidden, blacklisted or invalid OSes if the
3733
    # respective field is not requested
3734
    status_filter = [[qlang.OP_NOT, [qlang.OP_TRUE, fname]]
3735
                     for fname in ["hidden", "blacklisted"]
3736
                     if fname not in fields]
3737
    if "valid" not in fields:
3738
      status_filter.append([qlang.OP_TRUE, "valid"])
3739

    
3740
    if status_filter:
3741
      status_filter.insert(0, qlang.OP_AND)
3742
    else:
3743
      status_filter = None
3744

    
3745
    if name_filter and status_filter:
3746
      return [qlang.OP_AND, name_filter, status_filter]
3747
    elif name_filter:
3748
      return name_filter
3749
    else:
3750
      return status_filter
3751

    
3752
  def CheckArguments(self):
3753
    self.oq = _OsQuery(self._BuildFilter(self.op.output_fields, self.op.names),
3754
                       self.op.output_fields, False)
3755

    
3756
  def ExpandNames(self):
3757
    self.oq.ExpandNames(self)
3758

    
3759
  def Exec(self, feedback_fn):
3760
    return self.oq.OldStyleQuery(self)
3761

    
3762

    
3763
class LUNodeRemove(LogicalUnit):
3764
  """Logical unit for removing a node.
3765

3766
  """
3767
  HPATH = "node-remove"
3768
  HTYPE = constants.HTYPE_NODE
3769

    
3770
  def BuildHooksEnv(self):
3771
    """Build hooks env.
3772

3773
    This doesn't run on the target node in the pre phase as a failed
3774
    node would then be impossible to remove.
3775

3776
    """
3777
    return {
3778
      "OP_TARGET": self.op.node_name,
3779
      "NODE_NAME": self.op.node_name,
3780
      }
3781

    
3782
  def BuildHooksNodes(self):
3783
    """Build hooks nodes.
3784

3785
    """
3786
    all_nodes = self.cfg.GetNodeList()
3787
    try:
3788
      all_nodes.remove(self.op.node_name)
3789
    except ValueError:
3790
      logging.warning("Node '%s', which is about to be removed, was not found"
3791
                      " in the list of all nodes", self.op.node_name)
3792
    return (all_nodes, all_nodes)
3793

    
3794
  def CheckPrereq(self):
3795
    """Check prerequisites.
3796

3797
    This checks:
3798
     - the node exists in the configuration
3799
     - it does not have primary or secondary instances
3800
     - it's not the master
3801

3802
    Any errors are signaled by raising errors.OpPrereqError.
3803

3804
    """
3805
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3806
    node = self.cfg.GetNodeInfo(self.op.node_name)
3807
    assert node is not None
3808

    
3809
    instance_list = self.cfg.GetInstanceList()
3810

    
3811
    masternode = self.cfg.GetMasterNode()
3812
    if node.name == masternode:
3813
      raise errors.OpPrereqError("Node is the master node, failover to another"
3814
                                 " node is required", errors.ECODE_INVAL)
3815

    
3816
    for instance_name in instance_list:
3817
      instance = self.cfg.GetInstanceInfo(instance_name)
3818
      if node.name in instance.all_nodes:
3819
        raise errors.OpPrereqError("Instance %s is still running on the node,"
3820
                                   " please remove first" % instance_name,
3821
                                   errors.ECODE_INVAL)
3822
    self.op.node_name = node.name
3823
    self.node = node
3824

    
3825
  def Exec(self, feedback_fn):
3826
    """Removes the node from the cluster.
3827

3828
    """
3829
    node = self.node
3830
    logging.info("Stopping the node daemon and removing configs from node %s",
3831
                 node.name)
3832

    
3833
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
3834

    
3835
    # Promote nodes to master candidate as needed
3836
    _AdjustCandidatePool(self, exceptions=[node.name])
3837
    self.context.RemoveNode(node.name)
3838

    
3839
    # Run post hooks on the node before it's removed
3840
    _RunPostHook(self, node.name)
3841

    
3842
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
3843
    msg = result.fail_msg
3844
    if msg:
3845
      self.LogWarning("Errors encountered on the remote node while leaving"
3846
                      " the cluster: %s", msg)
3847

    
3848
    # Remove node from our /etc/hosts
3849
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3850
      master_node = self.cfg.GetMasterNode()
3851
      result = self.rpc.call_etc_hosts_modify(master_node,
3852
                                              constants.ETC_HOSTS_REMOVE,
3853
                                              node.name, None)
3854
      result.Raise("Can't update hosts file with new host data")
3855
      _RedistributeAncillaryFiles(self)
3856

    
3857

    
3858
class _NodeQuery(_QueryBase):
3859
  FIELDS = query.NODE_FIELDS
3860

    
3861
  def ExpandNames(self, lu):
3862
    lu.needed_locks = {}
3863
    lu.share_locks[locking.LEVEL_NODE] = 1
3864

    
3865
    if self.names:
3866
      self.wanted = _GetWantedNodes(lu, self.names)
3867
    else:
3868
      self.wanted = locking.ALL_SET
3869

    
3870
    self.do_locking = (self.use_locking and
3871
                       query.NQ_LIVE in self.requested_data)
3872

    
3873
    if self.do_locking:
3874
      # if we don't request only static fields, we need to lock the nodes
3875
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
3876

    
3877
  def DeclareLocks(self, lu, level):
3878
    pass
3879

    
3880
  def _GetQueryData(self, lu):
3881
    """Computes the list of nodes and their attributes.
3882

3883
    """
3884
    all_info = lu.cfg.GetAllNodesInfo()
3885

    
3886
    nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
3887

    
3888
    # Gather data as requested
3889
    if query.NQ_LIVE in self.requested_data:
3890
      # filter out non-vm_capable nodes
3891
      toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
3892

    
3893
      node_data = lu.rpc.call_node_info(toquery_nodes, lu.cfg.GetVGName(),
3894
                                        lu.cfg.GetHypervisorType())
3895
      live_data = dict((name, nresult.payload)
3896
                       for (name, nresult) in node_data.items()
3897
                       if not nresult.fail_msg and nresult.payload)
3898
    else:
3899
      live_data = None
3900

    
3901
    if query.NQ_INST in self.requested_data:
3902
      node_to_primary = dict([(name, set()) for name in nodenames])
3903
      node_to_secondary = dict([(name, set()) for name in nodenames])
3904

    
3905
      inst_data = lu.cfg.GetAllInstancesInfo()
3906

    
3907
      for inst in inst_data.values():
3908
        if inst.primary_node in node_to_primary:
3909
          node_to_primary[inst.primary_node].add(inst.name)
3910
        for secnode in inst.secondary_nodes:
3911
          if secnode in node_to_secondary:
3912
            node_to_secondary[secnode].add(inst.name)
3913
    else:
3914
      node_to_primary = None
3915
      node_to_secondary = None
3916

    
3917
    if query.NQ_OOB in self.requested_data:
3918
      oob_support = dict((name, bool(_SupportsOob(lu.cfg, node)))
3919
                         for name, node in all_info.iteritems())
3920
    else:
3921
      oob_support = None
3922

    
3923
    if query.NQ_GROUP in self.requested_data:
3924
      groups = lu.cfg.GetAllNodeGroupsInfo()
3925
    else:
3926
      groups = {}
3927

    
3928
    return query.NodeQueryData([all_info[name] for name in nodenames],
3929
                               live_data, lu.cfg.GetMasterNode(),
3930
                               node_to_primary, node_to_secondary, groups,
3931
                               oob_support, lu.cfg.GetClusterInfo())
3932

    
3933

    
3934
class LUNodeQuery(NoHooksLU):
3935
  """Logical unit for querying nodes.
3936

3937
  """
3938
  # pylint: disable-msg=W0142
3939
  REQ_BGL = False
3940

    
3941
  def CheckArguments(self):
3942
    self.nq = _NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
3943
                         self.op.output_fields, self.op.use_locking)
3944

    
3945
  def ExpandNames(self):
3946
    self.nq.ExpandNames(self)
3947

    
3948
  def Exec(self, feedback_fn):
3949
    return self.nq.OldStyleQuery(self)
3950

    
3951

    
3952
class LUNodeQueryvols(NoHooksLU):
3953
  """Logical unit for getting volumes on node(s).
3954

3955
  """
3956
  REQ_BGL = False
3957
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
3958
  _FIELDS_STATIC = utils.FieldSet("node")
3959

    
3960
  def CheckArguments(self):
3961
    _CheckOutputFields(static=self._FIELDS_STATIC,
3962
                       dynamic=self._FIELDS_DYNAMIC,
3963
                       selected=self.op.output_fields)
3964

    
3965
  def ExpandNames(self):
3966
    self.needed_locks = {}
3967
    self.share_locks[locking.LEVEL_NODE] = 1
3968
    if not self.op.nodes:
3969
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3970
    else:
3971
      self.needed_locks[locking.LEVEL_NODE] = \
3972
        _GetWantedNodes(self, self.op.nodes)
3973

    
3974
  def Exec(self, feedback_fn):
3975
    """Computes the list of nodes and their attributes.
3976

3977
    """
3978
    nodenames = self.glm.list_owned(locking.LEVEL_NODE)
3979
    volumes = self.rpc.call_node_volumes(nodenames)
3980

    
3981
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3982
             in self.cfg.GetInstanceList()]
3983

    
3984
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
3985

    
3986
    output = []
3987
    for node in nodenames:
3988
      nresult = volumes[node]
3989
      if nresult.offline:
3990
        continue
3991
      msg = nresult.fail_msg
3992
      if msg:
3993
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
3994
        continue
3995

    
3996
      node_vols = nresult.payload[:]
3997
      node_vols.sort(key=lambda vol: vol['dev'])
3998

    
3999
      for vol in node_vols:
4000
        node_output = []
4001
        for field in self.op.output_fields:
4002
          if field == "node":
4003
            val = node
4004
          elif field == "phys":
4005
            val = vol['dev']
4006
          elif field == "vg":
4007
            val = vol['vg']
4008
          elif field == "name":
4009
            val = vol['name']
4010
          elif field == "size":
4011
            val = int(float(vol['size']))
4012
          elif field == "instance":
4013
            for inst in ilist:
4014
              if node not in lv_by_node[inst]:
4015
                continue
4016
              if vol['name'] in lv_by_node[inst][node]:
4017
                val = inst.name
4018
                break
4019
            else:
4020
              val = '-'
4021
          else:
4022
            raise errors.ParameterError(field)
4023
          node_output.append(str(val))
4024

    
4025
        output.append(node_output)
4026

    
4027
    return output
4028

    
4029

    
4030
class LUNodeQueryStorage(NoHooksLU):
4031
  """Logical unit for getting information on storage units on node(s).
4032

4033
  """
4034
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
4035
  REQ_BGL = False
4036

    
4037
  def CheckArguments(self):
4038
    _CheckOutputFields(static=self._FIELDS_STATIC,
4039
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
4040
                       selected=self.op.output_fields)
4041

    
4042
  def ExpandNames(self):
4043
    self.needed_locks = {}
4044
    self.share_locks[locking.LEVEL_NODE] = 1
4045

    
4046
    if self.op.nodes:
4047
      self.needed_locks[locking.LEVEL_NODE] = \
4048
        _GetWantedNodes(self, self.op.nodes)
4049
    else:
4050
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4051

    
4052
  def Exec(self, feedback_fn):
4053
    """Computes the list of nodes and their attributes.
4054

4055
    """
4056
    self.nodes = self.glm.list_owned(locking.LEVEL_NODE)
4057

    
4058
    # Always get name to sort by
4059
    if constants.SF_NAME in self.op.output_fields:
4060
      fields = self.op.output_fields[:]
4061
    else:
4062
      fields = [constants.SF_NAME] + self.op.output_fields
4063

    
4064
    # Never ask for node or type as it's only known to the LU
4065
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
4066
      while extra in fields:
4067
        fields.remove(extra)
4068

    
4069
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
4070
    name_idx = field_idx[constants.SF_NAME]
4071

    
4072
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
4073
    data = self.rpc.call_storage_list(self.nodes,
4074
                                      self.op.storage_type, st_args,
4075
                                      self.op.name, fields)
4076