Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 1aef3df8

History | View | Annotate | Download (472.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay to many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42
import itertools
43
import operator
44

    
45
from ganeti import ssh
46
from ganeti import utils
47
from ganeti import errors
48
from ganeti import hypervisor
49
from ganeti import locking
50
from ganeti import constants
51
from ganeti import objects
52
from ganeti import serializer
53
from ganeti import ssconf
54
from ganeti import uidpool
55
from ganeti import compat
56
from ganeti import masterd
57
from ganeti import netutils
58
from ganeti import query
59
from ganeti import qlang
60
from ganeti import opcodes
61
from ganeti import ht
62

    
63
import ganeti.masterd.instance # pylint: disable-msg=W0611
64

    
65

    
66
class ResultWithJobs:
67
  """Data container for LU results with jobs.
68

69
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
70
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
71
  contained in the C{jobs} attribute and include the job IDs in the opcode
72
  result.
73

74
  """
75
  def __init__(self, jobs, **kwargs):
76
    """Initializes this class.
77

78
    Additional return values can be specified as keyword arguments.
79

80
    @type jobs: list of lists of L{opcode.OpCode}
81
    @param jobs: A list of lists of opcode objects
82

83
    """
84
    self.jobs = jobs
85
    self.other = kwargs
86

    
87

    
88
class LogicalUnit(object):
89
  """Logical Unit base class.
90

91
  Subclasses must follow these rules:
92
    - implement ExpandNames
93
    - implement CheckPrereq (except when tasklets are used)
94
    - implement Exec (except when tasklets are used)
95
    - implement BuildHooksEnv
96
    - implement BuildHooksNodes
97
    - redefine HPATH and HTYPE
98
    - optionally redefine their run requirements:
99
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
100

101
  Note that all commands require root permissions.
102

103
  @ivar dry_run_result: the value (if any) that will be returned to the caller
104
      in dry-run mode (signalled by opcode dry_run parameter)
105

106
  """
107
  HPATH = None
108
  HTYPE = None
109
  REQ_BGL = True
110

    
111
  def __init__(self, processor, op, context, rpc):
112
    """Constructor for LogicalUnit.
113

114
    This needs to be overridden in derived classes in order to check op
115
    validity.
116

117
    """
118
    self.proc = processor
119
    self.op = op
120
    self.cfg = context.cfg
121
    self.glm = context.glm
122
    self.context = context
123
    self.rpc = rpc
124
    # Dicts used to declare locking needs to mcpu
125
    self.needed_locks = None
126
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
127
    self.add_locks = {}
128
    self.remove_locks = {}
129
    # Used to force good behavior when calling helper functions
130
    self.recalculate_locks = {}
131
    # logging
132
    self.Log = processor.Log # pylint: disable-msg=C0103
133
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
134
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
135
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
136
    # support for dry-run
137
    self.dry_run_result = None
138
    # support for generic debug attribute
139
    if (not hasattr(self.op, "debug_level") or
140
        not isinstance(self.op.debug_level, int)):
141
      self.op.debug_level = 0
142

    
143
    # Tasklets
144
    self.tasklets = None
145

    
146
    # Validate opcode parameters and set defaults
147
    self.op.Validate(True)
148

    
149
    self.CheckArguments()
150

    
151
  def CheckArguments(self):
152
    """Check syntactic validity for the opcode arguments.
153

154
    This method is for doing a simple syntactic check and ensure
155
    validity of opcode parameters, without any cluster-related
156
    checks. While the same can be accomplished in ExpandNames and/or
157
    CheckPrereq, doing these separate is better because:
158

159
      - ExpandNames is left as as purely a lock-related function
160
      - CheckPrereq is run after we have acquired locks (and possible
161
        waited for them)
162

163
    The function is allowed to change the self.op attribute so that
164
    later methods can no longer worry about missing parameters.
165

166
    """
167
    pass
168

    
169
  def ExpandNames(self):
170
    """Expand names for this LU.
171

172
    This method is called before starting to execute the opcode, and it should
173
    update all the parameters of the opcode to their canonical form (e.g. a
174
    short node name must be fully expanded after this method has successfully
175
    completed). This way locking, hooks, logging, etc. can work correctly.
176

177
    LUs which implement this method must also populate the self.needed_locks
178
    member, as a dict with lock levels as keys, and a list of needed lock names
179
    as values. Rules:
180

181
      - use an empty dict if you don't need any lock
182
      - if you don't need any lock at a particular level omit that level
183
      - don't put anything for the BGL level
184
      - if you want all locks at a level use locking.ALL_SET as a value
185

186
    If you need to share locks (rather than acquire them exclusively) at one
187
    level you can modify self.share_locks, setting a true value (usually 1) for
188
    that level. By default locks are not shared.
189

190
    This function can also define a list of tasklets, which then will be
191
    executed in order instead of the usual LU-level CheckPrereq and Exec
192
    functions, if those are not defined by the LU.
193

194
    Examples::
195

196
      # Acquire all nodes and one instance
197
      self.needed_locks = {
198
        locking.LEVEL_NODE: locking.ALL_SET,
199
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
200
      }
201
      # Acquire just two nodes
202
      self.needed_locks = {
203
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
204
      }
205
      # Acquire no locks
206
      self.needed_locks = {} # No, you can't leave it to the default value None
207

208
    """
209
    # The implementation of this method is mandatory only if the new LU is
210
    # concurrent, so that old LUs don't need to be changed all at the same
211
    # time.
212
    if self.REQ_BGL:
213
      self.needed_locks = {} # Exclusive LUs don't need locks.
214
    else:
215
      raise NotImplementedError
216

    
217
  def DeclareLocks(self, level):
218
    """Declare LU locking needs for a level
219

220
    While most LUs can just declare their locking needs at ExpandNames time,
221
    sometimes there's the need to calculate some locks after having acquired
222
    the ones before. This function is called just before acquiring locks at a
223
    particular level, but after acquiring the ones at lower levels, and permits
224
    such calculations. It can be used to modify self.needed_locks, and by
225
    default it does nothing.
226

227
    This function is only called if you have something already set in
228
    self.needed_locks for the level.
229

230
    @param level: Locking level which is going to be locked
231
    @type level: member of ganeti.locking.LEVELS
232

233
    """
234

    
235
  def CheckPrereq(self):
236
    """Check prerequisites for this LU.
237

238
    This method should check that the prerequisites for the execution
239
    of this LU are fulfilled. It can do internode communication, but
240
    it should be idempotent - no cluster or system changes are
241
    allowed.
242

243
    The method should raise errors.OpPrereqError in case something is
244
    not fulfilled. Its return value is ignored.
245

246
    This method should also update all the parameters of the opcode to
247
    their canonical form if it hasn't been done by ExpandNames before.
248

249
    """
250
    if self.tasklets is not None:
251
      for (idx, tl) in enumerate(self.tasklets):
252
        logging.debug("Checking prerequisites for tasklet %s/%s",
253
                      idx + 1, len(self.tasklets))
254
        tl.CheckPrereq()
255
    else:
256
      pass
257

    
258
  def Exec(self, feedback_fn):
259
    """Execute the LU.
260

261
    This method should implement the actual work. It should raise
262
    errors.OpExecError for failures that are somewhat dealt with in
263
    code, or expected.
264

265
    """
266
    if self.tasklets is not None:
267
      for (idx, tl) in enumerate(self.tasklets):
268
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
269
        tl.Exec(feedback_fn)
270
    else:
271
      raise NotImplementedError
272

    
273
  def BuildHooksEnv(self):
274
    """Build hooks environment for this LU.
275

276
    @rtype: dict
277
    @return: Dictionary containing the environment that will be used for
278
      running the hooks for this LU. The keys of the dict must not be prefixed
279
      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
280
      will extend the environment with additional variables. If no environment
281
      should be defined, an empty dictionary should be returned (not C{None}).
282
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
283
      will not be called.
284

285
    """
286
    raise NotImplementedError
287

    
288
  def BuildHooksNodes(self):
289
    """Build list of nodes to run LU's hooks.
290

291
    @rtype: tuple; (list, list)
292
    @return: Tuple containing a list of node names on which the hook
293
      should run before the execution and a list of node names on which the
294
      hook should run after the execution. No nodes should be returned as an
295
      empty list (and not None).
296
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
297
      will not be called.
298

299
    """
300
    raise NotImplementedError
301

    
302
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
303
    """Notify the LU about the results of its hooks.
304

305
    This method is called every time a hooks phase is executed, and notifies
306
    the Logical Unit about the hooks' result. The LU can then use it to alter
307
    its result based on the hooks.  By default the method does nothing and the
308
    previous result is passed back unchanged but any LU can define it if it
309
    wants to use the local cluster hook-scripts somehow.
310

311
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
312
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
313
    @param hook_results: the results of the multi-node hooks rpc call
314
    @param feedback_fn: function used send feedback back to the caller
315
    @param lu_result: the previous Exec result this LU had, or None
316
        in the PRE phase
317
    @return: the new Exec result, based on the previous result
318
        and hook results
319

320
    """
321
    # API must be kept, thus we ignore the unused argument and could
322
    # be a function warnings
323
    # pylint: disable-msg=W0613,R0201
324
    return lu_result
325

    
326
  def _ExpandAndLockInstance(self):
327
    """Helper function to expand and lock an instance.
328

329
    Many LUs that work on an instance take its name in self.op.instance_name
330
    and need to expand it and then declare the expanded name for locking. This
331
    function does it, and then updates self.op.instance_name to the expanded
332
    name. It also initializes needed_locks as a dict, if this hasn't been done
333
    before.
334

335
    """
336
    if self.needed_locks is None:
337
      self.needed_locks = {}
338
    else:
339
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
340
        "_ExpandAndLockInstance called with instance-level locks set"
341
    self.op.instance_name = _ExpandInstanceName(self.cfg,
342
                                                self.op.instance_name)
343
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
344

    
345
  def _LockInstancesNodes(self, primary_only=False):
346
    """Helper function to declare instances' nodes for locking.
347

348
    This function should be called after locking one or more instances to lock
349
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
350
    with all primary or secondary nodes for instances already locked and
351
    present in self.needed_locks[locking.LEVEL_INSTANCE].
352

353
    It should be called from DeclareLocks, and for safety only works if
354
    self.recalculate_locks[locking.LEVEL_NODE] is set.
355

356
    In the future it may grow parameters to just lock some instance's nodes, or
357
    to just lock primaries or secondary nodes, if needed.
358

359
    If should be called in DeclareLocks in a way similar to::
360

361
      if level == locking.LEVEL_NODE:
362
        self._LockInstancesNodes()
363

364
    @type primary_only: boolean
365
    @param primary_only: only lock primary nodes of locked instances
366

367
    """
368
    assert locking.LEVEL_NODE in self.recalculate_locks, \
369
      "_LockInstancesNodes helper function called with no nodes to recalculate"
370

    
371
    # TODO: check if we're really been called with the instance locks held
372

    
373
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
374
    # future we might want to have different behaviors depending on the value
375
    # of self.recalculate_locks[locking.LEVEL_NODE]
376
    wanted_nodes = []
377
    locked_i = self.glm.list_owned(locking.LEVEL_INSTANCE)
378
    for _, instance in self.cfg.GetMultiInstanceInfo(locked_i):
379
      wanted_nodes.append(instance.primary_node)
380
      if not primary_only:
381
        wanted_nodes.extend(instance.secondary_nodes)
382

    
383
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
384
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
385
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
386
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
387

    
388
    del self.recalculate_locks[locking.LEVEL_NODE]
389

    
390

    
391
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
392
  """Simple LU which runs no hooks.
393

394
  This LU is intended as a parent for other LogicalUnits which will
395
  run no hooks, in order to reduce duplicate code.
396

397
  """
398
  HPATH = None
399
  HTYPE = None
400

    
401
  def BuildHooksEnv(self):
402
    """Empty BuildHooksEnv for NoHooksLu.
403

404
    This just raises an error.
405

406
    """
407
    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
408

    
409
  def BuildHooksNodes(self):
410
    """Empty BuildHooksNodes for NoHooksLU.
411

412
    """
413
    raise AssertionError("BuildHooksNodes called for NoHooksLU")
414

    
415

    
416
class Tasklet:
417
  """Tasklet base class.
418

419
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
420
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
421
  tasklets know nothing about locks.
422

423
  Subclasses must follow these rules:
424
    - Implement CheckPrereq
425
    - Implement Exec
426

427
  """
428
  def __init__(self, lu):
429
    self.lu = lu
430

    
431
    # Shortcuts
432
    self.cfg = lu.cfg
433
    self.rpc = lu.rpc
434

    
435
  def CheckPrereq(self):
436
    """Check prerequisites for this tasklets.
437

438
    This method should check whether the prerequisites for the execution of
439
    this tasklet are fulfilled. It can do internode communication, but it
440
    should be idempotent - no cluster or system changes are allowed.
441

442
    The method should raise errors.OpPrereqError in case something is not
443
    fulfilled. Its return value is ignored.
444

445
    This method should also update all parameters to their canonical form if it
446
    hasn't been done before.
447

448
    """
449
    pass
450

    
451
  def Exec(self, feedback_fn):
452
    """Execute the tasklet.
453

454
    This method should implement the actual work. It should raise
455
    errors.OpExecError for failures that are somewhat dealt with in code, or
456
    expected.
457

458
    """
459
    raise NotImplementedError
460

    
461

    
462
class _QueryBase:
463
  """Base for query utility classes.
464

465
  """
466
  #: Attribute holding field definitions
467
  FIELDS = None
468

    
469
  def __init__(self, filter_, fields, use_locking):
470
    """Initializes this class.
471

472
    """
473
    self.use_locking = use_locking
474

    
475
    self.query = query.Query(self.FIELDS, fields, filter_=filter_,
476
                             namefield="name")
477
    self.requested_data = self.query.RequestedData()
478
    self.names = self.query.RequestedNames()
479

    
480
    # Sort only if no names were requested
481
    self.sort_by_name = not self.names
482

    
483
    self.do_locking = None
484
    self.wanted = None
485

    
486
  def _GetNames(self, lu, all_names, lock_level):
487
    """Helper function to determine names asked for in the query.
488

489
    """
490
    if self.do_locking:
491
      names = lu.glm.list_owned(lock_level)
492
    else:
493
      names = all_names
494

    
495
    if self.wanted == locking.ALL_SET:
496
      assert not self.names
497
      # caller didn't specify names, so ordering is not important
498
      return utils.NiceSort(names)
499

    
500
    # caller specified names and we must keep the same order
501
    assert self.names
502
    assert not self.do_locking or lu.glm.is_owned(lock_level)
503

    
504
    missing = set(self.wanted).difference(names)
505
    if missing:
506
      raise errors.OpExecError("Some items were removed before retrieving"
507
                               " their data: %s" % missing)
508

    
509
    # Return expanded names
510
    return self.wanted
511

    
512
  def ExpandNames(self, lu):
513
    """Expand names for this query.
514

515
    See L{LogicalUnit.ExpandNames}.
516

517
    """
518
    raise NotImplementedError()
519

    
520
  def DeclareLocks(self, lu, level):
521
    """Declare locks for this query.
522

523
    See L{LogicalUnit.DeclareLocks}.
524

525
    """
526
    raise NotImplementedError()
527

    
528
  def _GetQueryData(self, lu):
529
    """Collects all data for this query.
530

531
    @return: Query data object
532

533
    """
534
    raise NotImplementedError()
535

    
536
  def NewStyleQuery(self, lu):
537
    """Collect data and execute query.
538

539
    """
540
    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
541
                                  sort_by_name=self.sort_by_name)
542

    
543
  def OldStyleQuery(self, lu):
544
    """Collect data and execute query.
545

546
    """
547
    return self.query.OldStyleQuery(self._GetQueryData(lu),
548
                                    sort_by_name=self.sort_by_name)
549

    
550

    
551
def _ShareAll():
552
  """Returns a dict declaring all lock levels shared.
553

554
  """
555
  return dict.fromkeys(locking.LEVELS, 1)
556

    
557

    
558
def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
559
  """Checks if the owned node groups are still correct for an instance.
560

561
  @type cfg: L{config.ConfigWriter}
562
  @param cfg: The cluster configuration
563
  @type instance_name: string
564
  @param instance_name: Instance name
565
  @type owned_groups: set or frozenset
566
  @param owned_groups: List of currently owned node groups
567

568
  """
569
  inst_groups = cfg.GetInstanceNodeGroups(instance_name)
570

    
571
  if not owned_groups.issuperset(inst_groups):
572
    raise errors.OpPrereqError("Instance %s's node groups changed since"
573
                               " locks were acquired, current groups are"
574
                               " are '%s', owning groups '%s'; retry the"
575
                               " operation" %
576
                               (instance_name,
577
                                utils.CommaJoin(inst_groups),
578
                                utils.CommaJoin(owned_groups)),
579
                               errors.ECODE_STATE)
580

    
581
  return inst_groups
582

    
583

    
584
def _SupportsOob(cfg, node):
585
  """Tells if node supports OOB.
586

587
  @type cfg: L{config.ConfigWriter}
588
  @param cfg: The cluster configuration
589
  @type node: L{objects.Node}
590
  @param node: The node
591
  @return: The OOB script if supported or an empty string otherwise
592

593
  """
594
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
595

    
596

    
597
def _GetWantedNodes(lu, nodes):
598
  """Returns list of checked and expanded node names.
599

600
  @type lu: L{LogicalUnit}
601
  @param lu: the logical unit on whose behalf we execute
602
  @type nodes: list
603
  @param nodes: list of node names or None for all nodes
604
  @rtype: list
605
  @return: the list of nodes, sorted
606
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
607

608
  """
