Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 0db3d0b5

History | View | Annotate | Download (425.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay to many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42
import itertools
43

    
44
from ganeti import ssh
45
from ganeti import utils
46
from ganeti import errors
47
from ganeti import hypervisor
48
from ganeti import locking
49
from ganeti import constants
50
from ganeti import objects
51
from ganeti import serializer
52
from ganeti import ssconf
53
from ganeti import uidpool
54
from ganeti import compat
55
from ganeti import masterd
56
from ganeti import netutils
57
from ganeti import query
58
from ganeti import qlang
59
from ganeti import opcodes
60

    
61
import ganeti.masterd.instance # pylint: disable-msg=W0611
62

    
63

    
64
def _SupportsOob(cfg, node):
65
  """Tells if node supports OOB.
66

67
  @type cfg: L{config.ConfigWriter}
68
  @param cfg: The cluster configuration
69
  @type node: L{objects.Node}
70
  @param node: The node
71
  @return: The OOB script if supported or an empty string otherwise
72

73
  """
74
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
75

    
76

    
77
class ResultWithJobs:
78
  """Data container for LU results with jobs.
79

80
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
81
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
82
  contained in the C{jobs} attribute and include the job IDs in the opcode
83
  result.
84

85
  """
86
  def __init__(self, jobs, **kwargs):
87
    """Initializes this class.
88

89
    Additional return values can be specified as keyword arguments.
90

91
    @type jobs: list of lists of L{opcode.OpCode}
92
    @param jobs: A list of lists of opcode objects
93

94
    """
95
    self.jobs = jobs
96
    self.other = kwargs
97

    
98

    
99
class LogicalUnit(object):
100
  """Logical Unit base class.
101

102
  Subclasses must follow these rules:
103
    - implement ExpandNames
104
    - implement CheckPrereq (except when tasklets are used)
105
    - implement Exec (except when tasklets are used)
106
    - implement BuildHooksEnv
107
    - implement BuildHooksNodes
108
    - redefine HPATH and HTYPE
109
    - optionally redefine their run requirements:
110
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
111

112
  Note that all commands require root permissions.
113

114
  @ivar dry_run_result: the value (if any) that will be returned to the caller
115
      in dry-run mode (signalled by opcode dry_run parameter)
116

117
  """
118
  HPATH = None
119
  HTYPE = None
120
  REQ_BGL = True
121

    
122
  def __init__(self, processor, op, context, rpc):
123
    """Constructor for LogicalUnit.
124

125
    This needs to be overridden in derived classes in order to check op
126
    validity.
127

128
    """
129
    self.proc = processor
130
    self.op = op
131
    self.cfg = context.cfg
132
    self.glm = context.glm
133
    self.context = context
134
    self.rpc = rpc
135
    # Dicts used to declare locking needs to mcpu
136
    self.needed_locks = None
137
    self.acquired_locks = {}
138
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
139
    self.add_locks = {}
140
    self.remove_locks = {}
141
    # Used to force good behavior when calling helper functions
142
    self.recalculate_locks = {}
143
    # logging
144
    self.Log = processor.Log # pylint: disable-msg=C0103
145
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
146
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
147
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
148
    # support for dry-run
149
    self.dry_run_result = None
150
    # support for generic debug attribute
151
    if (not hasattr(self.op, "debug_level") or
152
        not isinstance(self.op.debug_level, int)):
153
      self.op.debug_level = 0
154

    
155
    # Tasklets
156
    self.tasklets = None
157

    
158
    # Validate opcode parameters and set defaults
159
    self.op.Validate(True)
160

    
161
    self.CheckArguments()
162

    
163
  def CheckArguments(self):
164
    """Check syntactic validity for the opcode arguments.
165

166
    This method is for doing a simple syntactic check and ensure
167
    validity of opcode parameters, without any cluster-related
168
    checks. While the same can be accomplished in ExpandNames and/or
169
    CheckPrereq, doing these separate is better because:
170

171
      - ExpandNames is left as as purely a lock-related function
172
      - CheckPrereq is run after we have acquired locks (and possible
173
        waited for them)
174

175
    The function is allowed to change the self.op attribute so that
176
    later methods can no longer worry about missing parameters.
177

178
    """
179
    pass
180

    
181
  def ExpandNames(self):
182
    """Expand names for this LU.
183

184
    This method is called before starting to execute the opcode, and it should
185
    update all the parameters of the opcode to their canonical form (e.g. a
186
    short node name must be fully expanded after this method has successfully
187
    completed). This way locking, hooks, logging, etc. can work correctly.
188

189
    LUs which implement this method must also populate the self.needed_locks
190
    member, as a dict with lock levels as keys, and a list of needed lock names
191
    as values. Rules:
192

193
      - use an empty dict if you don't need any lock
194
      - if you don't need any lock at a particular level omit that level
195
      - don't put anything for the BGL level
196
      - if you want all locks at a level use locking.ALL_SET as a value
197

198
    If you need to share locks (rather than acquire them exclusively) at one
199
    level you can modify self.share_locks, setting a true value (usually 1) for
200
    that level. By default locks are not shared.
201

202
    This function can also define a list of tasklets, which then will be
203
    executed in order instead of the usual LU-level CheckPrereq and Exec
204
    functions, if those are not defined by the LU.
205

206
    Examples::
207

208
      # Acquire all nodes and one instance
209
      self.needed_locks = {
210
        locking.LEVEL_NODE: locking.ALL_SET,
211
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
212
      }
213
      # Acquire just two nodes
214
      self.needed_locks = {
215
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
216
      }
217
      # Acquire no locks
218
      self.needed_locks = {} # No, you can't leave it to the default value None
219

220
    """
221
    # The implementation of this method is mandatory only if the new LU is
222
    # concurrent, so that old LUs don't need to be changed all at the same
223
    # time.
224
    if self.REQ_BGL:
225
      self.needed_locks = {} # Exclusive LUs don't need locks.
226
    else:
227
      raise NotImplementedError
228

    
229
  def DeclareLocks(self, level):
230
    """Declare LU locking needs for a level
231

232
    While most LUs can just declare their locking needs at ExpandNames time,
233
    sometimes there's the need to calculate some locks after having acquired
234
    the ones before. This function is called just before acquiring locks at a
235
    particular level, but after acquiring the ones at lower levels, and permits
236
    such calculations. It can be used to modify self.needed_locks, and by
237
    default it does nothing.
238

239
    This function is only called if you have something already set in
240
    self.needed_locks for the level.
241

242
    @param level: Locking level which is going to be locked
243
    @type level: member of ganeti.locking.LEVELS
244

245
    """
246

    
247
  def CheckPrereq(self):
248
    """Check prerequisites for this LU.
249

250
    This method should check that the prerequisites for the execution
251
    of this LU are fulfilled. It can do internode communication, but
252
    it should be idempotent - no cluster or system changes are
253
    allowed.
254

255
    The method should raise errors.OpPrereqError in case something is
256
    not fulfilled. Its return value is ignored.
257

258
    This method should also update all the parameters of the opcode to
259
    their canonical form if it hasn't been done by ExpandNames before.
260

261
    """
262
    if self.tasklets is not None:
263
      for (idx, tl) in enumerate(self.tasklets):
264
        logging.debug("Checking prerequisites for tasklet %s/%s",
265
                      idx + 1, len(self.tasklets))
266
        tl.CheckPrereq()
267
    else:
268
      pass
269

    
270
  def Exec(self, feedback_fn):
271
    """Execute the LU.
272

273
    This method should implement the actual work. It should raise
274
    errors.OpExecError for failures that are somewhat dealt with in
275
    code, or expected.
276

277
    """
278
    if self.tasklets is not None:
279
      for (idx, tl) in enumerate(self.tasklets):
280
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
281
        tl.Exec(feedback_fn)
282
    else:
283
      raise NotImplementedError
284

    
285
  def BuildHooksEnv(self):
286
    """Build hooks environment for this LU.
287

288
    @rtype: dict
289
    @return: Dictionary containing the environment that will be used for
290
      running the hooks for this LU. The keys of the dict must not be prefixed
291
      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
292
      will extend the environment with additional variables. If no environment
293
      should be defined, an empty dictionary should be returned (not C{None}).
294
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
295
      will not be called.
296

297
    """
298
    raise NotImplementedError
299

    
300
  def BuildHooksNodes(self):
301
    """Build list of nodes to run LU's hooks.
302

303
    @rtype: tuple; (list, list)
304
    @return: Tuple containing a list of node names on which the hook
305
      should run before the execution and a list of node names on which the
306
      hook should run after the execution. No nodes should be returned as an
307
      empty list (and not None).
308
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
309
      will not be called.
310

311
    """
312
    raise NotImplementedError
313

    
314
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
315
    """Notify the LU about the results of its hooks.
316

317
    This method is called every time a hooks phase is executed, and notifies
318
    the Logical Unit about the hooks' result. The LU can then use it to alter
319
    its result based on the hooks.  By default the method does nothing and the
320
    previous result is passed back unchanged but any LU can define it if it
321
    wants to use the local cluster hook-scripts somehow.
322

323
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
324
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
325
    @param hook_results: the results of the multi-node hooks rpc call
326
    @param feedback_fn: function used send feedback back to the caller
327
    @param lu_result: the previous Exec result this LU had, or None
328
        in the PRE phase
329
    @return: the new Exec result, based on the previous result
330
        and hook results
331

332
    """
333
    # API must be kept, thus we ignore the unused argument and could
334
    # be a function warnings
335
    # pylint: disable-msg=W0613,R0201
336
    return lu_result
337

    
338
  def _ExpandAndLockInstance(self):
339
    """Helper function to expand and lock an instance.
340

341
    Many LUs that work on an instance take its name in self.op.instance_name
342
    and need to expand it and then declare the expanded name for locking. This
343
    function does it, and then updates self.op.instance_name to the expanded
344
    name. It also initializes needed_locks as a dict, if this hasn't been done
345
    before.
346

347
    """
348
    if self.needed_locks is None:
349
      self.needed_locks = {}
350
    else:
351
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
352
        "_ExpandAndLockInstance called with instance-level locks set"
353
    self.op.instance_name = _ExpandInstanceName(self.cfg,
354
                                                self.op.instance_name)
355
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
356

    
357
  def _LockInstancesNodes(self, primary_only=False):
358
    """Helper function to declare instances' nodes for locking.
359

360
    This function should be called after locking one or more instances to lock
361
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
362
    with all primary or secondary nodes for instances already locked and
363
    present in self.needed_locks[locking.LEVEL_INSTANCE].
364

365
    It should be called from DeclareLocks, and for safety only works if
366
    self.recalculate_locks[locking.LEVEL_NODE] is set.
367

368
    In the future it may grow parameters to just lock some instance's nodes, or
369
    to just lock primaries or secondary nodes, if needed.
370

371
    If should be called in DeclareLocks in a way similar to::
372

373
      if level == locking.LEVEL_NODE:
374
        self._LockInstancesNodes()
375

376
    @type primary_only: boolean
377
    @param primary_only: only lock primary nodes of locked instances
378

379
    """
380
    assert locking.LEVEL_NODE in self.recalculate_locks, \
381
      "_LockInstancesNodes helper function called with no nodes to recalculate"
382

    
383
    # TODO: check if we're really been called with the instance locks held
384

    
385
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
386
    # future we might want to have different behaviors depending on the value
387
    # of self.recalculate_locks[locking.LEVEL_NODE]
388
    wanted_nodes = []
389
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
390
      instance = self.context.cfg.GetInstanceInfo(instance_name)
391
      wanted_nodes.append(instance.primary_node)
392
      if not primary_only:
393
        wanted_nodes.extend(instance.secondary_nodes)
394

    
395
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
396
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
397
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
398
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
399

    
400
    del self.recalculate_locks[locking.LEVEL_NODE]
401

    
402

    
403
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
404
  """Simple LU which runs no hooks.
405

406
  This LU is intended as a parent for other LogicalUnits which will
407
  run no hooks, in order to reduce duplicate code.
408

409
  """
410
  HPATH = None
411
  HTYPE = None
412

    
413
  def BuildHooksEnv(self):
414
    """Empty BuildHooksEnv for NoHooksLu.
415

416
    This just raises an error.
417

418
    """
419
    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
420

    
421
  def BuildHooksNodes(self):
422
    """Empty BuildHooksNodes for NoHooksLU.
423

424
    """
425
    raise AssertionError("BuildHooksNodes called for NoHooksLU")
426

    
427

    
428
class Tasklet:
429
  """Tasklet base class.
430

431
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
432
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
433
  tasklets know nothing about locks.
434

435
  Subclasses must follow these rules:
436
    - Implement CheckPrereq
437
    - Implement Exec
438

439
  """
440
  def __init__(self, lu):
441
    self.lu = lu
442

    
443
    # Shortcuts
444
    self.cfg = lu.cfg
445
    self.rpc = lu.rpc
446

    
447
  def CheckPrereq(self):
448
    """Check prerequisites for this tasklets.
449

450
    This method should check whether the prerequisites for the execution of
451
    this tasklet are fulfilled. It can do internode communication, but it
452
    should be idempotent - no cluster or system changes are allowed.
453

454
    The method should raise errors.OpPrereqError in case something is not
455
    fulfilled. Its return value is ignored.
456

457
    This method should also update all parameters to their canonical form if it
458
    hasn't been done before.
459

460
    """
461
    pass
462

    
463
  def Exec(self, feedback_fn):
464
    """Execute the tasklet.
465

466
    This method should implement the actual work. It should raise
467
    errors.OpExecError for failures that are somewhat dealt with in code, or
468
    expected.
469

470
    """
471
    raise NotImplementedError
472

    
473

    
474
class _QueryBase:
475
  """Base for query utility classes.
476

477
  """
478
  #: Attribute holding field definitions
479
  FIELDS = None
480

    
481
  def __init__(self, filter_, fields, use_locking):
482
    """Initializes this class.
483

484
    """
485
    self.use_locking = use_locking
486

    
487
    self.query = query.Query(self.FIELDS, fields, filter_=filter_,
488
                             namefield="name")
489
    self.requested_data = self.query.RequestedData()
490
    self.names = self.query.RequestedNames()
491

    
492
    # Sort only if no names were requested
493
    self.sort_by_name = not self.names
494

    
495
    self.do_locking = None
496
    self.wanted = None
497

    
498
  def _GetNames(self, lu, all_names, lock_level):
499
    """Helper function to determine names asked for in the query.
500

501
    """
502
    if self.do_locking:
503
      names = lu.acquired_locks[lock_level]
504
    else:
505
      names = all_names
506

    
507
    if self.wanted == locking.ALL_SET:
508
      assert not self.names
509
      # caller didn't specify names, so ordering is not important
510
      return utils.NiceSort(names)
511

    
512
    # caller specified names and we must keep the same order
513
    assert self.names
514
    assert not self.do_locking or lu.acquired_locks[lock_level]
515

    
516
    missing = set(self.wanted).difference(names)
517
    if missing:
518
      raise errors.OpExecError("Some items were removed before retrieving"
519
                               " their data: %s" % missing)
520

    
521
    # Return expanded names
522
    return self.wanted
523

    
524
  def ExpandNames(self, lu):
525
    """Expand names for this query.
526

527
    See L{LogicalUnit.ExpandNames}.
528

529
    """
530
    raise NotImplementedError()
531

    
532
  def DeclareLocks(self, lu, level):
533
    """Declare locks for this query.
534

535
    See L{LogicalUnit.DeclareLocks}.
536

537
    """
538
    raise NotImplementedError()
539

    
540
  def _GetQueryData(self, lu):
541
    """Collects all data for this query.
542

543
    @return: Query data object
544

545
    """
546
    raise NotImplementedError()
547

    
548
  def NewStyleQuery(self, lu):
549
    """Collect data and execute query.
550

551
    """
552
    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
553
                                  sort_by_name=self.sort_by_name)
554

    
555
  def OldStyleQuery(self, lu):
556
    """Collect data and execute query.
557

558
    """
559
    return self.query.OldStyleQuery(self._GetQueryData(lu),
560
                                    sort_by_name=self.sort_by_name)
561

    
562

    
563
def _GetWantedNodes(lu, nodes):
564
  """Returns list of checked and expanded node names.
565

566
  @type lu: L{LogicalUnit}
567
  @param lu: the logical unit on whose behalf we execute
568
  @type nodes: list
569
  @param nodes: list of node names or None for all nodes
570
  @rtype: list
571
  @return: the list of nodes, sorted
572
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
573

574
  """
575
  if nodes:
576
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
577

    
578
  return utils.NiceSort(lu.cfg.GetNodeList())
579

    
580

    
581
def _GetWantedInstances(lu, instances):
582
  """Returns list of checked and expanded instance names.
583

584
  @type lu: L{LogicalUnit}
585
  @param lu: the logical unit on whose behalf we execute
586
  @type instances: list
587
  @param instances: list of instance names or None for all instances
588
  @rtype: list
589
  @return: the list of instances, sorted
590
  @raise errors.OpPrereqError: if the instances parameter is wrong type
591
  @raise errors.OpPrereqError: if any of the passed instances is not found
592

593
  """
594
  if instances:
595
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
596
  else:
597
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
598
  return wanted
599

    
600

    
601
def _GetUpdatedParams(old_params, update_dict,
602
                      use_default=True, use_none=False):
