Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 91ae95fd

History | View | Annotate | Download (482.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay too many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42
import itertools
43
import operator
44

    
45
from ganeti import ssh
46
from ganeti import utils
47
from ganeti import errors
48
from ganeti import hypervisor
49
from ganeti import locking
50
from ganeti import constants
51
from ganeti import objects
52
from ganeti import serializer
53
from ganeti import ssconf
54
from ganeti import uidpool
55
from ganeti import compat
56
from ganeti import masterd
57
from ganeti import netutils
58
from ganeti import query
59
from ganeti import qlang
60
from ganeti import opcodes
61
from ganeti import ht
62
from ganeti import rpc
63

    
64
import ganeti.masterd.instance # pylint: disable=W0611
65

    
66

    
67
#: Size of DRBD meta block device
68
DRBD_META_SIZE = 128
69

    
70

    
71
class ResultWithJobs:
72
  """Data container for LU results with jobs.
73

74
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
75
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
76
  contained in the C{jobs} attribute and include the job IDs in the opcode
77
  result.
78

79
  """
80
  def __init__(self, jobs, **kwargs):
81
    """Initializes this class.
82

83
    Additional return values can be specified as keyword arguments.
84

85
    @type jobs: list of lists of L{opcode.OpCode}
86
    @param jobs: A list of lists of opcode objects
87

88
    """
89
    self.jobs = jobs
90
    self.other = kwargs
91

    
92

    
93
class LogicalUnit(object):
94
  """Logical Unit base class.
95

96
  Subclasses must follow these rules:
97
    - implement ExpandNames
98
    - implement CheckPrereq (except when tasklets are used)
99
    - implement Exec (except when tasklets are used)
100
    - implement BuildHooksEnv
101
    - implement BuildHooksNodes
102
    - redefine HPATH and HTYPE
103
    - optionally redefine their run requirements:
104
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
105

106
  Note that all commands require root permissions.
107

108
  @ivar dry_run_result: the value (if any) that will be returned to the caller
109
      in dry-run mode (signalled by opcode dry_run parameter)
110

111
  """
112
  HPATH = None
113
  HTYPE = None
114
  REQ_BGL = True
115

    
116
  def __init__(self, processor, op, context, rpc_runner):
117
    """Constructor for LogicalUnit.
118

119
    This needs to be overridden in derived classes in order to check op
120
    validity.
121

122
    """
123
    self.proc = processor
124
    self.op = op
125
    self.cfg = context.cfg
126
    self.glm = context.glm
127
    # readability alias
128
    self.owned_locks = context.glm.list_owned
129
    self.context = context
130
    self.rpc = rpc_runner
131
    # Dicts used to declare locking needs to mcpu
132
    self.needed_locks = None
133
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
134
    self.add_locks = {}
135
    self.remove_locks = {}
136
    # Used to force good behavior when calling helper functions
137
    self.recalculate_locks = {}
138
    # logging
139
    self.Log = processor.Log # pylint: disable=C0103
140
    self.LogWarning = processor.LogWarning # pylint: disable=C0103
141
    self.LogInfo = processor.LogInfo # pylint: disable=C0103
142
    self.LogStep = processor.LogStep # pylint: disable=C0103
143
    # support for dry-run
144
    self.dry_run_result = None
145
    # support for generic debug attribute
146
    if (not hasattr(self.op, "debug_level") or
147
        not isinstance(self.op.debug_level, int)):
148
      self.op.debug_level = 0
149

    
150
    # Tasklets
151
    self.tasklets = None
152

    
153
    # Validate opcode parameters and set defaults
154
    self.op.Validate(True)
155

    
156
    self.CheckArguments()
157

    
158
  def CheckArguments(self):
159
    """Check syntactic validity for the opcode arguments.
160

161
    This method is for doing a simple syntactic check and ensure
162
    validity of opcode parameters, without any cluster-related
163
    checks. While the same can be accomplished in ExpandNames and/or
164
    CheckPrereq, doing these separate is better because:
165

166
      - ExpandNames is left as as purely a lock-related function
167
      - CheckPrereq is run after we have acquired locks (and possible
168
        waited for them)
169

170
    The function is allowed to change the self.op attribute so that
171
    later methods can no longer worry about missing parameters.
172

173
    """
174
    pass
175

    
176
  def ExpandNames(self):
177
    """Expand names for this LU.
178

179
    This method is called before starting to execute the opcode, and it should
180
    update all the parameters of the opcode to their canonical form (e.g. a
181
    short node name must be fully expanded after this method has successfully
182
    completed). This way locking, hooks, logging, etc. can work correctly.
183

184
    LUs which implement this method must also populate the self.needed_locks
185
    member, as a dict with lock levels as keys, and a list of needed lock names
186
    as values. Rules:
187

188
      - use an empty dict if you don't need any lock
189
      - if you don't need any lock at a particular level omit that level
190
      - don't put anything for the BGL level
191
      - if you want all locks at a level use locking.ALL_SET as a value
192

193
    If you need to share locks (rather than acquire them exclusively) at one
194
    level you can modify self.share_locks, setting a true value (usually 1) for
195
    that level. By default locks are not shared.
196

197
    This function can also define a list of tasklets, which then will be
198
    executed in order instead of the usual LU-level CheckPrereq and Exec
199
    functions, if those are not defined by the LU.
200

201
    Examples::
202

203
      # Acquire all nodes and one instance
204
      self.needed_locks = {
205
        locking.LEVEL_NODE: locking.ALL_SET,
206
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
207
      }
208
      # Acquire just two nodes
209
      self.needed_locks = {
210
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
211
      }
212
      # Acquire no locks
213
      self.needed_locks = {} # No, you can't leave it to the default value None
214

215
    """
216
    # The implementation of this method is mandatory only if the new LU is
217
    # concurrent, so that old LUs don't need to be changed all at the same
218
    # time.
219
    if self.REQ_BGL:
220
      self.needed_locks = {} # Exclusive LUs don't need locks.
221
    else:
222
      raise NotImplementedError
223

    
224
  def DeclareLocks(self, level):
225
    """Declare LU locking needs for a level
226

227
    While most LUs can just declare their locking needs at ExpandNames time,
228
    sometimes there's the need to calculate some locks after having acquired
229
    the ones before. This function is called just before acquiring locks at a
230
    particular level, but after acquiring the ones at lower levels, and permits
231
    such calculations. It can be used to modify self.needed_locks, and by
232
    default it does nothing.
233

234
    This function is only called if you have something already set in
235
    self.needed_locks for the level.
236

237
    @param level: Locking level which is going to be locked
238
    @type level: member of ganeti.locking.LEVELS
239

240
    """
241

    
242
  def CheckPrereq(self):
243
    """Check prerequisites for this LU.
244

245
    This method should check that the prerequisites for the execution
246
    of this LU are fulfilled. It can do internode communication, but
247
    it should be idempotent - no cluster or system changes are
248
    allowed.
249

250
    The method should raise errors.OpPrereqError in case something is
251
    not fulfilled. Its return value is ignored.
252

253
    This method should also update all the parameters of the opcode to
254
    their canonical form if it hasn't been done by ExpandNames before.
255

256
    """
257
    if self.tasklets is not None:
258
      for (idx, tl) in enumerate(self.tasklets):
259
        logging.debug("Checking prerequisites for tasklet %s/%s",
260
                      idx + 1, len(self.tasklets))
261
        tl.CheckPrereq()
262
    else:
263
      pass
264

    
265
  def Exec(self, feedback_fn):
266
    """Execute the LU.
267

268
    This method should implement the actual work. It should raise
269
    errors.OpExecError for failures that are somewhat dealt with in
270
    code, or expected.
271

272
    """
273
    if self.tasklets is not None:
274
      for (idx, tl) in enumerate(self.tasklets):
275
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
276
        tl.Exec(feedback_fn)
277
    else:
278
      raise NotImplementedError
279

    
280
  def BuildHooksEnv(self):
281
    """Build hooks environment for this LU.
282

283
    @rtype: dict
284
    @return: Dictionary containing the environment that will be used for
285
      running the hooks for this LU. The keys of the dict must not be prefixed
286
      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
287
      will extend the environment with additional variables. If no environment
288
      should be defined, an empty dictionary should be returned (not C{None}).
289
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
290
      will not be called.
291

292
    """
293
    raise NotImplementedError
294

    
295
  def BuildHooksNodes(self):
296
    """Build list of nodes to run LU's hooks.
297

298
    @rtype: tuple; (list, list)
299
    @return: Tuple containing a list of node names on which the hook
300
      should run before the execution and a list of node names on which the
301
      hook should run after the execution. No nodes should be returned as an
302
      empty list (and not None).
303
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
304
      will not be called.
305

306
    """
307
    raise NotImplementedError
308

    
309
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
310
    """Notify the LU about the results of its hooks.
311

312
    This method is called every time a hooks phase is executed, and notifies
313
    the Logical Unit about the hooks' result. The LU can then use it to alter
314
    its result based on the hooks.  By default the method does nothing and the
315
    previous result is passed back unchanged but any LU can define it if it
316
    wants to use the local cluster hook-scripts somehow.
317

318
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
319
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
320
    @param hook_results: the results of the multi-node hooks rpc call
321
    @param feedback_fn: function used send feedback back to the caller
322
    @param lu_result: the previous Exec result this LU had, or None
323
        in the PRE phase
324
    @return: the new Exec result, based on the previous result
325
        and hook results
326

327
    """
328
    # API must be kept, thus we ignore the unused argument and could
329
    # be a function warnings
330
    # pylint: disable=W0613,R0201
331
    return lu_result
332

    
333
  def _ExpandAndLockInstance(self):
334
    """Helper function to expand and lock an instance.
335

336
    Many LUs that work on an instance take its name in self.op.instance_name
337
    and need to expand it and then declare the expanded name for locking. This
338
    function does it, and then updates self.op.instance_name to the expanded
339
    name. It also initializes needed_locks as a dict, if this hasn't been done
340
    before.
341

342
    """
343
    if self.needed_locks is None:
344
      self.needed_locks = {}
345
    else:
346
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
347
        "_ExpandAndLockInstance called with instance-level locks set"
348
    self.op.instance_name = _ExpandInstanceName(self.cfg,
349
                                                self.op.instance_name)
350
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
351

    
352
  def _LockInstancesNodes(self, primary_only=False):
353
    """Helper function to declare instances' nodes for locking.
354

355
    This function should be called after locking one or more instances to lock
356
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
357
    with all primary or secondary nodes for instances already locked and
358
    present in self.needed_locks[locking.LEVEL_INSTANCE].
359

360
    It should be called from DeclareLocks, and for safety only works if
361
    self.recalculate_locks[locking.LEVEL_NODE] is set.
362

363
    In the future it may grow parameters to just lock some instance's nodes, or
364
    to just lock primaries or secondary nodes, if needed.
365

366
    If should be called in DeclareLocks in a way similar to::
367

368
      if level == locking.LEVEL_NODE:
369
        self._LockInstancesNodes()
370

371
    @type primary_only: boolean
372
    @param primary_only: only lock primary nodes of locked instances
373

374
    """
375
    assert locking.LEVEL_NODE in self.recalculate_locks, \
376
      "_LockInstancesNodes helper function called with no nodes to recalculate"
377

    
378
    # TODO: check if we're really been called with the instance locks held
379

    
380
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
381
    # future we might want to have different behaviors depending on the value
382
    # of self.recalculate_locks[locking.LEVEL_NODE]
383
    wanted_nodes = []
384
    locked_i = self.owned_locks(locking.LEVEL_INSTANCE)
385
    for _, instance in self.cfg.GetMultiInstanceInfo(locked_i):
386
      wanted_nodes.append(instance.primary_node)
387
      if not primary_only:
388
        wanted_nodes.extend(instance.secondary_nodes)
389

    
390
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
391
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
392
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
393
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
394

    
395
    del self.recalculate_locks[locking.LEVEL_NODE]
396

    
397

    
398
class NoHooksLU(LogicalUnit): # pylint: disable=W0223
399
  """Simple LU which runs no hooks.
400

401
  This LU is intended as a parent for other LogicalUnits which will
402
  run no hooks, in order to reduce duplicate code.
403

404
  """
405
  HPATH = None
406
  HTYPE = None
407

    
408
  def BuildHooksEnv(self):
409
    """Empty BuildHooksEnv for NoHooksLu.
410

411
    This just raises an error.
412

413
    """
414
    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
415

    
416
  def BuildHooksNodes(self):
417
    """Empty BuildHooksNodes for NoHooksLU.
418

419
    """
420
    raise AssertionError("BuildHooksNodes called for NoHooksLU")
421

    
422

    
423
class Tasklet:
424
  """Tasklet base class.
425

426
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
427
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
428
  tasklets know nothing about locks.
429

430
  Subclasses must follow these rules:
431
    - Implement CheckPrereq
432
    - Implement Exec
433

434
  """
435
  def __init__(self, lu):
436
    self.lu = lu
437

    
438
    # Shortcuts
439
    self.cfg = lu.cfg
440
    self.rpc = lu.rpc
441

    
442
  def CheckPrereq(self):
443
    """Check prerequisites for this tasklets.
444

445
    This method should check whether the prerequisites for the execution of
446
    this tasklet are fulfilled. It can do internode communication, but it
447
    should be idempotent - no cluster or system changes are allowed.
448

449
    The method should raise errors.OpPrereqError in case something is not
450
    fulfilled. Its return value is ignored.
451

452
    This method should also update all parameters to their canonical form if it
453
    hasn't been done before.
454

455
    """
456
    pass
457

    
458
  def Exec(self, feedback_fn):
459
    """Execute the tasklet.
460

461
    This method should implement the actual work. It should raise
462
    errors.OpExecError for failures that are somewhat dealt with in code, or
463
    expected.
464

465
    """
466
    raise NotImplementedError
467

    
468

    
469
class _QueryBase:
470
  """Base for query utility classes.
471

472
  """
473
  #: Attribute holding field definitions
474
  FIELDS = None
475

    
476
  def __init__(self, qfilter, fields, use_locking):
477
    """Initializes this class.
478

479
    """
480
    self.use_locking = use_locking
481

    
482
    self.query = query.Query(self.FIELDS, fields, qfilter=qfilter,
483
                             namefield="name")
484
    self.requested_data = self.query.RequestedData()
485
    self.names = self.query.RequestedNames()
486

    
487
    # Sort only if no names were requested
488
    self.sort_by_name = not self.names
489

    
490
    self.do_locking = None
491
    self.wanted = None
492

    
493
  def _GetNames(self, lu, all_names, lock_level):
494
    """Helper function to determine names asked for in the query.
495

496
    """
497
    if self.do_locking:
498
      names = lu.owned_locks(lock_level)
499
    else:
500
      names = all_names
501

    
502
    if self.wanted == locking.ALL_SET:
503
      assert not self.names
504
      # caller didn't specify names, so ordering is not important
505
      return utils.NiceSort(names)
506

    
507
    # caller specified names and we must keep the same order
508
    assert self.names
509
    assert not self.do_locking or lu.glm.is_owned(lock_level)
510

    
511
    missing = set(self.wanted).difference(names)
512
    if missing:
513
      raise errors.OpExecError("Some items were removed before retrieving"
514
                               " their data: %s" % missing)
515

    
516
    # Return expanded names
517
    return self.wanted
518

    
519
  def ExpandNames(self, lu):
520
    """Expand names for this query.
521

522
    See L{LogicalUnit.ExpandNames}.
523

524
    """
525
    raise NotImplementedError()
526

    
527
  def DeclareLocks(self, lu, level):
528
    """Declare locks for this query.
529

530
    See L{LogicalUnit.DeclareLocks}.
531

532
    """
533
    raise NotImplementedError()
534

    
535
  def _GetQueryData(self, lu):
536
    """Collects all data for this query.
537

538
    @return: Query data object
539

540
    """
541
    raise NotImplementedError()
542

    
543
  def NewStyleQuery(self, lu):
544
    """Collect data and execute query.
545

546
    """
547
    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
548
                                  sort_by_name=self.sort_by_name)
549

    
550
  def OldStyleQuery(self, lu):
551
    """Collect data and execute query.
552

553
    """
554
    return self.query.OldStyleQuery(self._GetQueryData(lu),
555
                                    sort_by_name=self.sort_by_name)
556

    
557

    
558
def _ShareAll():
559
  """Returns a dict declaring all lock levels shared.
560

561
  """
562
  return dict.fromkeys(locking.LEVELS, 1)
563

    
564

    
565
def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
566
  """Checks if the owned node groups are still correct for an instance.
567

568
  @type cfg: L{config.ConfigWriter}
569
  @param cfg: The cluster configuration
570
  @type instance_name: string
571
  @param instance_name: Instance name
572
  @type owned_groups: set or frozenset
573
  @param owned_groups: List of currently owned node groups
574

575
  """
576
  inst_groups = cfg.GetInstanceNodeGroups(instance_name)
577

    
578
  if not owned_groups.issuperset(inst_groups):
579
    raise errors.OpPrereqError("Instance %s's node groups changed since"
580
                               " locks were acquired, current groups are"
581
                               " are '%s', owning groups '%s'; retry the"
582
                               " operation" %
583
                               (instance_name,
584
                                utils.CommaJoin(inst_groups),
585
                                utils.CommaJoin(owned_groups)),
586
                               errors.ECODE_STATE)