609
  if nodes:
610
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
611

    
612
  return utils.NiceSort(lu.cfg.GetNodeList())
613

    
614

    
615
def _GetWantedInstances(lu, instances):
616
  """Returns list of checked and expanded instance names.
617

618
  @type lu: L{LogicalUnit}
619
  @param lu: the logical unit on whose behalf we execute
620
  @type instances: list
621
  @param instances: list of instance names or None for all instances
622
  @rtype: list
623
  @return: the list of instances, sorted
624
  @raise errors.OpPrereqError: if the instances parameter is wrong type
625
  @raise errors.OpPrereqError: if any of the passed instances is not found
626

627
  """
628
  if instances:
629
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
630
  else:
631
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
632
  return wanted
633

    
634

    
635
def _GetUpdatedParams(old_params, update_dict,
636
                      use_default=True, use_none=False):
637
  """Return the new version of a parameter dictionary.
638

639
  @type old_params: dict
640
  @param old_params: old parameters
641
  @type update_dict: dict
642
  @param update_dict: dict containing new parameter values, or
643
      constants.VALUE_DEFAULT to reset the parameter to its default
644
      value
645
  @param use_default: boolean
646
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
647
      values as 'to be deleted' values
648
  @param use_none: boolean
649
  @type use_none: whether to recognise C{None} values as 'to be
650
      deleted' values
651
  @rtype: dict
652
  @return: the new parameter dictionary
653

654
  """
655
  params_copy = copy.deepcopy(old_params)
656
  for key, val in update_dict.iteritems():
657
    if ((use_default and val == constants.VALUE_DEFAULT) or
658
        (use_none and val is None)):
659
      try:
660
        del params_copy[key]
661
      except KeyError:
662
        pass
663
    else:
664
      params_copy[key] = val
665
  return params_copy
666

    
667

    
668
def _ReleaseLocks(lu, level, names=None, keep=None):
669
  """Releases locks owned by an LU.
670

671
  @type lu: L{LogicalUnit}
672
  @param level: Lock level
673
  @type names: list or None
674
  @param names: Names of locks to release
675
  @type keep: list or None
676
  @param keep: Names of locks to retain
677

678
  """
679
  assert not (keep is not None and names is not None), \
680
         "Only one of the 'names' and the 'keep' parameters can be given"
681

    
682
  if names is not None:
683
    should_release = names.__contains__
684
  elif keep:
685
    should_release = lambda name: name not in keep
686
  else:
687
    should_release = None
688

    
689
  if should_release:
690
    retain = []
691
    release = []
692

    
693
    # Determine which locks to release
694
    for name in lu.glm.list_owned(level):
695
      if should_release(name):
696
        release.append(name)
697
      else:
698
        retain.append(name)
699

    
700
    assert len(lu.glm.list_owned(level)) == (len(retain) + len(release))
701

    
702
    # Release just some locks
703
    lu.glm.release(level, names=release)
704

    
705
    assert frozenset(lu.glm.list_owned(level)) == frozenset(retain)
706
  else:
707
    # Release everything
708
    lu.glm.release(level)
709

    
710
    assert not lu.glm.is_owned(level), "No locks should be owned"
711

    
712

    
713
def _MapInstanceDisksToNodes(instances):
714
  """Creates a map from (node, volume) to instance name.
715

716
  @type instances: list of L{objects.Instance}
717
  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
718

719
  """
720
  return dict(((node, vol), inst.name)
721
              for inst in instances
722
              for (node, vols) in inst.MapLVsByNode().items()
723
              for vol in vols)
724

    
725

    
726
def _RunPostHook(lu, node_name):
727
  """Runs the post-hook for an opcode on a single node.
728

729
  """
730
  hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu)
731
  try:
732
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
733
  except:
734
    # pylint: disable-msg=W0702
735
    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
736

    
737

    
738
def _CheckOutputFields(static, dynamic, selected):
739
  """Checks whether all selected fields are valid.
740

741
  @type static: L{utils.FieldSet}
742
  @param static: static fields set
743
  @type dynamic: L{utils.FieldSet}
744
  @param dynamic: dynamic fields set
745

746
  """
747
  f = utils.FieldSet()
748
  f.Extend(static)
749
  f.Extend(dynamic)
750

    
751
  delta = f.NonMatching(selected)
752
  if delta:
753
    raise errors.OpPrereqError("Unknown output fields selected: %s"
754
                               % ",".join(delta), errors.ECODE_INVAL)
755

    
756

    
757
def _CheckGlobalHvParams(params):
758
  """Validates that given hypervisor params are not global ones.
759

760
  This will ensure that instances don't get customised versions of
761
  global params.
762

763
  """
764
  used_globals = constants.HVC_GLOBALS.intersection(params)
765
  if used_globals:
766
    msg = ("The following hypervisor parameters are global and cannot"
767
           " be customized at instance level, please modify them at"
768
           " cluster level: %s" % utils.CommaJoin(used_globals))
769
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
770

    
771

    
772
def _CheckNodeOnline(lu, node, msg=None):
773
  """Ensure that a given node is online.
774

775
  @param lu: the LU on behalf of which we make the check
776
  @param node: the node to check
777
  @param msg: if passed, should be a message to replace the default one
778
  @raise errors.OpPrereqError: if the node is offline
779

780
  """
781
  if msg is None:
782
    msg = "Can't use offline node"
783
  if lu.cfg.GetNodeInfo(node).offline:
784
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
785

    
786

    
787
def _CheckNodeNotDrained(lu, node):
788
  """Ensure that a given node is not drained.
789

790
  @param lu: the LU on behalf of which we make the check
791
  @param node: the node to check
792
  @raise errors.OpPrereqError: if the node is drained
793

794
  """
795
  if lu.cfg.GetNodeInfo(node).drained:
796
    raise errors.OpPrereqError("Can't use drained node %s" % node,
797
                               errors.ECODE_STATE)
798

    
799

    
800
def _CheckNodeVmCapable(lu, node):
801
  """Ensure that a given node is vm capable.
802

803
  @param lu: the LU on behalf of which we make the check
804
  @param node: the node to check
805
  @raise errors.OpPrereqError: if the node is not vm capable
806

807
  """
808
  if not lu.cfg.GetNodeInfo(node).vm_capable:
809
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
810
                               errors.ECODE_STATE)
811

    
812

    
813
def _CheckNodeHasOS(lu, node, os_name, force_variant):
814
  """Ensure that a node supports a given OS.
815

816
  @param lu: the LU on behalf of which we make the check
817
  @param node: the node to check
818
  @param os_name: the OS to query about
819
  @param force_variant: whether to ignore variant errors
820
  @raise errors.OpPrereqError: if the node is not supporting the OS
821

822
  """
823
  result = lu.rpc.call_os_get(node, os_name)
824
  result.Raise("OS '%s' not in supported OS list for node %s" %
825
               (os_name, node),
826
               prereq=True, ecode=errors.ECODE_INVAL)
827
  if not force_variant:
828
    _CheckOSVariant(result.payload, os_name)
829

    
830

    
831
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
832
  """Ensure that a node has the given secondary ip.
833

834
  @type lu: L{LogicalUnit}
835
  @param lu: the LU on behalf of which we make the check
836
  @type node: string
837
  @param node: the node to check
838
  @type secondary_ip: string
839
  @param secondary_ip: the ip to check
840
  @type prereq: boolean
841
  @param prereq: whether to throw a prerequisite or an execute error
842
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
843
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
844

845
  """
846
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
847
  result.Raise("Failure checking secondary ip on node %s" % node,
848
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
849
  if not result.payload:
850
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
851
           " please fix and re-run this command" % secondary_ip)
852
    if prereq:
853
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
854
    else:
855
      raise errors.OpExecError(msg)
856

    
857

    
858
def _GetClusterDomainSecret():
859
  """Reads the cluster domain secret.
860

861
  """
862
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
863
                               strict=True)
864

    
865

    
866
def _CheckInstanceDown(lu, instance, reason):
867
  """Ensure that an instance is not running."""
868
  if instance.admin_up:
869
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
870
                               (instance.name, reason), errors.ECODE_STATE)
871

    
872
  pnode = instance.primary_node
873
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
874
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
875
              prereq=True, ecode=errors.ECODE_ENVIRON)
876

    
877
  if instance.name in ins_l.payload:
878
    raise errors.OpPrereqError("Instance %s is running, %s" %
879
                               (instance.name, reason), errors.ECODE_STATE)
880

    
881

    
882
def _ExpandItemName(fn, name, kind):
883
  """Expand an item name.
884

885
  @param fn: the function to use for expansion
886
  @param name: requested item name
887
  @param kind: text description ('Node' or 'Instance')
888
  @return: the resolved (full) name
889
  @raise errors.OpPrereqError: if the item is not found
890

891
  """
892
  full_name = fn(name)
893
  if full_name is None:
894
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
895
                               errors.ECODE_NOENT)
896
  return full_name
897

    
898

    
899
def _ExpandNodeName(cfg, name):
900
  """Wrapper over L{_ExpandItemName} for nodes."""
901
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
902

    
903

    
904
def _ExpandInstanceName(cfg, name):
905
  """Wrapper over L{_ExpandItemName} for instance."""
906
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
907

    
908

    
909
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
910
                          memory, vcpus, nics, disk_template, disks,
911
                          bep, hvp, hypervisor_name, tags):
912
  """Builds instance related env variables for hooks
913

914
  This builds the hook environment from individual variables.
915

916
  @type name: string
917
  @param name: the name of the instance
918
  @type primary_node: string
919
  @param primary_node: the name of the instance's primary node
920
  @type secondary_nodes: list
921
  @param secondary_nodes: list of secondary nodes as strings
922
  @type os_type: string
923
  @param os_type: the name of the instance's OS
924
  @type status: boolean
925
  @param status: the should_run status of the instance
926
  @type memory: string
927
  @param memory: the memory size of the instance
928
  @type vcpus: string
929
  @param vcpus: the count of VCPUs the instance has
930
  @type nics: list
931
  @param nics: list of tuples (ip, mac, mode, link) representing
932
      the NICs the instance has
933
  @type disk_template: string
934
  @param disk_template: the disk template of the instance
935
  @type disks: list
936
  @param disks: the list of (size, mode) pairs
937
  @type bep: dict
938
  @param bep: the backend parameters for the instance
939
  @type hvp: dict
940
  @param hvp: the hypervisor parameters for the instance
941
  @type hypervisor_name: string
942
  @param hypervisor_name: the hypervisor for the instance
943
  @type tags: list
944
  @param tags: list of instance tags as strings
945
  @rtype: dict
946
  @return: the hook environment for this instance
947

948
  """
949
  if status:
950
    str_status = "up"
951
  else:
952
    str_status = "down"
953
  env = {
954
    "OP_TARGET": name,
955
    "INSTANCE_NAME": name,
956
    "INSTANCE_PRIMARY": primary_node,
957
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
958
    "INSTANCE_OS_TYPE": os_type,
959
    "INSTANCE_STATUS": str_status,
960
    "INSTANCE_MEMORY": memory,
961
    "INSTANCE_VCPUS": vcpus,
962
    "INSTANCE_DISK_TEMPLATE": disk_template,
963
    "INSTANCE_HYPERVISOR": hypervisor_name,
964
  }
965

    
966
  if nics:
967
    nic_count = len(nics)
968
    for idx, (ip, mac, mode, link) in enumerate(nics):
969
      if ip is None:
970
        ip = ""
971
      env["INSTANCE_NIC%d_IP" % idx] = ip
972
      env["INSTANCE_NIC%d_MAC" % idx] = mac
973
      env["INSTANCE_NIC%d_MODE" % idx] = mode
974
      env["INSTANCE_NIC%d_LINK" % idx] = link
975
      if mode == constants.NIC_MODE_BRIDGED:
976
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
977
  else:
978
    nic_count = 0
979

    
980
  env["INSTANCE_NIC_COUNT"] = nic_count
981

    
982
  if disks:
983
    disk_count = len(disks)
984
    for idx, (size, mode) in enumerate(disks):
985
      env["INSTANCE_DISK%d_SIZE" % idx] = size
986
      env["INSTANCE_DISK%d_MODE" % idx] = mode
987
  else:
988
    disk_count = 0
989

    
990
  env["INSTANCE_DISK_COUNT"] = disk_count
991

    
992
  if not tags:
993
    tags = []
994

    
995
  env["INSTANCE_TAGS"] = " ".join(tags)
996

    
997
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
998
    for key, value in source.items():
999
      env["INSTANCE_%s_%s" % (kind, key)] = value
1000

    
1001
  return env
1002

    
1003

    
1004
def _NICListToTuple(lu, nics):
1005
  """Build a list of nic information tuples.
1006

1007
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
1008
  value in LUInstanceQueryData.
1009

1010
  @type lu:  L{LogicalUnit}
1011
  @param lu: the logical unit on whose behalf we execute
1012
  @type nics: list of L{objects.NIC}
1013
  @param nics: list of nics to convert to hooks tuples
1014

1015
  """
1016
  hooks_nics = []
1017
  cluster = lu.cfg.GetClusterInfo()
1018
  for nic in nics:
1019
    ip = nic.ip
1020
    mac = nic.mac
1021
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
1022
    mode = filled_params[constants.NIC_MODE]
1023
    link = filled_params[constants.NIC_LINK]
1024
    hooks_nics.append((ip, mac, mode, link))
1025
  return hooks_nics
1026

    
1027

    
1028
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
1029
  """Builds instance related env variables for hooks from an object.
1030

1031
  @type lu: L{LogicalUnit}
1032
  @param lu: the logical unit on whose behalf we execute
1033
  @type instance: L{objects.Instance}
1034
  @param instance: the instance for which we should build the
1035
      environment
1036
  @type override: dict
1037
  @param override: dictionary with key/values that will override
1038
      our values
1039
  @rtype: dict
1040
  @return: the hook environment dictionary
1041

1042
  """
1043
  cluster = lu.cfg.GetClusterInfo()
1044
  bep = cluster.FillBE(instance)
1045
  hvp = cluster.FillHV(instance)
1046
  args = {
1047
    "name": instance.name,
1048
    "primary_node": instance.primary_node,
1049
    "secondary_nodes": instance.secondary_nodes,
1050
    "os_type": instance.os,
1051
    "status": instance.admin_up,
1052
    "memory": bep[constants.BE_MEMORY],
1053
    "vcpus": bep[constants.BE_VCPUS],
1054
    "nics": _NICListToTuple(lu, instance.nics),
1055
    "disk_template": instance.disk_template,
1056
    "disks": [(disk.size, disk.mode) for disk in instance.disks],
1057
    "bep": bep,
1058
    "hvp": hvp,
1059
    "hypervisor_name": instance.hypervisor,
1060
    "tags": instance.tags,
1061
  }
1062
  if override:
1063
    args.update(override)
1064
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
1065

    
1066

    
1067
def _AdjustCandidatePool(lu, exceptions):
1068
  """Adjust the candidate pool after node operations.
1069

1070
  """
1071
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1072
  if mod_list:
1073
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1074
               utils.CommaJoin(node.name for node in mod_list))
1075
    for name in mod_list:
1076
      lu.context.ReaddNode(name)
1077
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1078
  if mc_now > mc_max:
1079
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1080
               (mc_now, mc_max))
1081

    
1082

    
1083
def _DecideSelfPromotion(lu, exceptions=None):
1084
  """Decide whether I should promote myself as a master candidate.
1085

1086
  """
1087
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1088
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1089
  # the new node will increase mc_max with one, so:
1090
  mc_should = min(mc_should + 1, cp_size)
1091
  return mc_now < mc_should
1092

    
1093

    
1094
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1095
  """Check that the brigdes needed by a list of nics exist.
1096

1097
  """
1098
  cluster = lu.cfg.GetClusterInfo()