603
  """Return the new version of a parameter dictionary.
604

605
  @type old_params: dict
606
  @param old_params: old parameters
607
  @type update_dict: dict
608
  @param update_dict: dict containing new parameter values, or
609
      constants.VALUE_DEFAULT to reset the parameter to its default
610
      value
611
  @param use_default: boolean
612
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
613
      values as 'to be deleted' values
614
  @param use_none: boolean
615
  @type use_none: whether to recognise C{None} values as 'to be
616
      deleted' values
617
  @rtype: dict
618
  @return: the new parameter dictionary
619

620
  """
621
  params_copy = copy.deepcopy(old_params)
622
  for key, val in update_dict.iteritems():
623
    if ((use_default and val == constants.VALUE_DEFAULT) or
624
        (use_none and val is None)):
625
      try:
626
        del params_copy[key]
627
      except KeyError:
628
        pass
629
    else:
630
      params_copy[key] = val
631
  return params_copy
632

    
633

    
634
def _ReleaseLocks(lu, level, names=None, keep=None):
635
  """Releases locks owned by an LU.
636

637
  @type lu: L{LogicalUnit}
638
  @param level: Lock level
639
  @type names: list or None
640
  @param names: Names of locks to release
641
  @type keep: list or None
642
  @param keep: Names of locks to retain
643

644
  """
645
  assert not (keep is not None and names is not None), \
646
         "Only one of the 'names' and the 'keep' parameters can be given"
647

    
648
  if names is not None:
649
    should_release = names.__contains__
650
  elif keep:
651
    should_release = lambda name: name not in keep
652
  else:
653
    should_release = None
654

    
655
  if should_release:
656
    retain = []
657
    release = []
658

    
659
    # Determine which locks to release
660
    for name in lu.acquired_locks[level]:
661
      if should_release(name):
662
        release.append(name)
663
      else:
664
        retain.append(name)
665

    
666
    assert len(lu.acquired_locks[level]) == (len(retain) + len(release))
667

    
668
    # Release just some locks
669
    lu.glm.release(level, names=release)
670
    lu.acquired_locks[level] = retain
671

    
672
    assert frozenset(lu.glm.list_owned(level)) == frozenset(retain)
673
  else:
674
    # Release everything
675
    lu.glm.release(level)
676
    del lu.acquired_locks[level]
677

    
678
    assert not lu.glm.list_owned(level), "No locks should be owned"
679

    
680

    
681
def _RunPostHook(lu, node_name):
682
  """Runs the post-hook for an opcode on a single node.
683

684
  """
685
  hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu)
686
  try:
687
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
688
  except:
689
    # pylint: disable-msg=W0702
690
    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
691

    
692

    
693
def _CheckOutputFields(static, dynamic, selected):
694
  """Checks whether all selected fields are valid.
695

696
  @type static: L{utils.FieldSet}
697
  @param static: static fields set
698
  @type dynamic: L{utils.FieldSet}
699
  @param dynamic: dynamic fields set
700

701
  """
702
  f = utils.FieldSet()
703
  f.Extend(static)
704
  f.Extend(dynamic)
705

    
706
  delta = f.NonMatching(selected)
707
  if delta:
708
    raise errors.OpPrereqError("Unknown output fields selected: %s"
709
                               % ",".join(delta), errors.ECODE_INVAL)
710

    
711

    
712
def _CheckGlobalHvParams(params):
713
  """Validates that given hypervisor params are not global ones.
714

715
  This will ensure that instances don't get customised versions of
716
  global params.
717

718
  """
719
  used_globals = constants.HVC_GLOBALS.intersection(params)
720
  if used_globals:
721
    msg = ("The following hypervisor parameters are global and cannot"
722
           " be customized at instance level, please modify them at"
723
           " cluster level: %s" % utils.CommaJoin(used_globals))
724
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
725

    
726

    
727
def _CheckNodeOnline(lu, node, msg=None):
728
  """Ensure that a given node is online.
729

730
  @param lu: the LU on behalf of which we make the check
731
  @param node: the node to check
732
  @param msg: if passed, should be a message to replace the default one
733
  @raise errors.OpPrereqError: if the node is offline
734

735
  """
736
  if msg is None:
737
    msg = "Can't use offline node"
738
  if lu.cfg.GetNodeInfo(node).offline:
739
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
740

    
741

    
742
def _CheckNodeNotDrained(lu, node):
743
  """Ensure that a given node is not drained.
744

745
  @param lu: the LU on behalf of which we make the check
746
  @param node: the node to check
747
  @raise errors.OpPrereqError: if the node is drained
748

749
  """
750
  if lu.cfg.GetNodeInfo(node).drained:
751
    raise errors.OpPrereqError("Can't use drained node %s" % node,
752
                               errors.ECODE_STATE)
753

    
754

    
755
def _CheckNodeVmCapable(lu, node):
756
  """Ensure that a given node is vm capable.
757

758
  @param lu: the LU on behalf of which we make the check
759
  @param node: the node to check
760
  @raise errors.OpPrereqError: if the node is not vm capable
761

762
  """
763
  if not lu.cfg.GetNodeInfo(node).vm_capable:
764
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
765
                               errors.ECODE_STATE)
766

    
767

    
768
def _CheckNodeHasOS(lu, node, os_name, force_variant):
769
  """Ensure that a node supports a given OS.
770

771
  @param lu: the LU on behalf of which we make the check
772
  @param node: the node to check
773
  @param os_name: the OS to query about
774
  @param force_variant: whether to ignore variant errors
775
  @raise errors.OpPrereqError: if the node is not supporting the OS
776

777
  """
778
  result = lu.rpc.call_os_get(node, os_name)
779
  result.Raise("OS '%s' not in supported OS list for node %s" %
780
               (os_name, node),
781
               prereq=True, ecode=errors.ECODE_INVAL)
782
  if not force_variant:
783
    _CheckOSVariant(result.payload, os_name)
784

    
785

    
786
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
787
  """Ensure that a node has the given secondary ip.
788

789
  @type lu: L{LogicalUnit}
790
  @param lu: the LU on behalf of which we make the check
791
  @type node: string
792
  @param node: the node to check
793
  @type secondary_ip: string
794
  @param secondary_ip: the ip to check
795
  @type prereq: boolean
796
  @param prereq: whether to throw a prerequisite or an execute error
797
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
798
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
799

800
  """
801
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
802
  result.Raise("Failure checking secondary ip on node %s" % node,
803
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
804
  if not result.payload:
805
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
806
           " please fix and re-run this command" % secondary_ip)
807
    if prereq:
808
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
809
    else:
810
      raise errors.OpExecError(msg)
811

    
812

    
813
def _GetClusterDomainSecret():
814
  """Reads the cluster domain secret.
815

816
  """
817
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
818
                               strict=True)
819

    
820

    
821
def _CheckInstanceDown(lu, instance, reason):
822
  """Ensure that an instance is not running."""
823
  if instance.admin_up:
824
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
825
                               (instance.name, reason), errors.ECODE_STATE)
826

    
827
  pnode = instance.primary_node
828
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
829
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
830
              prereq=True, ecode=errors.ECODE_ENVIRON)
831

    
832
  if instance.name in ins_l.payload:
833
    raise errors.OpPrereqError("Instance %s is running, %s" %
834
                               (instance.name, reason), errors.ECODE_STATE)
835

    
836

    
837
def _ExpandItemName(fn, name, kind):
838
  """Expand an item name.
839

840
  @param fn: the function to use for expansion
841
  @param name: requested item name
842
  @param kind: text description ('Node' or 'Instance')
843
  @return: the resolved (full) name
844
  @raise errors.OpPrereqError: if the item is not found
845

846
  """
847
  full_name = fn(name)
848
  if full_name is None:
849
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
850
                               errors.ECODE_NOENT)
851
  return full_name
852

    
853

    
854
def _ExpandNodeName(cfg, name):
855
  """Wrapper over L{_ExpandItemName} for nodes."""
856
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
857

    
858

    
859
def _ExpandInstanceName(cfg, name):
860
  """Wrapper over L{_ExpandItemName} for instance."""
861
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
862

    
863

    
864
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
865
                          memory, vcpus, nics, disk_template, disks,
866
                          bep, hvp, hypervisor_name):
867
  """Builds instance related env variables for hooks
868

869
  This builds the hook environment from individual variables.
870

871
  @type name: string
872
  @param name: the name of the instance
873
  @type primary_node: string
874
  @param primary_node: the name of the instance's primary node
875
  @type secondary_nodes: list
876
  @param secondary_nodes: list of secondary nodes as strings
877
  @type os_type: string
878
  @param os_type: the name of the instance's OS
879
  @type status: boolean
880
  @param status: the should_run status of the instance
881
  @type memory: string
882
  @param memory: the memory size of the instance
883
  @type vcpus: string
884
  @param vcpus: the count of VCPUs the instance has
885
  @type nics: list
886
  @param nics: list of tuples (ip, mac, mode, link) representing
887
      the NICs the instance has
888
  @type disk_template: string
889
  @param disk_template: the disk template of the instance
890
  @type disks: list
891
  @param disks: the list of (size, mode) pairs
892
  @type bep: dict
893
  @param bep: the backend parameters for the instance
894
  @type hvp: dict
895
  @param hvp: the hypervisor parameters for the instance
896
  @type hypervisor_name: string
897
  @param hypervisor_name: the hypervisor for the instance
898
  @rtype: dict
899
  @return: the hook environment for this instance
900

901
  """
902
  if status:
903
    str_status = "up"
904
  else:
905
    str_status = "down"
906
  env = {
907
    "OP_TARGET": name,
908
    "INSTANCE_NAME": name,
909
    "INSTANCE_PRIMARY": primary_node,
910
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
911
    "INSTANCE_OS_TYPE": os_type,
912
    "INSTANCE_STATUS": str_status,
913
    "INSTANCE_MEMORY": memory,
914
    "INSTANCE_VCPUS": vcpus,
915
    "INSTANCE_DISK_TEMPLATE": disk_template,
916
    "INSTANCE_HYPERVISOR": hypervisor_name,
917
  }
918

    
919
  if nics:
920
    nic_count = len(nics)
921
    for idx, (ip, mac, mode, link) in enumerate(nics):
922
      if ip is None:
923
        ip = ""
924
      env["INSTANCE_NIC%d_IP" % idx] = ip
925
      env["INSTANCE_NIC%d_MAC" % idx] = mac
926
      env["INSTANCE_NIC%d_MODE" % idx] = mode
927
      env["INSTANCE_NIC%d_LINK" % idx] = link
928
      if mode == constants.NIC_MODE_BRIDGED:
929
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
930
  else:
931
    nic_count = 0
932

    
933
  env["INSTANCE_NIC_COUNT"] = nic_count
934

    
935
  if disks:
936
    disk_count = len(disks)
937
    for idx, (size, mode) in enumerate(disks):
938
      env["INSTANCE_DISK%d_SIZE" % idx] = size
939
      env["INSTANCE_DISK%d_MODE" % idx] = mode
940
  else:
941
    disk_count = 0
942

    
943
  env["INSTANCE_DISK_COUNT"] = disk_count
944

    
945
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
946
    for key, value in source.items():
947
      env["INSTANCE_%s_%s" % (kind, key)] = value
948

    
949
  return env
950

    
951

    
952
def _NICListToTuple(lu, nics):
953
  """Build a list of nic information tuples.
954

955
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
956
  value in LUInstanceQueryData.
957

958
  @type lu:  L{LogicalUnit}
959
  @param lu: the logical unit on whose behalf we execute
960
  @type nics: list of L{objects.NIC}
961
  @param nics: list of nics to convert to hooks tuples
962

963
  """
964
  hooks_nics = []
965
  cluster = lu.cfg.GetClusterInfo()
966
  for nic in nics:
967
    ip = nic.ip
968
    mac = nic.mac
969
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
970
    mode = filled_params[constants.NIC_MODE]
971
    link = filled_params[constants.NIC_LINK]
972
    hooks_nics.append((ip, mac, mode, link))
973
  return hooks_nics
974

    
975

    
976
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
977
  """Builds instance related env variables for hooks from an object.
978

979
  @type lu: L{LogicalUnit}
980
  @param lu: the logical unit on whose behalf we execute
981
  @type instance: L{objects.Instance}
982
  @param instance: the instance for which we should build the
983
      environment
984
  @type override: dict
985
  @param override: dictionary with key/values that will override
986
      our values
987
  @rtype: dict
988
  @return: the hook environment dictionary
989

990
  """
991
  cluster = lu.cfg.GetClusterInfo()
992
  bep = cluster.FillBE(instance)
993
  hvp = cluster.FillHV(instance)
994
  args = {
995
    'name': instance.name,
996
    'primary_node': instance.primary_node,
997
    'secondary_nodes': instance.secondary_nodes,
998
    'os_type': instance.os,
999
    'status': instance.admin_up,
1000
    'memory': bep[constants.BE_MEMORY],
1001
    'vcpus': bep[constants.BE_VCPUS],
1002
    'nics': _NICListToTuple(lu, instance.nics),
1003
    'disk_template': instance.disk_template,
1004
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
1005
    'bep': bep,
1006
    'hvp': hvp,
1007
    'hypervisor_name': instance.hypervisor,
1008
  }
1009
  if override:
1010
    args.update(override)
1011
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
1012

    
1013

    
1014
def _AdjustCandidatePool(lu, exceptions):
1015
  """Adjust the candidate pool after node operations.
1016

1017
  """
1018
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1019
  if mod_list:
1020
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1021
               utils.CommaJoin(node.name for node in mod_list))
1022
    for name in mod_list:
1023
      lu.context.ReaddNode(name)
1024
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1025
  if mc_now > mc_max:
1026
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1027
               (mc_now, mc_max))
1028

    
1029

    
1030
def _DecideSelfPromotion(lu, exceptions=None):
1031
  """Decide whether I should promote myself as a master candidate.
1032

1033
  """
1034
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1035
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1036
  # the new node will increase mc_max with one, so:
1037
  mc_should = min(mc_should + 1, cp_size)
1038
  return mc_now < mc_should
1039

    
1040

    
1041
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1042
  """Check that the brigdes needed by a list of nics exist.
1043

1044
  """
1045
  cluster = lu.cfg.GetClusterInfo()