587

    
588
  return inst_groups
589

    
590

    
591
def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
592
  """Checks if the instances in a node group are still correct.
593

594
  @type cfg: L{config.ConfigWriter}
595
  @param cfg: The cluster configuration
596
  @type group_uuid: string
597
  @param group_uuid: Node group UUID
598
  @type owned_instances: set or frozenset
599
  @param owned_instances: List of currently owned instances
600

601
  """
602
  wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
603
  if owned_instances != wanted_instances:
604
    raise errors.OpPrereqError("Instances in node group '%s' changed since"
605
                               " locks were acquired, wanted '%s', have '%s';"
606
                               " retry the operation" %
607
                               (group_uuid,
608
                                utils.CommaJoin(wanted_instances),
609
                                utils.CommaJoin(owned_instances)),
610
                               errors.ECODE_STATE)
611

    
612
  return wanted_instances
613

    
614

    
615
def _SupportsOob(cfg, node):
616
  """Tells if node supports OOB.
617

618
  @type cfg: L{config.ConfigWriter}
619
  @param cfg: The cluster configuration
620
  @type node: L{objects.Node}
621
  @param node: The node
622
  @return: The OOB script if supported or an empty string otherwise
623

624
  """
625
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
626

    
627

    
628
def _GetWantedNodes(lu, nodes):
629
  """Returns list of checked and expanded node names.
630

631
  @type lu: L{LogicalUnit}
632
  @param lu: the logical unit on whose behalf we execute
633
  @type nodes: list
634
  @param nodes: list of node names or None for all nodes
635
  @rtype: list
636
  @return: the list of nodes, sorted
637
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
638

639
  """
640
  if nodes:
641
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
642

    
643
  return utils.NiceSort(lu.cfg.GetNodeList())
644

    
645

    
646
def _GetWantedInstances(lu, instances):
647
  """Returns list of checked and expanded instance names.
648

649
  @type lu: L{LogicalUnit}
650
  @param lu: the logical unit on whose behalf we execute
651
  @type instances: list
652
  @param instances: list of instance names or None for all instances
653
  @rtype: list
654
  @return: the list of instances, sorted
655
  @raise errors.OpPrereqError: if the instances parameter is wrong type
656
  @raise errors.OpPrereqError: if any of the passed instances is not found
657

658
  """
659
  if instances:
660
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
661
  else:
662
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
663
  return wanted
664

    
665

    
666
def _GetUpdatedParams(old_params, update_dict,
667
                      use_default=True, use_none=False):
668
  """Return the new version of a parameter dictionary.
669

670
  @type old_params: dict
671
  @param old_params: old parameters
672
  @type update_dict: dict
673
  @param update_dict: dict containing new parameter values, or
674
      constants.VALUE_DEFAULT to reset the parameter to its default
675
      value
676
  @param use_default: boolean
677
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
678
      values as 'to be deleted' values
679
  @param use_none: boolean
680
  @type use_none: whether to recognise C{None} values as 'to be
681
      deleted' values
682
  @rtype: dict
683
  @return: the new parameter dictionary
684

685
  """
686
  params_copy = copy.deepcopy(old_params)
687
  for key, val in update_dict.iteritems():
688
    if ((use_default and val == constants.VALUE_DEFAULT) or
689
        (use_none and val is None)):
690
      try:
691
        del params_copy[key]
692
      except KeyError:
693
        pass
694
    else:
695
      params_copy[key] = val
696
  return params_copy
697

    
698

    
699
def _ReleaseLocks(lu, level, names=None, keep=None):
700
  """Releases locks owned by an LU.
701

702
  @type lu: L{LogicalUnit}
703
  @param level: Lock level
704
  @type names: list or None
705
  @param names: Names of locks to release
706
  @type keep: list or None
707
  @param keep: Names of locks to retain
708

709
  """
710
  assert not (keep is not None and names is not None), \
711
         "Only one of the 'names' and the 'keep' parameters can be given"
712

    
713
  if names is not None:
714
    should_release = names.__contains__
715
  elif keep:
716
    should_release = lambda name: name not in keep
717
  else:
718
    should_release = None
719

    
720
  if should_release:
721
    retain = []
722
    release = []
723

    
724
    # Determine which locks to release
725
    for name in lu.owned_locks(level):
726
      if should_release(name):
727
        release.append(name)
728
      else:
729
        retain.append(name)
730

    
731
    assert len(lu.owned_locks(level)) == (len(retain) + len(release))
732

    
733
    # Release just some locks
734
    lu.glm.release(level, names=release)
735

    
736
    assert frozenset(lu.owned_locks(level)) == frozenset(retain)
737
  else:
738
    # Release everything
739
    lu.glm.release(level)
740

    
741
    assert not lu.glm.is_owned(level), "No locks should be owned"
742

    
743

    
744
def _MapInstanceDisksToNodes(instances):
745
  """Creates a map from (node, volume) to instance name.
746

747
  @type instances: list of L{objects.Instance}
748
  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
749

750
  """
751
  return dict(((node, vol), inst.name)
752
              for inst in instances
753
              for (node, vols) in inst.MapLVsByNode().items()
754
              for vol in vols)
755

    
756

    
757
def _RunPostHook(lu, node_name):
758
  """Runs the post-hook for an opcode on a single node.
759

760
  """
761
  hm = lu.proc.BuildHooksManager(lu)
762
  try:
763
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
764
  except:
765
    # pylint: disable=W0702
766
    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
767

    
768

    
769
def _CheckOutputFields(static, dynamic, selected):
770
  """Checks whether all selected fields are valid.
771

772
  @type static: L{utils.FieldSet}
773
  @param static: static fields set
774
  @type dynamic: L{utils.FieldSet}
775
  @param dynamic: dynamic fields set
776

777
  """
778
  f = utils.FieldSet()
779
  f.Extend(static)
780
  f.Extend(dynamic)
781

    
782
  delta = f.NonMatching(selected)
783
  if delta:
784
    raise errors.OpPrereqError("Unknown output fields selected: %s"
785
                               % ",".join(delta), errors.ECODE_INVAL)
786

    
787

    
788
def _CheckGlobalHvParams(params):
789
  """Validates that given hypervisor params are not global ones.
790

791
  This will ensure that instances don't get customised versions of
792
  global params.
793

794
  """
795
  used_globals = constants.HVC_GLOBALS.intersection(params)
796
  if used_globals:
797
    msg = ("The following hypervisor parameters are global and cannot"
798
           " be customized at instance level, please modify them at"
799
           " cluster level: %s" % utils.CommaJoin(used_globals))
800
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
801

    
802

    
803
def _CheckNodeOnline(lu, node, msg=None):
804
  """Ensure that a given node is online.
805

806
  @param lu: the LU on behalf of which we make the check
807
  @param node: the node to check
808
  @param msg: if passed, should be a message to replace the default one
809
  @raise errors.OpPrereqError: if the node is offline
810

811
  """
812
  if msg is None:
813
    msg = "Can't use offline node"
814
  if lu.cfg.GetNodeInfo(node).offline:
815
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
816

    
817

    
818
def _CheckNodeNotDrained(lu, node):
819
  """Ensure that a given node is not drained.
820

821
  @param lu: the LU on behalf of which we make the check
822
  @param node: the node to check
823
  @raise errors.OpPrereqError: if the node is drained
824

825
  """
826
  if lu.cfg.GetNodeInfo(node).drained:
827
    raise errors.OpPrereqError("Can't use drained node %s" % node,
828
                               errors.ECODE_STATE)
829

    
830

    
831
def _CheckNodeVmCapable(lu, node):
832
  """Ensure that a given node is vm capable.
833

834
  @param lu: the LU on behalf of which we make the check
835
  @param node: the node to check
836
  @raise errors.OpPrereqError: if the node is not vm capable
837

838
  """
839
  if not lu.cfg.GetNodeInfo(node).vm_capable:
840
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
841
                               errors.ECODE_STATE)
842

    
843

    
844
def _CheckNodeHasOS(lu, node, os_name, force_variant):
845
  """Ensure that a node supports a given OS.
846

847
  @param lu: the LU on behalf of which we make the check
848
  @param node: the node to check
849
  @param os_name: the OS to query about
850
  @param force_variant: whether to ignore variant errors
851
  @raise errors.OpPrereqError: if the node is not supporting the OS
852

853
  """
854
  result = lu.rpc.call_os_get(node, os_name)
855
  result.Raise("OS '%s' not in supported OS list for node %s" %
856
               (os_name, node),
857
               prereq=True, ecode=errors.ECODE_INVAL)
858
  if not force_variant:
859
    _CheckOSVariant(result.payload, os_name)
860

    
861

    
862
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
863
  """Ensure that a node has the given secondary ip.
864

865
  @type lu: L{LogicalUnit}
866
  @param lu: the LU on behalf of which we make the check
867
  @type node: string
868
  @param node: the node to check
869
  @type secondary_ip: string
870
  @param secondary_ip: the ip to check
871
  @type prereq: boolean
872
  @param prereq: whether to throw a prerequisite or an execute error
873
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
874
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
875

876
  """
877
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
878
  result.Raise("Failure checking secondary ip on node %s" % node,
879
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
880
  if not result.payload:
881
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
882
           " please fix and re-run this command" % secondary_ip)
883
    if prereq:
884
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
885
    else:
886
      raise errors.OpExecError(msg)
887

    
888

    
889
def _GetClusterDomainSecret():
890
  """Reads the cluster domain secret.
891

892
  """
893
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
894
                               strict=True)
895

    
896

    
897
def _CheckInstanceDown(lu, instance, reason):
898
  """Ensure that an instance is not running."""
899
  if instance.admin_up:
900
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
901
                               (instance.name, reason), errors.ECODE_STATE)
902

    
903
  pnode = instance.primary_node
904
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
905
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
906
              prereq=True, ecode=errors.ECODE_ENVIRON)
907

    
908
  if instance.name in ins_l.payload:
909
    raise errors.OpPrereqError("Instance %s is running, %s" %
910
                               (instance.name, reason), errors.ECODE_STATE)
911

    
912

    
913
def _ExpandItemName(fn, name, kind):
914
  """Expand an item name.
915

916
  @param fn: the function to use for expansion
917
  @param name: requested item name
918
  @param kind: text description ('Node' or 'Instance')
919
  @return: the resolved (full) name
920
  @raise errors.OpPrereqError: if the item is not found
921

922
  """
923
  full_name = fn(name)
924
  if full_name is None:
925
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
926
                               errors.ECODE_NOENT)
927
  return full_name
928

    
929

    
930
def _ExpandNodeName(cfg, name):
931
  """Wrapper over L{_ExpandItemName} for nodes."""
932
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
933

    
934

    
935
def _ExpandInstanceName(cfg, name):
936
  """Wrapper over L{_ExpandItemName} for instance."""
937
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
938

    
939

    
940
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
941
                          memory, vcpus, nics, disk_template, disks,
942
                          bep, hvp, hypervisor_name, tags):
943
  """Builds instance related env variables for hooks
944

945
  This builds the hook environment from individual variables.
946

947
  @type name: string
948
  @param name: the name of the instance
949
  @type primary_node: string
950
  @param primary_node: the name of the instance's primary node
951
  @type secondary_nodes: list
952
  @param secondary_nodes: list of secondary nodes as strings
953
  @type os_type: string
954
  @param os_type: the name of the instance's OS
955
  @type status: boolean
956
  @param status: the should_run status of the instance
957
  @type memory: string
958
  @param memory: the memory size of the instance
959
  @type vcpus: string
960
  @param vcpus: the count of VCPUs the instance has
961
  @type nics: list
962
  @param nics: list of tuples (ip, mac, mode, link) representing
963
      the NICs the instance has
964
  @type disk_template: string
965
  @param disk_template: the disk template of the instance
966
  @type disks: list
967
  @param disks: the list of (size, mode) pairs
968
  @type bep: dict
969
  @param bep: the backend parameters for the instance
970
  @type hvp: dict
971
  @param hvp: the hypervisor parameters for the instance
972
  @type hypervisor_name: string
973
  @param hypervisor_name: the hypervisor for the instance
974
  @type tags: list
975
  @param tags: list of instance tags as strings
976
  @rtype: dict
977
  @return: the hook environment for this instance
978

979
  """
980
  if status:
981
    str_status = "up"
982
  else:
983
    str_status = "down"
984
  env = {
985
    "OP_TARGET": name,
986
    "INSTANCE_NAME": name,
987
    "INSTANCE_PRIMARY": primary_node,
988
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
989
    "INSTANCE_OS_TYPE": os_type,
990
    "INSTANCE_STATUS": str_status,
991
    "INSTANCE_MEMORY": memory,
992
    "INSTANCE_VCPUS": vcpus,
993
    "INSTANCE_DISK_TEMPLATE": disk_template,
994
    "INSTANCE_HYPERVISOR": hypervisor_name,
995
  }
996

    
997
  if nics:
998
    nic_count = len(nics)
999
    for idx, (ip, mac, mode, link) in enumerate(nics):
1000
      if ip is None:
1001
        ip = ""
1002
      env["INSTANCE_NIC%d_IP" % idx] = ip
1003
      env["INSTANCE_NIC%d_MAC" % idx] = mac
1004
      env["INSTANCE_NIC%d_MODE" % idx] = mode
1005
      env["INSTANCE_NIC%d_LINK" % idx] = link
1006
      if mode == constants.NIC_MODE_BRIDGED:
1007
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
1008
  else:
1009
    nic_count = 0
1010

    
1011
  env["INSTANCE_NIC_COUNT"] = nic_count
1012

    
1013
  if disks:
1014
    disk_count = len(disks)
1015
    for idx, (size, mode) in enumerate(disks):
1016
      env["INSTANCE_DISK%d_SIZE" % idx] = size
1017
      env["INSTANCE_DISK%d_MODE" % idx] = mode
1018
  else:
1019
    disk_count = 0
1020

    
1021
  env["INSTANCE_DISK_COUNT"] = disk_count
1022

    
1023
  if not tags:
1024
    tags = []
1025

    
1026
  env["INSTANCE_TAGS"] = " ".join(tags)
1027

    
1028
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
1029
    for key, value in source.items():
1030
      env["INSTANCE_%s_%s" % (kind, key)] = value
1031

    
1032
  return env
1033

    
1034

    
1035
def _NICListToTuple(lu, nics):
1036
  """Build a list of nic information tuples.
1037

1038
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
1039
  value in LUInstanceQueryData.
1040

1041
  @type lu:  L{LogicalUnit}
1042
  @param lu: the logical unit on whose behalf we execute
1043
  @type nics: list of L{objects.NIC}
1044
  @param nics: list of nics to convert to hooks tuples
1045

1046
  """
1047
  hooks_nics = []
1048
  cluster = lu.cfg.GetClusterInfo()
1049
  for nic in nics:
1050
    ip = nic.ip
1051
    mac = nic.mac
1052
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
1053
    mode = filled_params[constants.NIC_MODE]
1054
    link = filled_params[constants.NIC_LINK]
1055
    hooks_nics.append((ip, mac, mode, link))
1056
  return hooks_nics
1057

    
1058

    
1059
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
1060
  """Builds instance related env variables for hooks from an object.
1061

1062
  @type lu: L{LogicalUnit}
1063
  @param lu: the logical unit on whose behalf we execute
1064
  @type instance: L{objects.Instance}
1065
  @param instance: the instance for which we should build the
1066
      environment
1067
  @type override: dict
1068
  @param override: dictionary with key/values that will override
1069
      our values
1070
  @rtype: dict
1071
  @return: the hook environment dictionary
1072

1073
  """
1074
  cluster = lu.cfg.GetClusterInfo()
1075
  bep = cluster.FillBE(instance)
1076
  hvp = cluster.FillHV(instance)
1077
  args = {
1078
    "name": instance.name,
1079
    "primary_node": instance.primary_node,
1080
    "secondary_nodes": instance.secondary_nodes,
1081
    "os_type": instance.os,
1082
    "status": instance.admin_up,
1083
    "memory": bep[constants.BE_MEMORY],
1084
    "vcpus": bep[constants.BE_VCPUS],
1085
    "nics": _NICListToTuple(lu, instance.nics),
1086
    "disk_template": instance.disk_template,
1087
    "disks": [(disk.size, disk.mode) for disk in instance.disks],
1088
    "bep": bep,
1089
    "hvp": hvp,
1090
    "hypervisor_name": instance.hypervisor,
1091
    "tags": instance.tags,
1092
  }
1093
  if override:
1094
    args.update(override)
1095
  return _BuildInstanceHookEnv(**args) # pylint: disable=W0142
1096

    
1097

    
1098
def _AdjustCandidatePool(lu, exceptions):
1099
  """Adjust the candidate pool after node operations.
1100

1101
  """
1102
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1103
  if mod_list:
1104
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1105
               utils.CommaJoin(node.name for node in mod_list))
1106
    for name in mod_list:
1107
      lu.context.ReaddNode(name)
1108
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1109
  if mc_now > mc_max:
1110
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1111
               (mc_now, mc_max))
1112

    
1113

    
1114
def _DecideSelfPromotion(lu, exceptions=None):
1115
  """Decide whether I should promote myself as a master candidate.
1116

1117
  """
1118
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1119
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1120
  # the new node will increase mc_max with one, so:
1121
  mc_should = min(mc_should + 1, cp_size)
1122
  return mc_now < mc_should
1123

    
1124

    
1125
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1126
  """Check that the brigdes needed by a list of nics exist.
1127

1128
  """
1129
  cluster = lu.cfg.GetClusterInfo()
1130
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1131
  brlist = [params[constants.NIC_LINK] for params in paramslist
1132
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1133
  if brlist:
1134
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1135
    result.Raise("Error checking bridges on destination node '%s'" %
1136
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1137

    
1138

    
1139
def _CheckInstanceBridgesExist(lu, instance, node=None):
1140
  """Check that the brigdes needed by an instance exist.
1141

1142
  """
1143
  if node is None:
1144
    node = instance.primary_node
1145
  _CheckNicsBridgesExist(lu, instance.nics, node)
1146

    
1147

    
1148
def _CheckOSVariant(os_obj, name):
1149
  """Check whether an OS name conforms to the os variants specification.
1150

1151
  @type os_obj: L{objects.OS}
1152
  @param os_obj: OS object to check
1153
  @type name: string
1154
  @param name: OS name passed by the user, to check for validity
1155

1156
  """
1157
  variant = objects.OS.GetVariant(name)
1158
  if not os_obj.supported_variants:
1159
    if variant:
1160
      raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
1161
                                 " passed)" % (os_obj.name, variant),
1162
                                 errors.ECODE_INVAL)
1163
    return
1164
  if not variant:
1165
    raise errors.OpPrereqError("OS name must include a variant",
1166
                               errors.ECODE_INVAL)
1167

    
1168
  if variant not in os_obj.supported_variants:
1169
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1170

    
1171

    
1172
def _GetNodeInstancesInner(cfg, fn):
1173
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1174

    
1175

    
1176
def _GetNodeInstances(cfg, node_name):
1177
  """Returns a list of all primary and secondary instances on a node.
1178

1179
  """
1180

    
1181
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1182

    
1183

    
1184
def _GetNodePrimaryInstances(cfg, node_name):
1185
  """Returns primary instances on a node.
1186

1187
  """
1188
  return _GetNodeInstancesInner(cfg,
1189
                                lambda inst: node_name == inst.primary_node)
1190

    
1191

    
1192
def _GetNodeSecondaryInstances(cfg, node_name):
1193
  """Returns secondary instances on a node.
1194

1195
  """
1196
  return _GetNodeInstancesInner(cfg,
1197
                                lambda inst: node_name in inst.secondary_nodes)
1198

    
1199

    
1200
def _GetStorageTypeArgs(cfg, storage_type):
1201
  """Returns the arguments for a storage type.
1202

1203
  """
1204
  # Special case for file storage
1205
  if storage_type == constants.ST_FILE:
1206
    # storage.FileStorage wants a list of storage directories
1207
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1208

    
1209
  return []
1210

    
1211

    
1212
def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
1213
  faulty = []
1214

    
1215
  for dev in instance.disks:
1216
    cfg.SetDiskID(dev, node_name)
1217

    
1218
  result = rpc_runner.call_blockdev_getmirrorstatus(node_name, instance.disks)
1219
  result.Raise("Failed to get disk status from node %s" % node_name,
1220
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1221

    
1222
  for idx, bdev_status in enumerate(result.payload):
1223
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1224
      faulty.append(idx)
1225

    
1226
  return faulty
1227

    
1228

    
1229
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1230
  """Check the sanity of iallocator and node arguments and use the
1231
  cluster-wide iallocator if appropriate.
1232

1233
  Check that at most one of (iallocator, node) is specified. If none is
1234
  specified, then the LU's opcode's iallocator slot is filled with the
1235
  cluster-wide default iallocator.
1236

1237
  @type iallocator_slot: string
1238
  @param iallocator_slot: the name of the opcode iallocator slot
1239
  @type node_slot: string
1240
  @param node_slot: the name of the opcode target node slot
1241

1242
  """
1243
  node = getattr(lu.op, node_slot, None)
1244
  iallocator = getattr(lu.op, iallocator_slot, None)
1245

    
1246
  if node is not None and iallocator is not None:
1247
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
1248
                               errors.ECODE_INVAL)
1249
  elif node is None and iallocator is None:
1250
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1251
    if default_iallocator:
1252
      setattr(lu.op, iallocator_slot, default_iallocator)
1253
    else:
1254
      raise errors.OpPrereqError("No iallocator or node given and no"
1255
                                 " cluster-wide default iallocator found;"
1256
                                 " please specify either an iallocator or a"
1257
                                 " node, or set a cluster-wide default"
1258
                                 " iallocator")
1259

    
1260

    
1261
def _GetDefaultIAllocator(cfg, iallocator):
1262
  """Decides on which iallocator to use.
1263

1264
  @type cfg: L{config.ConfigWriter}
1265
  @param cfg: Cluster configuration object
1266
  @type iallocator: string or None
1267
  @param iallocator: Iallocator specified in opcode
1268
  @rtype: string
1269
  @return: Iallocator name
1270

1271
  """
1272
  if not iallocator:
1273
    # Use default iallocator
1274
    iallocator = cfg.GetDefaultIAllocator()
1275

    
1276
  if not iallocator:
1277
    raise errors.OpPrereqError("No iallocator was specified, neither in the"
1278
                               " opcode nor as a cluster-wide default",
1279
                               errors.ECODE_INVAL)
1280

    
1281
  return iallocator
1282

    
1283

    
1284
class LUClusterPostInit(LogicalUnit):
1285
  """Logical unit for running hooks after cluster initialization.
1286

1287
  """
1288
  HPATH = "cluster-init"
1289
  HTYPE = constants.HTYPE_CLUSTER
1290

    
1291
  def BuildHooksEnv(self):
1292
    """Build hooks env.
1293

1294
    """
1295
    return {
1296
      "OP_TARGET": self.cfg.GetClusterName(),
1297
      }
1298

    
1299
  def BuildHooksNodes(self):
1300
    """Build hooks nodes.
1301

1302
    """
1303
    return ([], [self.cfg.GetMasterNode()])
1304

    
1305
  def Exec(self, feedback_fn):
1306
    """Nothing to do.
1307

1308
    """
1309
    return True
1310

    
1311

    
1312
class LUClusterDestroy(LogicalUnit):
1313
  """Logical unit for destroying the cluster.
1314

1315
  """
1316
  HPATH = "cluster-destroy"
1317
  HTYPE = constants.HTYPE_CLUSTER
1318

    
1319
  def BuildHooksEnv(self):
1320
    """Build hooks env.
1321

1322
    """
1323
    return {
1324
      "OP_TARGET": self.cfg.GetClusterName(),
1325
      }
1326

    
1327
  def BuildHooksNodes(self):
1328
    """Build hooks nodes.
1329

1330
    """
1331
    return ([], [])
1332

    
1333
  def CheckPrereq(self):
1334
    """Check prerequisites.
1335

1336
    This checks whether the cluster is empty.
1337

1338
    Any errors are signaled by raising errors.OpPrereqError.
1339

1340
    """
1341
    master = self.cfg.GetMasterNode()
1342

    
1343
    nodelist = self.cfg.GetNodeList()
1344
    if len(nodelist) != 1 or nodelist[0] != master:
1345
      raise errors.OpPrereqError("There are still %d node(s) in"
1346
                                 " this cluster." % (len(nodelist) - 1),
1347
                                 errors.ECODE_INVAL)
1348
    instancelist = self.cfg.GetInstanceList()
1349
    if instancelist:
1350
      raise errors.OpPrereqError("There are still %d instance(s) in"
1351
                                 " this cluster." % len(instancelist),
1352
                                 errors.ECODE_INVAL)
1353

    
1354
  def Exec(self, feedback_fn):
1355
    """Destroys the cluster.
1356

1357
    """
1358
    (master, ip, dev, netmask, family) = self.cfg.GetMasterNetworkParameters()
1359

    
1360
    # Run post hooks on master node before it's removed
1361
    _RunPostHook(self, master)
1362

    
1363
    result = self.rpc.call_node_deactivate_master_ip(master, ip, netmask, dev,
1364
                                                     family)
1365
    result.Raise("Could not disable the master role")
1366

    
1367
    return master
1368

    
1369

    
1370
def _VerifyCertificate(filename):
1371
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1372

1373
  @type filename: string
1374
  @param filename: Path to PEM file
1375

1376
  """
1377
  try:
1378
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1379
                                           utils.ReadFile(filename))
1380
  except Exception, err: # pylint: disable=W0703
1381
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1382
            "Failed to load X509 certificate %s: %s" % (filename, err))
1383

    
1384
  (errcode, msg) = \
1385
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1386
                                constants.SSL_CERT_EXPIRATION_ERROR)
1387

    
1388
  if msg:
1389
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1390
  else:
1391
    fnamemsg = None
1392

    
1393
  if errcode is None:
1394
    return (None, fnamemsg)
1395
  elif errcode == utils.CERT_WARNING:
1396
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1397
  elif errcode == utils.CERT_ERROR:
1398
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1399

    
1400
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1401

    
1402

    
1403
def _GetAllHypervisorParameters(cluster, instances):
1404
  """Compute the set of all hypervisor parameters.
1405

1406
  @type cluster: L{objects.Cluster}
1407
  @param cluster: the cluster object
1408
  @param instances: list of L{objects.Instance}
1409
  @param instances: additional instances from which to obtain parameters
1410
  @rtype: list of (origin, hypervisor, parameters)
1411
  @return: a list with all parameters found, indicating the hypervisor they
1412
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1413

1414
  """
1415
  hvp_data = []
1416

    
1417
  for hv_name in cluster.enabled_hypervisors:
1418
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1419

    
1420
  for os_name, os_hvp in cluster.os_hvp.items():
1421
    for hv_name, hv_params in os_hvp.items():
1422
      if hv_params:
1423
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1424
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1425

    
1426
  # TODO: collapse identical parameter values in a single one
1427
  for instance in instances:
1428
    if instance.hvparams:
1429
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1430
                       cluster.FillHV(instance)))
1431

    
1432
  return hvp_data
1433

    
1434

    
1435
class _VerifyErrors(object):
1436
  """Mix-in for cluster/group verify LUs.
1437

1438
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1439
  self.op and self._feedback_fn to be available.)
1440

1441
  """
1442

    
1443
  ETYPE_FIELD = "code"
1444
  ETYPE_ERROR = "ERROR"
1445
  ETYPE_WARNING = "WARNING"
1446

    
1447
  def _Error(self, ecode, item, msg, *args, **kwargs):
1448
    """Format an error message.
1449

1450
    Based on the opcode's error_codes parameter, either format a
1451
    parseable error code, or a simpler error string.
1452

1453
    This must be called only from Exec and functions called from Exec.
1454

1455
    """
1456
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1457
    itype, etxt, _ = ecode
1458
    # first complete the msg
1459
    if args:
1460
      msg = msg % args
1461
    # then format the whole message
1462
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1463
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1464
    else:
1465
      if item:
1466
        item = " " + item
1467
      else:
1468
        item = ""
1469
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1470
    # and finally report it via the feedback_fn
1471
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1472

    
1473
  def _ErrorIf(self, cond, ecode, *args, **kwargs):
1474
    """Log an error message if the passed condition is True.
1475

1476
    """
1477
    cond = (bool(cond)
1478
            or self.op.debug_simulate_errors) # pylint: disable=E1101
1479

    
1480
    # If the error code is in the list of ignored errors, demote the error to a
1481
    # warning
1482
    (_, etxt, _) = ecode
1483
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1484
      kwargs[self.ETYPE_FIELD] = self.ETYPE_WARNING
1485

    
1486
    if cond:
1487
      self._Error(ecode, *args, **kwargs)
1488

    
1489
    # do not mark the operation as failed for WARN cases only
1490
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1491
      self.bad = self.bad or cond
1492

    
1493

    
1494
class LUClusterVerify(NoHooksLU):
1495
  """Submits all jobs necessary to verify the cluster.
1496

1497
  """
1498
  REQ_BGL = False
1499

    
1500
  def ExpandNames(self):
1501
    self.needed_locks = {}
1502

    
1503
  def Exec(self, feedback_fn):
1504
    jobs = []
1505

    
1506
    if self.op.group_name:
1507
      groups = [self.op.group_name]
1508
      depends_fn = lambda: None
1509
    else:
1510
      groups = self.cfg.GetNodeGroupList()
1511

    
1512
      # Verify global configuration
1513
      jobs.append([
1514
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors)
1515
        ])
1516

    
1517
      # Always depend on global verification
1518
      depends_fn = lambda: [(-len(jobs), [])]
1519

    
1520
    jobs.extend([opcodes.OpClusterVerifyGroup(group_name=group,
1521
                                            ignore_errors=self.op.ignore_errors,
1522
                                            depends=depends_fn())]
1523
                for group in groups)
1524

    
1525
    # Fix up all parameters
1526
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1527
      op.debug_simulate_errors = self.op.debug_simulate_errors
1528
      op.verbose = self.op.verbose
1529
      op.error_codes = self.op.error_codes
1530
      try:
1531
        op.skip_checks = self.op.skip_checks
1532
      except AttributeError:
1533
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1534

    
1535
    return ResultWithJobs(jobs)
1536

    
1537

    
1538
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1539
  """Verifies the cluster config.
1540

1541
  """
1542
  REQ_BGL = True
1543

    
1544
  def _VerifyHVP(self, hvp_data):
1545
    """Verifies locally the syntax of the hypervisor parameters.
1546

1547
    """
1548
    for item, hv_name, hv_params in hvp_data:
1549
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1550
             (item, hv_name))
1551
      try:
1552
        hv_class = hypervisor.GetHypervisor(hv_name)
1553
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1554
        hv_class.CheckParameterSyntax(hv_params)
1555
      except errors.GenericError, err:
1556
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1557

    
1558
  def ExpandNames(self):
1559
    # Information can be safely retrieved as the BGL is acquired in exclusive
1560
    # mode
1561
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
1562
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1563
    self.all_node_info = self.cfg.GetAllNodesInfo()
1564
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1565
    self.needed_locks = {}
1566

    
1567
  def Exec(self, feedback_fn):
1568
    """Verify integrity of cluster, performing various test on nodes.
1569

1570
    """
1571
    self.bad = False
1572
    self._feedback_fn = feedback_fn
1573

    
1574
    feedback_fn("* Verifying cluster config")
1575

    
1576
    for msg in self.cfg.VerifyConfig():
1577
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1578

    
1579
    feedback_fn("* Verifying cluster certificate files")
1580

    
1581
    for cert_filename in constants.ALL_CERT_FILES:
1582
      (errcode, msg) = _VerifyCertificate(cert_filename)
1583
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1584

    
1585
    feedback_fn("* Verifying hypervisor parameters")
1586

    
1587
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1588
                                                self.all_inst_info.values()))
1589

    
1590
    feedback_fn("* Verifying all nodes belong to an existing group")
1591

    
1592
    # We do this verification here because, should this bogus circumstance
1593
    # occur, it would never be caught by VerifyGroup, which only acts on
1594
    # nodes/instances reachable from existing node groups.
1595

    
1596
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1597
                         if node.group not in self.all_group_info)
1598

    
1599
    dangling_instances = {}
1600
    no_node_instances = []
1601

    
1602
    for inst in self.all_inst_info.values():
1603
      if inst.primary_node in dangling_nodes:
1604
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1605
      elif inst.primary_node not in self.all_node_info:
1606
        no_node_instances.append(inst.name)
1607

    
1608
    pretty_dangling = [
1609
        "%s (%s)" %
1610
        (node.name,
1611
         utils.CommaJoin(dangling_instances.get(node.name,
1612
                                                ["no instances"])))
1613
        for node in dangling_nodes]
1614

    
1615
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1616
                  None,
1617
                  "the following nodes (and their instances) belong to a non"
1618
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1619

    
1620
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1621
                  None,
1622
                  "the following instances have a non-existing primary-node:"
1623
                  " %s", utils.CommaJoin(no_node_instances))
1624

    
1625
    return not self.bad
1626

    
1627

    
1628
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1629
  """Verifies the status of a node group.
1630

1631
  """
1632
  HPATH = "cluster-verify"
1633
  HTYPE = constants.HTYPE_CLUSTER
1634
  REQ_BGL = False
1635

    
1636
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1637

    
1638
  class NodeImage(object):
1639
    """A class representing the logical and physical status of a node.
1640

1641
    @type name: string
1642
    @ivar name: the node name to which this object refers
1643
    @ivar volumes: a structure as returned from
1644
        L{ganeti.backend.GetVolumeList} (runtime)
1645
    @ivar instances: a list of running instances (runtime)
1646
    @ivar pinst: list of configured primary instances (config)
1647
    @ivar sinst: list of configured secondary instances (config)
1648
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1649
        instances for which this node is secondary (config)
1650
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1651
    @ivar dfree: free disk, as reported by the node (runtime)
1652
    @ivar offline: the offline status (config)
1653
    @type rpc_fail: boolean
1654
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1655
        not whether the individual keys were correct) (runtime)
1656
    @type lvm_fail: boolean
1657
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1658
    @type hyp_fail: boolean
1659
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1660
    @type ghost: boolean
1661
    @ivar ghost: whether this is a known node or not (config)
1662
    @type os_fail: boolean
1663
    @ivar os_fail: whether the RPC call didn't return valid OS data