1099
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1100
  brlist = [params[constants.NIC_LINK] for params in paramslist
1101
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1102
  if brlist:
1103
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1104
    result.Raise("Error checking bridges on destination node '%s'" %
1105
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1106

    
1107

    
1108
def _CheckInstanceBridgesExist(lu, instance, node=None):
1109
  """Check that the brigdes needed by an instance exist.
1110

1111
  """
1112
  if node is None:
1113
    node = instance.primary_node
1114
  _CheckNicsBridgesExist(lu, instance.nics, node)
1115

    
1116

    
1117
def _CheckOSVariant(os_obj, name):
1118
  """Check whether an OS name conforms to the os variants specification.
1119

1120
  @type os_obj: L{objects.OS}
1121
  @param os_obj: OS object to check
1122
  @type name: string
1123
  @param name: OS name passed by the user, to check for validity
1124

1125
  """
1126
  variant = objects.OS.GetVariant(name)
1127
  if not os_obj.supported_variants:
1128
    if variant:
1129
      raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
1130
                                 " passed)" % (os_obj.name, variant),
1131
                                 errors.ECODE_INVAL)
1132
    return
1133
  if not variant:
1134
    raise errors.OpPrereqError("OS name must include a variant",
1135
                               errors.ECODE_INVAL)
1136

    
1137
  if variant not in os_obj.supported_variants:
1138
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1139

    
1140

    
1141
def _GetNodeInstancesInner(cfg, fn):
1142
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1143

    
1144

    
1145
def _GetNodeInstances(cfg, node_name):
1146
  """Returns a list of all primary and secondary instances on a node.
1147

1148
  """
1149

    
1150
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1151

    
1152

    
1153
def _GetNodePrimaryInstances(cfg, node_name):
1154
  """Returns primary instances on a node.
1155

1156
  """
1157
  return _GetNodeInstancesInner(cfg,
1158
                                lambda inst: node_name == inst.primary_node)
1159

    
1160

    
1161
def _GetNodeSecondaryInstances(cfg, node_name):
1162
  """Returns secondary instances on a node.
1163

1164
  """
1165
  return _GetNodeInstancesInner(cfg,
1166
                                lambda inst: node_name in inst.secondary_nodes)
1167

    
1168

    
1169
def _GetStorageTypeArgs(cfg, storage_type):
1170
  """Returns the arguments for a storage type.
1171

1172
  """
1173
  # Special case for file storage
1174
  if storage_type == constants.ST_FILE:
1175
    # storage.FileStorage wants a list of storage directories
1176
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1177

    
1178
  return []
1179

    
1180

    
1181
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
1182
  faulty = []
1183

    
1184
  for dev in instance.disks:
1185
    cfg.SetDiskID(dev, node_name)
1186

    
1187
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
1188
  result.Raise("Failed to get disk status from node %s" % node_name,
1189
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1190

    
1191
  for idx, bdev_status in enumerate(result.payload):
1192
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1193
      faulty.append(idx)
1194

    
1195
  return faulty
1196

    
1197

    
1198
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1199
  """Check the sanity of iallocator and node arguments and use the
1200
  cluster-wide iallocator if appropriate.
1201

1202
  Check that at most one of (iallocator, node) is specified. If none is
1203
  specified, then the LU's opcode's iallocator slot is filled with the
1204
  cluster-wide default iallocator.
1205

1206
  @type iallocator_slot: string
1207
  @param iallocator_slot: the name of the opcode iallocator slot
1208
  @type node_slot: string
1209
  @param node_slot: the name of the opcode target node slot
1210

1211
  """
1212
  node = getattr(lu.op, node_slot, None)
1213
  iallocator = getattr(lu.op, iallocator_slot, None)
1214

    
1215
  if node is not None and iallocator is not None:
1216
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
1217
                               errors.ECODE_INVAL)
1218
  elif node is None and iallocator is None:
1219
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1220
    if default_iallocator:
1221
      setattr(lu.op, iallocator_slot, default_iallocator)
1222
    else:
1223
      raise errors.OpPrereqError("No iallocator or node given and no"
1224
                                 " cluster-wide default iallocator found;"
1225
                                 " please specify either an iallocator or a"
1226
                                 " node, or set a cluster-wide default"
1227
                                 " iallocator")
1228

    
1229

    
1230
def _GetDefaultIAllocator(cfg, iallocator):
1231
  """Decides on which iallocator to use.
1232

1233
  @type cfg: L{config.ConfigWriter}
1234
  @param cfg: Cluster configuration object
1235
  @type iallocator: string or None
1236
  @param iallocator: Iallocator specified in opcode
1237
  @rtype: string
1238
  @return: Iallocator name
1239

1240
  """
1241
  if not iallocator:
1242
    # Use default iallocator
1243
    iallocator = cfg.GetDefaultIAllocator()
1244

    
1245
  if not iallocator:
1246
    raise errors.OpPrereqError("No iallocator was specified, neither in the"
1247
                               " opcode nor as a cluster-wide default",
1248
                               errors.ECODE_INVAL)
1249

    
1250
  return iallocator
1251

    
1252

    
1253
class LUClusterPostInit(LogicalUnit):
1254
  """Logical unit for running hooks after cluster initialization.
1255

1256
  """
1257
  HPATH = "cluster-init"
1258
  HTYPE = constants.HTYPE_CLUSTER
1259

    
1260
  def BuildHooksEnv(self):
1261
    """Build hooks env.
1262

1263
    """
1264
    return {
1265
      "OP_TARGET": self.cfg.GetClusterName(),
1266
      }
1267

    
1268
  def BuildHooksNodes(self):
1269
    """Build hooks nodes.
1270

1271
    """
1272
    return ([], [self.cfg.GetMasterNode()])
1273

    
1274
  def Exec(self, feedback_fn):
1275
    """Nothing to do.
1276

1277
    """
1278
    return True
1279

    
1280

    
1281
class LUClusterDestroy(LogicalUnit):
1282
  """Logical unit for destroying the cluster.
1283

1284
  """
1285
  HPATH = "cluster-destroy"
1286
  HTYPE = constants.HTYPE_CLUSTER
1287

    
1288
  def BuildHooksEnv(self):
1289
    """Build hooks env.
1290

1291
    """
1292
    return {
1293
      "OP_TARGET": self.cfg.GetClusterName(),
1294
      }
1295

    
1296
  def BuildHooksNodes(self):
1297
    """Build hooks nodes.
1298

1299
    """
1300
    return ([], [])
1301

    
1302
  def CheckPrereq(self):
1303
    """Check prerequisites.
1304

1305
    This checks whether the cluster is empty.
1306

1307
    Any errors are signaled by raising errors.OpPrereqError.
1308

1309
    """
1310
    master = self.cfg.GetMasterNode()
1311

    
1312
    nodelist = self.cfg.GetNodeList()
1313
    if len(nodelist) != 1 or nodelist[0] != master:
1314
      raise errors.OpPrereqError("There are still %d node(s) in"
1315
                                 " this cluster." % (len(nodelist) - 1),
1316
                                 errors.ECODE_INVAL)
1317
    instancelist = self.cfg.GetInstanceList()
1318
    if instancelist:
1319
      raise errors.OpPrereqError("There are still %d instance(s) in"
1320
                                 " this cluster." % len(instancelist),
1321
                                 errors.ECODE_INVAL)
1322

    
1323
  def Exec(self, feedback_fn):
1324
    """Destroys the cluster.
1325

1326
    """
1327
    master = self.cfg.GetMasterNode()
1328

    
1329
    # Run post hooks on master node before it's removed
1330
    _RunPostHook(self, master)
1331

    
1332
    result = self.rpc.call_node_stop_master(master, False)
1333
    result.Raise("Could not disable the master role")
1334

    
1335
    return master
1336

    
1337

    
1338
def _VerifyCertificate(filename):
1339
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1340

1341
  @type filename: string
1342
  @param filename: Path to PEM file
1343

1344
  """
1345
  try:
1346
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1347
                                           utils.ReadFile(filename))
1348
  except Exception, err: # pylint: disable-msg=W0703
1349
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1350
            "Failed to load X509 certificate %s: %s" % (filename, err))
1351

    
1352
  (errcode, msg) = \
1353
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1354
                                constants.SSL_CERT_EXPIRATION_ERROR)
1355

    
1356
  if msg:
1357
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1358
  else:
1359
    fnamemsg = None
1360

    
1361
  if errcode is None:
1362
    return (None, fnamemsg)
1363
  elif errcode == utils.CERT_WARNING:
1364
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1365
  elif errcode == utils.CERT_ERROR:
1366
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1367

    
1368
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1369

    
1370

    
1371
def _GetAllHypervisorParameters(cluster, instances):
1372
  """Compute the set of all hypervisor parameters.
1373

1374
  @type cluster: L{objects.Cluster}
1375
  @param cluster: the cluster object
1376
  @param instances: list of L{objects.Instance}
1377
  @param instances: additional instances from which to obtain parameters
1378
  @rtype: list of (origin, hypervisor, parameters)
1379
  @return: a list with all parameters found, indicating the hypervisor they
1380
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1381

1382
  """
1383
  hvp_data = []
1384

    
1385
  for hv_name in cluster.enabled_hypervisors:
1386
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1387

    
1388
  for os_name, os_hvp in cluster.os_hvp.items():
1389
    for hv_name, hv_params in os_hvp.items():
1390
      if hv_params:
1391
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1392
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1393

    
1394
  # TODO: collapse identical parameter values in a single one
1395
  for instance in instances:
1396
    if instance.hvparams:
1397
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1398
                       cluster.FillHV(instance)))
1399

    
1400
  return hvp_data
1401

    
1402

    
1403
class _VerifyErrors(object):
1404
  """Mix-in for cluster/group verify LUs.
1405

1406
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1407
  self.op and self._feedback_fn to be available.)
1408

1409
  """
1410
  TCLUSTER = "cluster"
1411
  TNODE = "node"
1412
  TINSTANCE = "instance"
1413

    
1414
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1415
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1416
  ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK")
1417
  ECLUSTERDANGLINGNODES = (TNODE, "ECLUSTERDANGLINGNODES")
1418
  ECLUSTERDANGLINGINST = (TNODE, "ECLUSTERDANGLINGINST")
1419
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1420
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1421
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1422
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1423
  EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
1424
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1425
  EINSTANCESPLITGROUPS = (TINSTANCE, "EINSTANCESPLITGROUPS")
1426
  ENODEDRBD = (TNODE, "ENODEDRBD")
1427
  ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
1428
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1429
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1430
  ENODEHV = (TNODE, "ENODEHV")
1431
  ENODELVM = (TNODE, "ENODELVM")
1432
  ENODEN1 = (TNODE, "ENODEN1")
1433
  ENODENET = (TNODE, "ENODENET")
1434
  ENODEOS = (TNODE, "ENODEOS")
1435
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1436
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1437
  ENODERPC = (TNODE, "ENODERPC")
1438
  ENODESSH = (TNODE, "ENODESSH")
1439
  ENODEVERSION = (TNODE, "ENODEVERSION")
1440
  ENODESETUP = (TNODE, "ENODESETUP")
1441
  ENODETIME = (TNODE, "ENODETIME")
1442
  ENODEOOBPATH = (TNODE, "ENODEOOBPATH")
1443

    
1444
  ETYPE_FIELD = "code"
1445
  ETYPE_ERROR = "ERROR"
1446
  ETYPE_WARNING = "WARNING"
1447

    
1448
  def _Error(self, ecode, item, msg, *args, **kwargs):
1449
    """Format an error message.
1450

1451
    Based on the opcode's error_codes parameter, either format a
1452
    parseable error code, or a simpler error string.
1453

1454
    This must be called only from Exec and functions called from Exec.
1455

1456
    """
1457
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1458
    itype, etxt = ecode
1459
    # first complete the msg
1460
    if args:
1461
      msg = msg % args
1462
    # then format the whole message
1463
    if self.op.error_codes: # This is a mix-in. pylint: disable-msg=E1101
1464
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1465
    else:
1466
      if item:
1467
        item = " " + item
1468
      else:
1469
        item = ""
1470
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1471
    # and finally report it via the feedback_fn
1472
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable-msg=E1101
1473

    
1474
  def _ErrorIf(self, cond, *args, **kwargs):
1475
    """Log an error message if the passed condition is True.
1476

1477
    """
1478
    cond = (bool(cond)
1479
            or self.op.debug_simulate_errors) # pylint: disable-msg=E1101
1480
    if cond:
1481
      self._Error(*args, **kwargs)
1482
    # do not mark the operation as failed for WARN cases only
1483
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1484
      self.bad = self.bad or cond
1485

    
1486

    
1487
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1488
  """Verifies the cluster config.
1489

1490
  """
1491
  REQ_BGL = True
1492

    
1493
  def _VerifyHVP(self, hvp_data):
1494
    """Verifies locally the syntax of the hypervisor parameters.
1495

1496
    """
1497
    for item, hv_name, hv_params in hvp_data:
1498
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1499
             (item, hv_name))
1500
      try:
1501
        hv_class = hypervisor.GetHypervisor(hv_name)
1502
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1503
        hv_class.CheckParameterSyntax(hv_params)
1504
      except errors.GenericError, err:
1505
        self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
1506

    
1507
  def ExpandNames(self):
1508
    # Information can be safely retrieved as the BGL is acquired in exclusive
1509
    # mode
1510
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1511
    self.all_node_info = self.cfg.GetAllNodesInfo()
1512
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1513
    self.needed_locks = {}
1514

    
1515
  def Exec(self, feedback_fn):
1516
    """Verify integrity of cluster, performing various test on nodes.
1517

1518
    """
1519
    self.bad = False
1520
    self._feedback_fn = feedback_fn
1521

    
1522
    feedback_fn("* Verifying cluster config")
1523

    
1524
    for msg in self.cfg.VerifyConfig():
1525
      self._ErrorIf(True, self.ECLUSTERCFG, None, msg)
1526

    
1527
    feedback_fn("* Verifying cluster certificate files")
1528

    
1529
    for cert_filename in constants.ALL_CERT_FILES:
1530
      (errcode, msg) = _VerifyCertificate(cert_filename)
1531
      self._ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
1532

    
1533
    feedback_fn("* Verifying hypervisor parameters")
1534

    
1535
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1536
                                                self.all_inst_info.values()))
1537

    
1538
    feedback_fn("* Verifying all nodes belong to an existing group")
1539

    
1540
    # We do this verification here because, should this bogus circumstance
1541
    # occur, it would never be caught by VerifyGroup, which only acts on
1542
    # nodes/instances reachable from existing node groups.
1543

    
1544
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1545
                         if node.group not in self.all_group_info)
1546

    
1547
    dangling_instances = {}
1548
    no_node_instances = []
1549

    
1550
    for inst in self.all_inst_info.values():
1551
      if inst.primary_node in dangling_nodes:
1552
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1553
      elif inst.primary_node not in self.all_node_info:
1554
        no_node_instances.append(inst.name)
1555

    
1556
    pretty_dangling = [
1557
        "%s (%s)" %
1558
        (node.name,
1559
         utils.CommaJoin(dangling_instances.get(node.name,
1560
                                                ["no instances"])))
1561
        for node in dangling_nodes]
1562

    
1563
    self._ErrorIf(bool(dangling_nodes), self.ECLUSTERDANGLINGNODES, None,
1564
                  "the following nodes (and their instances) belong to a non"
1565
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1566

    
1567
    self._ErrorIf(bool(no_node_instances), self.ECLUSTERDANGLINGINST, None,
1568
                  "the following instances have a non-existing primary-node:"
1569
                  " %s", utils.CommaJoin(no_node_instances))
1570

    
1571
    return (not self.bad, [g.name for g in self.all_group_info.values()])
1572

    
1573

    
1574
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1575
  """Verifies the status of a node group.
1576

1577
  """
1578
  HPATH = "cluster-verify"
1579
  HTYPE = constants.HTYPE_CLUSTER
1580
  REQ_BGL = False
1581

    
1582
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1583

    
1584
  class NodeImage(object):
1585
    """A class representing the logical and physical status of a node.
1586

1587
    @type name: string
1588
    @ivar name: the node name to which this object refers
1589
    @ivar volumes: a structure as returned from
1590
        L{ganeti.backend.GetVolumeList} (runtime)
1591
    @ivar instances: a list of running instances (runtime)
1592
    @ivar pinst: list of configured primary instances (config)
1593
    @ivar sinst: list of configured secondary instances (config)
1594
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1595
        instances for which this node is secondary (config)
1596
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1597
    @ivar dfree: free disk, as reported by the node (runtime)
1598
    @ivar offline: the offline status (config)
1599
    @type rpc_fail: boolean
1600
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1601
        not whether the individual keys were correct) (runtime)
1602
    @type lvm_fail: boolean
1603
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1604
    @type hyp_fail: boolean
1605
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1606
    @type ghost: boolean
1607
    @ivar ghost: whether this is a known node or not (config)
1608
    @type os_fail: boolean
1609
    @ivar os_fail: whether the RPC call didn't return valid OS data
1610
    @type oslist: list
1611
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1612
    @type vm_capable: boolean
1613
    @ivar vm_capable: whether the node can host instances
1614

1615
    """
1616
    def __init__(self, offline=False, name=None, vm_capable=True):
1617
      self.name = name
1618
      self.volumes = {}
1619
      self.instances = []
1620
      self.pinst = []
1621
      self.sinst = []
1622
      self.sbp = {}
1623
      self.mfree = 0
1624
      self.dfree = 0
1625
      self.offline = offline
1626
      self.vm_capable = vm_capable
1627
      self.rpc_fail = False
1628
      self.lvm_fail = False
1629
      self.hyp_fail = False
1630
      self.ghost = False
1631
      self.os_fail = False
1632
      self.oslist = {}
1633

    
1634
  def ExpandNames(self):
1635
    # This raises errors.OpPrereqError on its own:
1636
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1637

    
1638
    # Get instances in node group; this is unsafe and needs verification later
1639
    inst_names = self.cfg.GetNodeGroupInstances(self.group_uuid)
1640

    
1641
    self.needed_locks = {
1642
      locking.LEVEL_INSTANCE: inst_names,
1643
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1644
      locking.LEVEL_NODE: [],
1645
      }
1646

    
1647
    self.share_locks = _ShareAll()
1648

    
1649
  def DeclareLocks(self, level):
1650
    if level == locking.LEVEL_NODE:
1651
      # Get members of node group; this is unsafe and needs verification later
1652
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1653

    
1654
      all_inst_info = self.cfg.GetAllInstancesInfo()
1655

    
1656
      # In Exec(), we warn about mirrored instances that have primary and
1657
      # secondary living in separate node groups. To fully verify that
1658
      # volumes for these instances are healthy, we will need to do an
1659
      # extra call to their secondaries. We ensure here those nodes will
1660
      # be locked.
1661
      for inst in self.glm.list_owned(locking.LEVEL_INSTANCE):
1662
        # Important: access only the instances whose lock is owned
1663
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1664
          nodes.update(all_inst_info[inst].secondary_nodes)
1665

    
1666
      self.needed_locks[locking.LEVEL_NODE] = nodes
1667

    
1668
  def CheckPrereq(self):
1669
    group_nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1670
    group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
1671

    
1672
    unlocked_nodes = \
1673
        group_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE))
1674

    
1675
    unlocked_instances = \
1676
        group_instances.difference(self.glm.list_owned(locking.LEVEL_INSTANCE))
1677

    
1678
    if unlocked_nodes:
1679
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1680
                                 utils.CommaJoin(unlocked_nodes))
1681

    
1682
    if unlocked_instances:
1683
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1684
                                 utils.CommaJoin(unlocked_instances))
1685

    
1686
    self.all_node_info = self.cfg.GetAllNodesInfo()
1687
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1688

    
1689
    self.my_node_names = utils.NiceSort(group_nodes)
1690
    self.my_inst_names = utils.NiceSort(group_instances)
1691

    
1692
    self.my_node_info = dict((name, self.all_node_info[name])
1693
                             for name in self.my_node_names)
1694

    
1695
    self.my_inst_info = dict((name, self.all_inst_info[name])
1696
                             for name in self.my_inst_names)
1697

    
1698
    # We detect here the nodes that will need the extra RPC calls for verifying
1699
    # split LV volumes; they should be locked.
1700
    extra_lv_nodes = set()
1701

    
1702
    for inst in self.my_inst_info.values():
1703
      if inst.disk_template in constants.DTS_INT_MIRROR:
1704
        group = self.my_node_info[inst.primary_node].group
1705
        for nname in inst.secondary_nodes:
1706
          if self.all_node_info[nname].group != group:
1707
            extra_lv_nodes.add(nname)
1708

    
1709
    unlocked_lv_nodes = \
1710
        extra_lv_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE))
1711

    
1712
    if unlocked_lv_nodes:
1713
      raise errors.OpPrereqError("these nodes could be locked: %s" %
1714
                                 utils.CommaJoin(unlocked_lv_nodes))
1715
    self.extra_lv_nodes = list(extra_lv_nodes)
1716

    
1717
  def _VerifyNode(self, ninfo, nresult):
1718
    """Perform some basic validation on data returned from a node.