1046
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1047
  brlist = [params[constants.NIC_LINK] for params in paramslist
1048
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1049
  if brlist:
1050
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1051
    result.Raise("Error checking bridges on destination node '%s'" %
1052
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1053

    
1054

    
1055
def _CheckInstanceBridgesExist(lu, instance, node=None):
1056
  """Check that the brigdes needed by an instance exist.
1057

1058
  """
1059
  if node is None:
1060
    node = instance.primary_node
1061
  _CheckNicsBridgesExist(lu, instance.nics, node)
1062

    
1063

    
1064
def _CheckOSVariant(os_obj, name):
1065
  """Check whether an OS name conforms to the os variants specification.
1066

1067
  @type os_obj: L{objects.OS}
1068
  @param os_obj: OS object to check
1069
  @type name: string
1070
  @param name: OS name passed by the user, to check for validity
1071

1072
  """
1073
  if not os_obj.supported_variants:
1074
    return
1075
  variant = objects.OS.GetVariant(name)
1076
  if not variant:
1077
    raise errors.OpPrereqError("OS name must include a variant",
1078
                               errors.ECODE_INVAL)
1079

    
1080
  if variant not in os_obj.supported_variants:
1081
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1082

    
1083

    
1084
def _GetNodeInstancesInner(cfg, fn):
1085
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1086

    
1087

    
1088
def _GetNodeInstances(cfg, node_name):
1089
  """Returns a list of all primary and secondary instances on a node.
1090

1091
  """
1092

    
1093
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1094

    
1095

    
1096
def _GetNodePrimaryInstances(cfg, node_name):
1097
  """Returns primary instances on a node.
1098

1099
  """
1100
  return _GetNodeInstancesInner(cfg,
1101
                                lambda inst: node_name == inst.primary_node)
1102

    
1103

    
1104
def _GetNodeSecondaryInstances(cfg, node_name):
1105
  """Returns secondary instances on a node.
1106

1107
  """
1108
  return _GetNodeInstancesInner(cfg,
1109
                                lambda inst: node_name in inst.secondary_nodes)
1110

    
1111

    
1112
def _GetStorageTypeArgs(cfg, storage_type):
1113
  """Returns the arguments for a storage type.
1114

1115
  """
1116
  # Special case for file storage
1117
  if storage_type == constants.ST_FILE:
1118
    # storage.FileStorage wants a list of storage directories
1119
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1120

    
1121
  return []
1122

    
1123

    
1124
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
1125
  faulty = []
1126

    
1127
  for dev in instance.disks:
1128
    cfg.SetDiskID(dev, node_name)
1129

    
1130
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
1131
  result.Raise("Failed to get disk status from node %s" % node_name,
1132
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1133

    
1134
  for idx, bdev_status in enumerate(result.payload):
1135
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1136
      faulty.append(idx)
1137

    
1138
  return faulty
1139

    
1140

    
1141
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1142
  """Check the sanity of iallocator and node arguments and use the
1143
  cluster-wide iallocator if appropriate.
1144

1145
  Check that at most one of (iallocator, node) is specified. If none is
1146
  specified, then the LU's opcode's iallocator slot is filled with the
1147
  cluster-wide default iallocator.
1148

1149
  @type iallocator_slot: string
1150
  @param iallocator_slot: the name of the opcode iallocator slot
1151
  @type node_slot: string
1152
  @param node_slot: the name of the opcode target node slot
1153

1154
  """
1155
  node = getattr(lu.op, node_slot, None)
1156
  iallocator = getattr(lu.op, iallocator_slot, None)
1157

    
1158
  if node is not None and iallocator is not None:
1159
    raise errors.OpPrereqError("Do not specify both, iallocator and node.",
1160
                               errors.ECODE_INVAL)
1161
  elif node is None and iallocator is None:
1162
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1163
    if default_iallocator:
1164
      setattr(lu.op, iallocator_slot, default_iallocator)
1165
    else:
1166
      raise errors.OpPrereqError("No iallocator or node given and no"
1167
                                 " cluster-wide default iallocator found."
1168
                                 " Please specify either an iallocator or a"
1169
                                 " node, or set a cluster-wide default"
1170
                                 " iallocator.")
1171

    
1172

    
1173
class LUClusterPostInit(LogicalUnit):
1174
  """Logical unit for running hooks after cluster initialization.
1175

1176
  """
1177
  HPATH = "cluster-init"
1178
  HTYPE = constants.HTYPE_CLUSTER
1179

    
1180
  def BuildHooksEnv(self):
1181
    """Build hooks env.
1182

1183
    """
1184
    return {
1185
      "OP_TARGET": self.cfg.GetClusterName(),
1186
      }
1187

    
1188
  def BuildHooksNodes(self):
1189
    """Build hooks nodes.
1190

1191
    """
1192
    return ([], [self.cfg.GetMasterNode()])
1193

    
1194
  def Exec(self, feedback_fn):
1195
    """Nothing to do.
1196

1197
    """
1198
    return True
1199

    
1200

    
1201
class LUClusterDestroy(LogicalUnit):
1202
  """Logical unit for destroying the cluster.
1203

1204
  """
1205
  HPATH = "cluster-destroy"
1206
  HTYPE = constants.HTYPE_CLUSTER
1207

    
1208
  def BuildHooksEnv(self):
1209
    """Build hooks env.
1210

1211
    """
1212
    return {
1213
      "OP_TARGET": self.cfg.GetClusterName(),
1214
      }
1215

    
1216
  def BuildHooksNodes(self):
1217
    """Build hooks nodes.
1218

1219
    """
1220
    return ([], [])
1221

    
1222
  def CheckPrereq(self):
1223
    """Check prerequisites.
1224

1225
    This checks whether the cluster is empty.
1226

1227
    Any errors are signaled by raising errors.OpPrereqError.
1228

1229
    """
1230
    master = self.cfg.GetMasterNode()
1231

    
1232
    nodelist = self.cfg.GetNodeList()
1233
    if len(nodelist) != 1 or nodelist[0] != master:
1234
      raise errors.OpPrereqError("There are still %d node(s) in"
1235
                                 " this cluster." % (len(nodelist) - 1),
1236
                                 errors.ECODE_INVAL)
1237
    instancelist = self.cfg.GetInstanceList()
1238
    if instancelist:
1239
      raise errors.OpPrereqError("There are still %d instance(s) in"
1240
                                 " this cluster." % len(instancelist),
1241
                                 errors.ECODE_INVAL)
1242

    
1243
  def Exec(self, feedback_fn):
1244
    """Destroys the cluster.
1245

1246
    """
1247
    master = self.cfg.GetMasterNode()
1248

    
1249
    # Run post hooks on master node before it's removed
1250
    _RunPostHook(self, master)
1251

    
1252
    result = self.rpc.call_node_stop_master(master, False)
1253
    result.Raise("Could not disable the master role")
1254

    
1255
    return master
1256

    
1257

    
1258
def _VerifyCertificate(filename):
1259
  """Verifies a certificate for LUClusterVerify.
1260

1261
  @type filename: string
1262
  @param filename: Path to PEM file
1263

1264
  """
1265
  try:
1266
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1267
                                           utils.ReadFile(filename))
1268
  except Exception, err: # pylint: disable-msg=W0703
1269
    return (LUClusterVerify.ETYPE_ERROR,
1270
            "Failed to load X509 certificate %s: %s" % (filename, err))
1271

    
1272
  (errcode, msg) = \
1273
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1274
                                constants.SSL_CERT_EXPIRATION_ERROR)
1275

    
1276
  if msg:
1277
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1278
  else:
1279
    fnamemsg = None
1280

    
1281
  if errcode is None:
1282
    return (None, fnamemsg)
1283
  elif errcode == utils.CERT_WARNING:
1284
    return (LUClusterVerify.ETYPE_WARNING, fnamemsg)
1285
  elif errcode == utils.CERT_ERROR:
1286
    return (LUClusterVerify.ETYPE_ERROR, fnamemsg)
1287

    
1288
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1289

    
1290

    
1291
class LUClusterVerify(LogicalUnit):
1292
  """Verifies the cluster status.
1293

1294
  """
1295
  HPATH = "cluster-verify"
1296
  HTYPE = constants.HTYPE_CLUSTER
1297
  REQ_BGL = False
1298

    
1299
  TCLUSTER = "cluster"
1300
  TNODE = "node"
1301
  TINSTANCE = "instance"
1302

    
1303
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1304
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1305
  ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK")
1306
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1307
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1308
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1309
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1310
  EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
1311
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1312
  EINSTANCESPLITGROUPS = (TINSTANCE, "EINSTANCESPLITGROUPS")
1313
  ENODEDRBD = (TNODE, "ENODEDRBD")
1314
  ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
1315
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1316
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1317
  ENODEHV = (TNODE, "ENODEHV")
1318
  ENODELVM = (TNODE, "ENODELVM")
1319
  ENODEN1 = (TNODE, "ENODEN1")
1320
  ENODENET = (TNODE, "ENODENET")
1321
  ENODEOS = (TNODE, "ENODEOS")
1322
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1323
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1324
  ENODERPC = (TNODE, "ENODERPC")
1325
  ENODESSH = (TNODE, "ENODESSH")
1326
  ENODEVERSION = (TNODE, "ENODEVERSION")
1327
  ENODESETUP = (TNODE, "ENODESETUP")
1328
  ENODETIME = (TNODE, "ENODETIME")
1329
  ENODEOOBPATH = (TNODE, "ENODEOOBPATH")
1330

    
1331
  ETYPE_FIELD = "code"
1332
  ETYPE_ERROR = "ERROR"
1333
  ETYPE_WARNING = "WARNING"
1334

    
1335
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1336

    
1337
  class NodeImage(object):
1338
    """A class representing the logical and physical status of a node.
1339

1340
    @type name: string
1341
    @ivar name: the node name to which this object refers
1342
    @ivar volumes: a structure as returned from
1343
        L{ganeti.backend.GetVolumeList} (runtime)
1344
    @ivar instances: a list of running instances (runtime)
1345
    @ivar pinst: list of configured primary instances (config)
1346
    @ivar sinst: list of configured secondary instances (config)
1347
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1348
        instances for which this node is secondary (config)
1349
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1350
    @ivar dfree: free disk, as reported by the node (runtime)
1351
    @ivar offline: the offline status (config)
1352
    @type rpc_fail: boolean
1353
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1354
        not whether the individual keys were correct) (runtime)
1355
    @type lvm_fail: boolean
1356
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1357
    @type hyp_fail: boolean
1358
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1359
    @type ghost: boolean
1360
    @ivar ghost: whether this is a known node or not (config)
1361
    @type os_fail: boolean
1362
    @ivar os_fail: whether the RPC call didn't return valid OS data
1363
    @type oslist: list
1364
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1365
    @type vm_capable: boolean
1366
    @ivar vm_capable: whether the node can host instances
1367

1368
    """
1369
    def __init__(self, offline=False, name=None, vm_capable=True):
1370
      self.name = name
1371
      self.volumes = {}
1372
      self.instances = []
1373
      self.pinst = []
1374
      self.sinst = []
1375
      self.sbp = {}
1376
      self.mfree = 0
1377
      self.dfree = 0
1378
      self.offline = offline
1379
      self.vm_capable = vm_capable
1380
      self.rpc_fail = False
1381
      self.lvm_fail = False
1382
      self.hyp_fail = False
1383
      self.ghost = False
1384
      self.os_fail = False
1385
      self.oslist = {}
1386

    
1387
  def ExpandNames(self):
1388
    self.needed_locks = {
1389
      locking.LEVEL_NODE: locking.ALL_SET,
1390
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1391
    }
1392
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1393

    
1394
  def _Error(self, ecode, item, msg, *args, **kwargs):
1395
    """Format an error message.
1396

1397
    Based on the opcode's error_codes parameter, either format a
1398
    parseable error code, or a simpler error string.
1399

1400
    This must be called only from Exec and functions called from Exec.
1401

1402
    """
1403
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1404
    itype, etxt = ecode
1405
    # first complete the msg
1406
    if args:
1407
      msg = msg % args
1408
    # then format the whole message
1409
    if self.op.error_codes:
1410
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1411
    else:
1412
      if item:
1413
        item = " " + item
1414
      else:
1415
        item = ""
1416
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1417
    # and finally report it via the feedback_fn
1418
    self._feedback_fn("  - %s" % msg)
1419

    
1420
  def _ErrorIf(self, cond, *args, **kwargs):
1421
    """Log an error message if the passed condition is True.
1422

1423
    """
1424
    cond = bool(cond) or self.op.debug_simulate_errors
1425
    if cond:
1426
      self._Error(*args, **kwargs)
1427
    # do not mark the operation as failed for WARN cases only
1428
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1429
      self.bad = self.bad or cond
1430

    
1431
  def _VerifyNode(self, ninfo, nresult):
1432
    """Perform some basic validation on data returned from a node.
1433

1434
      - check the result data structure is well formed and has all the
1435
        mandatory fields
1436
      - check ganeti version
1437

1438
    @type ninfo: L{objects.Node}
1439
    @param ninfo: the node to check
1440
    @param nresult: the results from the node
1441
    @rtype: boolean
1442
    @return: whether overall this call was successful (and we can expect
1443
         reasonable values in the respose)
1444

1445
    """
1446
    node = ninfo.name
1447
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1448

    
1449
    # main result, nresult should be a non-empty dict
1450
    test = not nresult or not isinstance(nresult, dict)
1451
    _ErrorIf(test, self.ENODERPC, node,
1452
                  "unable to verify node: no data returned")
1453
    if test:
1454
      return False
1455

    
1456
    # compares ganeti version
1457
    local_version = constants.PROTOCOL_VERSION
1458
    remote_version = nresult.get("version", None)
1459
    test = not (remote_version and
1460
                isinstance(remote_version, (list, tuple)) and
1461
                len(remote_version) == 2)
1462
    _ErrorIf(test, self.ENODERPC, node,
1463
             "connection to node returned invalid data")
1464
    if test:
1465
      return False
1466

    
1467
    test = local_version != remote_version[0]
1468
    _ErrorIf(test, self.ENODEVERSION, node,
1469
             "incompatible protocol versions: master %s,"
1470
             " node %s", local_version, remote_version[0])
1471
    if test:
1472
      return False
1473

    
1474
    # node seems compatible, we can actually try to look into its results
1475

    
1476
    # full package version
1477
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1478
                  self.ENODEVERSION, node,
1479
                  "software version mismatch: master %s, node %s",
1480
                  constants.RELEASE_VERSION, remote_version[1],
1481
                  code=self.ETYPE_WARNING)
1482

    
1483
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1484
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1485
      for hv_name, hv_result in hyp_result.iteritems():
1486
        test = hv_result is not None
1487
        _ErrorIf(test, self.ENODEHV, node,
1488
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1489

    
1490
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1491
    if ninfo.vm_capable and isinstance(hvp_result, list):
1492
      for item, hv_name, hv_result in hvp_result:
1493
        _ErrorIf(True, self.ENODEHV, node,
1494
                 "hypervisor %s parameter verify failure (source %s): %s",
1495
                 hv_name, item, hv_result)
1496

    
1497
    test = nresult.get(constants.NV_NODESETUP,
1498
                       ["Missing NODESETUP results"])
1499
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1500
             "; ".join(test))
1501

    
1502
    return True
1503

    
1504
  def _VerifyNodeTime(self, ninfo, nresult,
1505
                      nvinfo_starttime, nvinfo_endtime):
1506
    """Check the node time.
1507

1508
    @type ninfo: L{objects.Node}
1509
    @param ninfo: the node to check
1510
    @param nresult: the remote results for the node
1511
    @param nvinfo_starttime: the start time of the RPC call
1512
    @param nvinfo_endtime: the end time of the RPC call
1513

1514
    """
1515
    node = ninfo.name
1516
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1517

    
1518
    ntime = nresult.get(constants.NV_TIME, None)
1519
    try:
1520
      ntime_merged = utils.MergeTime(ntime)
1521
    except (ValueError, TypeError):
1522
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1523
      return
1524

    
1525
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1526
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1527
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1528
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1529
    else:
1530
      ntime_diff = None
1531

    
1532
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1533
             "Node time diverges by at least %s from master node time",
1534
             ntime_diff)
1535

    
1536
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1537
    """Check the node time.
1538

1539
    @type ninfo: L{objects.Node}
1540
    @param ninfo: the node to check
1541
    @param nresult: the remote results for the node
1542
    @param vg_name: the configured VG name
1543

1544
    """
1545
    if vg_name is None:
1546
      return
1547

    
1548
    node = ninfo.name
1549
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1550

    
1551
    # checks vg existence and size > 20G
1552
    vglist = nresult.get(constants.NV_VGLIST, None)
1553
    test = not vglist
1554
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1555
    if not test:
1556
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1557
                                            constants.MIN_VG_SIZE)
1558
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1559

    
1560
    # check pv names
1561
    pvlist = nresult.get(constants.NV_PVLIST, None)
1562
    test = pvlist is None
1563
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1564
    if not test:
1565
      # check that ':' is not present in PV names, since it's a
1566
      # special character for lvcreate (denotes the range of PEs to
1567
      # use on the PV)
1568
      for _, pvname, owner_vg in pvlist:
1569
        test = ":" in pvname
1570
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1571
                 " '%s' of VG '%s'", pvname, owner_vg)
1572

    
1573
  def _VerifyNodeNetwork(self, ninfo, nresult):
1574
    """Check the node time.
1575

1576
    @type ninfo: L{objects.Node}
1577
    @param ninfo: the node to check
1578
    @param nresult: the remote results for the node
1579

1580
    """
1581
    node = ninfo.name
1582
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1583

    
1584
    test = constants.NV_NODELIST not in nresult
1585
    _ErrorIf(test, self.ENODESSH, node,
1586
             "node hasn't returned node ssh connectivity data")
1587
    if not test:
1588
      if nresult[constants.NV_NODELIST]:
1589
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1590
          _ErrorIf(True, self.ENODESSH, node,
1591
                   "ssh communication with node '%s': %s", a_node, a_msg)
1592

    
1593
    test = constants.NV_NODENETTEST not in nresult
1594
    _ErrorIf(test, self.ENODENET, node,
1595
             "node hasn't returned node tcp connectivity data")
1596
    if not test:
1597
      if nresult[constants.NV_NODENETTEST]:
1598
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1599
        for anode in nlist:
1600
          _ErrorIf(True, self.ENODENET, node,
1601
                   "tcp communication with node '%s': %s",
1602
                   anode, nresult[constants.NV_NODENETTEST][anode])
1603

    
1604
    test = constants.NV_MASTERIP not in nresult
1605
    _ErrorIf(test, self.ENODENET, node,
1606
             "node hasn't returned node master IP reachability data")
1607
    if not test:
1608
      if not nresult[constants.NV_MASTERIP]:
1609
        if node == self.master_node:
1610
          msg = "the master node cannot reach the master IP (not configured?)"
1611
        else:
1612
          msg = "cannot reach the master IP"
1613
        _ErrorIf(True, self.ENODENET, node, msg)
1614

    
1615
  def _VerifyInstance(self, instance, instanceconfig, node_image,
1616
                      diskstatus):
1617
    """Verify an instance.
1618

1619
    This function checks to see if the required block devices are
1620
    available on the instance's node.
1621

1622
    """
1623
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1624
    node_current = instanceconfig.primary_node
1625

    
1626
    node_vol_should = {}
1627
    instanceconfig.MapLVsByNode(node_vol_should)
1628

    
1629
    for node in node_vol_should:
1630
      n_img = node_image[node]
1631
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1632
        # ignore missing volumes on offline or broken nodes
1633
        continue
1634
      for volume in node_vol_should[node]:
1635
        test = volume not in n_img.volumes
1636
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1637
                 "volume %s missing on node %s", volume, node)
1638

    
1639
    if instanceconfig.admin_up:
1640
      pri_img = node_image[node_current]
1641
      test = instance not in pri_img.instances and not pri_img.offline
1642
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1643
               "instance not running on its primary node %s",
1644
               node_current)
1645

    
1646
    for node, n_img in node_image.items():
1647
      if node != node_current:
1648
        test = instance in n_img.instances
1649
        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1650
                 "instance should not run on node %s", node)
1651

    
1652
    diskdata = [(nname, success, status, idx)
1653
                for (nname, disks) in diskstatus.items()
1654
                for idx, (success, status) in enumerate(disks)]
1655

    
1656
    for nname, success, bdev_status, idx in diskdata:
1657
      # the 'ghost node' construction in Exec() ensures that we have a
1658
      # node here
1659
      snode = node_image[nname]
1660
      bad_snode = snode.ghost or snode.offline
1661
      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
1662
               self.EINSTANCEFAULTYDISK, instance,