1664
    @type oslist: list
1665
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1666
    @type vm_capable: boolean
1667
    @ivar vm_capable: whether the node can host instances
1668

1669
    """
1670
    def __init__(self, offline=False, name=None, vm_capable=True):
1671
      self.name = name
1672
      self.volumes = {}
1673
      self.instances = []
1674
      self.pinst = []
1675
      self.sinst = []
1676
      self.sbp = {}
1677
      self.mfree = 0
1678
      self.dfree = 0
1679
      self.offline = offline
1680
      self.vm_capable = vm_capable
1681
      self.rpc_fail = False
1682
      self.lvm_fail = False
1683
      self.hyp_fail = False
1684
      self.ghost = False
1685
      self.os_fail = False
1686
      self.oslist = {}
1687

    
1688
  def ExpandNames(self):
1689
    # This raises errors.OpPrereqError on its own:
1690
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1691

    
1692
    # Get instances in node group; this is unsafe and needs verification later
1693
    inst_names = self.cfg.GetNodeGroupInstances(self.group_uuid)
1694

    
1695
    self.needed_locks = {
1696
      locking.LEVEL_INSTANCE: inst_names,
1697
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1698
      locking.LEVEL_NODE: [],
1699
      }
1700

    
1701
    self.share_locks = _ShareAll()
1702

    
1703
  def DeclareLocks(self, level):
1704
    if level == locking.LEVEL_NODE:
1705
      # Get members of node group; this is unsafe and needs verification later
1706
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1707

    
1708
      all_inst_info = self.cfg.GetAllInstancesInfo()
1709

    
1710
      # In Exec(), we warn about mirrored instances that have primary and
1711
      # secondary living in separate node groups. To fully verify that
1712
      # volumes for these instances are healthy, we will need to do an
1713
      # extra call to their secondaries. We ensure here those nodes will
1714
      # be locked.
1715
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1716
        # Important: access only the instances whose lock is owned
1717
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1718
          nodes.update(all_inst_info[inst].secondary_nodes)
1719

    
1720
      self.needed_locks[locking.LEVEL_NODE] = nodes
1721

    
1722
  def CheckPrereq(self):
1723
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1724
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1725

    
1726
    group_nodes = set(self.group_info.members)
1727
    group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
1728

    
1729
    unlocked_nodes = \
1730
        group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1731

    
1732
    unlocked_instances = \
1733
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1734

    
1735
    if unlocked_nodes:
1736
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1737
                                 utils.CommaJoin(unlocked_nodes))
1738

    
1739
    if unlocked_instances:
1740
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1741
                                 utils.CommaJoin(unlocked_instances))
1742

    
1743
    self.all_node_info = self.cfg.GetAllNodesInfo()
1744
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1745

    
1746
    self.my_node_names = utils.NiceSort(group_nodes)
1747
    self.my_inst_names = utils.NiceSort(group_instances)
1748

    
1749
    self.my_node_info = dict((name, self.all_node_info[name])
1750
                             for name in self.my_node_names)
1751

    
1752
    self.my_inst_info = dict((name, self.all_inst_info[name])
1753
                             for name in self.my_inst_names)
1754

    
1755
    # We detect here the nodes that will need the extra RPC calls for verifying
1756
    # split LV volumes; they should be locked.
1757
    extra_lv_nodes = set()
1758

    
1759
    for inst in self.my_inst_info.values():
1760
      if inst.disk_template in constants.DTS_INT_MIRROR:
1761
        group = self.my_node_info[inst.primary_node].group
1762
        for nname in inst.secondary_nodes:
1763
          if self.all_node_info[nname].group != group:
1764
            extra_lv_nodes.add(nname)
1765

    
1766
    unlocked_lv_nodes = \
1767
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1768

    
1769
    if unlocked_lv_nodes:
1770
      raise errors.OpPrereqError("these nodes could be locked: %s" %
1771
                                 utils.CommaJoin(unlocked_lv_nodes))
1772
    self.extra_lv_nodes = list(extra_lv_nodes)
1773

    
1774
  def _VerifyNode(self, ninfo, nresult):
1775
    """Perform some basic validation on data returned from a node.
1776

1777
      - check the result data structure is well formed and has all the
1778
        mandatory fields
1779
      - check ganeti version
1780

1781
    @type ninfo: L{objects.Node}
1782
    @param ninfo: the node to check
1783
    @param nresult: the results from the node
1784
    @rtype: boolean
1785
    @return: whether overall this call was successful (and we can expect
1786
         reasonable values in the respose)
1787

1788
    """
1789
    node = ninfo.name
1790
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1791

    
1792
    # main result, nresult should be a non-empty dict
1793
    test = not nresult or not isinstance(nresult, dict)
1794
    _ErrorIf(test, constants.CV_ENODERPC, node,
1795
                  "unable to verify node: no data returned")
1796
    if test:
1797
      return False
1798

    
1799
    # compares ganeti version
1800
    local_version = constants.PROTOCOL_VERSION
1801
    remote_version = nresult.get("version", None)
1802
    test = not (remote_version and
1803
                isinstance(remote_version, (list, tuple)) and
1804
                len(remote_version) == 2)
1805
    _ErrorIf(test, constants.CV_ENODERPC, node,
1806
             "connection to node returned invalid data")
1807
    if test:
1808
      return False
1809

    
1810
    test = local_version != remote_version[0]
1811
    _ErrorIf(test, constants.CV_ENODEVERSION, node,
1812
             "incompatible protocol versions: master %s,"
1813
             " node %s", local_version, remote_version[0])
1814
    if test:
1815
      return False
1816

    
1817
    # node seems compatible, we can actually try to look into its results
1818

    
1819
    # full package version
1820
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1821
                  constants.CV_ENODEVERSION, node,
1822
                  "software version mismatch: master %s, node %s",
1823
                  constants.RELEASE_VERSION, remote_version[1],
1824
                  code=self.ETYPE_WARNING)
1825

    
1826
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1827
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1828
      for hv_name, hv_result in hyp_result.iteritems():
1829
        test = hv_result is not None
1830
        _ErrorIf(test, constants.CV_ENODEHV, node,
1831
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1832

    
1833
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1834
    if ninfo.vm_capable and isinstance(hvp_result, list):
1835
      for item, hv_name, hv_result in hvp_result:
1836
        _ErrorIf(True, constants.CV_ENODEHV, node,
1837
                 "hypervisor %s parameter verify failure (source %s): %s",
1838
                 hv_name, item, hv_result)
1839

    
1840
    test = nresult.get(constants.NV_NODESETUP,
1841
                       ["Missing NODESETUP results"])
1842
    _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1843
             "; ".join(test))
1844

    
1845
    return True
1846

    
1847
  def _VerifyNodeTime(self, ninfo, nresult,
1848
                      nvinfo_starttime, nvinfo_endtime):
1849
    """Check the node time.
1850

1851
    @type ninfo: L{objects.Node}
1852
    @param ninfo: the node to check
1853
    @param nresult: the remote results for the node
1854
    @param nvinfo_starttime: the start time of the RPC call
1855
    @param nvinfo_endtime: the end time of the RPC call
1856

1857
    """
1858
    node = ninfo.name
1859
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1860

    
1861
    ntime = nresult.get(constants.NV_TIME, None)
1862
    try:
1863
      ntime_merged = utils.MergeTime(ntime)
1864
    except (ValueError, TypeError):
1865
      _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1866
      return
1867

    
1868
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1869
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1870
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1871
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1872
    else:
1873
      ntime_diff = None
1874

    
1875
    _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1876
             "Node time diverges by at least %s from master node time",
1877
             ntime_diff)
1878

    
1879
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1880
    """Check the node LVM results.
1881

1882
    @type ninfo: L{objects.Node}
1883
    @param ninfo: the node to check
1884
    @param nresult: the remote results for the node
1885
    @param vg_name: the configured VG name
1886

1887
    """
1888
    if vg_name is None:
1889
      return
1890

    
1891
    node = ninfo.name
1892
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1893

    
1894
    # checks vg existence and size > 20G
1895
    vglist = nresult.get(constants.NV_VGLIST, None)
1896
    test = not vglist
1897
    _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1898
    if not test:
1899
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1900
                                            constants.MIN_VG_SIZE)
1901
      _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1902

    
1903
    # check pv names
1904
    pvlist = nresult.get(constants.NV_PVLIST, None)
1905
    test = pvlist is None
1906
    _ErrorIf(test, constants.CV_ENODELVM, node, "Can't get PV list from node")
1907
    if not test:
1908
      # check that ':' is not present in PV names, since it's a
1909
      # special character for lvcreate (denotes the range of PEs to
1910
      # use on the PV)
1911
      for _, pvname, owner_vg in pvlist:
1912
        test = ":" in pvname
1913
        _ErrorIf(test, constants.CV_ENODELVM, node,
1914
                 "Invalid character ':' in PV '%s' of VG '%s'",
1915
                 pvname, owner_vg)
1916

    
1917
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1918
    """Check the node bridges.
1919

1920
    @type ninfo: L{objects.Node}
1921
    @param ninfo: the node to check
1922
    @param nresult: the remote results for the node
1923
    @param bridges: the expected list of bridges
1924

1925
    """
1926
    if not bridges:
1927
      return
1928

    
1929
    node = ninfo.name
1930
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1931

    
1932
    missing = nresult.get(constants.NV_BRIDGES, None)
1933
    test = not isinstance(missing, list)
1934
    _ErrorIf(test, constants.CV_ENODENET, node,
1935
             "did not return valid bridge information")
1936
    if not test:
1937
      _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1938
               "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1939

    
1940
  def _VerifyNodeNetwork(self, ninfo, nresult):
1941
    """Check the node network connectivity results.
1942

1943
    @type ninfo: L{objects.Node}
1944
    @param ninfo: the node to check
1945
    @param nresult: the remote results for the node
1946

1947
    """
1948
    node = ninfo.name
1949
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1950

    
1951
    test = constants.NV_NODELIST not in nresult
1952
    _ErrorIf(test, constants.CV_ENODESSH, node,
1953
             "node hasn't returned node ssh connectivity data")
1954
    if not test:
1955
      if nresult[constants.NV_NODELIST]:
1956
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1957
          _ErrorIf(True, constants.CV_ENODESSH, node,
1958
                   "ssh communication with node '%s': %s", a_node, a_msg)
1959

    
1960
    test = constants.NV_NODENETTEST not in nresult
1961
    _ErrorIf(test, constants.CV_ENODENET, node,
1962
             "node hasn't returned node tcp connectivity data")
1963
    if not test:
1964
      if nresult[constants.NV_NODENETTEST]:
1965
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1966
        for anode in nlist:
1967
          _ErrorIf(True, constants.CV_ENODENET, node,
1968
                   "tcp communication with node '%s': %s",
1969
                   anode, nresult[constants.NV_NODENETTEST][anode])
1970

    
1971
    test = constants.NV_MASTERIP not in nresult
1972
    _ErrorIf(test, constants.CV_ENODENET, node,
1973
             "node hasn't returned node master IP reachability data")
1974
    if not test:
1975
      if not nresult[constants.NV_MASTERIP]:
1976
        if node == self.master_node:
1977
          msg = "the master node cannot reach the master IP (not configured?)"
1978
        else:
1979
          msg = "cannot reach the master IP"
1980
        _ErrorIf(True, constants.CV_ENODENET, node, msg)
1981

    
1982
  def _VerifyInstance(self, instance, instanceconfig, node_image,
1983
                      diskstatus):
1984
    """Verify an instance.
1985

1986
    This function checks to see if the required block devices are
1987
    available on the instance's node.
1988

1989
    """
1990
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1991
    node_current = instanceconfig.primary_node
1992

    
1993
    node_vol_should = {}
1994
    instanceconfig.MapLVsByNode(node_vol_should)
1995

    
1996
    for node in node_vol_should:
1997
      n_img = node_image[node]
1998
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1999
        # ignore missing volumes on offline or broken nodes
2000
        continue
2001
      for volume in node_vol_should[node]:
2002
        test = volume not in n_img.volumes
2003
        _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
2004
                 "volume %s missing on node %s", volume, node)
2005

    
2006
    if instanceconfig.admin_up:
2007
      pri_img = node_image[node_current]
2008
      test = instance not in pri_img.instances and not pri_img.offline
2009
      _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
2010
               "instance not running on its primary node %s",
2011
               node_current)
2012

    
2013
    diskdata = [(nname, success, status, idx)
2014
                for (nname, disks) in diskstatus.items()
2015
                for idx, (success, status) in enumerate(disks)]
2016

    
2017
    for nname, success, bdev_status, idx in diskdata:
2018
      # the 'ghost node' construction in Exec() ensures that we have a
2019
      # node here
2020
      snode = node_image[nname]
2021
      bad_snode = snode.ghost or snode.offline
2022
      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
2023
               constants.CV_EINSTANCEFAULTYDISK, instance,
2024
               "couldn't retrieve status for disk/%s on %s: %s",
2025
               idx, nname, bdev_status)
2026
      _ErrorIf((instanceconfig.admin_up and success and
2027
                bdev_status.ldisk_status == constants.LDS_FAULTY),
2028
               constants.CV_EINSTANCEFAULTYDISK, instance,
2029
               "disk/%s on %s is faulty", idx, nname)
2030

    
2031
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2032
    """Verify if there are any unknown volumes in the cluster.
2033

2034
    The .os, .swap and backup volumes are ignored. All other volumes are
2035
    reported as unknown.
2036

2037
    @type reserved: L{ganeti.utils.FieldSet}
2038
    @param reserved: a FieldSet of reserved volume names
2039

2040
    """
2041
    for node, n_img in node_image.items():
2042
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2043
        # skip non-healthy nodes
2044
        continue
2045
      for volume in n_img.volumes:
2046
        test = ((node not in node_vol_should or
2047
                volume not in node_vol_should[node]) and
2048
                not reserved.Matches(volume))
2049
        self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
2050
                      "volume %s is unknown", volume)
2051

    
2052
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
2053
    """Verify N+1 Memory Resilience.
2054

2055
    Check that if one single node dies we can still start all the
2056
    instances it was primary for.
2057

2058
    """
2059
    cluster_info = self.cfg.GetClusterInfo()
2060
    for node, n_img in node_image.items():
2061
      # This code checks that every node which is now listed as
2062
      # secondary has enough memory to host all instances it is
2063
      # supposed to should a single other node in the cluster fail.
2064
      # FIXME: not ready for failover to an arbitrary node
2065
      # FIXME: does not support file-backed instances
2066
      # WARNING: we currently take into account down instances as well
2067
      # as up ones, considering that even if they're down someone
2068
      # might want to start them even in the event of a node failure.
2069
      if n_img.offline:
2070
        # we're skipping offline nodes from the N+1 warning, since
2071
        # most likely we don't have good memory infromation from them;
2072
        # we already list instances living on such nodes, and that's
2073
        # enough warning
2074
        continue
2075
      for prinode, instances in n_img.sbp.items():
2076
        needed_mem = 0
2077
        for instance in instances:
2078
          bep = cluster_info.FillBE(instance_cfg[instance])
2079
          if bep[constants.BE_AUTO_BALANCE]:
2080
            needed_mem += bep[constants.BE_MEMORY]
2081
        test = n_img.mfree < needed_mem
2082
        self._ErrorIf(test, constants.CV_ENODEN1, node,
2083
                      "not enough memory to accomodate instance failovers"
2084
                      " should node %s fail (%dMiB needed, %dMiB available)",
2085
                      prinode, needed_mem, n_img.mfree)
2086

    
2087
  @classmethod
2088
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
2089
                   (files_all, files_opt, files_mc, files_vm)):
2090
    """Verifies file checksums collected from all nodes.
2091

2092
    @param errorif: Callback for reporting errors
2093
    @param nodeinfo: List of L{objects.Node} objects
2094
    @param master_node: Name of master node
2095
    @param all_nvinfo: RPC results
2096