1719

1720
      - check the result data structure is well formed and has all the
1721
        mandatory fields
1722
      - check ganeti version
1723

1724
    @type ninfo: L{objects.Node}
1725
    @param ninfo: the node to check
1726
    @param nresult: the results from the node
1727
    @rtype: boolean
1728
    @return: whether overall this call was successful (and we can expect
1729
         reasonable values in the respose)
1730

1731
    """
1732
    node = ninfo.name
1733
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1734

    
1735
    # main result, nresult should be a non-empty dict
1736
    test = not nresult or not isinstance(nresult, dict)
1737
    _ErrorIf(test, self.ENODERPC, node,
1738
                  "unable to verify node: no data returned")
1739
    if test:
1740
      return False
1741

    
1742
    # compares ganeti version
1743
    local_version = constants.PROTOCOL_VERSION
1744
    remote_version = nresult.get("version", None)
1745
    test = not (remote_version and
1746
                isinstance(remote_version, (list, tuple)) and
1747
                len(remote_version) == 2)
1748
    _ErrorIf(test, self.ENODERPC, node,
1749
             "connection to node returned invalid data")
1750
    if test:
1751
      return False
1752

    
1753
    test = local_version != remote_version[0]
1754
    _ErrorIf(test, self.ENODEVERSION, node,
1755
             "incompatible protocol versions: master %s,"
1756
             " node %s", local_version, remote_version[0])
1757
    if test:
1758
      return False
1759

    
1760
    # node seems compatible, we can actually try to look into its results
1761

    
1762
    # full package version
1763
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1764
                  self.ENODEVERSION, node,
1765
                  "software version mismatch: master %s, node %s",
1766
                  constants.RELEASE_VERSION, remote_version[1],
1767
                  code=self.ETYPE_WARNING)
1768

    
1769
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1770
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1771
      for hv_name, hv_result in hyp_result.iteritems():
1772
        test = hv_result is not None
1773
        _ErrorIf(test, self.ENODEHV, node,
1774
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1775

    
1776
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1777
    if ninfo.vm_capable and isinstance(hvp_result, list):
1778
      for item, hv_name, hv_result in hvp_result:
1779
        _ErrorIf(True, self.ENODEHV, node,
1780
                 "hypervisor %s parameter verify failure (source %s): %s",
1781
                 hv_name, item, hv_result)
1782

    
1783
    test = nresult.get(constants.NV_NODESETUP,
1784
                       ["Missing NODESETUP results"])
1785
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1786
             "; ".join(test))
1787

    
1788
    return True
1789

    
1790
  def _VerifyNodeTime(self, ninfo, nresult,
1791
                      nvinfo_starttime, nvinfo_endtime):
1792
    """Check the node time.
1793

1794
    @type ninfo: L{objects.Node}
1795
    @param ninfo: the node to check
1796
    @param nresult: the remote results for the node
1797
    @param nvinfo_starttime: the start time of the RPC call
1798
    @param nvinfo_endtime: the end time of the RPC call
1799

1800
    """
1801
    node = ninfo.name
1802
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1803

    
1804
    ntime = nresult.get(constants.NV_TIME, None)
1805
    try:
1806
      ntime_merged = utils.MergeTime(ntime)
1807
    except (ValueError, TypeError):
1808
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1809
      return
1810

    
1811
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1812
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1813
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1814
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1815
    else:
1816
      ntime_diff = None
1817

    
1818
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1819
             "Node time diverges by at least %s from master node time",
1820
             ntime_diff)
1821

    
1822
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1823
    """Check the node LVM results.
1824

1825
    @type ninfo: L{objects.Node}
1826
    @param ninfo: the node to check
1827
    @param nresult: the remote results for the node
1828
    @param vg_name: the configured VG name
1829

1830
    """
1831
    if vg_name is None:
1832
      return
1833

    
1834
    node = ninfo.name
1835
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1836

    
1837
    # checks vg existence and size > 20G
1838
    vglist = nresult.get(constants.NV_VGLIST, None)
1839
    test = not vglist
1840
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1841
    if not test:
1842
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1843
                                            constants.MIN_VG_SIZE)
1844
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1845

    
1846
    # check pv names
1847
    pvlist = nresult.get(constants.NV_PVLIST, None)
1848
    test = pvlist is None
1849
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1850
    if not test:
1851
      # check that ':' is not present in PV names, since it's a
1852
      # special character for lvcreate (denotes the range of PEs to
1853
      # use on the PV)
1854
      for _, pvname, owner_vg in pvlist:
1855
        test = ":" in pvname
1856
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1857
                 " '%s' of VG '%s'", pvname, owner_vg)
1858

    
1859
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1860
    """Check the node bridges.
1861

1862
    @type ninfo: L{objects.Node}
1863
    @param ninfo: the node to check
1864
    @param nresult: the remote results for the node
1865
    @param bridges: the expected list of bridges
1866

1867
    """
1868
    if not bridges:
1869
      return
1870

    
1871
    node = ninfo.name
1872
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1873

    
1874
    missing = nresult.get(constants.NV_BRIDGES, None)
1875
    test = not isinstance(missing, list)
1876
    _ErrorIf(test, self.ENODENET, node,
1877
             "did not return valid bridge information")
1878
    if not test:
1879
      _ErrorIf(bool(missing), self.ENODENET, node, "missing bridges: %s" %
1880
               utils.CommaJoin(sorted(missing)))
1881

    
1882
  def _VerifyNodeNetwork(self, ninfo, nresult):
1883
    """Check the node network connectivity results.
1884

1885
    @type ninfo: L{objects.Node}
1886
    @param ninfo: the node to check
1887
    @param nresult: the remote results for the node
1888

1889
    """
1890
    node = ninfo.name
1891
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1892

    
1893
    test = constants.NV_NODELIST not in nresult
1894
    _ErrorIf(test, self.ENODESSH, node,
1895
             "node hasn't returned node ssh connectivity data")
1896
    if not test:
1897
      if nresult[constants.NV_NODELIST]:
1898
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1899
          _ErrorIf(True, self.ENODESSH, node,
1900
                   "ssh communication with node '%s': %s", a_node, a_msg)
1901

    
1902
    test = constants.NV_NODENETTEST not in nresult
1903
    _ErrorIf(test, self.ENODENET, node,
1904
             "node hasn't returned node tcp connectivity data")
1905
    if not test:
1906
      if nresult[constants.NV_NODENETTEST]:
1907
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1908
        for anode in nlist:
1909
          _ErrorIf(True, self.ENODENET, node,
1910
                   "tcp communication with node '%s': %s",
1911
                   anode, nresult[constants.NV_NODENETTEST][anode])
1912

    
1913
    test = constants.NV_MASTERIP not in nresult
1914
    _ErrorIf(test, self.ENODENET, node,
1915
             "node hasn't returned node master IP reachability data")
1916
    if not test:
1917
      if not nresult[constants.NV_MASTERIP]:
1918
        if node == self.master_node:
1919
          msg = "the master node cannot reach the master IP (not configured?)"
1920
        else:
1921
          msg = "cannot reach the master IP"
1922
        _ErrorIf(True, self.ENODENET, node, msg)
1923

    
1924
  def _VerifyInstance(self, instance, instanceconfig, node_image,
1925
                      diskstatus):
1926
    """Verify an instance.
1927

1928
    This function checks to see if the required block devices are
1929
    available on the instance's node.
1930

1931
    """
1932
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1933
    node_current = instanceconfig.primary_node
1934

    
1935
    node_vol_should = {}
1936
    instanceconfig.MapLVsByNode(node_vol_should)
1937

    
1938
    for node in node_vol_should:
1939
      n_img = node_image[node]
1940
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1941
        # ignore missing volumes on offline or broken nodes
1942
        continue
1943
      for volume in node_vol_should[node]:
1944
        test = volume not in n_img.volumes
1945
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1946
                 "volume %s missing on node %s", volume, node)
1947

    
1948
    if instanceconfig.admin_up:
1949
      pri_img = node_image[node_current]
1950
      test = instance not in pri_img.instances and not pri_img.offline
1951
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1952
               "instance not running on its primary node %s",
1953
               node_current)
1954

    
1955
    diskdata = [(nname, success, status, idx)
1956
                for (nname, disks) in diskstatus.items()
1957
                for idx, (success, status) in enumerate(disks)]
1958

    
1959
    for nname, success, bdev_status, idx in diskdata:
1960
      # the 'ghost node' construction in Exec() ensures that we have a
1961
      # node here
1962
      snode = node_image[nname]
1963
      bad_snode = snode.ghost or snode.offline
1964
      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
1965
               self.EINSTANCEFAULTYDISK, instance,
1966
               "couldn't retrieve status for disk/%s on %s: %s",
1967
               idx, nname, bdev_status)
1968
      _ErrorIf((instanceconfig.admin_up and success and
1969
                bdev_status.ldisk_status == constants.LDS_FAULTY),
1970
               self.EINSTANCEFAULTYDISK, instance,
1971
               "disk/%s on %s is faulty", idx, nname)
1972

    
1973
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1974
    """Verify if there are any unknown volumes in the cluster.
1975

1976
    The .os, .swap and backup volumes are ignored. All other volumes are
1977
    reported as unknown.
1978

1979
    @type reserved: L{ganeti.utils.FieldSet}
1980
    @param reserved: a FieldSet of reserved volume names
1981

1982
    """
1983
    for node, n_img in node_image.items():
1984
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1985
        # skip non-healthy nodes
1986
        continue
1987
      for volume in n_img.volumes:
1988
        test = ((node not in node_vol_should or
1989
                volume not in node_vol_should[node]) and
1990
                not reserved.Matches(volume))
1991
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1992
                      "volume %s is unknown", volume)
1993

    
1994
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1995
    """Verify N+1 Memory Resilience.
1996

1997
    Check that if one single node dies we can still start all the
1998
    instances it was primary for.
1999

2000
    """
2001
    cluster_info = self.cfg.GetClusterInfo()
2002
    for node, n_img in node_image.items():
2003
      # This code checks that every node which is now listed as
2004
      # secondary has enough memory to host all instances it is
2005
      # supposed to should a single other node in the cluster fail.
2006
      # FIXME: not ready for failover to an arbitrary node
2007
      # FIXME: does not support file-backed instances
2008
      # WARNING: we currently take into account down instances as well
2009
      # as up ones, considering that even if they're down someone
2010
      # might want to start them even in the event of a node failure.
2011
      if n_img.offline:
2012
        # we're skipping offline nodes from the N+1 warning, since
2013
        # most likely we don't have good memory infromation from them;
2014
        # we already list instances living on such nodes, and that's
2015
        # enough warning
2016
        continue
2017
      for prinode, instances in n_img.sbp.items():
2018
        needed_mem = 0
2019
        for instance in instances:
2020
          bep = cluster_info.FillBE(instance_cfg[instance])
2021
          if bep[constants.BE_AUTO_BALANCE]:
2022
            needed_mem += bep[constants.BE_MEMORY]
2023
        test = n_img.mfree < needed_mem
2024
        self._ErrorIf(test, self.ENODEN1, node,
2025
                      "not enough memory to accomodate instance failovers"
2026
                      " should node %s fail (%dMiB needed, %dMiB available)",
2027
                      prinode, needed_mem, n_img.mfree)
2028

    
2029
  @classmethod
2030
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
2031
                   (files_all, files_all_opt, files_mc, files_vm)):
2032
    """Verifies file checksums collected from all nodes.
2033

2034
    @param errorif: Callback for reporting errors
2035
    @param nodeinfo: List of L{objects.Node} objects
2036
    @param master_node: Name of master node
2037
    @param all_nvinfo: RPC results
2038

2039
    """
2040
    node_names = frozenset(node.name for node in nodeinfo if not node.offline)
2041

    
2042
    assert master_node in node_names
2043
    assert (len(files_all | files_all_opt | files_mc | files_vm) ==
2044
            sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
2045
           "Found file listed in more than one file list"
2046

    
2047
    # Define functions determining which nodes to consider for a file
2048
    file2nodefn = dict([(filename, fn)
2049
      for (files, fn) in [(files_all, None),
2050
                          (files_all_opt, None),
2051
                          (files_mc, lambda node: (node.master_candidate or
2052
                                                   node.name == master_node)),
2053
                          (files_vm, lambda node: node.vm_capable)]
2054
      for filename in files])
2055

    
2056
    fileinfo = dict((filename, {}) for filename in file2nodefn.keys())
2057

    
2058
    for node in nodeinfo:
2059
      if node.offline:
2060
        continue
2061

    
2062
      nresult = all_nvinfo[node.name]
2063

    
2064
      if nresult.fail_msg or not nresult.payload:
2065
        node_files = None
2066
      else:
2067
        node_files = nresult.payload.get(constants.NV_FILELIST, None)
2068

    
2069
      test = not (node_files and isinstance(node_files, dict))
2070
      errorif(test, cls.ENODEFILECHECK, node.name,
2071
              "Node did not return file checksum data")
2072
      if test:
2073
        continue
2074

    
2075
      for (filename, checksum) in node_files.items():
2076
        # Check if the file should be considered for a node
2077
        fn = file2nodefn[filename]
2078
        if fn is None or fn(node):
2079
          fileinfo[filename].setdefault(checksum, set()).add(node.name)
2080

    
2081
    for (filename, checksums) in fileinfo.items():
2082
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2083

    
2084
      # Nodes having the file
2085
      with_file = frozenset(node_name
2086
                            for nodes in fileinfo[filename].values()
2087
                            for node_name in nodes)
2088

    
2089
      # Nodes missing file
2090
      missing_file = node_names - with_file
2091

    
2092
      if filename in files_all_opt:
2093
        # All or no nodes
2094
        errorif(missing_file and missing_file != node_names,
2095
                cls.ECLUSTERFILECHECK, None,
2096
                "File %s is optional, but it must exist on all or no"
2097
                " nodes (not found on %s)",
2098
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2099
      else:
2100
        errorif(missing_file, cls.ECLUSTERFILECHECK, None,
2101
                "File %s is missing from node(s) %s", filename,
2102
                utils.CommaJoin(utils.NiceSort(missing_file)))
2103

    
2104
      # See if there are multiple versions of the file
2105
      test = len(checksums) > 1
2106
      if test:
2107
        variants = ["variant %s on %s" %
2108
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2109
                    for (idx, (checksum, nodes)) in
2110
                      enumerate(sorted(checksums.items()))]
2111
      else:
2112
        variants = []
2113

    
2114
      errorif(test, cls.ECLUSTERFILECHECK, None,
2115
              "File %s found with %s different checksums (%s)",
2116
              filename, len(checksums), "; ".join(variants))
2117

    
2118
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2119
                      drbd_map):
2120
    """Verifies and the node DRBD status.
2121

2122
    @type ninfo: L{objects.Node}
2123
    @param ninfo: the node to check
2124
    @param nresult: the remote results for the node
2125
    @param instanceinfo: the dict of instances
2126
    @param drbd_helper: the configured DRBD usermode helper
2127
    @param drbd_map: the DRBD map as returned by
2128
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2129