1663
               "couldn't retrieve status for disk/%s on %s: %s",
1664
               idx, nname, bdev_status)
1665
      _ErrorIf((instanceconfig.admin_up and success and
1666
                bdev_status.ldisk_status == constants.LDS_FAULTY),
1667
               self.EINSTANCEFAULTYDISK, instance,
1668
               "disk/%s on %s is faulty", idx, nname)
1669

    
1670
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1671
    """Verify if there are any unknown volumes in the cluster.
1672

1673
    The .os, .swap and backup volumes are ignored. All other volumes are
1674
    reported as unknown.
1675

1676
    @type reserved: L{ganeti.utils.FieldSet}
1677
    @param reserved: a FieldSet of reserved volume names
1678

1679
    """
1680
    for node, n_img in node_image.items():
1681
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1682
        # skip non-healthy nodes
1683
        continue
1684
      for volume in n_img.volumes:
1685
        test = ((node not in node_vol_should or
1686
                volume not in node_vol_should[node]) and
1687
                not reserved.Matches(volume))
1688
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1689
                      "volume %s is unknown", volume)
1690

    
1691
  def _VerifyOrphanInstances(self, instancelist, node_image):
1692
    """Verify the list of running instances.
1693

1694
    This checks what instances are running but unknown to the cluster.
1695

1696
    """
1697
    for node, n_img in node_image.items():
1698
      for o_inst in n_img.instances:
1699
        test = o_inst not in instancelist
1700
        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1701
                      "instance %s on node %s should not exist", o_inst, node)
1702

    
1703
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1704
    """Verify N+1 Memory Resilience.
1705

1706
    Check that if one single node dies we can still start all the
1707
    instances it was primary for.
1708

1709
    """
1710
    cluster_info = self.cfg.GetClusterInfo()
1711
    for node, n_img in node_image.items():
1712
      # This code checks that every node which is now listed as
1713
      # secondary has enough memory to host all instances it is
1714
      # supposed to should a single other node in the cluster fail.
1715
      # FIXME: not ready for failover to an arbitrary node
1716
      # FIXME: does not support file-backed instances
1717
      # WARNING: we currently take into account down instances as well
1718
      # as up ones, considering that even if they're down someone
1719
      # might want to start them even in the event of a node failure.
1720
      if n_img.offline:
1721
        # we're skipping offline nodes from the N+1 warning, since
1722
        # most likely we don't have good memory infromation from them;
1723
        # we already list instances living on such nodes, and that's
1724
        # enough warning
1725
        continue
1726
      for prinode, instances in n_img.sbp.items():
1727
        needed_mem = 0
1728
        for instance in instances:
1729
          bep = cluster_info.FillBE(instance_cfg[instance])
1730
          if bep[constants.BE_AUTO_BALANCE]:
1731
            needed_mem += bep[constants.BE_MEMORY]
1732
        test = n_img.mfree < needed_mem
1733
        self._ErrorIf(test, self.ENODEN1, node,
1734
                      "not enough memory to accomodate instance failovers"
1735
                      " should node %s fail (%dMiB needed, %dMiB available)",
1736
                      prinode, needed_mem, n_img.mfree)
1737

    
1738
  @classmethod
1739
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
1740
                   (files_all, files_all_opt, files_mc, files_vm)):
1741
    """Verifies file checksums collected from all nodes.
1742

1743
    @param errorif: Callback for reporting errors
1744
    @param nodeinfo: List of L{objects.Node} objects
1745
    @param master_node: Name of master node
1746
    @param all_nvinfo: RPC results
1747

1748
    """
1749
    node_names = frozenset(node.name for node in nodeinfo)
1750

    
1751
    assert master_node in node_names
1752
    assert (len(files_all | files_all_opt | files_mc | files_vm) ==
1753
            sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
1754
           "Found file listed in more than one file list"
1755

    
1756
    # Define functions determining which nodes to consider for a file
1757
    file2nodefn = dict([(filename, fn)
1758
      for (files, fn) in [(files_all, None),
1759
                          (files_all_opt, None),
1760
                          (files_mc, lambda node: (node.master_candidate or
1761
                                                   node.name == master_node)),
1762
                          (files_vm, lambda node: node.vm_capable)]
1763
      for filename in files])
1764

    
1765
    fileinfo = dict((filename, {}) for filename in file2nodefn.keys())
1766

    
1767
    for node in nodeinfo:
1768
      nresult = all_nvinfo[node.name]
1769

    
1770
      if nresult.fail_msg or not nresult.payload:
1771
        node_files = None
1772
      else:
1773
        node_files = nresult.payload.get(constants.NV_FILELIST, None)
1774

    
1775
      test = not (node_files and isinstance(node_files, dict))
1776
      errorif(test, cls.ENODEFILECHECK, node.name,
1777
              "Node did not return file checksum data")
1778
      if test:
1779
        continue
1780

    
1781
      for (filename, checksum) in node_files.items():
1782
        # Check if the file should be considered for a node
1783
        fn = file2nodefn[filename]
1784
        if fn is None or fn(node):
1785
          fileinfo[filename].setdefault(checksum, set()).add(node.name)
1786

    
1787
    for (filename, checksums) in fileinfo.items():
1788
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
1789

    
1790
      # Nodes having the file
1791
      with_file = frozenset(node_name
1792
                            for nodes in fileinfo[filename].values()
1793
                            for node_name in nodes)
1794

    
1795
      # Nodes missing file
1796
      missing_file = node_names - with_file
1797

    
1798
      if filename in files_all_opt:
1799
        # All or no nodes
1800
        errorif(missing_file and missing_file != node_names,
1801
                cls.ECLUSTERFILECHECK, None,
1802
                "File %s is optional, but it must exist on all or no nodes (not"
1803
                " found on %s)",
1804
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
1805
      else:
1806
        errorif(missing_file, cls.ECLUSTERFILECHECK, None,
1807
                "File %s is missing from node(s) %s", filename,
1808
                utils.CommaJoin(utils.NiceSort(missing_file)))
1809

    
1810
      # See if there are multiple versions of the file
1811
      test = len(checksums) > 1
1812
      if test:
1813
        variants = ["variant %s on %s" %
1814
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
1815
                    for (idx, (checksum, nodes)) in
1816
                      enumerate(sorted(checksums.items()))]
1817
      else:
1818
        variants = []
1819

    
1820
      errorif(test, cls.ECLUSTERFILECHECK, None,
1821
              "File %s found with %s different checksums (%s)",
1822
              filename, len(checksums), "; ".join(variants))
1823

    
1824
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
1825
                      drbd_map):
1826
    """Verifies and the node DRBD status.
1827

1828
    @type ninfo: L{objects.Node}
1829
    @param ninfo: the node to check
1830
    @param nresult: the remote results for the node
1831
    @param instanceinfo: the dict of instances
1832
    @param drbd_helper: the configured DRBD usermode helper
1833
    @param drbd_map: the DRBD map as returned by
1834
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
1835

1836
    """
1837
    node = ninfo.name
1838
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1839

    
1840
    if drbd_helper:
1841
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
1842
      test = (helper_result == None)
1843
      _ErrorIf(test, self.ENODEDRBDHELPER, node,
1844
               "no drbd usermode helper returned")
1845
      if helper_result:
1846
        status, payload = helper_result
1847
        test = not status
1848
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
1849
                 "drbd usermode helper check unsuccessful: %s", payload)
1850
        test = status and (payload != drbd_helper)
1851
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
1852
                 "wrong drbd usermode helper: %s", payload)
1853

    
1854
    # compute the DRBD minors
1855
    node_drbd = {}
1856
    for minor, instance in drbd_map[node].items():
1857
      test = instance not in instanceinfo
1858
      _ErrorIf(test, self.ECLUSTERCFG, None,
1859
               "ghost instance '%s' in temporary DRBD map", instance)
1860
        # ghost instance should not be running, but otherwise we
1861
        # don't give double warnings (both ghost instance and
1862
        # unallocated minor in use)
1863
      if test:
1864
        node_drbd[minor] = (instance, False)
1865
      else:
1866
        instance = instanceinfo[instance]
1867
        node_drbd[minor] = (instance.name, instance.admin_up)
1868

    
1869
    # and now check them
1870
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
1871
    test = not isinstance(used_minors, (tuple, list))
1872
    _ErrorIf(test, self.ENODEDRBD, node,
1873
             "cannot parse drbd status file: %s", str(used_minors))
1874
    if test:
1875
      # we cannot check drbd status
1876
      return
1877

    
1878
    for minor, (iname, must_exist) in node_drbd.items():
1879
      test = minor not in used_minors and must_exist
1880
      _ErrorIf(test, self.ENODEDRBD, node,
1881
               "drbd minor %d of instance %s is not active", minor, iname)
1882
    for minor in used_minors:
1883
      test = minor not in node_drbd
1884
      _ErrorIf(test, self.ENODEDRBD, node,
1885
               "unallocated drbd minor %d is in use", minor)
1886

    
1887
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
1888
    """Builds the node OS structures.
1889

1890
    @type ninfo: L{objects.Node}
1891
    @param ninfo: the node to check
1892
    @param nresult: the remote results for the node
1893
    @param nimg: the node image object
1894

1895
    """
1896
    node = ninfo.name
1897
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1898

    
1899
    remote_os = nresult.get(constants.NV_OSLIST, None)
1900
    test = (not isinstance(remote_os, list) or
1901
            not compat.all(isinstance(v, list) and len(v) == 7
1902
                           for v in remote_os))
1903

    
1904
    _ErrorIf(test, self.ENODEOS, node,
1905
             "node hasn't returned valid OS data")
1906

    
1907
    nimg.os_fail = test
1908

    
1909
    if test:
1910
      return
1911

    
1912
    os_dict = {}
1913

    
1914
    for (name, os_path, status, diagnose,
1915
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
1916

    
1917
      if name not in os_dict:
1918
        os_dict[name] = []
1919

    
1920
      # parameters is a list of lists instead of list of tuples due to
1921
      # JSON lacking a real tuple type, fix it:
1922
      parameters = [tuple(v) for v in parameters]
1923
      os_dict[name].append((os_path, status, diagnose,
1924
                            set(variants), set(parameters), set(api_ver)))
1925

    
1926
    nimg.oslist = os_dict
1927

    
1928
  def _VerifyNodeOS(self, ninfo, nimg, base):
1929
    """Verifies the node OS list.
1930

1931
    @type ninfo: L{objects.Node}
1932
    @param ninfo: the node to check
1933
    @param nimg: the node image object
1934
    @param base: the 'template' node we match against (e.g. from the master)
1935

1936
    """
1937
    node = ninfo.name
1938
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1939

    
1940
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
1941

    
1942
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
1943
    for os_name, os_data in nimg.oslist.items():
1944
      assert os_data, "Empty OS status for OS %s?!" % os_name
1945
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
1946
      _ErrorIf(not f_status, self.ENODEOS, node,
1947
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
1948
      _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
1949
               "OS '%s' has multiple entries (first one shadows the rest): %s",
1950
               os_name, utils.CommaJoin([v[0] for v in os_data]))
1951
      # this will catched in backend too
1952
      _ErrorIf(compat.any(v >= constants.OS_API_V15 for v in f_api)
1953
               and not f_var, self.ENODEOS, node,
1954
               "OS %s with API at least %d does not declare any variant",
1955
               os_name, constants.OS_API_V15)
1956
      # comparisons with the 'base' image
1957
      test = os_name not in base.oslist
1958
      _ErrorIf(test, self.ENODEOS, node,
1959
               "Extra OS %s not present on reference node (%s)",
1960
               os_name, base.name)
1961
      if test:
1962
        continue
1963
      assert base.oslist[os_name], "Base node has empty OS status?"
1964
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
1965
      if not b_status:
1966
        # base OS is invalid, skipping
1967
        continue
1968
      for kind, a, b in [("API version", f_api, b_api),
1969
                         ("variants list", f_var, b_var),
1970
                         ("parameters", beautify_params(f_param),
1971
                          beautify_params(b_param))]:
1972
        _ErrorIf(a != b, self.ENODEOS, node,
1973
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
1974
                 kind, os_name, base.name,
1975
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
1976

    
1977
    # check any missing OSes
1978
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
1979
    _ErrorIf(missing, self.ENODEOS, node,
1980
             "OSes present on reference node %s but missing on this node: %s",
1981
             base.name, utils.CommaJoin(missing))
1982

    
1983
  def _VerifyOob(self, ninfo, nresult):
1984
    """Verifies out of band functionality of a node.
1985

1986
    @type ninfo: L{objects.Node}
1987
    @param ninfo: the node to check
1988
    @param nresult: the remote results for the node
1989

1990
    """
1991
    node = ninfo.name
1992
    # We just have to verify the paths on master and/or master candidates
1993
    # as the oob helper is invoked on the master
1994
    if ((ninfo.master_candidate or ninfo.master_capable) and
1995
        constants.NV_OOB_PATHS in nresult):
1996
      for path_result in nresult[constants.NV_OOB_PATHS]:
1997
        self._ErrorIf(path_result, self.ENODEOOBPATH, node, path_result)
1998

    
1999
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2000
    """Verifies and updates the node volume data.
2001

2002
    This function will update a L{NodeImage}'s internal structures
2003
    with data from the remote call.
2004

2005
    @type ninfo: L{objects.Node}
2006
    @param ninfo: the node to check
2007
    @param nresult: the remote results for the node
2008
    @param nimg: the node image object
2009
    @param vg_name: the configured VG name
2010

2011
    """
2012
    node = ninfo.name
2013
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2014

    
2015
    nimg.lvm_fail = True
2016
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2017
    if vg_name is None:
2018
      pass
2019
    elif isinstance(lvdata, basestring):
2020
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
2021
               utils.SafeEncode(lvdata))
2022
    elif not isinstance(lvdata, dict):
2023
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
2024
    else:
2025
      nimg.volumes = lvdata
2026
      nimg.lvm_fail = False
2027

    
2028
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2029
    """Verifies and updates the node instance list.
2030

2031
    If the listing was successful, then updates this node's instance
2032
    list. Otherwise, it marks the RPC call as failed for the instance
2033
    list key.
2034

2035
    @type ninfo: L{objects.Node}
2036
    @param ninfo: the node to check
2037
    @param nresult: the remote results for the node
2038
    @param nimg: the node image object
2039

2040
    """
2041
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2042
    test = not isinstance(idata, list)
2043
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
2044
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
2045
    if test:
2046
      nimg.hyp_fail = True
2047
    else:
2048
      nimg.instances = idata
2049

    
2050
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2051
    """Verifies and computes a node information map
2052

2053
    @type ninfo: L{objects.Node}
2054
    @param ninfo: the node to check
2055
    @param nresult: the remote results for the node
2056
    @param nimg: the node image object
2057
    @param vg_name: the configured VG name
2058

2059
    """
2060
    node = ninfo.name
2061
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2062

    
2063
    # try to read free memory (from the hypervisor)
2064
    hv_info = nresult.get(constants.NV_HVINFO, None)
2065
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2066
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
2067
    if not test:
2068
      try:
2069
        nimg.mfree = int(hv_info["memory_free"])
2070
      except (ValueError, TypeError):
2071
        _ErrorIf(True, self.ENODERPC, node,
2072
                 "node returned invalid nodeinfo, check hypervisor")
2073

    
2074
    # FIXME: devise a free space model for file based instances as well
2075
    if vg_name is not None:
2076
      test = (constants.NV_VGLIST not in nresult or
2077
              vg_name not in nresult[constants.NV_VGLIST])
2078
      _ErrorIf(test, self.ENODELVM, node,
2079
               "node didn't return data for the volume group '%s'"
2080
               " - it is either missing or broken", vg_name)
2081
      if not test:
2082
        try:
2083
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2084
        except (ValueError, TypeError):
2085
          _ErrorIf(True, self.ENODERPC, node,
2086
                   "node returned invalid LVM info, check LVM status")
2087

    
2088
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2089
    """Gets per-disk status information for all instances.
2090

2091
    @type nodelist: list of strings
2092
    @param nodelist: Node names
2093
    @type node_image: dict of (name, L{objects.Node})
2094
    @param node_image: Node objects
2095
    @type instanceinfo: dict of (name, L{objects.Instance})
2096
    @param instanceinfo: Instance objects
2097
    @rtype: {instance: {node: [(succes, payload)]}}
2098
    @return: a dictionary of per-instance dictionaries with nodes as
2099
        keys and disk information as values; the disk information is a
2100
        list of tuples (success, payload)
2101

2102
    """
2103
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2104

    
2105
    node_disks = {}
2106
    node_disks_devonly = {}
2107
    diskless_instances = set()
2108
    diskless = constants.DT_DISKLESS
2109

    
2110
    for nname in nodelist:
2111
      node_instances = list(itertools.chain(node_image[nname].pinst,
2112
                                            node_image[nname].sinst))
2113
      diskless_instances.update(inst for inst in node_instances
2114
                                if instanceinfo[inst].disk_template == diskless)
2115
      disks = [(inst, disk)
2116
               for inst in node_instances
2117
               for disk in instanceinfo[inst].disks]
2118

    
2119
      if not disks:
2120
        # No need to collect data
2121
        continue
2122

    
2123
      node_disks[nname] = disks
2124

    
2125
      # Creating copies as SetDiskID below will modify the objects and that can
2126
      # lead to incorrect data returned from nodes
2127
      devonly = [dev.Copy() for (_, dev) in disks]
2128

    
2129
      for dev in devonly:
2130
        self.cfg.SetDiskID(dev, nname)
2131

    
2132
      node_disks_devonly[nname] = devonly
2133

    
2134
    assert len(node_disks) == len(node_disks_devonly)
2135

    
2136
    # Collect data from all nodes with disks
2137
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2138
                                                          node_disks_devonly)