2097
    """
2098
    # Define functions determining which nodes to consider for a file
2099
    files2nodefn = [
2100
      (files_all, None),
2101
      (files_mc, lambda node: (node.master_candidate or
2102
                               node.name == master_node)),
2103
      (files_vm, lambda node: node.vm_capable),
2104
      ]
2105

    
2106
    # Build mapping from filename to list of nodes which should have the file
2107
    nodefiles = {}
2108
    for (files, fn) in files2nodefn:
2109
      if fn is None:
2110
        filenodes = nodeinfo
2111
      else:
2112
        filenodes = filter(fn, nodeinfo)
2113
      nodefiles.update((filename,
2114
                        frozenset(map(operator.attrgetter("name"), filenodes)))
2115
                       for filename in files)
2116

    
2117
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2118

    
2119
    fileinfo = dict((filename, {}) for filename in nodefiles)
2120
    ignore_nodes = set()
2121

    
2122
    for node in nodeinfo:
2123
      if node.offline:
2124
        ignore_nodes.add(node.name)
2125
        continue
2126

    
2127
      nresult = all_nvinfo[node.name]
2128

    
2129
      if nresult.fail_msg or not nresult.payload:
2130
        node_files = None
2131
      else:
2132
        node_files = nresult.payload.get(constants.NV_FILELIST, None)
2133

    
2134
      test = not (node_files and isinstance(node_files, dict))
2135
      errorif(test, constants.CV_ENODEFILECHECK, node.name,
2136
              "Node did not return file checksum data")
2137
      if test:
2138
        ignore_nodes.add(node.name)
2139
        continue
2140

    
2141
      # Build per-checksum mapping from filename to nodes having it
2142
      for (filename, checksum) in node_files.items():
2143
        assert filename in nodefiles
2144
        fileinfo[filename].setdefault(checksum, set()).add(node.name)
2145

    
2146
    for (filename, checksums) in fileinfo.items():
2147
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2148

    
2149
      # Nodes having the file
2150
      with_file = frozenset(node_name
2151
                            for nodes in fileinfo[filename].values()
2152
                            for node_name in nodes) - ignore_nodes
2153

    
2154
      expected_nodes = nodefiles[filename] - ignore_nodes
2155

    
2156
      # Nodes missing file
2157
      missing_file = expected_nodes - with_file
2158

    
2159
      if filename in files_opt:
2160
        # All or no nodes
2161
        errorif(missing_file and missing_file != expected_nodes,
2162
                constants.CV_ECLUSTERFILECHECK, None,
2163
                "File %s is optional, but it must exist on all or no"
2164
                " nodes (not found on %s)",
2165
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2166
      else:
2167
        errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2168
                "File %s is missing from node(s) %s", filename,
2169
                utils.CommaJoin(utils.NiceSort(missing_file)))
2170

    
2171
        # Warn if a node has a file it shouldn't
2172
        unexpected = with_file - expected_nodes
2173
        errorif(unexpected,
2174
                constants.CV_ECLUSTERFILECHECK, None,
2175
                "File %s should not exist on node(s) %s",
2176
                filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2177

    
2178
      # See if there are multiple versions of the file
2179
      test = len(checksums) > 1
2180
      if test:
2181
        variants = ["variant %s on %s" %
2182
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2183
                    for (idx, (checksum, nodes)) in
2184
                      enumerate(sorted(checksums.items()))]
2185
      else:
2186
        variants = []
2187

    
2188
      errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2189
              "File %s found with %s different checksums (%s)",
2190
              filename, len(checksums), "; ".join(variants))
2191

    
2192
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2193
                      drbd_map):
2194
    """Verifies and the node DRBD status.
2195

2196
    @type ninfo: L{objects.Node}
2197
    @param ninfo: the node to check
2198
    @param nresult: the remote results for the node
2199
    @param instanceinfo: the dict of instances
2200
    @param drbd_helper: the configured DRBD usermode helper
2201
    @param drbd_map: the DRBD map as returned by
2202
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2203

2204
    """
2205
    node = ninfo.name
2206
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2207

    
2208
    if drbd_helper:
2209
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2210
      test = (helper_result == None)
2211
      _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2212
               "no drbd usermode helper returned")
2213
      if helper_result:
2214
        status, payload = helper_result
2215
        test = not status
2216
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2217
                 "drbd usermode helper check unsuccessful: %s", payload)
2218
        test = status and (payload != drbd_helper)
2219
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2220
                 "wrong drbd usermode helper: %s", payload)
2221

    
2222
    # compute the DRBD minors
2223
    node_drbd = {}
2224
    for minor, instance in drbd_map[node].items():
2225
      test = instance not in instanceinfo
2226
      _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2227
               "ghost instance '%s' in temporary DRBD map", instance)
2228
        # ghost instance should not be running, but otherwise we
2229
        # don't give double warnings (both ghost instance and
2230
        # unallocated minor in use)
2231
      if test:
2232
        node_drbd[minor] = (instance, False)
2233
      else:
2234
        instance = instanceinfo[instance]
2235
        node_drbd[minor] = (instance.name, instance.admin_up)
2236

    
2237
    # and now check them
2238
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2239
    test = not isinstance(used_minors, (tuple, list))
2240
    _ErrorIf(test, constants.CV_ENODEDRBD, node,
2241
             "cannot parse drbd status file: %s", str(used_minors))
2242
    if test:
2243
      # we cannot check drbd status
2244
      return
2245

    
2246
    for minor, (iname, must_exist) in node_drbd.items():
2247
      test = minor not in used_minors and must_exist
2248
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2249
               "drbd minor %d of instance %s is not active", minor, iname)
2250
    for minor in used_minors:
2251
      test = minor not in node_drbd
2252
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2253
               "unallocated drbd minor %d is in use", minor)
2254

    
2255
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2256
    """Builds the node OS structures.
2257

2258
    @type ninfo: L{objects.Node}
2259
    @param ninfo: the node to check
2260
    @param nresult: the remote results for the node
2261
    @param nimg: the node image object
2262

2263
    """
2264
    node = ninfo.name
2265
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2266

    
2267
    remote_os = nresult.get(constants.NV_OSLIST, None)
2268
    test = (not isinstance(remote_os, list) or
2269
            not compat.all(isinstance(v, list) and len(v) == 7
2270
                           for v in remote_os))
2271

    
2272
    _ErrorIf(test, constants.CV_ENODEOS, node,
2273
             "node hasn't returned valid OS data")
2274

    
2275
    nimg.os_fail = test
2276

    
2277
    if test:
2278
      return
2279

    
2280
    os_dict = {}
2281

    
2282
    for (name, os_path, status, diagnose,
2283
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2284

    
2285
      if name not in os_dict:
2286
        os_dict[name] = []
2287

    
2288
      # parameters is a list of lists instead of list of tuples due to
2289
      # JSON lacking a real tuple type, fix it:
2290
      parameters = [tuple(v) for v in parameters]
2291
      os_dict[name].append((os_path, status, diagnose,
2292
                            set(variants), set(parameters), set(api_ver)))
2293

    
2294
    nimg.oslist = os_dict
2295

    
2296
  def _VerifyNodeOS(self, ninfo, nimg, base):
2297
    """Verifies the node OS list.
2298

2299
    @type ninfo: L{objects.Node}
2300
    @param ninfo: the node to check
2301
    @param nimg: the node image object
2302
    @param base: the 'template' node we match against (e.g. from the master)
2303

2304
    """
2305
    node = ninfo.name
2306
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2307

    
2308
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2309

    
2310
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2311
    for os_name, os_data in nimg.oslist.items():
2312
      assert os_data, "Empty OS status for OS %s?!" % os_name
2313
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2314
      _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2315
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2316
      _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2317
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2318
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2319
      # comparisons with the 'base' image
2320
      test = os_name not in base.oslist
2321
      _ErrorIf(test, constants.CV_ENODEOS, node,
2322
               "Extra OS %s not present on reference node (%s)",
2323
               os_name, base.name)
2324
      if test:
2325
        continue
2326
      assert base.oslist[os_name], "Base node has empty OS status?"
2327
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2328
      if not b_status:
2329
        # base OS is invalid, skipping
2330
        continue
2331
      for kind, a, b in [("API version", f_api, b_api),
2332
                         ("variants list", f_var, b_var),
2333
                         ("parameters", beautify_params(f_param),
2334
                          beautify_params(b_param))]:
2335
        _ErrorIf(a != b, constants.CV_ENODEOS, node,
2336
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2337
                 kind, os_name, base.name,
2338
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2339

    
2340
    # check any missing OSes
2341
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2342
    _ErrorIf(missing, constants.CV_ENODEOS, node,
2343
             "OSes present on reference node %s but missing on this node: %s",
2344
             base.name, utils.CommaJoin(missing))
2345

    
2346
  def _VerifyOob(self, ninfo, nresult):
2347
    """Verifies out of band functionality of a node.
2348

2349
    @type ninfo: L{objects.Node}
2350
    @param ninfo: the node to check
2351
    @param nresult: the remote results for the node
2352

2353
    """
2354
    node = ninfo.name
2355
    # We just have to verify the paths on master and/or master candidates
2356
    # as the oob helper is invoked on the master
2357
    if ((ninfo.master_candidate or ninfo.master_capable) and
2358
        constants.NV_OOB_PATHS in nresult):
2359
      for path_result in nresult[constants.NV_OOB_PATHS]:
2360
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2361

    
2362
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2363
    """Verifies and updates the node volume data.
2364

2365
    This function will update a L{NodeImage}'s internal structures
2366
    with data from the remote call.
2367

2368
    @type ninfo: L{objects.Node}
2369
    @param ninfo: the node to check
2370
    @param nresult: the remote results for the node
2371
    @param nimg: the node image object
2372
    @param vg_name: the configured VG name
2373

2374
    """
2375
    node = ninfo.name
2376
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2377

    
2378
    nimg.lvm_fail = True
2379
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2380
    if vg_name is None:
2381
      pass
2382
    elif isinstance(lvdata, basestring):
2383
      _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2384
               utils.SafeEncode(lvdata))
2385
    elif not isinstance(lvdata, dict):
2386
      _ErrorIf(True, constants.CV_ENODELVM, node,
2387
               "rpc call to node failed (lvlist)")
2388
    else:
2389
      nimg.volumes = lvdata
2390
      nimg.lvm_fail = False
2391

    
2392
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2393
    """Verifies and updates the node instance list.
2394

2395
    If the listing was successful, then updates this node's instance
2396
    list. Otherwise, it marks the RPC call as failed for the instance
2397
    list key.
2398

2399
    @type ninfo: L{objects.Node}
2400
    @param ninfo: the node to check
2401
    @param nresult: the remote results for the node
2402
    @param nimg: the node image object
2403

2404
    """
2405
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2406
    test = not isinstance(idata, list)
2407
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2408
                  "rpc call to node failed (instancelist): %s",
2409
                  utils.SafeEncode(str(idata)))
2410
    if test:
2411
      nimg.hyp_fail = True
2412
    else:
2413
      nimg.instances = idata
2414

    
2415
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2416
    """Verifies and computes a node information map
2417

2418
    @type ninfo: L{objects.Node}
2419
    @param ninfo: the node to check
2420
    @param nresult: the remote results for the node
2421
    @param nimg: the node image object
2422
    @param vg_name: the configured VG name
2423

2424
    """
2425
    node = ninfo.name
2426
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2427

    
2428
    # try to read free memory (from the hypervisor)
2429
    hv_info = nresult.get(constants.NV_HVINFO, None)
2430
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2431
    _ErrorIf(test, constants.CV_ENODEHV, node,
2432
             "rpc call to node failed (hvinfo)")
2433
    if not test:
2434
      try:
2435
        nimg.mfree = int(hv_info["memory_free"])
2436
      except (ValueError, TypeError):
2437
        _ErrorIf(True, constants.CV_ENODERPC, node,
2438
                 "node returned invalid nodeinfo, check hypervisor")
2439

    
2440
    # FIXME: devise a free space model for file based instances as well
2441
    if vg_name is not None:
2442
      test = (constants.NV_VGLIST not in nresult or
2443
              vg_name not in nresult[constants.NV_VGLIST])
2444
      _ErrorIf(test, constants.CV_ENODELVM, node,
2445
               "node didn't return data for the volume group '%s'"
2446
               " - it is either missing or broken", vg_name)
2447
      if not test:
2448
        try:
2449
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2450
        except (ValueError, TypeError):
2451
          _ErrorIf(True, constants.CV_ENODERPC, node,
2452
                   "node returned invalid LVM info, check LVM status")
2453

    
2454
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2455
    """Gets per-disk status information for all instances.
2456

2457
    @type nodelist: list of strings
2458
    @param nodelist: Node names
2459
    @type node_image: dict of (name, L{objects.Node})
2460
    @param node_image: Node objects
2461
    @type instanceinfo: dict of (name, L{objects.Instance})
2462
    @param instanceinfo: Instance objects
2463
    @rtype: {instance: {node: [(succes, payload)]}}
2464
    @return: a dictionary of per-instance dictionaries with nodes as
2465
        keys and disk information as values; the disk information is a
2466
        list of tuples (success, payload)
2467

2468
    """
2469
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2470

    
2471
    node_disks = {}
2472
    node_disks_devonly = {}
2473
    diskless_instances = set()
2474
    diskless = constants.DT_DISKLESS
2475

    
2476
    for nname in nodelist:
2477
      node_instances = list(itertools.chain(node_image[nname].pinst,
2478
                                            node_image[nname].sinst))
2479
      diskless_instances.update(inst for inst in node_instances
2480
                                if instanceinfo[inst].disk_template == diskless)
2481
      disks = [(inst, disk)
2482
               for inst in node_instances
2483
               for disk in instanceinfo[inst].disks]
2484

    
2485
      if not disks:
2486
        # No need to collect data
2487
        continue
2488

    
2489
      node_disks[nname] = disks
2490

    
2491
      # Creating copies as SetDiskID below will modify the objects and that can
2492
      # lead to incorrect data returned from nodes
2493
      devonly = [dev.Copy() for (_, dev) in disks]
2494

    
2495
      for dev in devonly:
2496
        self.cfg.SetDiskID(dev, nname)
2497

    
2498
      node_disks_devonly[nname] = devonly
2499

    
2500
    assert len(node_disks) == len(node_disks_devonly)
2501

    
2502
    # Collect data from all nodes with disks
2503
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2504
                                                          node_disks_devonly)
2505

    
2506
    assert len(result) == len(node_disks)
2507

    
2508
    instdisk = {}
2509

    
2510
    for (nname, nres) in result.items():
2511
      disks = node_disks[nname]
2512

    
2513
      if nres.offline:
2514
        # No data from this node
2515
        data = len(disks) * [(False, "node offline")]
2516
      else:
2517
        msg = nres.fail_msg
2518
        _ErrorIf(msg, constants.CV_ENODERPC, nname,
2519
                 "while getting disk information: %s", msg)
2520
        if msg:
2521
          # No data from this node
2522
          data = len(disks) * [(False, msg)]
2523
        else:
2524
          data = []
2525
          for idx, i in enumerate(nres.payload):
2526
            if isinstance(i, (tuple, list)) and len(i) == 2:
2527
              data.append(i)
2528
            else:
2529
              logging.warning("Invalid result from node %s, entry %d: %s",
2530
                              nname, idx, i)
2531
              data.append((False, "Invalid result from the remote node"))
2532

    
2533
      for ((inst, _), status) in zip(disks, data):
2534
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2535

    
2536
    # Add empty entries for diskless instances.
2537
    for inst in diskless_instances:
2538
      assert inst not in instdisk
2539
      instdisk[inst] = {}
2540

    
2541
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2542
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2543
                      compat.all(isinstance(s, (tuple, list)) and
2544
                                 len(s) == 2 for s in statuses)
2545
                      for inst, nnames in instdisk.items()
2546
                      for nname, statuses in nnames.items())
2547
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2548

    
2549
    return instdisk
2550

    
2551
  @staticmethod
2552
  def _SshNodeSelector(group_uuid, all_nodes):
2553
    """Create endless iterators for all potential SSH check hosts.
2554

2555
    """
2556
    nodes = [node for node in all_nodes
2557
             if (node.group != group_uuid and
2558
                 not node.offline)]
2559
    keyfunc = operator.attrgetter("group")
2560

    
2561
    return map(itertools.cycle,
2562
               [sorted(map(operator.attrgetter("name"), names))
2563
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2564
                                                  keyfunc)])
2565

    
2566
  @classmethod
2567
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2568
    """Choose which nodes should talk to which other nodes.
2569

2570
    We will make nodes contact all nodes in their group, and one node from
2571
    every other group.
2572

2573
    @warning: This algorithm has a known issue if one node group is much
2574
      smaller than others (e.g. just one node). In such a case all other
2575
      nodes will talk to the single node.
2576

2577
    """
2578
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2579
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2580

    
2581
    return (online_nodes,
2582
            dict((name, sorted([i.next() for i in sel]))
2583
                 for name in online_nodes))
2584

    
2585
  def BuildHooksEnv(self):
2586
    """Build hooks env.
2587

2588
    Cluster-Verify hooks just ran in the post phase and their failure makes
2589
    the output be logged in the verify output and the verification to fail.
2590

2591
    """
2592
    env = {
2593
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2594
      }
2595

    
2596
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2597
               for node in self.my_node_info.values())
2598

    
2599
    return env
2600

    
2601
  def BuildHooksNodes(self):
2602
    """Build hooks nodes.
2603

2604
    """
2605
    return ([], self.my_node_names)
2606

    
2607
  def Exec(self, feedback_fn):
2608
    """Verify integrity of the node group, performing various test on nodes.
2609