2130
    """
2131
    node = ninfo.name
2132
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2133

    
2134
    if drbd_helper:
2135
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2136
      test = (helper_result == None)
2137
      _ErrorIf(test, self.ENODEDRBDHELPER, node,
2138
               "no drbd usermode helper returned")
2139
      if helper_result:
2140
        status, payload = helper_result
2141
        test = not status
2142
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
2143
                 "drbd usermode helper check unsuccessful: %s", payload)
2144
        test = status and (payload != drbd_helper)
2145
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
2146
                 "wrong drbd usermode helper: %s", payload)
2147

    
2148
    # compute the DRBD minors
2149
    node_drbd = {}
2150
    for minor, instance in drbd_map[node].items():
2151
      test = instance not in instanceinfo
2152
      _ErrorIf(test, self.ECLUSTERCFG, None,
2153
               "ghost instance '%s' in temporary DRBD map", instance)
2154
        # ghost instance should not be running, but otherwise we
2155
        # don't give double warnings (both ghost instance and
2156
        # unallocated minor in use)
2157
      if test:
2158
        node_drbd[minor] = (instance, False)
2159
      else:
2160
        instance = instanceinfo[instance]
2161
        node_drbd[minor] = (instance.name, instance.admin_up)
2162

    
2163
    # and now check them
2164
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2165
    test = not isinstance(used_minors, (tuple, list))
2166
    _ErrorIf(test, self.ENODEDRBD, node,
2167
             "cannot parse drbd status file: %s", str(used_minors))
2168
    if test:
2169
      # we cannot check drbd status
2170
      return
2171

    
2172
    for minor, (iname, must_exist) in node_drbd.items():
2173
      test = minor not in used_minors and must_exist
2174
      _ErrorIf(test, self.ENODEDRBD, node,
2175
               "drbd minor %d of instance %s is not active", minor, iname)
2176
    for minor in used_minors:
2177
      test = minor not in node_drbd
2178
      _ErrorIf(test, self.ENODEDRBD, node,
2179
               "unallocated drbd minor %d is in use", minor)
2180

    
2181
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2182
    """Builds the node OS structures.
2183

2184
    @type ninfo: L{objects.Node}
2185
    @param ninfo: the node to check
2186
    @param nresult: the remote results for the node
2187
    @param nimg: the node image object
2188

2189
    """
2190
    node = ninfo.name
2191
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2192

    
2193
    remote_os = nresult.get(constants.NV_OSLIST, None)
2194
    test = (not isinstance(remote_os, list) or
2195
            not compat.all(isinstance(v, list) and len(v) == 7
2196
                           for v in remote_os))
2197

    
2198
    _ErrorIf(test, self.ENODEOS, node,
2199
             "node hasn't returned valid OS data")
2200

    
2201
    nimg.os_fail = test
2202

    
2203
    if test:
2204
      return
2205

    
2206
    os_dict = {}
2207

    
2208
    for (name, os_path, status, diagnose,
2209
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2210

    
2211
      if name not in os_dict:
2212
        os_dict[name] = []
2213

    
2214
      # parameters is a list of lists instead of list of tuples due to
2215
      # JSON lacking a real tuple type, fix it:
2216
      parameters = [tuple(v) for v in parameters]
2217
      os_dict[name].append((os_path, status, diagnose,
2218
                            set(variants), set(parameters), set(api_ver)))
2219

    
2220
    nimg.oslist = os_dict
2221

    
2222
  def _VerifyNodeOS(self, ninfo, nimg, base):
2223
    """Verifies the node OS list.
2224

2225
    @type ninfo: L{objects.Node}
2226
    @param ninfo: the node to check
2227
    @param nimg: the node image object
2228
    @param base: the 'template' node we match against (e.g. from the master)
2229

2230
    """
2231
    node = ninfo.name
2232
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2233

    
2234
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2235

    
2236
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2237
    for os_name, os_data in nimg.oslist.items():
2238
      assert os_data, "Empty OS status for OS %s?!" % os_name
2239
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2240
      _ErrorIf(not f_status, self.ENODEOS, node,
2241
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2242
      _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
2243
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2244
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2245
      # comparisons with the 'base' image
2246
      test = os_name not in base.oslist
2247
      _ErrorIf(test, self.ENODEOS, node,
2248
               "Extra OS %s not present on reference node (%s)",
2249
               os_name, base.name)
2250
      if test:
2251
        continue
2252
      assert base.oslist[os_name], "Base node has empty OS status?"
2253
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2254
      if not b_status:
2255
        # base OS is invalid, skipping
2256
        continue
2257
      for kind, a, b in [("API version", f_api, b_api),
2258
                         ("variants list", f_var, b_var),
2259
                         ("parameters", beautify_params(f_param),
2260
                          beautify_params(b_param))]:
2261
        _ErrorIf(a != b, self.ENODEOS, node,
2262
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2263
                 kind, os_name, base.name,
2264
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2265

    
2266
    # check any missing OSes
2267
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2268
    _ErrorIf(missing, self.ENODEOS, node,
2269
             "OSes present on reference node %s but missing on this node: %s",
2270
             base.name, utils.CommaJoin(missing))
2271

    
2272
  def _VerifyOob(self, ninfo, nresult):
2273
    """Verifies out of band functionality of a node.
2274

2275
    @type ninfo: L{objects.Node}
2276
    @param ninfo: the node to check
2277
    @param nresult: the remote results for the node
2278

2279
    """
2280
    node = ninfo.name
2281
    # We just have to verify the paths on master and/or master candidates
2282
    # as the oob helper is invoked on the master
2283
    if ((ninfo.master_candidate or ninfo.master_capable) and
2284
        constants.NV_OOB_PATHS in nresult):
2285
      for path_result in nresult[constants.NV_OOB_PATHS]:
2286
        self._ErrorIf(path_result, self.ENODEOOBPATH, node, path_result)
2287

    
2288
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2289
    """Verifies and updates the node volume data.
2290

2291
    This function will update a L{NodeImage}'s internal structures
2292
    with data from the remote call.
2293

2294
    @type ninfo: L{objects.Node}
2295
    @param ninfo: the node to check
2296
    @param nresult: the remote results for the node
2297
    @param nimg: the node image object
2298
    @param vg_name: the configured VG name
2299

2300
    """
2301
    node = ninfo.name
2302
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2303

    
2304
    nimg.lvm_fail = True
2305
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2306
    if vg_name is None:
2307
      pass
2308
    elif isinstance(lvdata, basestring):
2309
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
2310
               utils.SafeEncode(lvdata))
2311
    elif not isinstance(lvdata, dict):
2312
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
2313
    else:
2314
      nimg.volumes = lvdata
2315
      nimg.lvm_fail = False
2316

    
2317
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2318
    """Verifies and updates the node instance list.
2319

2320
    If the listing was successful, then updates this node's instance
2321
    list. Otherwise, it marks the RPC call as failed for the instance
2322
    list key.
2323

2324
    @type ninfo: L{objects.Node}
2325
    @param ninfo: the node to check
2326
    @param nresult: the remote results for the node
2327
    @param nimg: the node image object
2328

2329
    """
2330
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2331
    test = not isinstance(idata, list)
2332
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
2333
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
2334
    if test:
2335
      nimg.hyp_fail = True
2336
    else:
2337
      nimg.instances = idata
2338

    
2339
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2340
    """Verifies and computes a node information map
2341

2342
    @type ninfo: L{objects.Node}
2343
    @param ninfo: the node to check
2344
    @param nresult: the remote results for the node
2345
    @param nimg: the node image object
2346
    @param vg_name: the configured VG name
2347

2348
    """
2349
    node = ninfo.name
2350
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2351

    
2352
    # try to read free memory (from the hypervisor)
2353
    hv_info = nresult.get(constants.NV_HVINFO, None)
2354
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2355
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
2356
    if not test:
2357
      try:
2358
        nimg.mfree = int(hv_info["memory_free"])
2359
      except (ValueError, TypeError):
2360
        _ErrorIf(True, self.ENODERPC, node,
2361
                 "node returned invalid nodeinfo, check hypervisor")
2362

    
2363
    # FIXME: devise a free space model for file based instances as well
2364
    if vg_name is not None:
2365
      test = (constants.NV_VGLIST not in nresult or
2366
              vg_name not in nresult[constants.NV_VGLIST])
2367
      _ErrorIf(test, self.ENODELVM, node,
2368
               "node didn't return data for the volume group '%s'"
2369
               " - it is either missing or broken", vg_name)
2370
      if not test:
2371
        try:
2372
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2373
        except (ValueError, TypeError):
2374
          _ErrorIf(True, self.ENODERPC, node,
2375
                   "node returned invalid LVM info, check LVM status")
2376

    
2377
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2378
    """Gets per-disk status information for all instances.
2379

2380
    @type nodelist: list of strings
2381
    @param nodelist: Node names
2382
    @type node_image: dict of (name, L{objects.Node})
2383
    @param node_image: Node objects
2384
    @type instanceinfo: dict of (name, L{objects.Instance})
2385
    @param instanceinfo: Instance objects
2386
    @rtype: {instance: {node: [(succes, payload)]}}
2387
    @return: a dictionary of per-instance dictionaries with nodes as
2388
        keys and disk information as values; the disk information is a
2389
        list of tuples (success, payload)
2390

2391
    """
2392
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2393

    
2394
    node_disks = {}
2395
    node_disks_devonly = {}
2396
    diskless_instances = set()
2397
    diskless = constants.DT_DISKLESS
2398

    
2399
    for nname in nodelist:
2400
      node_instances = list(itertools.chain(node_image[nname].pinst,
2401
                                            node_image[nname].sinst))
2402
      diskless_instances.update(inst for inst in node_instances
2403
                                if instanceinfo[inst].disk_template == diskless)
2404
      disks = [(inst, disk)
2405
               for inst in node_instances
2406
               for disk in instanceinfo[inst].disks]
2407

    
2408
      if not disks:
2409
        # No need to collect data
2410
        continue
2411

    
2412
      node_disks[nname] = disks
2413

    
2414
      # Creating copies as SetDiskID below will modify the objects and that can
2415
      # lead to incorrect data returned from nodes
2416
      devonly = [dev.Copy() for (_, dev) in disks]
2417

    
2418
      for dev in devonly:
2419
        self.cfg.SetDiskID(dev, nname)
2420

    
2421
      node_disks_devonly[nname] = devonly
2422

    
2423
    assert len(node_disks) == len(node_disks_devonly)
2424

    
2425
    # Collect data from all nodes with disks
2426
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2427
                                                          node_disks_devonly)
2428

    
2429
    assert len(result) == len(node_disks)
2430

    
2431
    instdisk = {}
2432

    
2433
    for (nname, nres) in result.items():
2434
      disks = node_disks[nname]
2435

    
2436
      if nres.offline:
2437
        # No data from this node
2438
        data = len(disks) * [(False, "node offline")]
2439
      else:
2440
        msg = nres.fail_msg
2441
        _ErrorIf(msg, self.ENODERPC, nname,
2442
                 "while getting disk information: %s", msg)
2443
        if msg:
2444
          # No data from this node
2445
          data = len(disks) * [(False, msg)]
2446
        else:
2447
          data = []
2448
          for idx, i in enumerate(nres.payload):
2449
            if isinstance(i, (tuple, list)) and len(i) == 2:
2450
              data.append(i)
2451
            else:
2452
              logging.warning("Invalid result from node %s, entry %d: %s",
2453
                              nname, idx, i)
2454
              data.append((False, "Invalid result from the remote node"))
2455

    
2456
      for ((inst, _), status) in zip(disks, data):
2457
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2458

    
2459
    # Add empty entries for diskless instances.
2460
    for inst in diskless_instances:
2461
      assert inst not in instdisk
2462
      instdisk[inst] = {}
2463

    
2464
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2465
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2466
                      compat.all(isinstance(s, (tuple, list)) and
2467
                                 len(s) == 2 for s in statuses)
2468
                      for inst, nnames in instdisk.items()
2469
                      for nname, statuses in nnames.items())
2470
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2471

    
2472
    return instdisk
2473

    
2474
  def BuildHooksEnv(self):
2475
    """Build hooks env.
2476

2477
    Cluster-Verify hooks just ran in the post phase and their failure makes
2478
    the output be logged in the verify output and the verification to fail.
2479

2480
    """
2481
    env = {
2482
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2483
      }
2484

    
2485
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2486
               for node in self.my_node_info.values())
2487

    
2488
    return env
2489

    
2490
  def BuildHooksNodes(self):
2491
    """Build hooks nodes.
2492

2493
    """
2494
    return ([], self.my_node_names)
2495

    
2496
  def Exec(self, feedback_fn):
2497
    """Verify integrity of the node group, performing various test on nodes.
2498