2139

    
2140
    assert len(result) == len(node_disks)
2141

    
2142
    instdisk = {}
2143

    
2144
    for (nname, nres) in result.items():
2145
      disks = node_disks[nname]
2146

    
2147
      if nres.offline:
2148
        # No data from this node
2149
        data = len(disks) * [(False, "node offline")]
2150
      else:
2151
        msg = nres.fail_msg
2152
        _ErrorIf(msg, self.ENODERPC, nname,
2153
                 "while getting disk information: %s", msg)
2154
        if msg:
2155
          # No data from this node
2156
          data = len(disks) * [(False, msg)]
2157
        else:
2158
          data = []
2159
          for idx, i in enumerate(nres.payload):
2160
            if isinstance(i, (tuple, list)) and len(i) == 2:
2161
              data.append(i)
2162
            else:
2163
              logging.warning("Invalid result from node %s, entry %d: %s",
2164
                              nname, idx, i)
2165
              data.append((False, "Invalid result from the remote node"))
2166

    
2167
      for ((inst, _), status) in zip(disks, data):
2168
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2169

    
2170
    # Add empty entries for diskless instances.
2171
    for inst in diskless_instances:
2172
      assert inst not in instdisk
2173
      instdisk[inst] = {}
2174

    
2175
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2176
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2177
                      compat.all(isinstance(s, (tuple, list)) and
2178
                                 len(s) == 2 for s in statuses)
2179
                      for inst, nnames in instdisk.items()
2180
                      for nname, statuses in nnames.items())
2181
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2182

    
2183
    return instdisk
2184

    
2185
  def _VerifyHVP(self, hvp_data):
2186
    """Verifies locally the syntax of the hypervisor parameters.
2187

2188
    """
2189
    for item, hv_name, hv_params in hvp_data:
2190
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
2191
             (item, hv_name))
2192
      try:
2193
        hv_class = hypervisor.GetHypervisor(hv_name)
2194
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2195
        hv_class.CheckParameterSyntax(hv_params)
2196
      except errors.GenericError, err:
2197
        self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
2198

    
2199
  def BuildHooksEnv(self):
2200
    """Build hooks env.
2201

2202
    Cluster-Verify hooks just ran in the post phase and their failure makes
2203
    the output be logged in the verify output and the verification to fail.
2204

2205
    """
2206
    cfg = self.cfg
2207

    
2208
    env = {
2209
      "CLUSTER_TAGS": " ".join(cfg.GetClusterInfo().GetTags())
2210
      }
2211

    
2212
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2213
               for node in cfg.GetAllNodesInfo().values())
2214

    
2215
    return env
2216

    
2217
  def BuildHooksNodes(self):
2218
    """Build hooks nodes.
2219

2220
    """
2221
    return ([], self.cfg.GetNodeList())
2222

    
2223
  def Exec(self, feedback_fn):
2224
    """Verify integrity of cluster, performing various test on nodes.
2225

2226
    """
2227
    # This method has too many local variables. pylint: disable-msg=R0914
2228
    self.bad = False
2229
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2230
    verbose = self.op.verbose
2231
    self._feedback_fn = feedback_fn
2232
    feedback_fn("* Verifying global settings")
2233
    for msg in self.cfg.VerifyConfig():
2234
      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
2235

    
2236
    # Check the cluster certificates
2237
    for cert_filename in constants.ALL_CERT_FILES:
2238
      (errcode, msg) = _VerifyCertificate(cert_filename)
2239
      _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
2240

    
2241
    vg_name = self.cfg.GetVGName()
2242
    drbd_helper = self.cfg.GetDRBDHelper()
2243
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2244
    cluster = self.cfg.GetClusterInfo()
2245
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
2246
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
2247
    nodeinfo_byname = dict(zip(nodelist, nodeinfo))
2248
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
2249
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
2250
                        for iname in instancelist)
2251
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2252
    i_non_redundant = [] # Non redundant instances
2253
    i_non_a_balanced = [] # Non auto-balanced instances
2254
    n_offline = 0 # Count of offline nodes
2255
    n_drained = 0 # Count of nodes being drained
2256
    node_vol_should = {}
2257

    
2258
    # FIXME: verify OS list
2259

    
2260
    # File verification
2261
    filemap = _ComputeAncillaryFiles(cluster, False)
2262

    
2263
    # do local checksums
2264
    master_node = self.master_node = self.cfg.GetMasterNode()
2265
    master_ip = self.cfg.GetMasterIP()
2266

    
2267
    # Compute the set of hypervisor parameters
2268
    hvp_data = []
2269
    for hv_name in hypervisors:
2270
      hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
2271
    for os_name, os_hvp in cluster.os_hvp.items():
2272
      for hv_name, hv_params in os_hvp.items():
2273
        if not hv_params:
2274
          continue
2275
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
2276
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
2277
    # TODO: collapse identical parameter values in a single one
2278
    for instance in instanceinfo.values():
2279
      if not instance.hvparams:
2280
        continue
2281
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
2282
                       cluster.FillHV(instance)))
2283
    # and verify them locally
2284
    self._VerifyHVP(hvp_data)
2285

    
2286
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
2287
    node_verify_param = {
2288
      constants.NV_FILELIST:
2289
        utils.UniqueSequence(filename
2290
                             for files in filemap
2291
                             for filename in files),
2292
      constants.NV_NODELIST: [node.name for node in nodeinfo
2293
                              if not node.offline],
2294
      constants.NV_HYPERVISOR: hypervisors,
2295
      constants.NV_HVPARAMS: hvp_data,
2296
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
2297
                                  node.secondary_ip) for node in nodeinfo
2298
                                 if not node.offline],
2299
      constants.NV_INSTANCELIST: hypervisors,
2300
      constants.NV_VERSION: None,
2301
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2302
      constants.NV_NODESETUP: None,
2303
      constants.NV_TIME: None,
2304
      constants.NV_MASTERIP: (master_node, master_ip),
2305
      constants.NV_OSLIST: None,
2306
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2307
      }
2308

    
2309
    if vg_name is not None:
2310
      node_verify_param[constants.NV_VGLIST] = None
2311
      node_verify_param[constants.NV_LVLIST] = vg_name
2312
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2313
      node_verify_param[constants.NV_DRBDLIST] = None
2314

    
2315
    if drbd_helper:
2316
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2317

    
2318
    # Build our expected cluster state
2319
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2320
                                                 name=node.name,
2321
                                                 vm_capable=node.vm_capable))
2322
                      for node in nodeinfo)
2323

    
2324
    # Gather OOB paths
2325
    oob_paths = []
2326
    for node in nodeinfo:
2327
      path = _SupportsOob(self.cfg, node)
2328
      if path and path not in oob_paths:
2329
        oob_paths.append(path)
2330

    
2331
    if oob_paths:
2332
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2333

    
2334
    for instance in instancelist:
2335
      inst_config = instanceinfo[instance]
2336

    
2337
      for nname in inst_config.all_nodes:
2338
        if nname not in node_image:
2339
          # ghost node
2340
          gnode = self.NodeImage(name=nname)
2341
          gnode.ghost = True
2342
          node_image[nname] = gnode
2343

    
2344
      inst_config.MapLVsByNode(node_vol_should)
2345

    
2346
      pnode = inst_config.primary_node
2347
      node_image[pnode].pinst.append(instance)
2348

    
2349
      for snode in inst_config.secondary_nodes:
2350
        nimg = node_image[snode]
2351
        nimg.sinst.append(instance)
2352
        if pnode not in nimg.sbp:
2353
          nimg.sbp[pnode] = []
2354
        nimg.sbp[pnode].append(instance)
2355

    
2356
    # At this point, we have the in-memory data structures complete,
2357
    # except for the runtime information, which we'll gather next
2358

    
2359
    # Due to the way our RPC system works, exact response times cannot be
2360
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2361
    # time before and after executing the request, we can at least have a time
2362
    # window.
2363
    nvinfo_starttime = time.time()
2364
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
2365
                                           self.cfg.GetClusterName())
2366
    nvinfo_endtime = time.time()
2367

    
2368
    all_drbd_map = self.cfg.ComputeDRBDMap()
2369

    
2370
    feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist))
2371
    instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo)
2372

    
2373
    feedback_fn("* Verifying configuration file consistency")
2374
    self._VerifyFiles(_ErrorIf, nodeinfo, master_node, all_nvinfo, filemap)
2375

    
2376
    feedback_fn("* Verifying node status")
2377

    
2378
    refos_img = None
2379

    
2380
    for node_i in nodeinfo:
2381
      node = node_i.name
2382
      nimg = node_image[node]
2383

    
2384
      if node_i.offline:
2385
        if verbose:
2386
          feedback_fn("* Skipping offline node %s" % (node,))
2387
        n_offline += 1
2388
        continue
2389

    
2390
      if node == master_node:
2391
        ntype = "master"
2392
      elif node_i.master_candidate:
2393
        ntype = "master candidate"
2394
      elif node_i.drained:
2395
        ntype = "drained"
2396
        n_drained += 1
2397
      else:
2398
        ntype = "regular"
2399
      if verbose:
2400
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2401

    
2402
      msg = all_nvinfo[node].fail_msg
2403
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
2404
      if msg:
2405
        nimg.rpc_fail = True
2406
        continue
2407

    
2408
      nresult = all_nvinfo[node].payload
2409

    
2410
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2411
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2412
      self._VerifyNodeNetwork(node_i, nresult)
2413
      self._VerifyOob(node_i, nresult)
2414

    
2415
      if nimg.vm_capable:
2416
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2417
        self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper,
2418
                             all_drbd_map)
2419

    
2420
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2421
        self._UpdateNodeInstances(node_i, nresult, nimg)
2422
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2423
        self._UpdateNodeOS(node_i, nresult, nimg)
2424
        if not nimg.os_fail:
2425
          if refos_img is None:
2426
            refos_img = nimg
2427
          self._VerifyNodeOS(node_i, nimg, refos_img)
2428

    
2429
    feedback_fn("* Verifying instance status")
2430
    for instance in instancelist:
2431
      if verbose:
2432
        feedback_fn("* Verifying instance %s" % instance)
2433
      inst_config = instanceinfo[instance]
2434
      self._VerifyInstance(instance, inst_config, node_image,
2435
                           instdisk[instance])
2436
      inst_nodes_offline = []
2437

    
2438
      pnode = inst_config.primary_node
2439
      pnode_img = node_image[pnode]
2440
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2441
               self.ENODERPC, pnode, "instance %s, connection to"
2442
               " primary node failed", instance)
2443

    
2444
      _ErrorIf(inst_config.admin_up and pnode_img.offline,
2445
               self.EINSTANCEBADNODE, instance,
2446
               "instance is marked as running and lives on offline node %s",
2447
               inst_config.primary_node)
2448

    
2449
      # If the instance is non-redundant we cannot survive losing its primary
2450
      # node, so we are not N+1 compliant. On the other hand we have no disk
2451
      # templates with more than one secondary so that situation is not well
2452
      # supported either.
2453
      # FIXME: does not support file-backed instances
2454
      if not inst_config.secondary_nodes:
2455
        i_non_redundant.append(instance)
2456

    
2457
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
2458
               instance, "instance has multiple secondary nodes: %s",
2459
               utils.CommaJoin(inst_config.secondary_nodes),
2460
               code=self.ETYPE_WARNING)
2461

    
2462
      if inst_config.disk_template in constants.DTS_INT_MIRROR:
2463
        pnode = inst_config.primary_node
2464
        instance_nodes = utils.NiceSort(inst_config.all_nodes)
2465
        instance_groups = {}
2466

    
2467
        for node in instance_nodes:
2468
          instance_groups.setdefault(nodeinfo_byname[node].group,
2469
                                     []).append(node)
2470

    
2471
        pretty_list = [
2472
          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
2473
          # Sort so that we always list the primary node first.
2474
          for group, nodes in sorted(instance_groups.items(),
2475
                                     key=lambda (_, nodes): pnode in nodes,
2476
                                     reverse=True)]
2477

    
2478
        self._ErrorIf(len(instance_groups) > 1, self.EINSTANCESPLITGROUPS,
2479
                      instance, "instance has primary and secondary nodes in"
2480
                      " different groups: %s", utils.CommaJoin(pretty_list),
2481
                      code=self.ETYPE_WARNING)
2482

    
2483
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2484
        i_non_a_balanced.append(instance)
2485

    
2486
      for snode in inst_config.secondary_nodes:
2487
        s_img = node_image[snode]
2488
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
2489
                 "instance %s, connection to secondary node failed", instance)
2490

    
2491
        if s_img.offline:
2492
          inst_nodes_offline.append(snode)
2493

    
2494
      # warn that the instance lives on offline nodes
2495
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
2496
               "instance has offline secondary node(s) %s",
2497
               utils.CommaJoin(inst_nodes_offline))
2498
      # ... or ghost/non-vm_capable nodes
2499
      for node in inst_config.all_nodes:
2500
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
2501
                 "instance lives on ghost node %s", node)
2502
        _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE,
2503
                 instance, "instance lives on non-vm_capable node %s", node)
2504

    
2505
    feedback_fn("* Verifying orphan volumes")
2506
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2507
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2508

    
2509
    feedback_fn("* Verifying orphan instances")
2510
    self._VerifyOrphanInstances(instancelist, node_image)
2511

    
2512
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2513
      feedback_fn("* Verifying N+1 Memory redundancy")
2514
      self._VerifyNPlusOneMemory(node_image, instanceinfo)
2515

    
2516
    feedback_fn("* Other Notes")
2517
    if i_non_redundant:
2518
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2519
                  % len(i_non_redundant))
2520

    
2521
    if i_non_a_balanced:
2522
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2523
                  % len(i_non_a_balanced))
2524

    
2525
    if n_offline:
2526
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2527

    
2528
    if n_drained:
2529
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2530

    
2531
    return not self.bad
2532

    
2533
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2534
    """Analyze the post-hooks' result
2535

2536
    This method analyses the hook result, handles it, and sends some
2537
    nicely-formatted feedback back to the user.
2538

2539
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2540
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2541
    @param hooks_results: the results of the multi-node hooks rpc call
2542
    @param feedback_fn: function used send feedback back to the caller
2543
    @param lu_result: previous Exec result
2544
    @return: the new Exec result, based on the previous result
2545
        and hook results
2546

2547
    """
2548
    # We only really run POST phase hooks, and are only interested in
2549
    # their results
2550
    if phase == constants.HOOKS_PHASE_POST:
2551
      # Used to change hooks' output to proper indentation
2552
      feedback_fn("* Hooks Results")
2553
      assert hooks_results, "invalid result from hooks"
2554

    
2555
      for node_name in hooks_results:
2556
        res = hooks_results[node_name]
2557
        msg = res.fail_msg
2558
        test = msg and not res.offline
2559
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
2560
                      "Communication failure in hooks execution: %s", msg)
2561
        if res.offline or msg:
2562
          # No need to investigate payload if node is offline or gave an error.
2563
          # override manually lu_result here as _ErrorIf only
2564
          # overrides self.bad
2565
          lu_result = 1
2566
          continue
2567
        for script, hkr, output in res.payload:
2568
          test = hkr == constants.HKR_FAIL
2569
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
2570
                        "Script %s failed, output:", script)
2571
          if test:
2572
            output = self._HOOKS_INDENT_RE.sub('      ', output)
2573
            feedback_fn("%s" % output)
2574
            lu_result = 0
2575

    
2576
      return lu_result
2577

    
2578

    
2579
class LUClusterVerifyDisks(NoHooksLU):
2580
  """Verifies the cluster disks status.
2581

2582
  """
2583
  REQ_BGL = False
2584

    
2585
  def ExpandNames(self):
2586
    self.needed_locks = {
2587
      locking.LEVEL_NODE: locking.ALL_SET,
2588
      locking.LEVEL_INSTANCE: locking.ALL_SET,
2589
    }
2590
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
2591

    
2592
  def Exec(self, feedback_fn):
2593
    """Verify integrity of cluster disks.
2594

2595
    @rtype: tuple of three items
2596
    @return: a tuple of (dict of node-to-node_error, list of instances
2597
        which need activate-disks, dict of instance: (node, volume) for
2598
        missing volumes
2599

2600
    """
2601
    result = res_nodes, res_instances, res_missing = {}, [], {}
2602

    
2603
    nodes = utils.NiceSort(self.cfg.GetVmCapableNodeList())
2604
    instances = self.cfg.GetAllInstancesInfo().values()
2605

    
2606
    nv_dict = {}
2607
    for inst in instances:
2608
      inst_lvs = {}
2609
      if not inst.admin_up:
2610
        continue
2611
      inst.MapLVsByNode(inst_lvs)
2612
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
2613
      for node, vol_list in inst_lvs.iteritems():
2614
        for vol in vol_list:
2615
          nv_dict[(node, vol)] = inst
2616

    
2617
    if not nv_dict:
2618
      return result
2619

    
2620
    node_lvs = self.rpc.call_lv_list(nodes, [])
2621
    for node, node_res in node_lvs.items():
2622
      if node_res.offline:
2623
        continue