2610
    """
2611
    # This method has too many local variables. pylint: disable=R0914
2612
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2613

    
2614
    if not self.my_node_names:
2615
      # empty node group
2616
      feedback_fn("* Empty node group, skipping verification")
2617
      return True
2618

    
2619
    self.bad = False
2620
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2621
    verbose = self.op.verbose
2622
    self._feedback_fn = feedback_fn
2623

    
2624
    vg_name = self.cfg.GetVGName()
2625
    drbd_helper = self.cfg.GetDRBDHelper()
2626
    cluster = self.cfg.GetClusterInfo()
2627
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2628
    hypervisors = cluster.enabled_hypervisors
2629
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2630

    
2631
    i_non_redundant = [] # Non redundant instances
2632
    i_non_a_balanced = [] # Non auto-balanced instances
2633
    n_offline = 0 # Count of offline nodes
2634
    n_drained = 0 # Count of nodes being drained
2635
    node_vol_should = {}
2636

    
2637
    # FIXME: verify OS list
2638

    
2639
    # File verification
2640
    filemap = _ComputeAncillaryFiles(cluster, False)
2641

    
2642
    # do local checksums
2643
    master_node = self.master_node = self.cfg.GetMasterNode()
2644
    master_ip = self.cfg.GetMasterIP()
2645

    
2646
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2647

    
2648
    node_verify_param = {
2649
      constants.NV_FILELIST:
2650
        utils.UniqueSequence(filename
2651
                             for files in filemap
2652
                             for filename in files),
2653
      constants.NV_NODELIST:
2654
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2655
                                  self.all_node_info.values()),
2656
      constants.NV_HYPERVISOR: hypervisors,
2657
      constants.NV_HVPARAMS:
2658
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2659
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2660
                                 for node in node_data_list
2661
                                 if not node.offline],
2662
      constants.NV_INSTANCELIST: hypervisors,
2663
      constants.NV_VERSION: None,
2664
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2665
      constants.NV_NODESETUP: None,
2666
      constants.NV_TIME: None,
2667
      constants.NV_MASTERIP: (master_node, master_ip),
2668
      constants.NV_OSLIST: None,
2669
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2670
      }
2671

    
2672
    if vg_name is not None:
2673
      node_verify_param[constants.NV_VGLIST] = None
2674
      node_verify_param[constants.NV_LVLIST] = vg_name
2675
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2676
      node_verify_param[constants.NV_DRBDLIST] = None
2677

    
2678
    if drbd_helper:
2679
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2680

    
2681
    # bridge checks
2682
    # FIXME: this needs to be changed per node-group, not cluster-wide
2683
    bridges = set()
2684
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2685
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2686
      bridges.add(default_nicpp[constants.NIC_LINK])
2687
    for instance in self.my_inst_info.values():
2688
      for nic in instance.nics:
2689
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2690
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2691
          bridges.add(full_nic[constants.NIC_LINK])
2692

    
2693
    if bridges:
2694
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2695

    
2696
    # Build our expected cluster state
2697
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2698
                                                 name=node.name,
2699
                                                 vm_capable=node.vm_capable))
2700
                      for node in node_data_list)
2701

    
2702
    # Gather OOB paths
2703
    oob_paths = []
2704
    for node in self.all_node_info.values():
2705
      path = _SupportsOob(self.cfg, node)
2706
      if path and path not in oob_paths:
2707
        oob_paths.append(path)
2708

    
2709
    if oob_paths:
2710
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2711

    
2712
    for instance in self.my_inst_names:
2713
      inst_config = self.my_inst_info[instance]
2714

    
2715
      for nname in inst_config.all_nodes:
2716
        if nname not in node_image:
2717
          gnode = self.NodeImage(name=nname)
2718
          gnode.ghost = (nname not in self.all_node_info)
2719
          node_image[nname] = gnode
2720

    
2721
      inst_config.MapLVsByNode(node_vol_should)
2722

    
2723
      pnode = inst_config.primary_node
2724
      node_image[pnode].pinst.append(instance)
2725

    
2726
      for snode in inst_config.secondary_nodes:
2727
        nimg = node_image[snode]
2728
        nimg.sinst.append(instance)
2729
        if pnode not in nimg.sbp:
2730
          nimg.sbp[pnode] = []
2731
        nimg.sbp[pnode].append(instance)
2732

    
2733
    # At this point, we have the in-memory data structures complete,
2734
    # except for the runtime information, which we'll gather next
2735

    
2736
    # Due to the way our RPC system works, exact response times cannot be
2737
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2738
    # time before and after executing the request, we can at least have a time
2739
    # window.
2740
    nvinfo_starttime = time.time()
2741
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2742
                                           node_verify_param,
2743
                                           self.cfg.GetClusterName())
2744
    nvinfo_endtime = time.time()
2745

    
2746
    if self.extra_lv_nodes and vg_name is not None:
2747
      extra_lv_nvinfo = \
2748
          self.rpc.call_node_verify(self.extra_lv_nodes,
2749
                                    {constants.NV_LVLIST: vg_name},
2750
                                    self.cfg.GetClusterName())
2751
    else:
2752
      extra_lv_nvinfo = {}
2753

    
2754
    all_drbd_map = self.cfg.ComputeDRBDMap()
2755

    
2756
    feedback_fn("* Gathering disk information (%s nodes)" %
2757
                len(self.my_node_names))
2758
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2759
                                     self.my_inst_info)
2760

    
2761
    feedback_fn("* Verifying configuration file consistency")
2762

    
2763
    # If not all nodes are being checked, we need to make sure the master node
2764
    # and a non-checked vm_capable node are in the list.
2765
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2766
    if absent_nodes:
2767
      vf_nvinfo = all_nvinfo.copy()
2768
      vf_node_info = list(self.my_node_info.values())
2769
      additional_nodes = []
2770
      if master_node not in self.my_node_info:
2771
        additional_nodes.append(master_node)
2772
        vf_node_info.append(self.all_node_info[master_node])
2773
      # Add the first vm_capable node we find which is not included
2774
      for node in absent_nodes:
2775
        nodeinfo = self.all_node_info[node]
2776
        if nodeinfo.vm_capable and not nodeinfo.offline:
2777
          additional_nodes.append(node)
2778
          vf_node_info.append(self.all_node_info[node])
2779
          break
2780
      key = constants.NV_FILELIST
2781
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2782
                                                 {key: node_verify_param[key]},
2783
                                                 self.cfg.GetClusterName()))
2784
    else:
2785
      vf_nvinfo = all_nvinfo
2786
      vf_node_info = self.my_node_info.values()
2787

    
2788
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2789

    
2790
    feedback_fn("* Verifying node status")
2791

    
2792
    refos_img = None
2793

    
2794
    for node_i in node_data_list:
2795
      node = node_i.name
2796
      nimg = node_image[node]
2797

    
2798
      if node_i.offline:
2799
        if verbose:
2800
          feedback_fn("* Skipping offline node %s" % (node,))
2801
        n_offline += 1
2802
        continue
2803

    
2804
      if node == master_node:
2805
        ntype = "master"
2806
      elif node_i.master_candidate:
2807
        ntype = "master candidate"
2808
      elif node_i.drained:
2809
        ntype = "drained"
2810
        n_drained += 1
2811
      else:
2812
        ntype = "regular"
2813
      if verbose:
2814
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2815

    
2816
      msg = all_nvinfo[node].fail_msg
2817
      _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2818
               msg)
2819
      if msg:
2820
        nimg.rpc_fail = True
2821
        continue
2822

    
2823
      nresult = all_nvinfo[node].payload
2824

    
2825
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2826
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2827
      self._VerifyNodeNetwork(node_i, nresult)
2828
      self._VerifyOob(node_i, nresult)
2829

    
2830
      if nimg.vm_capable:
2831
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2832
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2833
                             all_drbd_map)
2834

    
2835
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2836
        self._UpdateNodeInstances(node_i, nresult, nimg)
2837
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2838
        self._UpdateNodeOS(node_i, nresult, nimg)
2839

    
2840
        if not nimg.os_fail:
2841
          if refos_img is None:
2842
            refos_img = nimg
2843
          self._VerifyNodeOS(node_i, nimg, refos_img)
2844
        self._VerifyNodeBridges(node_i, nresult, bridges)
2845

    
2846
        # Check whether all running instancies are primary for the node. (This
2847
        # can no longer be done from _VerifyInstance below, since some of the
2848
        # wrong instances could be from other node groups.)
2849
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2850

    
2851
        for inst in non_primary_inst:
2852
          test = inst in self.all_inst_info
2853
          _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2854
                   "instance should not run on node %s", node_i.name)
2855
          _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2856
                   "node is running unknown instance %s", inst)
2857

    
2858
    for node, result in extra_lv_nvinfo.items():
2859
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2860
                              node_image[node], vg_name)
2861

    
2862
    feedback_fn("* Verifying instance status")
2863
    for instance in self.my_inst_names:
2864
      if verbose:
2865
        feedback_fn("* Verifying instance %s" % instance)
2866
      inst_config = self.my_inst_info[instance]
2867
      self._VerifyInstance(instance, inst_config, node_image,
2868
                           instdisk[instance])
2869
      inst_nodes_offline = []
2870

    
2871
      pnode = inst_config.primary_node
2872
      pnode_img = node_image[pnode]
2873
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2874
               constants.CV_ENODERPC, pnode, "instance %s, connection to"
2875
               " primary node failed", instance)
2876

    
2877
      _ErrorIf(inst_config.admin_up and pnode_img.offline,
2878
               constants.CV_EINSTANCEBADNODE, instance,
2879
               "instance is marked as running and lives on offline node %s",
2880
               inst_config.primary_node)
2881

    
2882
      # If the instance is non-redundant we cannot survive losing its primary
2883
      # node, so we are not N+1 compliant. On the other hand we have no disk
2884
      # templates with more than one secondary so that situation is not well
2885
      # supported either.
2886
      # FIXME: does not support file-backed instances
2887
      if not inst_config.secondary_nodes:
2888
        i_non_redundant.append(instance)
2889

    
2890
      _ErrorIf(len(inst_config.secondary_nodes) > 1,
2891
               constants.CV_EINSTANCELAYOUT,
2892
               instance, "instance has multiple secondary nodes: %s",
2893
               utils.CommaJoin(inst_config.secondary_nodes),
2894
               code=self.ETYPE_WARNING)
2895

    
2896
      if inst_config.disk_template in constants.DTS_INT_MIRROR:
2897
        pnode = inst_config.primary_node
2898
        instance_nodes = utils.NiceSort(inst_config.all_nodes)
2899
        instance_groups = {}
2900

    
2901
        for node in instance_nodes:
2902
          instance_groups.setdefault(self.all_node_info[node].group,
2903
                                     []).append(node)
2904

    
2905
        pretty_list = [
2906
          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
2907
          # Sort so that we always list the primary node first.
2908
          for group, nodes in sorted(instance_groups.items(),
2909
                                     key=lambda (_, nodes): pnode in nodes,
2910
                                     reverse=True)]
2911

    
2912
        self._ErrorIf(len(instance_groups) > 1,
2913
                      constants.CV_EINSTANCESPLITGROUPS,
2914
                      instance, "instance has primary and secondary nodes in"
2915
                      " different groups: %s", utils.CommaJoin(pretty_list),
2916
                      code=self.ETYPE_WARNING)
2917

    
2918
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2919
        i_non_a_balanced.append(instance)
2920

    
2921
      for snode in inst_config.secondary_nodes:
2922
        s_img = node_image[snode]
2923
        _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2924
                 snode, "instance %s, connection to secondary node failed",
2925
                 instance)
2926

    
2927
        if s_img.offline:
2928
          inst_nodes_offline.append(snode)
2929

    
2930
      # warn that the instance lives on offline nodes
2931
      _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
2932
               "instance has offline secondary node(s) %s",
2933
               utils.CommaJoin(inst_nodes_offline))
2934
      # ... or ghost/non-vm_capable nodes
2935
      for node in inst_config.all_nodes:
2936
        _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
2937
                 instance, "instance lives on ghost node %s", node)
2938
        _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
2939
                 instance, "instance lives on non-vm_capable node %s", node)
2940

    
2941
    feedback_fn("* Verifying orphan volumes")
2942
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2943

    
2944
    # We will get spurious "unknown volume" warnings if any node of this group
2945
    # is secondary for an instance whose primary is in another group. To avoid
2946
    # them, we find these instances and add their volumes to node_vol_should.
2947
    for inst in self.all_inst_info.values():
2948
      for secondary in inst.secondary_nodes:
2949
        if (secondary in self.my_node_info
2950
            and inst.name not in self.my_inst_info):
2951
          inst.MapLVsByNode(node_vol_should)
2952
          break
2953

    
2954
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2955

    
2956
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2957
      feedback_fn("* Verifying N+1 Memory redundancy")
2958
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2959

    
2960
    feedback_fn("* Other Notes")
2961
    if i_non_redundant:
2962
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2963
                  % len(i_non_redundant))
2964

    
2965
    if i_non_a_balanced:
2966
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2967
                  % len(i_non_a_balanced))
2968

    
2969
    if n_offline:
2970
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2971

    
2972
    if n_drained:
2973
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2974

    
2975
    return not self.bad
2976

    
2977
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2978
    """Analyze the post-hooks' result
2979

2980
    This method analyses the hook result, handles it, and sends some
2981
    nicely-formatted feedback back to the user.
2982

2983
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2984
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2985
    @param hooks_results: the results of the multi-node hooks rpc call
2986
    @param feedback_fn: function used send feedback back to the caller
2987
    @param lu_result: previous Exec result
2988
    @return: the new Exec result, based on the previous result
2989
        and hook results
2990

2991
    """
2992
    # We only really run POST phase hooks, only for non-empty groups,
2993
    # and are only interested in their results
2994
    if not self.my_node_names:
2995
      # empty node group
2996
      pass
2997
    elif phase == constants.HOOKS_PHASE_POST:
2998
      # Used to change hooks' output to proper indentation
2999
      feedback_fn("* Hooks Results")
3000
      assert hooks_results, "invalid result from hooks"
3001

    
3002
      for node_name in hooks_results:
3003
        res = hooks_results[node_name]
3004
        msg = res.fail_msg
3005
        test = msg and not res.offline
3006
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3007
                      "Communication failure in hooks execution: %s", msg)
3008
        if res.offline or msg:
3009
          # No need to investigate payload if node is offline or gave
3010
          # an error.
3011
          continue
3012
        for script, hkr, output in res.payload:
3013
          test = hkr == constants.HKR_FAIL
3014
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3015
                        "Script %s failed, output:", script)
3016
          if test:
3017
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3018
            feedback_fn("%s" % output)
3019
            lu_result = False
3020

    
3021
    return lu_result
3022

    
3023

    
3024
class LUClusterVerifyDisks(NoHooksLU):
3025
  """Verifies the cluster disks status.
3026

3027
  """
3028
  REQ_BGL = False
3029

    
3030
  def ExpandNames(self):
3031
    self.share_locks = _ShareAll()
3032
    self.needed_locks = {
3033
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3034
      }
3035

    
3036
  def Exec(self, feedback_fn):
3037
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3038

    
3039
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3040
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3041
                           for group in group_names])
3042

    
3043

    
3044
class LUGroupVerifyDisks(NoHooksLU):
3045
  """Verifies the status of all disks in a node group.
3046

3047
  """
3048
  REQ_BGL = False
3049

    
3050
  def ExpandNames(self):
3051
    # Raises errors.OpPrereqError on its own if group can't be found
3052
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
3053

    
3054
    self.share_locks = _ShareAll()
3055
    self.needed_locks = {
3056
      locking.LEVEL_INSTANCE: [],
3057
      locking.LEVEL_NODEGROUP: [],
3058
      locking.LEVEL_NODE: [],
3059
      }
3060

    
3061
  def DeclareLocks(self, level):
3062
    if level == locking.LEVEL_INSTANCE:
3063
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
3064

    
3065
      # Lock instances optimistically, needs verification once node and group
3066
      # locks have been acquired
3067
      self.needed_locks[locking.LEVEL_INSTANCE] = \
3068
        self.cfg.GetNodeGroupInstances(self.group_uuid)
3069

    
3070
    elif level == locking.LEVEL_NODEGROUP:
3071
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3072

    
3073
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
3074
        set([self.group_uuid] +
3075
            # Lock all groups used by instances optimistically; this requires
3076
            # going via the node before it's locked, requiring verification
3077
            # later on
3078
            [group_uuid
3079
             for instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
3080
             for group_uuid in self.cfg.GetInstanceNodeGroups(instance_name)])
3081

    
3082
    elif level == locking.LEVEL_NODE:
3083
      # This will only lock the nodes in the group to be verified which contain
3084
      # actual instances
3085
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3086
      self._LockInstancesNodes()
3087

    
3088
      # Lock all nodes in group to be verified
3089
      assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
3090
      member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
3091
      self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
3092

    
3093
  def CheckPrereq(self):
3094
    owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
3095
    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
3096
    owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
3097

    
3098
    assert self.group_uuid in owned_groups
3099

    
3100
    # Check if locked instances are still correct
3101
    _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
3102

    
3103
    # Get instance information
3104
    self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
3105

    
3106
    # Check if node groups for locked instances are still correct
3107
    for (instance_name, inst) in self.instances.items():
3108
      assert owned_nodes.issuperset(inst.all_nodes), \
3109
        "Instance %s's nodes changed while we kept the lock" % instance_name
3110

    
3111
      inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
3112
                                             owned_groups)
3113

    
3114
      assert self.group_uuid in inst_groups, \
3115
        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
3116

    
3117
  def Exec(self, feedback_fn):
3118
    """Verify integrity of cluster disks.
3119

3120
    @rtype: tuple of three items