2499
    """
2500
    # This method has too many local variables. pylint: disable-msg=R0914
2501

    
2502
    if not self.my_node_names:
2503
      # empty node group
2504
      feedback_fn("* Empty node group, skipping verification")
2505
      return True
2506

    
2507
    self.bad = False
2508
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2509
    verbose = self.op.verbose
2510
    self._feedback_fn = feedback_fn
2511

    
2512
    vg_name = self.cfg.GetVGName()
2513
    drbd_helper = self.cfg.GetDRBDHelper()
2514
    cluster = self.cfg.GetClusterInfo()
2515
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2516
    hypervisors = cluster.enabled_hypervisors
2517
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2518

    
2519
    i_non_redundant = [] # Non redundant instances
2520
    i_non_a_balanced = [] # Non auto-balanced instances
2521
    n_offline = 0 # Count of offline nodes
2522
    n_drained = 0 # Count of nodes being drained
2523
    node_vol_should = {}
2524

    
2525
    # FIXME: verify OS list
2526

    
2527
    # File verification
2528
    filemap = _ComputeAncillaryFiles(cluster, False)
2529

    
2530
    # do local checksums
2531
    master_node = self.master_node = self.cfg.GetMasterNode()
2532
    master_ip = self.cfg.GetMasterIP()
2533

    
2534
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2535

    
2536
    # We will make nodes contact all nodes in their group, and one node from
2537
    # every other group.
2538
    # TODO: should it be a *random* node, different every time?
2539
    online_nodes = [node.name for node in node_data_list if not node.offline]
2540
    other_group_nodes = {}
2541

    
2542
    for name in sorted(self.all_node_info):
2543
      node = self.all_node_info[name]
2544
      if (node.group not in other_group_nodes
2545
          and node.group != self.group_uuid
2546
          and not node.offline):
2547
        other_group_nodes[node.group] = node.name
2548

    
2549
    node_verify_param = {
2550
      constants.NV_FILELIST:
2551
        utils.UniqueSequence(filename
2552
                             for files in filemap
2553
                             for filename in files),
2554
      constants.NV_NODELIST: online_nodes + other_group_nodes.values(),
2555
      constants.NV_HYPERVISOR: hypervisors,
2556
      constants.NV_HVPARAMS:
2557
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2558
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2559
                                 for node in node_data_list
2560
                                 if not node.offline],
2561
      constants.NV_INSTANCELIST: hypervisors,
2562
      constants.NV_VERSION: None,
2563
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2564
      constants.NV_NODESETUP: None,
2565
      constants.NV_TIME: None,
2566
      constants.NV_MASTERIP: (master_node, master_ip),
2567
      constants.NV_OSLIST: None,
2568
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2569
      }
2570

    
2571
    if vg_name is not None:
2572
      node_verify_param[constants.NV_VGLIST] = None
2573
      node_verify_param[constants.NV_LVLIST] = vg_name
2574
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2575
      node_verify_param[constants.NV_DRBDLIST] = None
2576

    
2577
    if drbd_helper:
2578
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2579

    
2580
    # bridge checks
2581
    # FIXME: this needs to be changed per node-group, not cluster-wide
2582
    bridges = set()
2583
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2584
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2585
      bridges.add(default_nicpp[constants.NIC_LINK])
2586
    for instance in self.my_inst_info.values():
2587
      for nic in instance.nics:
2588
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2589
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2590
          bridges.add(full_nic[constants.NIC_LINK])
2591

    
2592
    if bridges:
2593
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2594

    
2595
    # Build our expected cluster state
2596
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2597
                                                 name=node.name,
2598
                                                 vm_capable=node.vm_capable))
2599
                      for node in node_data_list)
2600

    
2601
    # Gather OOB paths
2602
    oob_paths = []
2603
    for node in self.all_node_info.values():
2604
      path = _SupportsOob(self.cfg, node)
2605
      if path and path not in oob_paths:
2606
        oob_paths.append(path)
2607

    
2608
    if oob_paths:
2609
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2610

    
2611
    for instance in self.my_inst_names:
2612
      inst_config = self.my_inst_info[instance]
2613

    
2614
      for nname in inst_config.all_nodes:
2615
        if nname not in node_image:
2616
          gnode = self.NodeImage(name=nname)
2617
          gnode.ghost = (nname not in self.all_node_info)
2618
          node_image[nname] = gnode
2619

    
2620
      inst_config.MapLVsByNode(node_vol_should)
2621

    
2622
      pnode = inst_config.primary_node
2623
      node_image[pnode].pinst.append(instance)
2624

    
2625
      for snode in inst_config.secondary_nodes:
2626
        nimg = node_image[snode]
2627
        nimg.sinst.append(instance)
2628
        if pnode not in nimg.sbp:
2629
          nimg.sbp[pnode] = []
2630
        nimg.sbp[pnode].append(instance)
2631

    
2632
    # At this point, we have the in-memory data structures complete,
2633
    # except for the runtime information, which we'll gather next
2634

    
2635
    # Due to the way our RPC system works, exact response times cannot be
2636
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2637
    # time before and after executing the request, we can at least have a time
2638
    # window.
2639
    nvinfo_starttime = time.time()
2640
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2641
                                           node_verify_param,
2642
                                           self.cfg.GetClusterName())
2643
    nvinfo_endtime = time.time()
2644

    
2645
    if self.extra_lv_nodes and vg_name is not None:
2646
      extra_lv_nvinfo = \
2647
          self.rpc.call_node_verify(self.extra_lv_nodes,
2648
                                    {constants.NV_LVLIST: vg_name},
2649
                                    self.cfg.GetClusterName())
2650
    else:
2651
      extra_lv_nvinfo = {}
2652

    
2653
    all_drbd_map = self.cfg.ComputeDRBDMap()
2654

    
2655
    feedback_fn("* Gathering disk information (%s nodes)" %
2656
                len(self.my_node_names))
2657
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2658
                                     self.my_inst_info)
2659

    
2660
    feedback_fn("* Verifying configuration file consistency")
2661

    
2662
    # If not all nodes are being checked, we need to make sure the master node
2663
    # and a non-checked vm_capable node are in the list.
2664
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2665
    if absent_nodes:
2666
      vf_nvinfo = all_nvinfo.copy()
2667
      vf_node_info = list(self.my_node_info.values())
2668
      additional_nodes = []
2669
      if master_node not in self.my_node_info:
2670
        additional_nodes.append(master_node)
2671
        vf_node_info.append(self.all_node_info[master_node])
2672
      # Add the first vm_capable node we find which is not included
2673
      for node in absent_nodes:
2674
        nodeinfo = self.all_node_info[node]
2675
        if nodeinfo.vm_capable and not nodeinfo.offline:
2676
          additional_nodes.append(node)
2677
          vf_node_info.append(self.all_node_info[node])
2678
          break
2679
      key = constants.NV_FILELIST
2680
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2681
                                                 {key: node_verify_param[key]},
2682
                                                 self.cfg.GetClusterName()))
2683
    else:
2684
      vf_nvinfo = all_nvinfo
2685
      vf_node_info = self.my_node_info.values()
2686

    
2687
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2688

    
2689
    feedback_fn("* Verifying node status")
2690

    
2691
    refos_img = None
2692

    
2693
    for node_i in node_data_list:
2694
      node = node_i.name
2695
      nimg = node_image[node]
2696

    
2697
      if node_i.offline:
2698
        if verbose:
2699
          feedback_fn("* Skipping offline node %s" % (node,))
2700
        n_offline += 1
2701
        continue
2702

    
2703
      if node == master_node:
2704
        ntype = "master"
2705
      elif node_i.master_candidate:
2706
        ntype = "master candidate"
2707
      elif node_i.drained:
2708
        ntype = "drained"
2709
        n_drained += 1
2710
      else:
2711
        ntype = "regular"
2712
      if verbose:
2713
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2714

    
2715
      msg = all_nvinfo[node].fail_msg
2716
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
2717
      if msg:
2718
        nimg.rpc_fail = True
2719
        continue
2720

    
2721
      nresult = all_nvinfo[node].payload
2722

    
2723
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2724
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2725
      self._VerifyNodeNetwork(node_i, nresult)
2726
      self._VerifyOob(node_i, nresult)
2727

    
2728
      if nimg.vm_capable:
2729
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2730
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2731
                             all_drbd_map)
2732

    
2733
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2734
        self._UpdateNodeInstances(node_i, nresult, nimg)
2735
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2736
        self._UpdateNodeOS(node_i, nresult, nimg)
2737

    
2738
        if not nimg.os_fail:
2739
          if refos_img is None:
2740
            refos_img = nimg
2741
          self._VerifyNodeOS(node_i, nimg, refos_img)
2742
        self._VerifyNodeBridges(node_i, nresult, bridges)
2743

    
2744
        # Check whether all running instancies are primary for the node. (This
2745
        # can no longer be done from _VerifyInstance below, since some of the
2746
        # wrong instances could be from other node groups.)
2747
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2748

    
2749
        for inst in non_primary_inst:
2750
          test = inst in self.all_inst_info
2751
          _ErrorIf(test, self.EINSTANCEWRONGNODE, inst,
2752
                   "instance should not run on node %s", node_i.name)
2753
          _ErrorIf(not test, self.ENODEORPHANINSTANCE, node_i.name,
2754
                   "node is running unknown instance %s", inst)
2755

    
2756
    for node, result in extra_lv_nvinfo.items():
2757
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2758
                              node_image[node], vg_name)
2759

    
2760
    feedback_fn("* Verifying instance status")
2761
    for instance in self.my_inst_names:
2762
      if verbose:
2763
        feedback_fn("* Verifying instance %s" % instance)
2764
      inst_config = self.my_inst_info[instance]
2765
      self._VerifyInstance(instance, inst_config, node_image,
2766
                           instdisk[instance])
2767
      inst_nodes_offline = []
2768

    
2769
      pnode = inst_config.primary_node
2770
      pnode_img = node_image[pnode]
2771
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2772
               self.ENODERPC, pnode, "instance %s, connection to"
2773
               " primary node failed", instance)
2774

    
2775
      _ErrorIf(inst_config.admin_up and pnode_img.offline,
2776
               self.EINSTANCEBADNODE, instance,
2777
               "instance is marked as running and lives on offline node %s",
2778
               inst_config.primary_node)
2779

    
2780
      # If the instance is non-redundant we cannot survive losing its primary
2781
      # node, so we are not N+1 compliant. On the other hand we have no disk
2782
      # templates with more than one secondary so that situation is not well
2783
      # supported either.
2784
      # FIXME: does not support file-backed instances
2785
      if not inst_config.secondary_nodes:
2786
        i_non_redundant.append(instance)
2787

    
2788
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
2789
               instance, "instance has multiple secondary nodes: %s",
2790
               utils.CommaJoin(inst_config.secondary_nodes),
2791
               code=self.ETYPE_WARNING)
2792

    
2793
      if inst_config.disk_template in constants.DTS_INT_MIRROR:
2794
        pnode = inst_config.primary_node
2795
        instance_nodes = utils.NiceSort(inst_config.all_nodes)
2796
        instance_groups = {}
2797

    
2798
        for node in instance_nodes:
2799
          instance_groups.setdefault(self.all_node_info[node].group,
2800
                                     []).append(node)
2801

    
2802
        pretty_list = [
2803
          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
2804
          # Sort so that we always list the primary node first.
2805
          for group, nodes in sorted(instance_groups.items(),
2806
                                     key=lambda (_, nodes): pnode in nodes,
2807
                                     reverse=True)]
2808

    
2809
        self._ErrorIf(len(instance_groups) > 1, self.EINSTANCESPLITGROUPS,
2810
                      instance, "instance has primary and secondary nodes in"
2811
                      " different groups: %s", utils.CommaJoin(pretty_list),
2812
                      code=self.ETYPE_WARNING)
2813

    
2814
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2815
        i_non_a_balanced.append(instance)
2816

    
2817
      for snode in inst_config.secondary_nodes:
2818
        s_img = node_image[snode]
2819
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
2820
                 "instance %s, connection to secondary node failed", instance)
2821

    
2822
        if s_img.offline:
2823
          inst_nodes_offline.append(snode)
2824

    
2825
      # warn that the instance lives on offline nodes
2826
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
2827
               "instance has offline secondary node(s) %s",
2828
               utils.CommaJoin(inst_nodes_offline))
2829
      # ... or ghost/non-vm_capable nodes
2830
      for node in inst_config.all_nodes:
2831
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
2832
                 "instance lives on ghost node %s", node)
2833
        _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE,
2834
                 instance, "instance lives on non-vm_capable node %s", node)
2835

    
2836
    feedback_fn("* Verifying orphan volumes")
2837
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2838

    
2839
    # We will get spurious "unknown volume" warnings if any node of this group
2840
    # is secondary for an instance whose primary is in another group. To avoid
2841
    # them, we find these instances and add their volumes to node_vol_should.
2842
    for inst in self.all_inst_info.values():
2843
      for secondary in inst.secondary_nodes:
2844
        if (secondary in self.my_node_info
2845
            and inst.name not in self.my_inst_info):
2846
          inst.MapLVsByNode(node_vol_should)
2847
          break
2848

    
2849
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2850

    
2851
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2852
      feedback_fn("* Verifying N+1 Memory redundancy")
2853
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2854

    
2855
    feedback_fn("* Other Notes")
2856
    if i_non_redundant:
2857
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2858
                  % len(i_non_redundant))
2859

    
2860
    if i_non_a_balanced:
2861
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2862
                  % len(i_non_a_balanced))
2863

    
2864
    if n_offline:
2865
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2866

    
2867
    if n_drained:
2868
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2869

    
2870
    return not self.bad
2871

    
2872
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2873
    """Analyze the post-hooks' result
2874

2875
    This method analyses the hook result, handles it, and sends some
2876
    nicely-formatted feedback back to the user.
2877

2878
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2879
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2880
    @param hooks_results: the results of the multi-node hooks rpc call
2881
    @param feedback_fn: function used send feedback back to the caller
2882
    @param lu_result: previous Exec result
2883
    @return: the new Exec result, based on the previous result
2884
        and hook results
2885

2886
    """
2887
    # We only really run POST phase hooks, only for non-empty groups,
2888
    # and are only interested in their results
2889
    if not self.my_node_names:
2890
      # empty node group
2891
      pass
2892
    elif phase == constants.HOOKS_PHASE_POST:
2893
      # Used to change hooks' output to proper indentation
2894
      feedback_fn("* Hooks Results")
2895
      assert hooks_results, "invalid result from hooks"
2896

    
2897
      for node_name in hooks_results:
2898
        res = hooks_results[node_name]
2899
        msg = res.fail_msg
2900
        test = msg and not res.offline
2901
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
2902
                      "Communication failure in hooks execution: %s", msg)
2903
        if res.offline or msg:
2904
          # No need to investigate payload if node is offline or gave an error.
2905
          # override manually lu_result here as _ErrorIf only
2906
          # overrides self.bad
2907
          lu_result = 1
2908
          continue
2909
        for script, hkr, output in res.payload:
2910
          test = hkr == constants.HKR_FAIL
2911
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
2912
                        "Script %s failed, output:", script)
2913
          if test:
2914
            output = self._HOOKS_INDENT_RE.sub("      ", output)
2915
            feedback_fn("%s" % output)
2916
            lu_result = 0
2917

    
2918
    return lu_result
2919

    
2920

    
2921
class LUClusterVerifyDisks(NoHooksLU):
2922
  """Verifies the cluster disks status.
2923

2924
  """
2925
  REQ_BGL = False
2926

    
2927
  def ExpandNames(self):
2928
    self.share_locks = _ShareAll()
2929
    self.needed_locks = {
2930
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
2931
      }
2932

    
2933
  def Exec(self, feedback_fn):
2934
    group_names = self.glm.list_owned(locking.LEVEL_NODEGROUP)
2935

    
2936
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2937
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2938
                           for group in group_names])
2939

    
2940

    
2941
class LUGroupVerifyDisks(NoHooksLU):
2942
  """Verifies the status of all disks in a node group.
2943

2944
  """
2945
  REQ_BGL = False
2946

    
2947
  def ExpandNames(self):
2948
    # Raises errors.OpPrereqError on its own if group can't be found
2949
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
2950

    
2951
    self.share_locks = _ShareAll()
2952
    self.needed_locks = {
2953
      locking.LEVEL_INSTANCE: [],
2954
      locking.LEVEL_NODEGROUP: [],
2955
      locking.LEVEL_NODE: [],
2956
      }
2957

    
2958
  def DeclareLocks(self, level):
2959
    if level == locking.LEVEL_INSTANCE:
2960
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
2961

    
2962
      # Lock instances optimistically, needs verification once node and group
2963
      # locks have been acquired
2964
      self.needed_locks[locking.LEVEL_INSTANCE] = \
2965
        self.cfg.GetNodeGroupInstances(self.group_uuid)
2966

    
2967
    elif level == locking.LEVEL_NODEGROUP:
2968
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
2969

    
2970
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
2971
        set([self.group_uuid] +
2972
            # Lock all groups used by instances optimistically; this requires
2973
            # going via the node before it's locked, requiring verification
2974
            # later on
2975
            [group_uuid
2976
             for instance_name in
2977
               self.glm.list_owned(locking.LEVEL_INSTANCE)
2978
             for group_uuid in
2979
               self.cfg.GetInstanceNodeGroups(instance_name)])
2980

    
2981
    elif level == locking.LEVEL_NODE:
2982
      # This will only lock the nodes in the group to be verified which contain
2983
      # actual instances
2984
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
2985
      self._LockInstancesNodes()
2986

    
2987
      # Lock all nodes in group to be verified
2988
      assert self.group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
2989
      member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
2990
      self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
2991

    
2992
  def CheckPrereq(self):
2993
    owned_instances = frozenset(self.glm.list_owned(locking.LEVEL_INSTANCE))
2994
    owned_groups = frozenset(self.glm.list_owned(locking.LEVEL_NODEGROUP))
2995
    owned_nodes = frozenset(self.glm.list_owned(locking.LEVEL_NODE))
2996

    
2997
    assert self.group_uuid in owned_groups
2998

    
2999
    # Check if locked instances are still correct
3000
    wanted_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
3001
    if owned_instances != wanted_instances:
3002
      raise errors.OpPrereqError("Instances in node group %s changed since"
3003
                                 " locks were acquired, wanted %s, have %s;"
3004
                                 " retry the operation" %
3005
                                 (self.op.group_name,
3006
                                  utils.CommaJoin(wanted_instances),
3007
                                  utils.CommaJoin(owned_instances)),
3008
                                 errors.ECODE_STATE)
3009

    
3010
    # Get instance information
3011
    self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
3012

    
3013
    # Check if node groups for locked instances are still correct
3014
    for (instance_name, inst) in self.instances.items():
3015
      assert self.group_uuid in self.cfg.GetInstanceNodeGroups(instance_name), \
3016
        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
3017
      assert owned_nodes.issuperset(inst.all_nodes), \
3018
        "Instance %s's nodes changed while we kept the lock" % instance_name
3019

    
3020
      _CheckInstanceNodeGroups(self.cfg, instance_name, owned_groups)
3021

    
3022
  def Exec(self, feedback_fn):
3023
    """Verify integrity of cluster disks.
3024

3025
    @rtype: tuple of three items