2624
      msg = node_res.fail_msg
2625
      if msg:
2626
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
2627
        res_nodes[node] = msg
2628
        continue
2629

    
2630
      lvs = node_res.payload
2631
      for lv_name, (_, _, lv_online) in lvs.items():
2632
        inst = nv_dict.pop((node, lv_name), None)
2633
        if (not lv_online and inst is not None
2634
            and inst.name not in res_instances):
2635
          res_instances.append(inst.name)
2636

    
2637
    # any leftover items in nv_dict are missing LVs, let's arrange the
2638
    # data better
2639
    for key, inst in nv_dict.iteritems():
2640
      if inst.name not in res_missing:
2641
        res_missing[inst.name] = []
2642
      res_missing[inst.name].append(key)
2643

    
2644
    return result
2645

    
2646

    
2647
class LUClusterRepairDiskSizes(NoHooksLU):
2648
  """Verifies the cluster disks sizes.
2649

2650
  """
2651
  REQ_BGL = False
2652

    
2653
  def ExpandNames(self):
2654
    if self.op.instances:
2655
      self.wanted_names = []
2656
      for name in self.op.instances:
2657
        full_name = _ExpandInstanceName(self.cfg, name)
2658
        self.wanted_names.append(full_name)
2659
      self.needed_locks = {
2660
        locking.LEVEL_NODE: [],
2661
        locking.LEVEL_INSTANCE: self.wanted_names,
2662
        }
2663
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2664
    else:
2665
      self.wanted_names = None
2666
      self.needed_locks = {
2667
        locking.LEVEL_NODE: locking.ALL_SET,
2668
        locking.LEVEL_INSTANCE: locking.ALL_SET,
2669
        }
2670
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
2671

    
2672
  def DeclareLocks(self, level):
2673
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
2674
      self._LockInstancesNodes(primary_only=True)
2675

    
2676
  def CheckPrereq(self):
2677
    """Check prerequisites.
2678

2679
    This only checks the optional instance list against the existing names.
2680

2681
    """
2682
    if self.wanted_names is None:
2683
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2684

    
2685
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
2686
                             in self.wanted_names]
2687

    
2688
  def _EnsureChildSizes(self, disk):
2689
    """Ensure children of the disk have the needed disk size.
2690

2691
    This is valid mainly for DRBD8 and fixes an issue where the
2692
    children have smaller disk size.
2693

2694
    @param disk: an L{ganeti.objects.Disk} object
2695

2696
    """
2697
    if disk.dev_type == constants.LD_DRBD8:
2698
      assert disk.children, "Empty children for DRBD8?"
2699
      fchild = disk.children[0]
2700
      mismatch = fchild.size < disk.size
2701
      if mismatch:
2702
        self.LogInfo("Child disk has size %d, parent %d, fixing",
2703
                     fchild.size, disk.size)
2704
        fchild.size = disk.size
2705

    
2706
      # and we recurse on this child only, not on the metadev
2707
      return self._EnsureChildSizes(fchild) or mismatch
2708
    else:
2709
      return False
2710

    
2711
  def Exec(self, feedback_fn):
2712
    """Verify the size of cluster disks.
2713

2714
    """
2715
    # TODO: check child disks too
2716
    # TODO: check differences in size between primary/secondary nodes
2717
    per_node_disks = {}
2718
    for instance in self.wanted_instances:
2719
      pnode = instance.primary_node
2720
      if pnode not in per_node_disks:
2721
        per_node_disks[pnode] = []
2722
      for idx, disk in enumerate(instance.disks):
2723
        per_node_disks[pnode].append((instance, idx, disk))
2724

    
2725
    changed = []
2726
    for node, dskl in per_node_disks.items():
2727
      newl = [v[2].Copy() for v in dskl]
2728
      for dsk in newl:
2729
        self.cfg.SetDiskID(dsk, node)
2730
      result = self.rpc.call_blockdev_getsize(node, newl)
2731
      if result.fail_msg:
2732
        self.LogWarning("Failure in blockdev_getsize call to node"
2733
                        " %s, ignoring", node)
2734
        continue
2735
      if len(result.payload) != len(dskl):
2736
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
2737
                        " result.payload=%s", node, len(dskl), result.payload)
2738
        self.LogWarning("Invalid result from node %s, ignoring node results",
2739
                        node)
2740
        continue
2741
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
2742
        if size is None:
2743
          self.LogWarning("Disk %d of instance %s did not return size"
2744
                          " information, ignoring", idx, instance.name)
2745
          continue
2746
        if not isinstance(size, (int, long)):
2747
          self.LogWarning("Disk %d of instance %s did not return valid"
2748
                          " size information, ignoring", idx, instance.name)
2749
          continue
2750
        size = size >> 20
2751
        if size != disk.size:
2752
          self.LogInfo("Disk %d of instance %s has mismatched size,"
2753
                       " correcting: recorded %d, actual %d", idx,
2754
                       instance.name, disk.size, size)
2755
          disk.size = size
2756
          self.cfg.Update(instance, feedback_fn)
2757
          changed.append((instance.name, idx, size))
2758
        if self._EnsureChildSizes(disk):
2759
          self.cfg.Update(instance, feedback_fn)
2760
          changed.append((instance.name, idx, disk.size))
2761
    return changed
2762

    
2763

    
2764
class LUClusterRename(LogicalUnit):
2765
  """Rename the cluster.
2766

2767
  """
2768
  HPATH = "cluster-rename"
2769
  HTYPE = constants.HTYPE_CLUSTER
2770

    
2771
  def BuildHooksEnv(self):
2772
    """Build hooks env.
2773

2774
    """
2775
    return {
2776
      "OP_TARGET": self.cfg.GetClusterName(),
2777
      "NEW_NAME": self.op.name,
2778
      }
2779

    
2780
  def BuildHooksNodes(self):
2781
    """Build hooks nodes.
2782

2783
    """
2784
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
2785

    
2786
  def CheckPrereq(self):
2787
    """Verify that the passed name is a valid one.
2788

2789
    """
2790
    hostname = netutils.GetHostname(name=self.op.name,
2791
                                    family=self.cfg.GetPrimaryIPFamily())
2792

    
2793
    new_name = hostname.name
2794
    self.ip = new_ip = hostname.ip
2795
    old_name = self.cfg.GetClusterName()
2796
    old_ip = self.cfg.GetMasterIP()
2797
    if new_name == old_name and new_ip == old_ip:
2798
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
2799
                                 " cluster has changed",
2800
                                 errors.ECODE_INVAL)
2801
    if new_ip != old_ip:
2802
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
2803
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
2804
                                   " reachable on the network" %
2805
                                   new_ip, errors.ECODE_NOTUNIQUE)
2806

    
2807
    self.op.name = new_name
2808

    
2809
  def Exec(self, feedback_fn):
2810
    """Rename the cluster.
2811

2812
    """
2813
    clustername = self.op.name
2814
    ip = self.ip
2815

    
2816
    # shutdown the master IP
2817
    master = self.cfg.GetMasterNode()
2818
    result = self.rpc.call_node_stop_master(master, False)
2819
    result.Raise("Could not disable the master role")
2820

    
2821
    try:
2822
      cluster = self.cfg.GetClusterInfo()
2823
      cluster.cluster_name = clustername
2824
      cluster.master_ip = ip
2825
      self.cfg.Update(cluster, feedback_fn)
2826

    
2827
      # update the known hosts file
2828
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
2829
      node_list = self.cfg.GetOnlineNodeList()
2830
      try:
2831
        node_list.remove(master)
2832
      except ValueError:
2833
        pass
2834
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
2835
    finally:
2836
      result = self.rpc.call_node_start_master(master, False, False)
2837
      msg = result.fail_msg
2838
      if msg:
2839
        self.LogWarning("Could not re-enable the master role on"
2840
                        " the master, please restart manually: %s", msg)
2841

    
2842
    return clustername
2843

    
2844

    
2845
class LUClusterSetParams(LogicalUnit):
2846
  """Change the parameters of the cluster.
2847

2848
  """
2849
  HPATH = "cluster-modify"
2850
  HTYPE = constants.HTYPE_CLUSTER
2851
  REQ_BGL = False
2852

    
2853
  def CheckArguments(self):
2854
    """Check parameters
2855

2856
    """
2857
    if self.op.uid_pool:
2858
      uidpool.CheckUidPool(self.op.uid_pool)
2859

    
2860
    if self.op.add_uids:
2861
      uidpool.CheckUidPool(self.op.add_uids)
2862

    
2863
    if self.op.remove_uids:
2864
      uidpool.CheckUidPool(self.op.remove_uids)
2865

    
2866
  def ExpandNames(self):
2867
    # FIXME: in the future maybe other cluster params won't require checking on
2868
    # all nodes to be modified.
2869
    self.needed_locks = {
2870
      locking.LEVEL_NODE: locking.ALL_SET,
2871
    }
2872
    self.share_locks[locking.LEVEL_NODE] = 1
2873

    
2874
  def BuildHooksEnv(self):
2875
    """Build hooks env.
2876

2877
    """
2878
    return {
2879
      "OP_TARGET": self.cfg.GetClusterName(),
2880
      "NEW_VG_NAME": self.op.vg_name,
2881
      }
2882

    
2883
  def BuildHooksNodes(self):
2884
    """Build hooks nodes.
2885

2886
    """
2887
    mn = self.cfg.GetMasterNode()
2888
    return ([mn], [mn])
2889

    
2890
  def CheckPrereq(self):
2891
    """Check prerequisites.
2892

2893
    This checks whether the given params don't conflict and
2894
    if the given volume group is valid.
2895

2896
    """
2897
    if self.op.vg_name is not None and not self.op.vg_name:
2898
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
2899
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
2900
                                   " instances exist", errors.ECODE_INVAL)
2901

    
2902
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
2903
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
2904
        raise errors.OpPrereqError("Cannot disable drbd helper while"
2905
                                   " drbd-based instances exist",
2906
                                   errors.ECODE_INVAL)
2907

    
2908
    node_list = self.acquired_locks[locking.LEVEL_NODE]
2909

    
2910
    # if vg_name not None, checks given volume group on all nodes
2911
    if self.op.vg_name:
2912
      vglist = self.rpc.call_vg_list(node_list)
2913
      for node in node_list:
2914
        msg = vglist[node].fail_msg
2915
        if msg:
2916
          # ignoring down node
2917
          self.LogWarning("Error while gathering data on node %s"
2918
                          " (ignoring node): %s", node, msg)
2919
          continue
2920
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2921
                                              self.op.vg_name,
2922
                                              constants.MIN_VG_SIZE)
2923
        if vgstatus:
2924
          raise errors.OpPrereqError("Error on node '%s': %s" %
2925
                                     (node, vgstatus), errors.ECODE_ENVIRON)
2926

    
2927
    if self.op.drbd_helper:
2928
      # checks given drbd helper on all nodes
2929
      helpers = self.rpc.call_drbd_helper(node_list)
2930
      for node in node_list:
2931
        ninfo = self.cfg.GetNodeInfo(node)
2932
        if ninfo.offline:
2933
          self.LogInfo("Not checking drbd helper on offline node %s", node)
2934
          continue
2935
        msg = helpers[node].fail_msg
2936
        if msg:
2937
          raise errors.OpPrereqError("Error checking drbd helper on node"
2938
                                     " '%s': %s" % (node, msg),
2939
                                     errors.ECODE_ENVIRON)
2940
        node_helper = helpers[node].payload
2941
        if node_helper != self.op.drbd_helper:
2942
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
2943
                                     (node, node_helper), errors.ECODE_ENVIRON)
2944

    
2945
    self.cluster = cluster = self.cfg.GetClusterInfo()
2946
    # validate params changes
2947
    if self.op.beparams:
2948
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2949
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
2950

    
2951
    if self.op.ndparams:
2952
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
2953
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
2954

    
2955
      # TODO: we need a more general way to handle resetting
2956
      # cluster-level parameters to default values
2957
      if self.new_ndparams["oob_program"] == "":
2958
        self.new_ndparams["oob_program"] = \
2959
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
2960

    
2961
    if self.op.nicparams:
2962
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2963
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
2964
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
2965
      nic_errors = []
2966

    
2967
      # check all instances for consistency
2968
      for instance in self.cfg.GetAllInstancesInfo().values():
2969
        for nic_idx, nic in enumerate(instance.nics):
2970
          params_copy = copy.deepcopy(nic.nicparams)
2971
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
2972

    
2973
          # check parameter syntax
2974
          try:
2975
            objects.NIC.CheckParameterSyntax(params_filled)
2976
          except errors.ConfigurationError, err:
2977
            nic_errors.append("Instance %s, nic/%d: %s" %
2978
                              (instance.name, nic_idx, err))
2979

    
2980
          # if we're moving instances to routed, check that they have an ip
2981
          target_mode = params_filled[constants.NIC_MODE]
2982
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2983
            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2984
                              (instance.name, nic_idx))
2985
      if nic_errors:
2986
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2987
                                   "\n".join(nic_errors))
2988

    
2989
    # hypervisor list/parameters
2990
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
2991
    if self.op.hvparams:
2992
      for hv_name, hv_dict in self.op.hvparams.items():
2993
        if hv_name not in self.new_hvparams:
2994
          self.new_hvparams[hv_name] = hv_dict
2995
        else:
2996
          self.new_hvparams[hv_name].update(hv_dict)
2997

    
2998
    # os hypervisor parameters
2999
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
3000
    if self.op.os_hvp:
3001
      for os_name, hvs in self.op.os_hvp.items():
3002
        if os_name not in self.new_os_hvp:
3003
          self.new_os_hvp[os_name] = hvs
3004
        else:
3005
          for hv_name, hv_dict in hvs.items():
3006
            if hv_name not in self.new_os_hvp[os_name]:
3007
              self.new_os_hvp[os_name][hv_name] = hv_dict
3008
            else:
3009
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
3010

    
3011
    # os parameters
3012
    self.new_osp = objects.FillDict(cluster.osparams, {})
3013
    if self.op.osparams:
3014
      for os_name, osp in self.op.osparams.items():
3015
        if os_name not in self.new_osp:
3016
          self.new_osp[os_name] = {}
3017

    
3018
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
3019
                                                  use_none=True)
3020

    
3021
        if not self.new_osp[os_name]:
3022
          # we removed all parameters
3023
          del self.new_osp[os_name]
3024
        else:
3025
          # check the parameter validity (remote check)
3026
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
3027
                         os_name, self.new_osp[os_name])
3028

    
3029
    # changes to the hypervisor list
3030
    if self.op.enabled_hypervisors is not None:
3031
      self.hv_list = self.op.enabled_hypervisors
3032
      for hv in self.hv_list:
3033
        # if the hypervisor doesn't already exist in the cluster
3034
        # hvparams, we initialize it to empty, and then (in both
3035
        # cases) we make sure to fill the defaults, as we might not
3036
        # have a complete defaults list if the hypervisor wasn't
3037
        # enabled before
3038
        if hv not in new_hvp:
3039
          new_hvp[hv] = {}
3040
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
3041
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
3042
    else:
3043
      self.hv_list = cluster.enabled_hypervisors
3044

    
3045
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
3046
      # either the enabled list has changed, or the parameters have, validate
3047
      for hv_name, hv_params in self.new_hvparams.items():
3048
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
3049
            (self.op.enabled_hypervisors and
3050
             hv_name in self.op.enabled_hypervisors)):
3051
          # either this is a new hypervisor, or its parameters have changed
3052
          hv_class = hypervisor.GetHypervisor(hv_name)
3053
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3054
          hv_class.CheckParameterSyntax(hv_params)
3055
          _CheckHVParams(self, node_list, hv_name, hv_params)
3056

    
3057
    if self.op.os_hvp:
3058
      # no need to check any newly-enabled hypervisors, since the
3059
      # defaults have already been checked in the above code-block
3060
      for os_name, os_hvp in self.new_os_hvp.items():
3061
        for hv_name, hv_params in os_hvp.items():
3062
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3063
          # we need to fill in the new os_hvp on top of the actual hv_p
3064
          cluster_defaults = self.new_hvparams.get(hv_name, {})
3065
          new_osp = objects.FillDict(cluster_defaults, hv_params)
3066
          hv_class = hypervisor.GetHypervisor(hv_name)
3067
          hv_class.CheckParameterSyntax(new_osp)
3068
          _CheckHVParams(self, node_list, hv_name, new_osp)
3069

    
3070
    if self.op.default_iallocator:
3071
      alloc_script = utils.FindFile(self.op.default_iallocator,
3072
                                    constants.IALLOCATOR_SEARCH_PATH,
3073
                                    os.path.isfile)
3074
      if alloc_script is None:
3075
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
3076
                                   " specified" % self.op.default_iallocator,
3077
                                   errors.ECODE_INVAL)
3078

    
3079
  def Exec(self, feedback_fn):
3080
    """Change the parameters of the cluster.
3081

3082
    """
3083
    if self.op.vg_name is not None:
3084
      new_volume = self.op.vg_name
3085
      if not new_volume:
3086
        new_volume = None
3087
      if new_volume != self.cfg.GetVGName():
3088
        self.cfg.SetVGName(new_volume)
3089
      else:
3090
        feedback_fn("Cluster LVM configuration already in desired"
3091
                    " state, not changing")
3092
    if self.op.drbd_helper is not None:
3093
      new_helper = self.op.drbd_helper
3094
      if not new_helper:
3095
        new_helper = None