3121
    @return: a tuple of (dict of node-to-node_error, list of instances
3122
        which need activate-disks, dict of instance: (node, volume) for
3123
        missing volumes
3124

3125
    """
3126
    res_nodes = {}
3127
    res_instances = set()
3128
    res_missing = {}
3129

    
3130
    nv_dict = _MapInstanceDisksToNodes([inst
3131
                                        for inst in self.instances.values()
3132
                                        if inst.admin_up])
3133

    
3134
    if nv_dict:
3135
      nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
3136
                             set(self.cfg.GetVmCapableNodeList()))
3137

    
3138
      node_lvs = self.rpc.call_lv_list(nodes, [])
3139

    
3140
      for (node, node_res) in node_lvs.items():
3141
        if node_res.offline:
3142
          continue
3143

    
3144
        msg = node_res.fail_msg
3145
        if msg:
3146
          logging.warning("Error enumerating LVs on node %s: %s", node, msg)
3147
          res_nodes[node] = msg
3148
          continue
3149

    
3150
        for lv_name, (_, _, lv_online) in node_res.payload.items():
3151
          inst = nv_dict.pop((node, lv_name), None)
3152
          if not (lv_online or inst is None):
3153
            res_instances.add(inst)
3154

    
3155
      # any leftover items in nv_dict are missing LVs, let's arrange the data
3156
      # better
3157
      for key, inst in nv_dict.iteritems():
3158
        res_missing.setdefault(inst, []).append(list(key))
3159

    
3160
    return (res_nodes, list(res_instances), res_missing)
3161

    
3162

    
3163
class LUClusterRepairDiskSizes(NoHooksLU):
3164
  """Verifies the cluster disks sizes.
3165

3166
  """
3167
  REQ_BGL = False
3168

    
3169
  def ExpandNames(self):
3170
    if self.op.instances:
3171
      self.wanted_names = _GetWantedInstances(self, self.op.instances)
3172
      self.needed_locks = {
3173
        locking.LEVEL_NODE: [],
3174
        locking.LEVEL_INSTANCE: self.wanted_names,
3175
        }
3176
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3177
    else:
3178
      self.wanted_names = None
3179
      self.needed_locks = {
3180
        locking.LEVEL_NODE: locking.ALL_SET,
3181
        locking.LEVEL_INSTANCE: locking.ALL_SET,
3182
        }
3183
    self.share_locks = _ShareAll()
3184

    
3185
  def DeclareLocks(self, level):
3186
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
3187
      self._LockInstancesNodes(primary_only=True)
3188

    
3189
  def CheckPrereq(self):
3190
    """Check prerequisites.
3191

3192
    This only checks the optional instance list against the existing names.
3193

3194
    """
3195
    if self.wanted_names is None:
3196
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
3197

    
3198
    self.wanted_instances = \
3199
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
3200

    
3201
  def _EnsureChildSizes(self, disk):
3202
    """Ensure children of the disk have the needed disk size.
3203

3204
    This is valid mainly for DRBD8 and fixes an issue where the
3205
    children have smaller disk size.
3206

3207
    @param disk: an L{ganeti.objects.Disk} object
3208

3209
    """
3210
    if disk.dev_type == constants.LD_DRBD8:
3211
      assert disk.children, "Empty children for DRBD8?"
3212
      fchild = disk.children[0]
3213
      mismatch = fchild.size < disk.size
3214
      if mismatch:
3215
        self.LogInfo("Child disk has size %d, parent %d, fixing",
3216
                     fchild.size, disk.size)
3217
        fchild.size = disk.size
3218

    
3219
      # and we recurse on this child only, not on the metadev
3220
      return self._EnsureChildSizes(fchild) or mismatch
3221
    else:
3222
      return False
3223

    
3224
  def Exec(self, feedback_fn):
3225
    """Verify the size of cluster disks.
3226

3227
    """
3228
    # TODO: check child disks too
3229
    # TODO: check differences in size between primary/secondary nodes
3230
    per_node_disks = {}
3231
    for instance in self.wanted_instances:
3232
      pnode = instance.primary_node
3233
      if pnode not in per_node_disks:
3234
        per_node_disks[pnode] = []
3235
      for idx, disk in enumerate(instance.disks):
3236
        per_node_disks[pnode].append((instance, idx, disk))
3237

    
3238
    changed = []
3239
    for node, dskl in per_node_disks.items():
3240
      newl = [v[2].Copy() for v in dskl]
3241
      for dsk in newl:
3242
        self.cfg.SetDiskID(dsk, node)
3243
      result = self.rpc.call_blockdev_getsize(node, newl)
3244
      if result.fail_msg:
3245
        self.LogWarning("Failure in blockdev_getsize call to node"
3246
                        " %s, ignoring", node)
3247
        continue
3248
      if len(result.payload) != len(dskl):
3249
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
3250
                        " result.payload=%s", node, len(dskl), result.payload)
3251
        self.LogWarning("Invalid result from node %s, ignoring node results",
3252
                        node)
3253
        continue
3254
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
3255
        if size is None:
3256
          self.LogWarning("Disk %d of instance %s did not return size"
3257
                          " information, ignoring", idx, instance.name)
3258
          continue
3259
        if not isinstance(size, (int, long)):
3260
          self.LogWarning("Disk %d of instance %s did not return valid"
3261
                          " size information, ignoring", idx, instance.name)
3262
          continue
3263
        size = size >> 20
3264
        if size != disk.size:
3265
          self.LogInfo("Disk %d of instance %s has mismatched size,"
3266
                       " correcting: recorded %d, actual %d", idx,
3267
                       instance.name, disk.size, size)
3268
          disk.size = size
3269
          self.cfg.Update(instance, feedback_fn)
3270
          changed.append((instance.name, idx, size))
3271
        if self._EnsureChildSizes(disk):
3272
          self.cfg.Update(instance, feedback_fn)
3273
          changed.append((instance.name, idx, disk.size))
3274
    return changed
3275

    
3276

    
3277
class LUClusterRename(LogicalUnit):
3278
  """Rename the cluster.
3279

3280
  """
3281
  HPATH = "cluster-rename"
3282
  HTYPE = constants.HTYPE_CLUSTER
3283

    
3284
  def BuildHooksEnv(self):
3285
    """Build hooks env.
3286

3287
    """
3288
    return {
3289
      "OP_TARGET": self.cfg.GetClusterName(),
3290
      "NEW_NAME": self.op.name,
3291
      }
3292

    
3293
  def BuildHooksNodes(self):
3294
    """Build hooks nodes.
3295

3296
    """
3297
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
3298

    
3299
  def CheckPrereq(self):
3300
    """Verify that the passed name is a valid one.
3301

3302
    """
3303
    hostname = netutils.GetHostname(name=self.op.name,
3304
                                    family=self.cfg.GetPrimaryIPFamily())
3305

    
3306
    new_name = hostname.name
3307
    self.ip = new_ip = hostname.ip
3308
    old_name = self.cfg.GetClusterName()
3309
    old_ip = self.cfg.GetMasterIP()
3310
    if new_name == old_name and new_ip == old_ip:
3311
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
3312
                                 " cluster has changed",
3313
                                 errors.ECODE_INVAL)
3314
    if new_ip != old_ip:
3315
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
3316
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
3317
                                   " reachable on the network" %
3318
                                   new_ip, errors.ECODE_NOTUNIQUE)
3319

    
3320
    self.op.name = new_name
3321

    
3322
  def Exec(self, feedback_fn):
3323
    """Rename the cluster.
3324

3325
    """
3326
    clustername = self.op.name
3327
    new_ip = self.ip
3328

    
3329
    # shutdown the master IP
3330
    (master, ip, dev, netmask, family) = self.cfg.GetMasterNetworkParameters()
3331
    result = self.rpc.call_node_deactivate_master_ip(master, ip, netmask, dev,
3332
                                                     family)
3333
    result.Raise("Could not disable the master role")
3334

    
3335
    try:
3336
      cluster = self.cfg.GetClusterInfo()
3337
      cluster.cluster_name = clustername
3338
      cluster.master_ip = new_ip
3339
      self.cfg.Update(cluster, feedback_fn)
3340

    
3341
      # update the known hosts file
3342
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
3343
      node_list = self.cfg.GetOnlineNodeList()
3344
      try:
3345
        node_list.remove(master)
3346
      except ValueError:
3347
        pass
3348
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
3349
    finally:
3350
      result = self.rpc.call_node_activate_master_ip(master, new_ip, netmask,
3351
                                                     dev, family)
3352
      msg = result.fail_msg
3353
      if msg:
3354
        self.LogWarning("Could not re-enable the master role on"
3355
                        " the master, please restart manually: %s", msg)
3356

    
3357
    return clustername
3358

    
3359

    
3360
def _ValidateNetmask(cfg, netmask):
3361
  """Checks if a netmask is valid.
3362

3363
  @type cfg: L{config.ConfigWriter}
3364
  @param cfg: The cluster configuration
3365
  @type netmask: int
3366
  @param netmask: the netmask to be verified
3367
  @raise errors.OpPrereqError: if the validation fails
3368

3369
  """
3370
  ip_family = cfg.GetPrimaryIPFamily()
3371
  try:
3372
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
3373
  except errors.ProgrammerError:
3374
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
3375
                               ip_family)
3376
  if not ipcls.ValidateNetmask(netmask):
3377
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
3378
                                (netmask))
3379

    
3380

    
3381
class LUClusterSetParams(LogicalUnit):
3382
  """Change the parameters of the cluster.
3383

3384
  """
3385
  HPATH = "cluster-modify"
3386
  HTYPE = constants.HTYPE_CLUSTER
3387
  REQ_BGL = False
3388

    
3389
  def CheckArguments(self):
3390
    """Check parameters
3391

3392
    """
3393
    if self.op.uid_pool:
3394
      uidpool.CheckUidPool(self.op.uid_pool)
3395

    
3396
    if self.op.add_uids:
3397
      uidpool.CheckUidPool(self.op.add_uids)
3398

    
3399
    if self.op.remove_uids:
3400
      uidpool.CheckUidPool(self.op.remove_uids)
3401

    
3402
    if self.op.master_netmask is not None:
3403
      _ValidateNetmask(self.cfg, self.op.master_netmask)
3404

    
3405
  def ExpandNames(self):
3406
    # FIXME: in the future maybe other cluster params won't require checking on
3407
    # all nodes to be modified.
3408
    self.needed_locks = {
3409
      locking.LEVEL_NODE: locking.ALL_SET,
3410
    }
3411
    self.share_locks[locking.LEVEL_NODE] = 1
3412

    
3413
  def BuildHooksEnv(self):
3414
    """Build hooks env.
3415

3416
    """
3417
    return {
3418
      "OP_TARGET": self.cfg.GetClusterName(),
3419
      "NEW_VG_NAME": self.op.vg_name,
3420
      }
3421

    
3422
  def BuildHooksNodes(self):
3423
    """Build hooks nodes.
3424

3425
    """
3426
    mn = self.cfg.GetMasterNode()
3427
    return ([mn], [mn])
3428

    
3429
  def CheckPrereq(self):
3430
    """Check prerequisites.
3431

3432
    This checks whether the given params don't conflict and
3433
    if the given volume group is valid.
3434

3435
    """
3436
    if self.op.vg_name is not None and not self.op.vg_name:
3437
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
3438
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
3439
                                   " instances exist", errors.ECODE_INVAL)
3440

    
3441
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
3442
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
3443
        raise errors.OpPrereqError("Cannot disable drbd helper while"
3444
                                   " drbd-based instances exist",
3445
                                   errors.ECODE_INVAL)
3446

    
3447
    node_list = self.owned_locks(locking.LEVEL_NODE)
3448

    
3449
    # if vg_name not None, checks given volume group on all nodes
3450
    if self.op.vg_name:
3451
      vglist = self.rpc.call_vg_list(node_list)
3452
      for node in node_list:
3453
        msg = vglist[node].fail_msg
3454
        if msg:
3455
          # ignoring down node
3456
          self.LogWarning("Error while gathering data on node %s"
3457
                          " (ignoring node): %s", node, msg)
3458
          continue
3459
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
3460
                                              self.op.vg_name,
3461
                                              constants.MIN_VG_SIZE)
3462
        if vgstatus:
3463
          raise errors.OpPrereqError("Error on node '%s': %s" %
3464
                                     (node, vgstatus), errors.ECODE_ENVIRON)
3465

    
3466
    if self.op.drbd_helper:
3467
      # checks given drbd helper on all nodes
3468
      helpers = self.rpc.call_drbd_helper(node_list)
3469
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
3470
        if ninfo.offline:
3471
          self.LogInfo("Not checking drbd helper on offline node %s", node)
3472
          continue
3473
        msg = helpers[node].fail_msg
3474
        if msg:
3475
          raise errors.OpPrereqError("Error checking drbd helper on node"
3476
                                     " '%s': %s" % (node, msg),
3477
                                     errors.ECODE_ENVIRON)
3478
        node_helper = helpers[node].payload
3479
        if node_helper != self.op.drbd_helper:
3480
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
3481
                                     (node, node_helper), errors.ECODE_ENVIRON)
3482

    
3483
    self.cluster = cluster = self.cfg.GetClusterInfo()
3484
    # validate params changes
3485
    if self.op.beparams:
3486
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
3487
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
3488

    
3489
    if self.op.ndparams:
3490
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
3491
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
3492

    
3493
      # TODO: we need a more general way to handle resetting
3494
      # cluster-level parameters to default values
3495
      if self.new_ndparams["oob_program"] == "":
3496
        self.new_ndparams["oob_program"] = \
3497
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
3498

    
3499
    if self.op.nicparams:
3500
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
3501
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
3502
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
3503
      nic_errors = []
3504

    
3505
      # check all instances for consistency
3506
      for instance in self.cfg.GetAllInstancesInfo().values():
3507
        for nic_idx, nic in enumerate(instance.nics):
3508
          params_copy = copy.deepcopy(nic.nicparams)
3509
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
3510

    
3511
          # check parameter syntax
3512
          try:
3513
            objects.NIC.CheckParameterSyntax(params_filled)
3514
          except errors.ConfigurationError, err:
3515
            nic_errors.append("Instance %s, nic/%d: %s" %
3516
                              (instance.name, nic_idx, err))
3517

    
3518
          # if we're moving instances to routed, check that they have an ip
3519
          target_mode = params_filled[constants.NIC_MODE]
3520
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
3521
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
3522
                              " address" % (instance.name, nic_idx))
3523
      if nic_errors:
3524
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
3525
                                   "\n".join(nic_errors))
3526

    
3527
    # hypervisor list/parameters
3528
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
3529
    if self.op.hvparams:
3530
      for hv_name, hv_dict in self.op.hvparams.items():
3531
        if hv_name not in self.new_hvparams:
3532
          self.new_hvparams[hv_name] = hv_dict
3533
        else:
3534
          self.new_hvparams[hv_name].update(hv_dict)
3535

    
3536
    # os hypervisor parameters
3537
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
3538
    if self.op.os_hvp:
3539
      for os_name, hvs in self.op.os_hvp.items():
3540
        if os_name not in self.new_os_hvp:
3541
          self.new_os_hvp[os_name] = hvs
3542
        else:
3543
          for hv_name, hv_dict in hvs.items():
3544
            if hv_name not in self.new_os_hvp[os_name]:
3545
              self.new_os_hvp[os_name][hv_name] = hv_dict
3546
            else:
3547
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
3548

    
3549
    # os parameters
3550
    self.new_osp = objects.FillDict(cluster.osparams, {})
3551
    if self.op.osparams:
3552
      for os_name, osp in self.op.osparams.items():
3553
        if os_name not in self.new_osp:
3554
          self.new_osp[os_name] = {}
3555

    
3556
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
3557
                                                  use_none=True)
3558

    
3559
        if not self.new_osp[os_name]:
3560
          # we removed all parameters
3561
          del self.new_osp[os_name]
3562
        else:
3563
          # check the parameter validity (remote check)
3564
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
3565
                         os_name, self.new_osp[os_name])
3566

    
3567
    # changes to the hypervisor list
3568
    if self.op.enabled_hypervisors is not None:
3569
      self.hv_list = self.op.enabled_hypervisors
3570
      for hv in self.hv_list:
3571
        # if the hypervisor doesn't already exist in the cluster
3572
        # hvparams, we initialize it to empty, and then (in both
3573
        # cases) we make sure to fill the defaults, as we might not
3574
        # have a complete defaults list if the hypervisor wasn't
3575
        # enabled before
3576
        if hv not in new_hvp:
3577
          new_hvp[hv] = {}
3578
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
3579
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
3580
    else:
3581
      self.hv_list = cluster.enabled_hypervisors
3582

    
3583
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
3584
      # either the enabled list has changed, or the parameters have, validate
3585
      for hv_name, hv_params in self.new_hvparams.items():
3586
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
3587
            (self.op.enabled_hypervisors and
3588
             hv_name in self.op.enabled_hypervisors)):
3589
          # either this is a new hypervisor, or its parameters have changed
3590
          hv_class = hypervisor.GetHypervisor(hv_name)
3591
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3592
          hv_class.CheckParameterSyntax(hv_params)
3593
          _CheckHVParams(self, node_list, hv_name, hv_params)
3594

    
3595
    if self.op.os_hvp:
3596
      # no need to check any newly-enabled hypervisors, since the
3597
      # defaults have already been checked in the above code-block
3598
      for os_name, os_hvp in self.new_os_hvp.items():
3599
        for hv_name, hv_params in os_hvp.items():
3600
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3601
          # we need to fill in the new os_hvp on top of the actual hv_p
3602
          cluster_defaults = self.new_hvparams.get(hv_name, {})
3603
          new_osp = objects.FillDict(cluster_defaults, hv_params)
3604
          hv_class = hypervisor.GetHypervisor(hv_name)
3605
          hv_class.CheckParameterSyntax(new_osp)
3606
          _CheckHVParams(self, node_list, hv_name, new_osp)
3607

    
3608
    if self.op.default_iallocator:
3609
      alloc_script = utils.FindFile(self.op.default_iallocator,
3610
                                    constants.IALLOCATOR_SEARCH_PATH,
3611
                                    os.path.isfile)
3612
      if alloc_script is None:
3613
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
3614
                                   " specified" % self.op.default_iallocator,
3615
                                   errors.ECODE_INVAL)
3616

    
3617
  def Exec(self, feedback_fn):
3618
    """Change the parameters of the cluster.
3619

3620
    """
3621
    if self.op.vg_name is not None:
3622
      new_volume = self.op.vg_name
3623
      if not new_volume:
3624
        new_volume = None
3625
      if new_volume != self.cfg.GetVGName():
3626
        self.cfg.SetVGName(new_volume)
3627
      else:
3628
        feedback_fn("Cluster LVM configuration already in desired"
3629
                    " state, not changing")
3630
    if self.op.drbd_helper is not None:
3631
      new_helper = self.op.drbd_helper
3632
      if not new_helper:
3633
        new_helper = None
3634
      if new_helper != self.cfg.GetDRBDHelper():
3635
        self.cfg.SetDRBDHelper(new_helper)
3636
      else:
3637
        feedback_fn("Cluster DRBD helper already in desired state,"
3638
                    " not changing")
3639
    if self.op.hvparams:
3640
      self.cluster.hvparams = self.new_hvparams
3641
    if self.op.os_hvp:
3642
      self.cluster.os_hvp = self.new_os_hvp
3643
    if self.op.enabled_hypervisors is not None:
3644
      self.cluster.hvparams = self.new_hvparams
3645
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
3646
    if self.op.beparams:
3647
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
3648
    if self.op.nicparams:
3649
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
3650
    if self.op.osparams:
3651
      self.cluster.osparams = self.new_osp
3652
    if self.op.ndparams:
3653
      self.cluster.ndparams = self.new_ndparams
3654

    
3655
    if self.op.candidate_pool_size is not None:
3656
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3657
      # we need to update the pool size here, otherwise the save will fail
3658
      _AdjustCandidatePool(self, [])
3659

    
3660
    if self.op.maintain_node_health is not None:
3661
      self.cluster.maintain_node_health = self.op.maintain_node_health
3662

    
3663
    if self.op.prealloc_wipe_disks is not None:
3664
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3665

    
3666
    if self.op.add_uids is not None:
3667
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3668

    
3669
    if self.op.remove_uids is not None:
3670
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3671

    
3672
    if self.op.uid_pool is not None:
3673
      self.cluster.uid_pool = self.op.uid_pool
3674

    
3675
    if self.op.default_iallocator is not None:
3676
      self.cluster.default_iallocator = self.op.default_iallocator
3677

    
3678
    if self.op.reserved_lvs is not None:
3679
      self.cluster.reserved_lvs = self.op.reserved_lvs
3680

    
3681
    def helper_os(aname, mods, desc):
3682
      desc += " OS list"
3683
      lst = getattr(self.cluster, aname)
3684
      for key, val in mods:
3685
        if key == constants.DDM_ADD:
3686
          if val in lst:
3687
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3688
          else:
3689
            lst.append(val)
3690
        elif key == constants.DDM_REMOVE:
3691
          if val in lst:
3692
            lst.remove(val)
3693
          else:
3694
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3695
        else:
3696
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3697

    
3698
    if self.op.hidden_os:
3699
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3700

    
3701
    if self.op.blacklisted_os:
3702
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3703

    
3704
    if self.op.master_netdev:
3705
      (master, ip, dev, netmask, family) = self.cfg.GetMasterNetworkParameters()
3706
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3707
                  self.cluster.master_netdev)
3708
      result = self.rpc.call_node_deactivate_master_ip(master, ip, netmask, dev,
3709
                                                       family)
3710
      result.Raise("Could not disable the master ip")
3711
      feedback_fn("Changing master_netdev from %s to %s" %
3712
                  (dev, self.op.master_netdev))
3713
      self.cluster.master_netdev = self.op.master_netdev
3714

    
3715
    if self.op.master_netmask:
3716
      (master, ip, dev, old_netmask, _) = self.cfg.GetMasterNetworkParameters()
3717
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
3718
      result = self.rpc.call_node_change_master_netmask(master, old_netmask,
3719
                                                        self.op.master_netmask,
3720
                                                        ip, dev)
3721
      if result.fail_msg:
3722
        msg = "Could not change the master IP netmask: %s" % result.fail_msg
3723
        self.LogWarning(msg)
3724
        feedback_fn(msg)
3725
      else:
3726
        self.cluster.master_netmask = self.op.master_netmask
3727

    
3728
    self.cfg.Update(self.cluster, feedback_fn)
3729

    
3730
    if self.op.master_netdev:
3731
      (master, ip, dev, netmask, family) = self.cfg.GetMasterNetworkParameters()
3732
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3733
                  self.op.master_netdev)
3734
      result = self.rpc.call_node_activate_master_ip(master, ip, netmask, dev,
3735
                                                     family)
3736
      if result.fail_msg:
3737
        self.LogWarning("Could not re-enable the master ip on"
3738
                        " the master, please restart manually: %s",
3739
                        result.fail_msg)
3740

    
3741

    
3742
def _UploadHelper(lu, nodes, fname):
3743
  """Helper for uploading a file and showing warnings.
3744

3745
  """
3746
  if os.path.exists(fname):
3747
    result = lu.rpc.call_upload_file(nodes, fname)
3748
    for to_node, to_result in result.items():
3749
      msg = to_result.fail_msg
3750
      if msg:
3751
        msg = ("Copy of file %s to node %s failed: %s" %
3752
               (fname, to_node, msg))
3753
        lu.proc.LogWarning(msg)
3754

    
3755

    
3756
def _ComputeAncillaryFiles(cluster, redist):
3757
  """Compute files external to Ganeti which need to be consistent.
3758

3759
  @type redist: boolean
3760
  @param redist: Whether to include files which need to be redistributed
3761

3762
  """
3763
  # Compute files for all nodes
3764
  files_all = set([
3765
    constants.SSH_KNOWN_HOSTS_FILE,
3766
    constants.CONFD_HMAC_KEY,
3767
    constants.CLUSTER_DOMAIN_SECRET_FILE,
3768
    constants.SPICE_CERT_FILE,
3769
    constants.SPICE_CACERT_FILE,
3770
    constants.RAPI_USERS_FILE,
3771
    ])
3772

    
3773
  if not redist:
3774
    files_all.update(constants.ALL_CERT_FILES)
3775
    files_all.update(ssconf.SimpleStore().GetFileList())
3776
  else:
3777
    # we need to ship at least the RAPI certificate
3778
    files_all.add(constants.RAPI_CERT_FILE)
3779

    
3780
  if cluster.modify_etc_hosts:
3781
    files_all.add(constants.ETC_HOSTS)
3782

    
3783
  # Files which are optional, these must:
3784
  # - be present in one other category as well
3785
  # - either exist or not exist on all nodes of that category (mc, vm all)
3786
  files_opt = set([
3787
    constants.RAPI_USERS_FILE,
3788
    ])
3789

    
3790
  # Files which should only be on master candidates
3791
  files_mc = set()
3792
  if not redist:
3793
    files_mc.add(constants.CLUSTER_CONF_FILE)
3794

    
3795
  # Files which should only be on VM-capable nodes
3796
  files_vm = set(filename
3797
    for hv_name in cluster.enabled_hypervisors
3798
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0])
3799

    
3800
  files_opt |= set(filename
3801
    for hv_name in cluster.enabled_hypervisors
3802
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1])
3803

    
3804
  # Filenames in each category must be unique
3805
  all_files_set = files_all | files_mc | files_vm
3806
  assert (len(all_files_set) ==
3807
          sum(map(len, [files_all, files_mc, files_vm]))), \
3808
         "Found file listed in more than one file list"
3809

    
3810
  # Optional files must be present in one other category
3811
  assert all_files_set.issuperset(files_opt), \
3812
         "Optional file not in a different required list"
3813

    
3814
  return (files_all, files_opt, files_mc, files_vm)
3815

    
3816

    
3817
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3818
  """Distribute additional files which are part of the cluster configuration.
3819

3820
  ConfigWriter takes care of distributing the config and ssconf files, but
3821
  there are more files which should be distributed to all nodes. This function
3822
  makes sure those are copied.
3823

3824
  @param lu: calling logical unit
3825
  @param additional_nodes: list of nodes not in the config to distribute to
3826
  @type additional_vm: boolean
3827
  @param additional_vm: whether the additional nodes are vm-capable or not
3828

3829
  """
3830
  # Gather target nodes
3831
  cluster = lu.cfg.GetClusterInfo()
3832
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3833

    
3834
  online_nodes = lu.cfg.GetOnlineNodeList()
3835
  vm_nodes = lu.cfg.GetVmCapableNodeList()
3836

    
3837
  if additional_nodes is not None:
3838
    online_nodes.extend(additional_nodes)
3839
    if additional_vm:
3840
      vm_nodes.extend(additional_nodes)
3841

    
3842
  # Never distribute to master node
3843
  for nodelist in [online_nodes, vm_nodes]:
3844
    if master_info.name in nodelist:
3845
      nodelist.remove(master_info.name)
3846

    
3847
  # Gather file lists
3848
  (files_all, _, files_mc, files_vm) = \
3849
    _ComputeAncillaryFiles(cluster, True)
3850

    
3851
  # Never re-distribute configuration file from here
3852
  assert not (constants.CLUSTER_CONF_FILE in files_all or
3853
              constants.CLUSTER_CONF_FILE in files_vm)
3854
  assert not files_mc, "Master candidates not handled in this function"
3855

    
3856
  filemap = [
3857
    (online_nodes, files_all),
3858
    (vm_nodes, files_vm),
3859
    ]
3860

    
3861
  # Upload the files
3862
  for (node_list, files) in filemap:
3863
    for fname in files:
3864
      _UploadHelper(lu, node_list, fname)
3865

    
3866

    
3867
class LUClusterRedistConf(NoHooksLU):
3868
  """Force the redistribution of cluster configuration.
3869

3870
  This is a very simple LU.
3871

3872
  """
3873
  REQ_BGL = False
3874

    
3875
  def ExpandNames(self):
3876
    self.needed_locks = {
3877
      locking.LEVEL_NODE: locking.ALL_SET,
3878
    }
3879
    self.share_locks[locking.LEVEL_NODE] = 1
3880

    
3881
  def Exec(self, feedback_fn):
3882
    """Redistribute the configuration.
3883

3884
    """
3885
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3886
    _RedistributeAncillaryFiles(self)
3887

    
3888

    
3889
class LUClusterActivateMasterIp(NoHooksLU):
3890
  """Activate the master IP on the master node.
3891

3892
  """
3893
  def Exec(self, feedback_fn):
3894
    """Activate the master IP.
3895

3896
    """
3897
    (master, ip, dev, netmask, family) = self.cfg.GetMasterNetworkParameters()
3898
    self.rpc.call_node_activate_master_ip(master, ip, netmask, dev, family)
3899

    
3900

    
3901
class LUClusterDeactivateMasterIp(NoHooksLU):
3902
  """Deactivate the master IP on the master node.
3903

3904
  """
3905
  def Exec(self, feedback_fn):
3906
    """Deactivate the master IP.
3907

3908
    """
3909
    (master, ip, dev, netmask, family) = self.cfg.GetMasterNetworkParameters()
3910
    self.rpc.call_node_deactivate_master_ip(master, ip, netmask, dev, family)
3911

    
3912

    
3913
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3914
  """Sleep and poll for an instance's disk to sync.
3915

3916
  """
3917
  if not instance.disks or disks is not None and not disks:
3918
    return True
3919

    
3920
  disks = _ExpandCheckDisks(instance, disks)
3921

    
3922
  if not oneshot:
3923
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3924

    
3925
  node = instance.primary_node
3926

    
3927
  for dev in disks:
3928
    lu.cfg.SetDiskID(dev, node)
3929

    
3930
  # TODO: Convert to utils.Retry
3931

    
3932
  retries = 0
3933
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3934
  while True:
3935
    max_time = 0
3936
    done = True
3937
    cumul_degraded = False
3938
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3939
    msg = rstats.fail_msg
3940
    if msg:
3941
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3942
      retries += 1
3943
      if retries >= 10:
3944
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3945
                                 " aborting." % node)
3946
      time.sleep(6)
3947
      continue
3948
    rstats = rstats.payload
3949
    retries = 0
3950
    for i, mstat in enumerate(rstats):
3951
      if mstat is None:
3952
        lu.LogWarning("Can't compute data for node %s/%s",
3953
                           node, disks[i].iv_name)
3954
        continue
3955

    
3956
      cumul_degraded = (cumul_degraded or
3957
                        (mstat.is_degraded and mstat.sync_percent is None))
3958
      if mstat.sync_percent is not None:
3959
        done = False
3960
        if mstat.estimated_time is not None:
3961
          rem_time = ("%s remaining (estimated)" %
3962
                      utils.FormatSeconds(mstat.estimated_time))
3963
          max_time = mstat.estimated_time
3964
        else:
3965
          rem_time = "no time estimate"
3966
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
3967
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
3968

    
3969
    # if we're done but degraded, let's do a few small retries, to
3970
    # make sure we see a stable and not transient situation; therefore
3971
    # we force restart of the loop
3972
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3973
      logging.info("Degraded disks found, %d retries left", degr_retries)
3974
      degr_retries -= 1
3975
      time.sleep(1)
3976
      continue
3977

    
3978
    if done or oneshot:
3979
      break
3980

    
3981
    time.sleep(min(60, max_time))
3982

    
3983
  if done:
3984
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
3985
  return not cumul_degraded
3986

    
3987

    
3988
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3989
  """Check that mirrors are not degraded.
3990

3991
  The ldisk parameter, if True, will change the test from the
3992
  is_degraded attribute (which represents overall non-ok status for
3993
  the device(s)) to the ldisk (representing the local storage status).
3994

3995
  """
3996
  lu.cfg.SetDiskID(dev, node)
3997

    
3998
  result = True
3999

    
4000
  if on_primary or dev.AssembleOnSecondary():
4001
    rstats = lu.rpc.call_blockdev_find(node, dev)
4002
    msg = rstats.fail_msg
4003
    if msg:
4004
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
4005
      result = False
4006
    elif not rstats.payload:
4007
      lu.LogWarning("Can't find disk on node %s", node)
4008
      result = False
4009
    else:
4010
      if ldisk:
4011
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
4012
      else:
4013
        result = result and not rstats.payload.is_degraded
4014

    
4015
  if dev.children:
4016
    for child in dev.children:
4017
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
4018

    
4019
  return result
4020

    
4021

    
4022
class LUOobCommand(NoHooksLU):
4023
  """Logical unit for OOB handling.
4024

4025
  """
4026
  REG_BGL = False
4027
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
4028

    
4029
  def ExpandNames(self):
4030
    """Gather locks we need.
4031

4032
    """
4033
    if self.op.node_names:
4034
      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
4035
      lock_names = self.op.node_names
4036
    else:
4037
      lock_names = locking.ALL_SET
4038

    
4039
    self.needed_locks = {
4040
      locking.LEVEL_NODE: lock_names,
4041
      }
4042

    
4043
  def CheckPrereq(self):
4044
    """Check prerequisites.
4045

4046
    This checks:
4047
     - the node exists in the configuration
4048
     - OOB is supported
4049

4050
    Any errors are signaled by raising errors.OpPrereqError.
4051

4052
    """
4053
    self.nodes = []
4054
    self.master_node = self.cfg.GetMasterNode()
4055

    
4056
    assert self.op.power_delay >= 0.0
4057

    
4058
    if self.op.node_names:
4059
      if (self.op.command in self._SKIP_MASTER and
4060
          self.master_node in self.op.node_names):
4061
        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
4062
        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
4063

    
4064
        if master_oob_handler:
4065
          additional_text = ("run '%s %s %s' if you want to operate on the"
4066
                             " master regardless") % (master_oob_handler,
4067
                                                      self.op.command,
4068
                                                      self.master_node)
4069
        else:
4070
          additional_text = "it does not support out-of-band operations"
4071

    
4072
        raise errors.OpPrereqError(("Operating on the master node %s is not"
4073
                                    " allowed for %s; %s") %
4074
                                   (self.master_node, self.op.command,
4075
                                    additional_text), errors.ECODE_INVAL)
4076
    else:
4077
      self.op.node_names = self.cfg.GetNodeList()
4078
      if self.op.command in self._SKIP_MASTER:
4079
        self.op.node_names.remove(self.master_node)
4080

    
4081
    if self.op.command in self._SKIP_MASTER:
4082
      assert self.master_node not in self.op.node_names
4083

    
4084
    for (node_name, node) in self.cfg.GetMultiNodeInfo(self.op.node_names):
4085
      if node is None:
4086
        raise errors.OpPrereqError("Node %s not found" % node_name,
4087
                                   errors.ECODE_NOENT)
4088
      else:
4089
        self.nodes.append(node)
4090

    
4091
      if (not self.op.ignore_status and
4092
          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
4093
        raise errors.OpPrereqError(("Cannot power off node %s because it is"
4094
                                    " not marked offline"