3026
    @return: a tuple of (dict of node-to-node_error, list of instances
3027
        which need activate-disks, dict of instance: (node, volume) for
3028
        missing volumes
3029

3030
    """
3031
    res_nodes = {}
3032
    res_instances = set()
3033
    res_missing = {}
3034

    
3035
    nv_dict = _MapInstanceDisksToNodes([inst
3036
                                        for inst in self.instances.values()
3037
                                        if inst.admin_up])
3038

    
3039
    if nv_dict:
3040
      nodes = utils.NiceSort(set(self.glm.list_owned(locking.LEVEL_NODE)) &
3041
                             set(self.cfg.GetVmCapableNodeList()))
3042

    
3043
      node_lvs = self.rpc.call_lv_list(nodes, [])
3044

    
3045
      for (node, node_res) in node_lvs.items():
3046
        if node_res.offline:
3047
          continue
3048

    
3049
        msg = node_res.fail_msg
3050
        if msg:
3051
          logging.warning("Error enumerating LVs on node %s: %s", node, msg)
3052
          res_nodes[node] = msg
3053
          continue
3054

    
3055
        for lv_name, (_, _, lv_online) in node_res.payload.items():
3056
          inst = nv_dict.pop((node, lv_name), None)
3057
          if not (lv_online or inst is None):
3058
            res_instances.add(inst)
3059

    
3060
      # any leftover items in nv_dict are missing LVs, let's arrange the data
3061
      # better
3062
      for key, inst in nv_dict.iteritems():
3063
        res_missing.setdefault(inst, []).append(key)
3064

    
3065
    return (res_nodes, list(res_instances), res_missing)
3066

    
3067

    
3068
class LUClusterRepairDiskSizes(NoHooksLU):
3069
  """Verifies the cluster disks sizes.
3070

3071
  """
3072
  REQ_BGL = False
3073

    
3074
  def ExpandNames(self):
3075
    if self.op.instances:
3076
      self.wanted_names = _GetWantedInstances(self, self.op.instances)
3077
      self.needed_locks = {
3078
        locking.LEVEL_NODE: [],
3079
        locking.LEVEL_INSTANCE: self.wanted_names,
3080
        }
3081
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3082
    else:
3083
      self.wanted_names = None
3084
      self.needed_locks = {
3085
        locking.LEVEL_NODE: locking.ALL_SET,
3086
        locking.LEVEL_INSTANCE: locking.ALL_SET,
3087
        }
3088
    self.share_locks = _ShareAll()
3089

    
3090
  def DeclareLocks(self, level):
3091
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
3092
      self._LockInstancesNodes(primary_only=True)
3093

    
3094
  def CheckPrereq(self):
3095
    """Check prerequisites.
3096

3097
    This only checks the optional instance list against the existing names.
3098

3099
    """
3100
    if self.wanted_names is None:
3101
      self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
3102

    
3103
    self.wanted_instances = \
3104
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
3105

    
3106
  def _EnsureChildSizes(self, disk):
3107
    """Ensure children of the disk have the needed disk size.
3108

3109
    This is valid mainly for DRBD8 and fixes an issue where the
3110
    children have smaller disk size.
3111

3112
    @param disk: an L{ganeti.objects.Disk} object
3113

3114
    """
3115
    if disk.dev_type == constants.LD_DRBD8:
3116
      assert disk.children, "Empty children for DRBD8?"
3117
      fchild = disk.children[0]
3118
      mismatch = fchild.size < disk.size
3119
      if mismatch:
3120
        self.LogInfo("Child disk has size %d, parent %d, fixing",
3121
                     fchild.size, disk.size)
3122
        fchild.size = disk.size
3123

    
3124
      # and we recurse on this child only, not on the metadev
3125
      return self._EnsureChildSizes(fchild) or mismatch
3126
    else:
3127
      return False
3128

    
3129
  def Exec(self, feedback_fn):
3130
    """Verify the size of cluster disks.
3131

3132
    """
3133
    # TODO: check child disks too
3134
    # TODO: check differences in size between primary/secondary nodes
3135
    per_node_disks = {}
3136
    for instance in self.wanted_instances:
3137
      pnode = instance.primary_node
3138
      if pnode not in per_node_disks:
3139
        per_node_disks[pnode] = []
3140
      for idx, disk in enumerate(instance.disks):
3141
        per_node_disks[pnode].append((instance, idx, disk))
3142

    
3143
    changed = []
3144
    for node, dskl in per_node_disks.items():
3145
      newl = [v[2].Copy() for v in dskl]
3146
      for dsk in newl:
3147
        self.cfg.SetDiskID(dsk, node)
3148
      result = self.rpc.call_blockdev_getsize(node, newl)
3149
      if result.fail_msg:
3150
        self.LogWarning("Failure in blockdev_getsize call to node"
3151
                        " %s, ignoring", node)
3152
        continue
3153
      if len(result.payload) != len(dskl):
3154
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
3155
                        " result.payload=%s", node, len(dskl), result.payload)
3156
        self.LogWarning("Invalid result from node %s, ignoring node results",
3157
                        node)
3158
        continue
3159
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
3160
        if size is None:
3161
          self.LogWarning("Disk %d of instance %s did not return size"
3162
                          " information, ignoring", idx, instance.name)
3163
          continue
3164
        if not isinstance(size, (int, long)):
3165
          self.LogWarning("Disk %d of instance %s did not return valid"
3166
                          " size information, ignoring", idx, instance.name)
3167
          continue
3168
        size = size >> 20
3169
        if size != disk.size:
3170
          self.LogInfo("Disk %d of instance %s has mismatched size,"
3171
                       " correcting: recorded %d, actual %d", idx,
3172
                       instance.name, disk.size, size)
3173
          disk.size = size
3174
          self.cfg.Update(instance, feedback_fn)
3175
          changed.append((instance.name, idx, size))
3176
        if self._EnsureChildSizes(disk):
3177
          self.cfg.Update(instance, feedback_fn)
3178
          changed.append((instance.name, idx, disk.size))
3179
    return changed
3180

    
3181

    
3182
class LUClusterRename(LogicalUnit):
3183
  """Rename the cluster.
3184

3185
  """
3186
  HPATH = "cluster-rename"
3187
  HTYPE = constants.HTYPE_CLUSTER
3188

    
3189
  def BuildHooksEnv(self):
3190
    """Build hooks env.
3191

3192
    """
3193
    return {
3194
      "OP_TARGET": self.cfg.GetClusterName(),
3195
      "NEW_NAME": self.op.name,
3196
      }
3197

    
3198
  def BuildHooksNodes(self):
3199
    """Build hooks nodes.
3200

3201
    """
3202
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
3203

    
3204
  def CheckPrereq(self):
3205
    """Verify that the passed name is a valid one.
3206

3207
    """
3208
    hostname = netutils.GetHostname(name=self.op.name,
3209
                                    family=self.cfg.GetPrimaryIPFamily())
3210

    
3211
    new_name = hostname.name
3212
    self.ip = new_ip = hostname.ip
3213
    old_name = self.cfg.GetClusterName()
3214
    old_ip = self.cfg.GetMasterIP()
3215
    if new_name == old_name and new_ip == old_ip:
3216
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
3217
                                 " cluster has changed",
3218
                                 errors.ECODE_INVAL)
3219
    if new_ip != old_ip:
3220
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
3221
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
3222
                                   " reachable on the network" %
3223
                                   new_ip, errors.ECODE_NOTUNIQUE)
3224

    
3225
    self.op.name = new_name
3226

    
3227
  def Exec(self, feedback_fn):
3228
    """Rename the cluster.
3229

3230
    """
3231
    clustername = self.op.name
3232
    ip = self.ip
3233

    
3234
    # shutdown the master IP
3235
    master = self.cfg.GetMasterNode()
3236
    result = self.rpc.call_node_stop_master(master, False)
3237
    result.Raise("Could not disable the master role")
3238

    
3239
    try:
3240
      cluster = self.cfg.GetClusterInfo()
3241
      cluster.cluster_name = clustername
3242
      cluster.master_ip = ip
3243
      self.cfg.Update(cluster, feedback_fn)
3244

    
3245
      # update the known hosts file
3246
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
3247
      node_list = self.cfg.GetOnlineNodeList()
3248
      try:
3249
        node_list.remove(master)
3250
      except ValueError:
3251
        pass
3252
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
3253
    finally:
3254
      result = self.rpc.call_node_start_master(master, False, False)
3255
      msg = result.fail_msg
3256
      if msg:
3257
        self.LogWarning("Could not re-enable the master role on"
3258
                        " the master, please restart manually: %s", msg)
3259

    
3260
    return clustername
3261

    
3262

    
3263
class LUClusterSetParams(LogicalUnit):
3264
  """Change the parameters of the cluster.
3265

3266
  """
3267
  HPATH = "cluster-modify"
3268
  HTYPE = constants.HTYPE_CLUSTER
3269
  REQ_BGL = False
3270

    
3271
  def CheckArguments(self):
3272
    """Check parameters
3273

3274
    """
3275
    if self.op.uid_pool:
3276
      uidpool.CheckUidPool(self.op.uid_pool)
3277

    
3278
    if self.op.add_uids:
3279
      uidpool.CheckUidPool(self.op.add_uids)
3280

    
3281
    if self.op.remove_uids:
3282
      uidpool.CheckUidPool(self.op.remove_uids)
3283

    
3284
  def ExpandNames(self):
3285
    # FIXME: in the future maybe other cluster params won't require checking on
3286
    # all nodes to be modified.
3287
    self.needed_locks = {
3288
      locking.LEVEL_NODE: locking.ALL_SET,
3289
    }
3290
    self.share_locks[locking.LEVEL_NODE] = 1
3291

    
3292
  def BuildHooksEnv(self):
3293
    """Build hooks env.
3294

3295
    """
3296
    return {
3297
      "OP_TARGET": self.cfg.GetClusterName(),
3298
      "NEW_VG_NAME": self.op.vg_name,
3299
      }
3300

    
3301
  def BuildHooksNodes(self):
3302
    """Build hooks nodes.
3303

3304
    """
3305
    mn = self.cfg.GetMasterNode()
3306
    return ([mn], [mn])
3307

    
3308
  def CheckPrereq(self):
3309
    """Check prerequisites.
3310

3311
    This checks whether the given params don't conflict and
3312
    if the given volume group is valid.
3313

3314
    """
3315
    if self.op.vg_name is not None and not self.op.vg_name:
3316
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
3317
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
3318
                                   " instances exist", errors.ECODE_INVAL)
3319

    
3320
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
3321
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
3322
        raise errors.OpPrereqError("Cannot disable drbd helper while"
3323
                                   " drbd-based instances exist",
3324
                                   errors.ECODE_INVAL)
3325

    
3326
    node_list = self.glm.list_owned(locking.LEVEL_NODE)
3327

    
3328
    # if vg_name not None, checks given volume group on all nodes
3329
    if self.op.vg_name:
3330
      vglist = self.rpc.call_vg_list(node_list)
3331
      for node in node_list:
3332
        msg = vglist[node].fail_msg
3333
        if msg:
3334
          # ignoring down node
3335
          self.LogWarning("Error while gathering data on node %s"
3336
                          " (ignoring node): %s", node, msg)
3337
          continue
3338
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
3339
                                              self.op.vg_name,
3340
                                              constants.MIN_VG_SIZE)
3341
        if vgstatus:
3342
          raise errors.OpPrereqError("Error on node '%s': %s" %
3343
                                     (node, vgstatus), errors.ECODE_ENVIRON)
3344

    
3345
    if self.op.drbd_helper:
3346
      # checks given drbd helper on all nodes
3347
      helpers = self.rpc.call_drbd_helper(node_list)
3348
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
3349
        if ninfo.offline:
3350
          self.LogInfo("Not checking drbd helper on offline node %s", node)
3351
          continue
3352
        msg = helpers[node].fail_msg
3353
        if msg:
3354
          raise errors.OpPrereqError("Error checking drbd helper on node"
3355
                                     " '%s': %s" % (node, msg),
3356
                                     errors.ECODE_ENVIRON)
3357
        node_helper = helpers[node].payload
3358
        if node_helper != self.op.drbd_helper:
3359
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
3360
                                     (node, node_helper), errors.ECODE_ENVIRON)
3361

    
3362
    self.cluster = cluster = self.cfg.GetClusterInfo()
3363
    # validate params changes
3364
    if self.op.beparams:
3365
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
3366
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
3367

    
3368
    if self.op.ndparams:
3369
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
3370
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
3371

    
3372
      # TODO: we need a more general way to handle resetting
3373
      # cluster-level parameters to default values
3374
      if self.new_ndparams["oob_program"] == "":
3375
        self.new_ndparams["oob_program"] = \
3376
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
3377

    
3378
    if self.op.nicparams:
3379
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
3380
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
3381
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
3382
      nic_errors = []
3383

    
3384
      # check all instances for consistency
3385
      for instance in self.cfg.GetAllInstancesInfo().values():
3386
        for nic_idx, nic in enumerate(instance.nics):
3387
          params_copy = copy.deepcopy(nic.nicparams)
3388
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
3389

    
3390
          # check parameter syntax
3391
          try:
3392
            objects.NIC.CheckParameterSyntax(params_filled)
3393
          except errors.ConfigurationError, err:
3394
            nic_errors.append("Instance %s, nic/%d: %s" %
3395
                              (instance.name, nic_idx, err))
3396

    
3397
          # if we're moving instances to routed, check that they have an ip
3398
          target_mode = params_filled[constants.NIC_MODE]
3399
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
3400
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
3401
                              " address" % (instance.name, nic_idx))
3402
      if nic_errors:
3403
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
3404
                                   "\n".join(nic_errors))
3405

    
3406
    # hypervisor list/parameters
3407
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
3408
    if self.op.hvparams:
3409
      for hv_name, hv_dict in self.op.hvparams.items():
3410
        if hv_name not in self.new_hvparams:
3411
          self.new_hvparams[hv_name] = hv_dict
3412
        else:
3413
          self.new_hvparams[hv_name].update(hv_dict)
3414

    
3415
    # os hypervisor parameters
3416
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
3417
    if self.op.os_hvp:
3418
      for os_name, hvs in self.op.os_hvp.items():
3419
        if os_name not in self.new_os_hvp:
3420
          self.new_os_hvp[os_name] = hvs
3421
        else:
3422
          for hv_name, hv_dict in hvs.items():
3423
            if hv_name not in self.new_os_hvp[os_name]:
3424
              self.new_os_hvp[os_name][hv_name] = hv_dict
3425
            else:
3426
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
3427

    
3428
    # os parameters
3429
    self.new_osp = objects.FillDict(cluster.osparams, {})
3430
    if self.op.osparams:
3431
      for os_name, osp in self.op.osparams.items():
3432
        if os_name not in self.new_osp:
3433
          self.new_osp[os_name] = {}
3434

    
3435
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
3436
                                                  use_none=True)
3437

    
3438
        if not self.new_osp[os_name]:
3439
          # we removed all parameters
3440
          del self.new_osp[os_name]
3441
        else:
3442
          # check the parameter validity (remote check)
3443
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
3444
                         os_name, self.new_osp[os_name])
3445

    
3446
    # changes to the hypervisor list
3447
    if self.op.enabled_hypervisors is not None:
3448
      self.hv_list = self.op.enabled_hypervisors
3449
      for hv in self.hv_list:
3450
        # if the hypervisor doesn't already exist in the cluster
3451
        # hvparams, we initialize it to empty, and then (in both
3452
        # cases) we make sure to fill the defaults, as we might not
3453
        # have a complete defaults list if the hypervisor wasn't
3454
        # enabled before
3455
        if hv not in new_hvp:
3456
          new_hvp[hv] = {}
3457
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
3458
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
3459
    else:
3460
      self.hv_list = cluster.enabled_hypervisors
3461

    
3462
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
3463
      # either the enabled list has changed, or the parameters have, validate
3464
      for hv_name, hv_params in self.new_hvparams.items():
3465
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
3466
            (self.op.enabled_hypervisors and
3467
             hv_name in self.op.enabled_hypervisors)):
3468
          # either this is a new hypervisor, or its parameters have changed
3469
          hv_class = hypervisor.GetHypervisor(hv_name)
3470
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3471
          hv_class.CheckParameterSyntax(hv_params)
3472
          _CheckHVParams(self, node_list, hv_name, hv_params)
3473

    
3474
    if self.op.os_hvp:
3475
      # no need to check any newly-enabled hypervisors, since the
3476
      # defaults have already been checked in the above code-block
3477
      for os_name, os_hvp in self.new_os_hvp.items():
3478
        for hv_name, hv_params in os_hvp.items():
3479
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3480
          # we need to fill in the new os_hvp on top of the actual hv_p
3481
          cluster_defaults = self.new_hvparams.get(hv_name, {})
3482
          new_osp = objects.FillDict(cluster_defaults, hv_params)
3483
          hv_class = hypervisor.GetHypervisor(hv_name)
3484
          hv_class.CheckParameterSyntax(new_osp)
3485
          _CheckHVParams(self, node_list, hv_name, new_osp)
3486

    
3487
    if self.op.default_iallocator:
3488
      alloc_script = utils.FindFile(self.op.default_iallocator,
3489
                                    constants.IALLOCATOR_SEARCH_PATH,
3490
                                    os.path.isfile)
3491
      if alloc_script is None:
3492
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
3493
                                   " specified" % self.op.default_iallocator,
3494
                                   errors.ECODE_INVAL)
3495

    
3496
  def Exec(self, feedback_fn):
3497
    """Change the parameters of the cluster.
3498

3499
    """
3500
    if self.op.vg_name is not None:
3501
      new_volume = self.op.vg_name
3502
      if not new_volume:
3503
        new_volume = None
3504
      if new_volume != self.cfg.GetVGName():
3505
        self.cfg.SetVGName(new_volume)
3506
      else:
3507
        feedback_fn("Cluster LVM configuration already in desired"
3508
                    " state, not changing")
3509
    if self.op.drbd_helper is not None:
3510
      new_helper = self.op.drbd_helper
3511
      if not new_helper:
3512
        new_helper = None
3513
      if new_helper != self.cfg.GetDRBDHelper():
3514
        self.cfg.SetDRBDHelper(new_helper)
3515
      else:
3516
        feedback_fn("Cluster DRBD helper already in desired state,"
3517
                    " not changing")
3518
    if self.op.hvparams:
3519
      self.cluster.hvparams = self.new_hvparams
3520
    if self.op.os_hvp:
3521
      self.cluster.os_hvp = self.new_os_hvp
3522
    if self.op.enabled_hypervisors is not None:
3523
      self.cluster.hvparams = self.new_hvparams
3524
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
3525
    if self.op.beparams:
3526
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
3527
    if self.op.nicparams:
3528
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
3529
    if self.op.osparams:
3530
      self.cluster.osparams = self.new_osp
3531
    if self.op.ndparams:
3532
      self.cluster.ndparams = self.new_ndparams
3533

    
3534
    if self.op.candidate_pool_size is not None:
3535
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3536
      # we need to update the pool size here, otherwise the save will fail
3537
      _AdjustCandidatePool(self, [])
3538

    
3539
    if self.op.maintain_node_health is not None:
3540
      self.cluster.maintain_node_health = self.op.maintain_node_health
3541

    
3542
    if self.op.prealloc_wipe_disks is not None:
3543
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3544

    
3545
    if self.op.add_uids is not None:
3546
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3547

    
3548
    if self.op.remove_uids is not None:
3549
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3550

    
3551
    if self.op.uid_pool is not None:
3552
      self.cluster.uid_pool = self.op.uid_pool
3553

    
3554
    if self.op.default_iallocator is not None:
3555
      self.cluster.default_iallocator = self.op.default_iallocator
3556

    
3557
    if self.op.reserved_lvs is not None:
3558
      self.cluster.reserved_lvs = self.op.reserved_lvs
3559

    
3560
    def helper_os(aname, mods, desc):
3561
      desc += " OS list"
3562
      lst = getattr(self.cluster, aname)
3563
      for key, val in mods:
3564
        if key == constants.DDM_ADD:
3565
          if val in lst:
3566
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3567
          else:
3568
            lst.append(val)
3569
        elif key == constants.DDM_REMOVE:
3570
          if val in lst:
3571
            lst.remove(val)
3572
          else:
3573
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3574
        else:
3575
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3576

    
3577
    if self.op.hidden_os:
3578
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3579

    
3580
    if self.op.blacklisted_os:
3581
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3582

    
3583
    if self.op.master_netdev:
3584
      master = self.cfg.GetMasterNode()
3585
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3586
                  self.cluster.master_netdev)
3587
      result = self.rpc.call_node_stop_master(master, False)
3588
      result.Raise("Could not disable the master ip")
3589
      feedback_fn("Changing master_netdev from %s to %s" %
3590
                  (self.cluster.master_netdev, self.op.master_netdev))
3591
      self.cluster.master_netdev = self.op.master_netdev
3592

    
3593
    self.cfg.Update(self.cluster, feedback_fn)
3594

    
3595
    if self.op.master_netdev:
3596
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3597
                  self.op.master_netdev)
3598
      result = self.rpc.call_node_start_master(master, False, False)
3599
      if result.fail_msg:
3600
        self.LogWarning("Could not re-enable the master ip on"
3601
                        " the master, please restart manually: %s",
3602
                        result.fail_msg)
3603

    
3604

    
3605
def _UploadHelper(lu, nodes, fname):
3606
  """Helper for uploading a file and showing warnings.
3607

3608
  """
3609
  if os.path.exists(fname):
3610
    result = lu.rpc.call_upload_file(nodes, fname)
3611
    for to_node, to_result in result.items():
3612
      msg = to_result.fail_msg
3613
      if msg:
3614
        msg = ("Copy of file %s to node %s failed: %s" %
3615
               (fname, to_node, msg))
3616
        lu.proc.LogWarning(msg)
3617

    
3618

    
3619
def _ComputeAncillaryFiles(cluster, redist):
3620
  """Compute files external to Ganeti which need to be consistent.
3621

3622
  @type redist: boolean
3623
  @param redist: Whether to include files which need to be redistributed
3624

3625
  """
3626
  # Compute files for all nodes
3627
  files_all = set([
3628
    constants.SSH_KNOWN_HOSTS_FILE,
3629
    constants.CONFD_HMAC_KEY,
3630
    constants.CLUSTER_DOMAIN_SECRET_FILE,
3631
    ])
3632

    
3633
  if not redist:
3634
    files_all.update(constants.ALL_CERT_FILES)
3635
    files_all.update(ssconf.SimpleStore().GetFileList())
3636

    
3637
  if cluster.modify_etc_hosts:
3638
    files_all.add(constants.ETC_HOSTS)
3639

    
3640
  # Files which must either exist on all nodes or on none
3641
  files_all_opt = set([
3642
    constants.RAPI_USERS_FILE,
3643
    ])
3644

    
3645
  # Files which should only be on master candidates
3646
  files_mc = set()
3647
  if not redist:
3648
    files_mc.add(constants.CLUSTER_CONF_FILE)
3649

    
3650
  # Files which should only be on VM-capable nodes
3651
  files_vm = set(filename
3652
    for hv_name in cluster.enabled_hypervisors
3653
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles())
3654

    
3655
  # Filenames must be unique
3656
  assert (len(files_all | files_all_opt | files_mc | files_vm) ==
3657
          sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
3658
         "Found file listed in more than one file list"
3659

    
3660
  return (files_all, files_all_opt, files_mc, files_vm)
3661

    
3662

    
3663
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3664
  """Distribute additional files which are part of the cluster configuration.
3665

3666
  ConfigWriter takes care of distributing the config and ssconf files, but
3667
  there are more files which should be distributed to all nodes. This function
3668
  makes sure those are copied.
3669

3670
  @param lu: calling logical unit
3671
  @param additional_nodes: list of nodes not in the config to distribute to
3672
  @type additional_vm: boolean
3673
  @param additional_vm: whether the additional nodes are vm-capable or not
3674

3675
  """
3676
  # Gather target nodes
3677
  cluster = lu.cfg.GetClusterInfo()
3678
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3679

    
3680
  online_nodes = lu.cfg.GetOnlineNodeList()
3681
  vm_nodes = lu.cfg.GetVmCapableNodeList()
3682

    
3683
  if additional_nodes is not None:
3684
    online_nodes.extend(additional_nodes)
3685
    if additional_vm:
3686
      vm_nodes.extend(additional_nodes)
3687

    
3688
  # Never distribute to master node
3689
  for nodelist in [online_nodes, vm_nodes]:
3690
    if master_info.name in nodelist:
3691
      nodelist.remove(master_info.name)
3692

    
3693
  # Gather file lists
3694
  (files_all, files_all_opt, files_mc, files_vm) = \
3695
    _ComputeAncillaryFiles(cluster, True)
3696

    
3697
  # Never re-distribute configuration file from here
3698
  assert not (constants.CLUSTER_CONF_FILE in files_all or
3699
              constants.CLUSTER_CONF_FILE in files_vm)
3700
  assert not files_mc, "Master candidates not handled in this function"
3701

    
3702
  filemap = [
3703
    (online_nodes, files_all),
3704
    (online_nodes, files_all_opt),
3705
    (vm_nodes, files_vm),
3706
    ]
3707

    
3708
  # Upload the files
3709
  for (node_list, files) in filemap:
3710
    for fname in files:
3711
      _UploadHelper(lu, node_list, fname)
3712

    
3713

    
3714
class LUClusterRedistConf(NoHooksLU):
3715
  """Force the redistribution of cluster configuration.
3716

3717
  This is a very simple LU.
3718

3719
  """
3720
  REQ_BGL = False
3721

    
3722
  def ExpandNames(self):
3723
    self.needed_locks = {
3724
      locking.LEVEL_NODE: locking.ALL_SET,
3725
    }
3726
    self.share_locks[locking.LEVEL_NODE] = 1
3727

    
3728
  def Exec(self, feedback_fn):
3729
    """Redistribute the configuration.
3730

3731
    """
3732
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3733
    _RedistributeAncillaryFiles(self)
3734

    
3735

    
3736
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3737
  """Sleep and poll for an instance's disk to sync.
3738

3739
  """
3740
  if not instance.disks or disks is not None and not disks:
3741
    return True
3742

    
3743
  disks = _ExpandCheckDisks(instance, disks)
3744

    
3745
  if not oneshot:
3746
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3747

    
3748
  node = instance.primary_node
3749

    
3750
  for dev in disks:
3751
    lu.cfg.SetDiskID(dev, node)
3752

    
3753
  # TODO: Convert to utils.Retry
3754

    
3755
  retries = 0
3756
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3757
  while True:
3758
    max_time = 0
3759
    done = True
3760
    cumul_degraded = False
3761
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3762
    msg = rstats.fail_msg
3763
    if msg:
3764
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3765
      retries += 1
3766
      if retries >= 10:
3767
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3768
                                 " aborting." % node)
3769
      time.sleep(6)
3770
      continue
3771
    rstats = rstats.payload
3772
    retries = 0
3773
    for i, mstat in enumerate(rstats):
3774
      if mstat is None:
3775
        lu.LogWarning("Can't compute data for node %s/%s",
3776
                           node, disks[i].iv_name)
3777
        continue
3778

    
3779
      cumul_degraded = (cumul_degraded or
3780
                        (mstat.is_degraded and mstat.sync_percent is None))
3781
      if mstat.sync_percent is not None:
3782
        done = False
3783
        if mstat.estimated_time is not None:
3784
          rem_time = ("%s remaining (estimated)" %
3785
                      utils.FormatSeconds(mstat.estimated_time))
3786
          max_time = mstat.estimated_time
3787
        else:
3788
          rem_time = "no time estimate"
3789
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
3790
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
3791

    
3792
    # if we're done but degraded, let's do a few small retries, to
3793
    # make sure we see a stable and not transient situation; therefore
3794
    # we force restart of the loop
3795
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3796
      logging.info("Degraded disks found, %d retries left", degr_retries)
3797
      degr_retries -= 1
3798
      time.sleep(1)
3799
      continue
3800

    
3801
    if done or oneshot:
3802
      break
3803

    
3804
    time.sleep(min(60, max_time))
3805

    
3806
  if done:
3807
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
3808
  return not cumul_degraded
3809

    
3810

    
3811
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3812
  """Check that mirrors are not degraded.
3813

3814
  The ldisk parameter, if True, will change the test from the
3815
  is_degraded attribute (which represents overall non-ok status for
3816
  the device(s)) to the ldisk (representing the local storage status).
3817

3818
  """
3819
  lu.cfg.SetDiskID(dev, node)
3820

    
3821
  result = True
3822

    
3823
  if on_primary or dev.AssembleOnSecondary():
3824
    rstats = lu.rpc.call_blockdev_find(node, dev)
3825
    msg = rstats.fail_msg
3826
    if msg:
3827
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
3828
      result = False
3829
    elif not rstats.payload:
3830
      lu.LogWarning("Can't find disk on node %s", node)
3831
      result = False
3832
    else:
3833
      if ldisk:
3834
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
3835
      else:
3836
        result = result and not rstats.payload.is_degraded
3837

    
3838
  if dev.children:
3839
    for child in dev.children:
3840
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
3841

    
3842
  return result
3843

    
3844

    
3845
class LUOobCommand(NoHooksLU):
3846
  """Logical unit for OOB handling.
3847

3848
  """
3849
  REG_BGL = False
3850
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
3851

    
3852
  def ExpandNames(self):
3853
    """Gather locks we need.
3854

3855
    """
3856
    if self.op.node_names:
3857
      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
3858
      lock_names = self.op.node_names
3859
    else:
3860
      lock_names = locking.ALL_SET
3861

    
3862
    self.needed_locks = {
3863
      locking.LEVEL_NODE: lock_names,
3864
      }
3865

    
3866
  def CheckPrereq(self):
3867
    """Check prerequisites.
3868

3869
    This checks:
3870
     - the node exists in the configuration
3871
     - OOB is supported
3872

3873
    Any errors are signaled by raising errors.OpPrereqError.
3874

3875
    """
3876
    self.nodes = []
3877
    self.master_node = self.cfg.GetMasterNode()
3878

    
3879
    assert self.op.power_delay >= 0.0
3880

    
3881
    if self.op.node_names:
3882
      if (self.op.command in self._SKIP_MASTER and
3883
          self.master_node in self.op.node_names):
3884
        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
3885
        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
3886

    
3887
        if master_oob_handler:
3888
          additional_text = ("run '%s %s %s' if you want to operate on the"
3889
                             " master regardless") % (master_oob_handler,
3890
                                                      self.op.command,
3891
                                                      self.master_node)
3892
        else:
3893
          additional_text = "it does not support out-of-band operations"
3894

    
3895
        raise errors.OpPrereqError(("Operating on the master node %s is not"
3896
                                    " allowed for %s; %s") %
3897
                                   (self.master_node, self.op.command,
3898
                                    additional_text), errors.ECODE_INVAL)
3899
    else:
3900
      self.op.node_names = self.cfg.GetNodeList()
3901
      if self.op.command in self._SKIP_MASTER:
3902
        self.op.node_names.remove(self.master_node)
3903

    
3904
    if self.op.command in self._SKIP_MASTER:
3905
      assert self.master_node not in self.op.node_names
3906

    
3907
    for (node_name, node) in self.cfg.GetMultiNodeInfo(self.op.node_names):
3908
      if node is None:
3909
        raise errors.OpPrereqError("Node %s not found" % node_name,
3910
                                   errors.ECODE_NOENT)
3911
      else:
3912
        self.nodes.append(node)
3913

    
3914
      if (not self.op.ignore_status and
3915
          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
3916
        raise errors.OpPrereqError(("Cannot power off node %s because it is"
3917
                                    " not marked offline") % node_name,
3918
                                   errors.ECODE_STATE)
3919

    
3920
  def Exec(self, feedback_fn):
3921
    """Execute OOB and return result if we expect any.
3922

3923
    """
3924
    master_node = self.master_node
3925
    ret = []
3926

    
3927
    for idx, node in enumerate(utils.NiceSort(self.nodes,
3928
                                              key=lambda node: node.name)):
3929
      node_entry = [(constants.RS_NORMAL, node.name)]
3930
      ret.append(node_entry)
3931

    
3932
      oob_program = _SupportsOob(self.cfg, node)
3933

    
3934
      if not oob_program:
3935
        node_entry.append((constants.RS_UNAVAIL, None))
3936
        continue
3937

    
3938
      logging.info("Executing out-of-band command '%s' using '%s' on %s",
3939
                   self.op.command, oob_program, node.name)
3940
      result = self.rpc.call_run_oob(master_node, oob_program,
3941
                                     self.op.command, node.name,
3942
                                     self.op.timeout)
3943

    
3944
      if result.fail_msg:
3945
        self.LogWarning("Out-of-band RPC failed on node '%s': %s",
3946
                        node.name, result.fail_msg)
3947
        node_entry.append((constants.RS_NODATA, None))
3948
      else:
3949
        try:
3950
          self._CheckPayload(result)
3951
        except errors.OpExecError, err:
3952
          self.LogWarning("Payload returned by node '%s' is not valid: %s",
3953
                          node.name, err)
3954
          node_entry.append((constants.RS_NODATA, None))
3955
        else:
3956
          if self.op.command == constants.OOB_HEALTH:
3957
            # For health we should log important events
3958
            for item, status in result.payload:
3959
              if status in [constants.OOB_STATUS_WARNING,
3960
                            constants.OOB_STATUS_CRITICAL]:
3961
                self.LogWarning("Item '%s' on node '%s' has status '%s'",
3962
                                item, node.name, status)
3963

    
3964
          if self.op.command == constants.OOB_POWER_ON:
3965
            node.powered = True
3966
          elif self.op.command == constants.OOB_POWER_OFF:
3967
            node.powered = False
3968
          elif self.op.command == constants.OOB_POWER_STATUS:
3969
            powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
3970
            if powered != node.powered:
3971
              logging.warning(("Recorded power state (%s) of node '%s' does not"
3972
                               " match actual power state (%s)"), node.powered,
3973
                              node.name, powered)
3974

    
3975
          # For configuration changing commands we should update the node
3976
          if self.op.command in (constants.OOB_POWER_ON,
3977
                                 constants.OOB_POWER_OFF):
3978
            self.cfg.Update(node, feedback_fn)
3979

    
3980
          node_entry.append((constants.RS_NORMAL, result.payload))
3981

    
3982
          if (self.op.command == constants.OOB_POWER_ON and
3983
              idx < len(self.nodes) - 1):
3984
            time.sleep(self.op.power_delay)
3985

    
3986
    return ret
3987

    
3988
  def _CheckPayload(self, result):
3989
    """Checks if the payload is valid.
3990

3991
    @param result: RPC result
3992
    @raises errors.OpExecError: If payload is not valid
3993

3994
    """
3995
    errs = []
3996
    if self.op.command == constants.OOB_HEALTH:
3997
      if not isinstance(result.payload, list):
3998
        errs.append("command 'health' is expected to return a list but got %s" %
3999
                    type(result.payload))
4000
      else:
4001
        for item, status in result.payload:
4002
          if status not in constants.OOB_STATUSES:
4003
            errs.append("health item '%s' has invalid status '%s'" %
4004
                        (item, status))
4005

    
4006
    if self.op.command == constants.OOB_POWER_STATUS:
4007
      if not isinstance(result.payload, dict):
4008
        errs.append("power-status is expected to return a dict but got %s" %
4009
                    type(result.payload))
4010

    
4011
    if self.op.command in [
4012
        constants.OOB_POWER_ON,
4013
        constants.OOB_POWER_OFF,
4014
        constants.OOB_POWER_CYCLE,
4015
        ]:
4016
      if result.payload is not None:
4017
        errs.append("%s is expected to not return payload but got '%s'" %
4018
                    (self.op.command, result.payload))
4019

    
4020
    if errs:
4021
      raise errors.OpExecError("Check of out-of-band payload failed due to %s" %
4022
                               utils.CommaJoin(errs))
4023

    
4024
class _OsQuery(_QueryBase):
4025
  FIELDS = query.OS_FIELDS
4026

    
4027
  def ExpandNames(self, lu):
4028
    # Lock all nodes in shared mode
4029
    # Temporary removal of locks, should be reverted later
4030
    # TODO: reintroduce locks when they are lighter-weight
4031
    lu.needed_locks = {}
4032
    #self.share_locks[locking.LEVEL_NODE] = 1
4033
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4034

    
4035
    # The following variables interact with _QueryBase._GetNames
4036
    if self.names:
4037
      self.wanted = self.names
4038
    else:
4039
      self.wanted = locking.ALL_SET
4040

    
4041
    self.do_locking = self.use_locking
4042

    
4043
  def DeclareLocks(self, lu, level):
4044
    pass
4045

    
4046
  @staticmethod
4047
  def _DiagnoseByOS(rlist):
4048
    """Remaps a per-node return list into an a per-os per-node dictionary
4049

4050
    @param rlist: a map with node names as keys and OS objects as values
4051

4052
    @rtype: dict
4053
    @return: a dictionary with osnames as keys and as value another
4054
        map, with nodes as keys and tuples of (path, status, diagnose,
4055
        variants, parameters, api_versions) as values, eg::
4056

4057
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
4058
                                     (/srv/..., False, "invalid api")],
4059
                           "node2": [(/srv/..., True, "", [], [])]}
4060
          }
4061

4062
    """
4063
    all_os = {}
4064
    # we build here the list of nodes that didn't fail the RPC (at RPC
4065
    # level), so that nodes with a non-responding node daemon don't
4066
    # make all OSes invalid
4067
    good_nodes = [node_name for node_name in rlist
4068
                  if not rlist[node_name].fail_msg]
4069
    for node_name, nr in rlist.items():
4070
      if nr.fail_msg or not nr.payload:
4071
        continue
4072
      for (name, path, status, diagnose, variants,
4073
           params, api_versions) in nr.payload:
4074
        if name not in all_os:
4075
          # build a list of nodes for this os containing empty lists
4076
          # for each node in node_list
4077
          all_os[name] = {}
4078
          for nname in good_nodes:
4079
            all_os[name][nname] = []
4080
        # convert params from [name, help] to (name, help)
4081
        params = [tuple(v) for v in params]
4082
        all_os[name][node_name].append((path, status, diagnose,
4083
                                        variants, params, api_versions))
4084
    return all_os
4085

    
4086
  def _GetQueryData(self, lu):
4087
    """Computes the list of nodes and their attributes.
4088