3096
      if new_helper != self.cfg.GetDRBDHelper():
3097
        self.cfg.SetDRBDHelper(new_helper)
3098
      else:
3099
        feedback_fn("Cluster DRBD helper already in desired state,"
3100
                    " not changing")
3101
    if self.op.hvparams:
3102
      self.cluster.hvparams = self.new_hvparams
3103
    if self.op.os_hvp:
3104
      self.cluster.os_hvp = self.new_os_hvp
3105
    if self.op.enabled_hypervisors is not None:
3106
      self.cluster.hvparams = self.new_hvparams
3107
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
3108
    if self.op.beparams:
3109
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
3110
    if self.op.nicparams:
3111
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
3112
    if self.op.osparams:
3113
      self.cluster.osparams = self.new_osp
3114
    if self.op.ndparams:
3115
      self.cluster.ndparams = self.new_ndparams
3116

    
3117
    if self.op.candidate_pool_size is not None:
3118
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3119
      # we need to update the pool size here, otherwise the save will fail
3120
      _AdjustCandidatePool(self, [])
3121

    
3122
    if self.op.maintain_node_health is not None:
3123
      self.cluster.maintain_node_health = self.op.maintain_node_health
3124

    
3125
    if self.op.prealloc_wipe_disks is not None:
3126
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3127

    
3128
    if self.op.add_uids is not None:
3129
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3130

    
3131
    if self.op.remove_uids is not None:
3132
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3133

    
3134
    if self.op.uid_pool is not None:
3135
      self.cluster.uid_pool = self.op.uid_pool
3136

    
3137
    if self.op.default_iallocator is not None:
3138
      self.cluster.default_iallocator = self.op.default_iallocator
3139

    
3140
    if self.op.reserved_lvs is not None:
3141
      self.cluster.reserved_lvs = self.op.reserved_lvs
3142

    
3143
    def helper_os(aname, mods, desc):
3144
      desc += " OS list"
3145
      lst = getattr(self.cluster, aname)
3146
      for key, val in mods:
3147
        if key == constants.DDM_ADD:
3148
          if val in lst:
3149
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3150
          else:
3151
            lst.append(val)
3152
        elif key == constants.DDM_REMOVE:
3153
          if val in lst:
3154
            lst.remove(val)
3155
          else:
3156
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3157
        else:
3158
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3159

    
3160
    if self.op.hidden_os:
3161
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3162

    
3163
    if self.op.blacklisted_os:
3164
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3165

    
3166
    if self.op.master_netdev:
3167
      master = self.cfg.GetMasterNode()
3168
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3169
                  self.cluster.master_netdev)
3170
      result = self.rpc.call_node_stop_master(master, False)
3171
      result.Raise("Could not disable the master ip")
3172
      feedback_fn("Changing master_netdev from %s to %s" %
3173
                  (self.cluster.master_netdev, self.op.master_netdev))
3174
      self.cluster.master_netdev = self.op.master_netdev
3175

    
3176
    self.cfg.Update(self.cluster, feedback_fn)
3177

    
3178
    if self.op.master_netdev:
3179
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3180
                  self.op.master_netdev)
3181
      result = self.rpc.call_node_start_master(master, False, False)
3182
      if result.fail_msg:
3183
        self.LogWarning("Could not re-enable the master ip on"
3184
                        " the master, please restart manually: %s",
3185
                        result.fail_msg)
3186

    
3187

    
3188
def _UploadHelper(lu, nodes, fname):
3189
  """Helper for uploading a file and showing warnings.
3190

3191
  """
3192
  if os.path.exists(fname):
3193
    result = lu.rpc.call_upload_file(nodes, fname)
3194
    for to_node, to_result in result.items():
3195
      msg = to_result.fail_msg
3196
      if msg:
3197
        msg = ("Copy of file %s to node %s failed: %s" %
3198
               (fname, to_node, msg))
3199
        lu.proc.LogWarning(msg)
3200

    
3201

    
3202
def _ComputeAncillaryFiles(cluster, redist):
3203
  """Compute files external to Ganeti which need to be consistent.
3204

3205
  @type redist: boolean
3206
  @param redist: Whether to include files which need to be redistributed
3207

3208
  """
3209
  # Compute files for all nodes
3210
  files_all = set([
3211
    constants.SSH_KNOWN_HOSTS_FILE,
3212
    constants.CONFD_HMAC_KEY,
3213
    constants.CLUSTER_DOMAIN_SECRET_FILE,
3214
    ])
3215

    
3216
  if not redist:
3217
    files_all.update(constants.ALL_CERT_FILES)
3218
    files_all.update(ssconf.SimpleStore().GetFileList())
3219

    
3220
  if cluster.modify_etc_hosts:
3221
    files_all.add(constants.ETC_HOSTS)
3222

    
3223
  # Files which must either exist on all nodes or on none
3224
  files_all_opt = set([
3225
    constants.RAPI_USERS_FILE,
3226
    ])
3227

    
3228
  # Files which should only be on master candidates
3229
  files_mc = set()
3230
  if not redist:
3231
    files_mc.add(constants.CLUSTER_CONF_FILE)
3232

    
3233
  # Files which should only be on VM-capable nodes
3234
  files_vm = set(filename
3235
    for hv_name in cluster.enabled_hypervisors
3236
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles())
3237

    
3238
  # Filenames must be unique
3239
  assert (len(files_all | files_all_opt | files_mc | files_vm) ==
3240
          sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
3241
         "Found file listed in more than one file list"
3242

    
3243
  return (files_all, files_all_opt, files_mc, files_vm)
3244

    
3245

    
3246
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3247
  """Distribute additional files which are part of the cluster configuration.
3248

3249
  ConfigWriter takes care of distributing the config and ssconf files, but
3250
  there are more files which should be distributed to all nodes. This function
3251
  makes sure those are copied.
3252

3253
  @param lu: calling logical unit
3254
  @param additional_nodes: list of nodes not in the config to distribute to
3255
  @type additional_vm: boolean
3256
  @param additional_vm: whether the additional nodes are vm-capable or not
3257

3258
  """
3259
  # Gather target nodes
3260
  cluster = lu.cfg.GetClusterInfo()
3261
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3262

    
3263
  online_nodes = lu.cfg.GetOnlineNodeList()
3264
  vm_nodes = lu.cfg.GetVmCapableNodeList()
3265

    
3266
  if additional_nodes is not None:
3267
    online_nodes.extend(additional_nodes)
3268
    if additional_vm:
3269
      vm_nodes.extend(additional_nodes)
3270

    
3271
  # Never distribute to master node
3272
  for nodelist in [online_nodes, vm_nodes]:
3273
    if master_info.name in nodelist:
3274
      nodelist.remove(master_info.name)
3275

    
3276
  # Gather file lists
3277
  (files_all, files_all_opt, files_mc, files_vm) = \
3278
    _ComputeAncillaryFiles(cluster, True)
3279

    
3280
  # Never re-distribute configuration file from here
3281
  assert not (constants.CLUSTER_CONF_FILE in files_all or
3282
              constants.CLUSTER_CONF_FILE in files_vm)
3283
  assert not files_mc, "Master candidates not handled in this function"
3284

    
3285
  filemap = [
3286
    (online_nodes, files_all),
3287
    (online_nodes, files_all_opt),
3288
    (vm_nodes, files_vm),
3289
    ]
3290

    
3291
  # Upload the files
3292
  for (node_list, files) in filemap:
3293
    for fname in files:
3294
      _UploadHelper(lu, node_list, fname)
3295

    
3296

    
3297
class LUClusterRedistConf(NoHooksLU):
3298
  """Force the redistribution of cluster configuration.
3299

3300
  This is a very simple LU.
3301

3302
  """
3303
  REQ_BGL = False
3304

    
3305
  def ExpandNames(self):
3306
    self.needed_locks = {
3307
      locking.LEVEL_NODE: locking.ALL_SET,
3308
    }
3309
    self.share_locks[locking.LEVEL_NODE] = 1
3310

    
3311
  def Exec(self, feedback_fn):
3312
    """Redistribute the configuration.
3313

3314
    """
3315
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3316
    _RedistributeAncillaryFiles(self)
3317

    
3318

    
3319
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3320
  """Sleep and poll for an instance's disk to sync.
3321

3322
  """
3323
  if not instance.disks or disks is not None and not disks:
3324
    return True
3325

    
3326
  disks = _ExpandCheckDisks(instance, disks)
3327

    
3328
  if not oneshot:
3329
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3330

    
3331
  node = instance.primary_node
3332

    
3333
  for dev in disks:
3334
    lu.cfg.SetDiskID(dev, node)
3335

    
3336
  # TODO: Convert to utils.Retry
3337

    
3338
  retries = 0
3339
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3340
  while True:
3341
    max_time = 0
3342
    done = True
3343
    cumul_degraded = False
3344
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3345
    msg = rstats.fail_msg
3346
    if msg:
3347
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3348
      retries += 1
3349
      if retries >= 10:
3350
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3351
                                 " aborting." % node)
3352
      time.sleep(6)
3353
      continue
3354
    rstats = rstats.payload
3355
    retries = 0
3356
    for i, mstat in enumerate(rstats):
3357
      if mstat is None:
3358
        lu.LogWarning("Can't compute data for node %s/%s",
3359
                           node, disks[i].iv_name)
3360
        continue
3361

    
3362
      cumul_degraded = (cumul_degraded or
3363
                        (mstat.is_degraded and mstat.sync_percent is None))
3364
      if mstat.sync_percent is not None:
3365
        done = False
3366
        if mstat.estimated_time is not None:
3367
          rem_time = ("%s remaining (estimated)" %
3368
                      utils.FormatSeconds(mstat.estimated_time))
3369
          max_time = mstat.estimated_time
3370
        else:
3371
          rem_time = "no time estimate"
3372
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
3373
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
3374

    
3375
    # if we're done but degraded, let's do a few small retries, to
3376
    # make sure we see a stable and not transient situation; therefore
3377
    # we force restart of the loop
3378
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3379
      logging.info("Degraded disks found, %d retries left", degr_retries)
3380
      degr_retries -= 1
3381
      time.sleep(1)
3382
      continue
3383

    
3384
    if done or oneshot:
3385
      break
3386

    
3387
    time.sleep(min(60, max_time))
3388

    
3389
  if done:
3390
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
3391
  return not cumul_degraded
3392

    
3393

    
3394
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3395
  """Check that mirrors are not degraded.
3396

3397
  The ldisk parameter, if True, will change the test from the
3398
  is_degraded attribute (which represents overall non-ok status for
3399
  the device(s)) to the ldisk (representing the local storage status).
3400

3401
  """
3402
  lu.cfg.SetDiskID(dev, node)
3403

    
3404
  result = True
3405

    
3406
  if on_primary or dev.AssembleOnSecondary():
3407
    rstats = lu.rpc.call_blockdev_find(node, dev)
3408
    msg = rstats.fail_msg
3409
    if msg:
3410
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
3411
      result = False
3412
    elif not rstats.payload:
3413
      lu.LogWarning("Can't find disk on node %s", node)
3414
      result = False
3415
    else:
3416
      if ldisk:
3417
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
3418
      else:
3419
        result = result and not rstats.payload.is_degraded
3420

    
3421
  if dev.children:
3422
    for child in dev.children:
3423
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
3424

    
3425
  return result
3426

    
3427

    
3428
class LUOobCommand(NoHooksLU):
3429
  """Logical unit for OOB handling.
3430

3431
  """
3432
  REG_BGL = False
3433
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
3434

    
3435
  def CheckPrereq(self):
3436
    """Check prerequisites.
3437

3438
    This checks:
3439
     - the node exists in the configuration
3440
     - OOB is supported
3441

3442
    Any errors are signaled by raising errors.OpPrereqError.
3443

3444
    """
3445
    self.nodes = []
3446
    self.master_node = self.cfg.GetMasterNode()
3447

    
3448
    assert self.op.power_delay >= 0.0
3449

    
3450
    if self.op.node_names:
3451
      if (self.op.command in self._SKIP_MASTER and
3452
          self.master_node in self.op.node_names):
3453
        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
3454
        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
3455

    
3456
        if master_oob_handler:
3457
          additional_text = ("run '%s %s %s' if you want to operate on the"
3458
                             " master regardless") % (master_oob_handler,
3459
                                                      self.op.command,
3460
                                                      self.master_node)
3461
        else:
3462
          additional_text = "it does not support out-of-band operations"
3463

    
3464
        raise errors.OpPrereqError(("Operating on the master node %s is not"
3465
                                    " allowed for %s; %s") %
3466
                                   (self.master_node, self.op.command,
3467
                                    additional_text), errors.ECODE_INVAL)
3468
    else:
3469
      self.op.node_names = self.cfg.GetNodeList()
3470
      if self.op.command in self._SKIP_MASTER:
3471
        self.op.node_names.remove(self.master_node)
3472

    
3473
    if self.op.command in self._SKIP_MASTER:
3474
      assert self.master_node not in self.op.node_names
3475

    
3476
    for node_name in self.op.node_names:
3477
      node = self.cfg.GetNodeInfo(node_name)
3478

    
3479
      if node is None:
3480
        raise errors.OpPrereqError("Node %s not found" % node_name,
3481
                                   errors.ECODE_NOENT)
3482
      else:
3483
        self.nodes.append(node)
3484

    
3485
      if (not self.op.ignore_status and
3486
          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
3487
        raise errors.OpPrereqError(("Cannot power off node %s because it is"
3488
                                    " not marked offline") % node_name,
3489
                                   errors.ECODE_STATE)
3490

    
3491
  def ExpandNames(self):
3492
    """Gather locks we need.
3493

3494
    """
3495
    if self.op.node_names:
3496
      self.op.node_names = [_ExpandNodeName(self.cfg, name)
3497
                            for name in self.op.node_names]
3498
      lock_names = self.op.node_names
3499
    else:
3500
      lock_names = locking.ALL_SET
3501

    
3502
    self.needed_locks = {
3503
      locking.LEVEL_NODE: lock_names,
3504
      }
3505

    
3506
  def Exec(self, feedback_fn):
3507
    """Execute OOB and return result if we expect any.
3508

3509
    """
3510
    master_node = self.master_node
3511
    ret = []
3512

    
3513
    for idx, node in enumerate(self.nodes):
3514
      node_entry = [(constants.RS_NORMAL, node.name)]
3515
      ret.append(node_entry)
3516

    
3517
      oob_program = _SupportsOob(self.cfg, node)
3518

    
3519
      if not oob_program:
3520
        node_entry.append((constants.RS_UNAVAIL, None))
3521
        continue
3522

    
3523
      logging.info("Executing out-of-band command '%s' using '%s' on %s",
3524
                   self.op.command, oob_program, node.name)
3525
      result = self.rpc.call_run_oob(master_node, oob_program,
3526
                                     self.op.command, node.name,
3527
                                     self.op.timeout)
3528

    
3529
      if result.fail_msg:
3530
        self.LogWarning("Out-of-band RPC failed on node '%s': %s",
3531
                        node.name, result.fail_msg)
3532
        node_entry.append((constants.RS_NODATA, None))
3533
      else:
3534
        try:
3535
          self._CheckPayload(result)
3536
        except errors.OpExecError, err:
3537
          self.LogWarning("Payload returned by node '%s' is not valid: %s",
3538
                          node.name, err)
3539
          node_entry.append((constants.RS_NODATA, None))
3540
        else:
3541
          if self.op.command == constants.OOB_HEALTH:
3542
            # For health we should log important events
3543
            for item, status in result.payload:
3544
              if status in [constants.OOB_STATUS_WARNING,
3545
                            constants.OOB_STATUS_CRITICAL]:
3546
                self.LogWarning("Item '%s' on node '%s' has status '%s'",
3547
                                item, node.name, status)
3548

    
3549
          if self.op.command == constants.OOB_POWER_ON:
3550
            node.powered = True
3551
          elif self.op.command == constants.OOB_POWER_OFF:
3552
            node.powered = False
3553
          elif self.op.command == constants.OOB_POWER_STATUS:
3554
            powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
3555
            if powered != node.powered:
3556
              logging.warning(("Recorded power state (%s) of node '%s' does not"
3557
                               " match actual power state (%s)"), node.powered,
3558
                              node.name, powered)
3559

    
3560
          # For configuration changing commands we should update the node
3561
          if self.op.command in (constants.OOB_POWER_ON,
3562
                                 constants.OOB_POWER_OFF):
3563
            self.cfg.Update(node, feedback_fn)
3564

    
3565
          node_entry.append((constants.RS_NORMAL, result.payload))
3566

    
3567
          if (self.op.command == constants.OOB_POWER_ON and
3568
              idx < len(self.nodes) - 1):
3569
            time.sleep(self.op.power_delay)
3570

    
3571
    return ret
3572

    
3573
  def _CheckPayload(self, result):
3574
    """Checks if the payload is valid.
3575

3576
    @param result: RPC result
3577
    @raises errors.OpExecError: If payload is not valid
3578

3579
    """
3580
    errs = []
3581
    if self.op.command == constants.OOB_HEALTH:
3582
      if not isinstance(result.payload, list):
3583
        errs.append("command 'health' is expected to return a list but got %s" %
3584
                    type(result.payload))
3585
      else:
3586
        for item, status in result.payload:
3587
          if status not in constants.OOB_STATUSES:
3588
            errs.append("health item '%s' has invalid status '%s'" %
3589
                        (item, status))
3590

    
3591
    if self.op.command == constants.OOB_POWER_STATUS:
3592
      if not isinstance(result.payload, dict):
3593
        errs.append("power-status is expected to return a dict but got %s" %
3594
                    type(result.payload))
3595

    
3596
    if self.op.command in [
3597
        constants.OOB_POWER_ON,
3598
        constants.OOB_POWER_OFF,
3599
        constants.OOB_POWER_CYCLE,
3600
        ]:
3601
      if result.payload is not None:
3602
        errs.append("%s is expected to not return payload but got '%s'" %
3603
                    (self.op.command, result.payload))
3604

    
3605
    if errs:
3606
      raise errors.OpExecError("Check of out-of-band payload failed due to %s" %
3607
                               utils.CommaJoin(errs))
3608

    
3609
class _OsQuery(_QueryBase):
3610
  FIELDS = query.OS_FIELDS
3611

    
3612
  def ExpandNames(self, lu):
3613
    # Lock all nodes in shared mode
3614
    # Temporary removal of locks, should be reverted later
3615
    # TODO: reintroduce locks when they are lighter-weight
3616
    lu.needed_locks = {}
3617
    #self.share_locks[locking.LEVEL_NODE] = 1
3618
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3619

    
3620
    # The following variables interact with _QueryBase._GetNames
3621
    if self.names:
3622
      self.wanted = self.names
3623
    else:
3624
      self.wanted = locking.ALL_SET
3625

    
3626
    self.do_locking = self.use_locking
3627

    
3628
  def DeclareLocks(self, lu, level):
3629
    pass
3630

    
3631
  @staticmethod
3632
  def _DiagnoseByOS(rlist):
3633
    """Remaps a per-node return list into an a per-os per-node dictionary
3634

3635
    @param rlist: a map with node names as keys and OS objects as values
3636

3637
    @rtype: dict
3638
    @return: a dictionary with osnames as keys and as value another
3639
        map, with nodes as keys and tuples of (path, status, diagnose,
3640
        variants, parameters, api_versions) as values, eg::
3641

3642
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
3643
                                     (/srv/..., False, "invalid api")],
3644
                           "node2": [(/srv/..., True, "", [], [])]}
3645
          }
3646

3647
    """
3648
    all_os = {}
3649
    # we build here the list of nodes that didn't fail the RPC (at RPC
3650
    # level), so that nodes with a non-responding node daemon don't
3651
    # make all OSes invalid
3652
    good_nodes = [node_name for node_name in rlist
3653
                  if not rlist[node_name].fail_msg]
3654
    for node_name, nr in rlist.items():
3655
      if nr.fail_msg or not nr.payload:
3656
        continue
3657
      for (name, path, status, diagnose, variants,
3658
           params, api_versions) in nr.payload:
3659
        if name not in all_os:
3660
          # build a list of nodes for this os containing empty lists
3661
          # for each node in node_list
3662
          all_os[name] = {}
3663
          for nname in good_nodes:
3664
            all_os[name][nname] = []
3665
        # convert params from [name, help] to (name, help)
3666
        params = [tuple(v) for v in params]
3667
        all_os[name][node_name].append((path, status, diagnose,
3668
                                        variants, params, api_versions))
3669
    return all_os
3670

    
3671
  def _GetQueryData(self, lu):
3672
    """Computes the list of nodes and their attributes.
3673

3674
    """
3675
    # Locking is not used
3676
    assert not (lu.acquired_locks or self.do_locking or self.use_locking)
3677

    
3678
    valid_nodes = [node.name
3679
                   for node in lu.cfg.GetAllNodesInfo().values()
3680
                   if not node.offline and node.vm_capable]
3681
    pol = self._DiagnoseByOS(lu.rpc.call_os_diagnose(valid_nodes))
3682
    cluster = lu.cfg.GetClusterInfo()
3683

    
3684
    data = {}
3685

    
3686
    for (os_name, os_data) in pol.items():
3687
      info = query.OsInfo(name=os_name, valid=True, node_status=os_data,
3688
                          hidden=(os_name in cluster.hidden_os),
3689
                          blacklisted=(os_name in cluster.blacklisted_os))
3690

    
3691
      variants = set()
3692
      parameters = set()
3693
      api_versions = set()
3694

    
3695
      for idx, osl in enumerate(os_data.values()):
3696
        info.valid = bool(info.valid and osl and osl[0][1])
3697
        if not info.valid:
3698
          break
3699

    
3700
        (node_variants, node_params, node_api) = osl[0][3:6]
3701
        if idx == 0:
3702
          # First entry
3703
          variants.update(node_variants)
3704
          parameters.update(node_params)
3705
          api_versions.update(node_api)
3706
        else:
3707
          # Filter out inconsistent values
3708
          variants.intersection_update(node_variants)
3709
          parameters.intersection_update(node_params)
3710
          api_versions.intersection_update(node_api)
3711

    
3712
      info.variants = list(variants)
3713
      info.parameters = list(parameters)
3714
      info.api_versions = list(api_versions)
3715

    
3716
      data[os_name] = info
3717

    
3718
    # Prepare data in requested order
3719
    return [data[name] for name in self._GetNames(lu, pol.keys(), None)
3720
            if name in data]
3721

    
3722

    
3723
class LUOsDiagnose(NoHooksLU):
3724
  """Logical unit for OS diagnose/query.
3725

3726
  """
3727
  REQ_BGL = False
3728

    
3729
  @staticmethod
3730
  def _BuildFilter(fields, names):
3731
    """Builds a filter for querying OSes.
3732

3733
    """
3734
    name_filter = qlang.MakeSimpleFilter("name", names)
3735

    
3736
    # Legacy behaviour: Hide hidden, blacklisted or invalid OSes if the
3737
    # respective field is not requested
3738
    status_filter = [[qlang.OP_NOT, [qlang.OP_TRUE, fname]]
3739
                     for fname in ["hidden", "blacklisted"]
3740
                     if fname not in fields]
3741
    if "valid" not in fields:
3742
      status_filter.append([qlang.OP_TRUE, "valid"])
3743

    
3744
    if status_filter:
3745
      status_filter.insert(0, qlang.OP_AND)
3746
    else:
3747
      status_filter = None
3748

    
3749
    if name_filter and status_filter:
3750
      return [qlang.OP_AND, name_filter, status_filter]
3751
    elif name_filter:
3752
      return name_filter
3753
    else:
3754
      return status_filter
3755

    
3756
  def CheckArguments(self):
3757
    self.oq = _OsQuery(self._BuildFilter(self.op.output_fields, self.op.names),
3758
                       self.op.output_fields, False)
3759

    
3760
  def ExpandNames(self):
3761
    self.oq.ExpandNames(self)
3762

    
3763
  def Exec(self, feedback_fn):
3764
    return self.oq.OldStyleQuery(self)
3765

    
3766

    
3767
class LUNodeRemove(LogicalUnit):
3768
  """Logical unit for removing a node.
3769

3770
  """
3771
  HPATH = "node-remove"
3772
  HTYPE = constants.HTYPE_NODE
3773

    
3774
  def BuildHooksEnv(self):
3775
    """Build hooks env.
3776

3777
    This doesn't run on the target node in the pre phase as a failed
3778
    node would then be impossible to remove.
3779

3780
    """
3781
    return {
3782
      "OP_TARGET": self.op.node_name,
3783
      "NODE_NAME": self.op.node_name,
3784
      }
3785

    
3786
  def BuildHooksNodes(self):
3787
    """Build hooks nodes.
3788

3789
    """
3790
    all_nodes = self.cfg.GetNodeList()
3791
    try:
3792
      all_nodes.remove(self.op.node_name)
3793
    except ValueError:
3794
      logging.warning("Node '%s', which is about to be removed, was not found"
3795
                      " in the list of all nodes", self.op.node_name)
3796
    return (all_nodes, all_nodes)
3797

    
3798
  def CheckPrereq(self):
3799
    """Check prerequisites.
3800

3801
    This checks:
3802
     - the node exists in the configuration
3803
     - it does not have primary or secondary instances
3804
     - it's not the master
3805

3806
    Any errors are signaled by raising errors.OpPrereqError.
3807

3808
    """
3809
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3810
    node = self.cfg.GetNodeInfo(self.op.node_name)
3811
    assert node is not None
3812

    
3813
    instance_list = self.cfg.GetInstanceList()
3814

    
3815
    masternode = self.cfg.GetMasterNode()
3816
    if node.name == masternode:
3817
      raise errors.OpPrereqError("Node is the master node, failover to another"
3818
                                 " node is required", errors.ECODE_INVAL)
3819

    
3820
    for instance_name in instance_list:
3821
      instance = self.cfg.GetInstanceInfo(instance_name)
3822
      if node.name in instance.all_nodes:
3823
        raise errors.OpPrereqError("Instance %s is still running on the node,"
3824
                                   " please remove first" % instance_name,
3825
                                   errors.ECODE_INVAL)
3826
    self.op.node_name = node.name
3827
    self.node = node
3828

    
3829
  def Exec(self, feedback_fn):
3830
    """Removes the node from the cluster.
3831

3832
    """
3833
    node = self.node
3834
    logging.info("Stopping the node daemon and removing configs from node %s",
3835
                 node.name)
3836

    
3837
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
3838

    
3839
    # Promote nodes to master candidate as needed
3840
    _AdjustCandidatePool(self, exceptions=[node.name])
3841
    self.context.RemoveNode(node.name)
3842

    
3843
    # Run post hooks on the node before it's removed
3844
    _RunPostHook(self, node.name)
3845

    
3846
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
3847
    msg = result.fail_msg
3848
    if msg:
3849
      self.LogWarning("Errors encountered on the remote node while leaving"
3850
                      " the cluster: %s", msg)
3851

    
3852
    # Remove node from our /etc/hosts
3853
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3854
      master_node = self.cfg.GetMasterNode()
3855
      result = self.rpc.call_etc_hosts_modify(master_node,
3856
                                              constants.ETC_HOSTS_REMOVE,
3857
                                              node.name, None)
3858
      result.Raise("Can't update hosts file with new host data")
3859
      _RedistributeAncillaryFiles(self)
3860

    
3861

    
3862
class _NodeQuery(_QueryBase):
3863
  FIELDS = query.NODE_FIELDS
3864

    
3865
  def ExpandNames(self, lu):
3866
    lu.needed_locks = {}
3867
    lu.share_locks[locking.LEVEL_NODE] = 1
3868

    
3869
    if self.names:
3870
      self.wanted = _GetWantedNodes(lu, self.names)
3871
    else:
3872
      self.wanted = locking.ALL_SET
3873

    
3874
    self.do_locking = (self.use_locking and
3875
                       query.NQ_LIVE in self.requested_data)
3876

    
3877
    if self.do_locking:
3878
      # if we don't request only static fields, we need to lock the nodes
3879
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
3880

    
3881
  def DeclareLocks(self, lu, level):
3882
    pass
3883

    
3884
  def _GetQueryData(self, lu):
3885
    """Computes the list of nodes and their attributes.
3886

3887
    """
3888
    all_info = lu.cfg.GetAllNodesInfo()
3889

    
3890
    nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
3891

    
3892
    # Gather data as requested
3893
    if query.NQ_LIVE in self.requested_data:
3894
      # filter out non-vm_capable nodes
3895
      toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
3896

    
3897
      node_data = lu.rpc.call_node_info(toquery_nodes, lu.cfg.GetVGName(),
3898
                                        lu.cfg.GetHypervisorType())
3899
      live_data = dict((name, nresult.payload)
3900
                       for (name, nresult) in node_data.items()
3901
                       if not nresult.fail_msg and nresult.payload)
3902
    else:
3903
      live_data = None
3904

    
3905
    if query.NQ_INST in self.requested_data:
3906
      node_to_primary = dict([(name, set()) for name in nodenames])
3907
      node_to_secondary = dict([(name, set()) for name in nodenames])
3908

    
3909
      inst_data = lu.cfg.GetAllInstancesInfo()
3910

    
3911
      for inst in inst_data.values():
3912
        if inst.primary_node in node_to_primary:
3913
          node_to_primary[inst.primary_node].add(inst.name)
3914
        for secnode in inst.secondary_nodes:
3915
          if secnode in node_to_secondary:
3916
            node_to_secondary[secnode].add(inst.name)
3917
    else:
3918
      node_to_primary = None
3919
      node_to_secondary = None
3920

    
3921
    if query.NQ_OOB in self.requested_data:
3922
      oob_support = dict((name, bool(_SupportsOob(lu.cfg, node)))
3923
                         for name, node in all_info.iteritems())
3924
    else:
3925
      oob_support = None
3926

    
3927
    if query.NQ_GROUP in self.requested_data:
3928
      groups = lu.cfg.GetAllNodeGroupsInfo()
3929
    else:
3930
      groups = {}
3931

    
3932
    return query.NodeQueryData([all_info[name] for name in nodenames],
3933
                               live_data, lu.cfg.GetMasterNode(),
3934
                               node_to_primary, node_to_secondary, groups,
3935
                               oob_support, lu.cfg.GetClusterInfo())
3936

    
3937

    
3938
class LUNodeQuery(NoHooksLU):
3939
  """Logical unit for querying nodes.
3940

3941
  """
3942
  # pylint: disable-msg=W0142
3943
  REQ_BGL = False
3944

    
3945
  def CheckArguments(self):
3946
    self.nq = _NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
3947
                         self.op.output_fields, self.op.use_locking)
3948

    
3949
  def ExpandNames(self):
3950
    self.nq.ExpandNames(self)
3951

    
3952
  def Exec(self, feedback_fn):
3953
    return self.nq.OldStyleQuery(self)
3954

    
3955

    
3956
class LUNodeQueryvols(NoHooksLU):
3957
  """Logical unit for getting volumes on node(s).
3958

3959
  """
3960
  REQ_BGL = False
3961
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
3962
  _FIELDS_STATIC = utils.FieldSet("node")
3963

    
3964
  def CheckArguments(self):
3965
    _CheckOutputFields(static=self._FIELDS_STATIC,
3966
                       dynamic=self._FIELDS_DYNAMIC,
3967
                       selected=self.op.output_fields)
3968

    
3969
  def ExpandNames(self):
3970
    self.needed_locks = {}
3971
    self.share_locks[locking.LEVEL_NODE] = 1
3972
    if not self.op.nodes:
3973
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3974
    else:
3975
      self.needed_locks[locking.LEVEL_NODE] = \
3976
        _GetWantedNodes(self, self.op.nodes)
3977

    
3978
  def Exec(self, feedback_fn):
3979
    """Computes the list of nodes and their attributes.
3980

3981
    """
3982
    nodenames = self.acquired_locks[locking.LEVEL_NODE]
3983
    volumes = self.rpc.call_node_volumes(nodenames)
3984

    
3985
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3986
             in self.cfg.GetInstanceList()]
3987

    
3988
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
3989

    
3990
    output = []
3991
    for node in nodenames:
3992
      nresult = volumes[node]
3993
      if nresult.offline:
3994
        continue
3995
      msg = nresult.fail_msg
3996
      if msg:
3997
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
3998
        continue
3999

    
4000
      node_vols = nresult.payload[:]
4001
      node_vols.sort(key=lambda vol: vol['dev'])
4002

    
4003
      for vol in node_vols:
4004
        node_output = []
4005
        for field in self.op.output_fields:
4006
          if field == "node":
4007
            val = node
4008
          elif field == "phys":
4009
            val = vol['dev']
4010
          elif field == "vg":
4011
            val = vol['vg']
4012
          elif field == "name":
4013
            val = vol['name']
4014
          elif field == "size":
4015
            val = int(float(vol['size']))
4016
          elif field == "instance":
4017
            for inst in ilist:
4018
              if node not in lv_by_node[inst]:
4019
                continue
4020
              if vol['name'] in lv_by_node[inst][node]:
4021
                val = inst.name
4022
                break
4023
            else:
4024
              val = '-'
4025
          else:
4026
            raise errors.ParameterError(field)
4027
          node_output.append(str(val))
4028

    
4029
        output.append(node_output)
4030

    
4031
    return output
4032

    
4033

    
4034
class LUNodeQueryStorage(NoHooksLU):
4035
  """Logical unit for getting information on storage units on node(s).
4036

4037
  """
4038
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
4039
  REQ_BGL = False
4040

    
4041
  def CheckArguments(self):
4042
    _CheckOutputFields(static=self._FIELDS_STATIC,
4043
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
4044
                       selected=self.op.output_fields)
4045

    
4046
  def ExpandNames(self):
4047
    self.needed_locks = {}
4048
    self.share_locks[locking.LEVEL_NODE] = 1
4049

    
4050
    if self.op.nodes:
4051
      self.needed_locks[locking.LEVEL_NODE] = \
4052
        _GetWantedNodes(self, self.op.nodes)
4053
    else:
4054
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4055

    
4056
  def Exec(self, feedback_fn):
4057
    """Computes the list of nodes and their attributes.
4058

4059
    """
4060
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4061

    
4062
    # Always get name to sort by
4063
    if constants.SF_NAME in self.op.output_fields:
4064
      fields = self.op.output_fields[:]
4065
    else:
4066
      fields = [constants.SF_NAME] + self.op.output_fields
4067

    
4068
    # Never ask for node or type as it's only known to the LU
4069
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
4070
      while extra in fields:
4071
        fields.remove(extra)
4072

    
4073
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
4074
    name_idx = field_idx[constants.SF_NAME]
4075

    
4076
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)