Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 96e0d5cc

History | View | Annotate | Download (481.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay too many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42
import itertools
43
import operator
44

    
45
from ganeti import ssh
46
from ganeti import utils
47
from ganeti import errors
48
from ganeti import hypervisor
49
from ganeti import locking
50
from ganeti import constants
51
from ganeti import objects
52
from ganeti import serializer
53
from ganeti import ssconf
54
from ganeti import uidpool
55
from ganeti import compat
56
from ganeti import masterd
57
from ganeti import netutils
58
from ganeti import query
59
from ganeti import qlang
60
from ganeti import opcodes
61
from ganeti import ht
62
from ganeti import rpc
63

    
64
import ganeti.masterd.instance # pylint: disable=W0611
65

    
66

    
67
#: Size of DRBD meta block device
68
DRBD_META_SIZE = 128
69

    
70

    
71
class ResultWithJobs:
72
  """Data container for LU results with jobs.
73

74
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
75
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
76
  contained in the C{jobs} attribute and include the job IDs in the opcode
77
  result.
78

79
  """
80
  def __init__(self, jobs, **kwargs):
81
    """Initializes this class.
82

83
    Additional return values can be specified as keyword arguments.
84

85
    @type jobs: list of lists of L{opcode.OpCode}
86
    @param jobs: A list of lists of opcode objects
87

88
    """
89
    self.jobs = jobs
90
    self.other = kwargs
91

    
92

    
93
class LogicalUnit(object):
94
  """Logical Unit base class.
95

96
  Subclasses must follow these rules:
97
    - implement ExpandNames
98
    - implement CheckPrereq (except when tasklets are used)
99
    - implement Exec (except when tasklets are used)
100
    - implement BuildHooksEnv
101
    - implement BuildHooksNodes
102
    - redefine HPATH and HTYPE
103
    - optionally redefine their run requirements:
104
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
105

106
  Note that all commands require root permissions.
107

108
  @ivar dry_run_result: the value (if any) that will be returned to the caller
109
      in dry-run mode (signalled by opcode dry_run parameter)
110

111
  """
112
  HPATH = None
113
  HTYPE = None
114
  REQ_BGL = True
115

    
116
  def __init__(self, processor, op, context, rpc_runner):
117
    """Constructor for LogicalUnit.
118

119
    This needs to be overridden in derived classes in order to check op
120
    validity.
121

122
    """
123
    self.proc = processor
124
    self.op = op
125
    self.cfg = context.cfg
126
    self.glm = context.glm
127
    # readability alias
128
    self.owned_locks = context.glm.list_owned
129
    self.context = context
130
    self.rpc = rpc_runner
131
    # Dicts used to declare locking needs to mcpu
132
    self.needed_locks = None
133
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
134
    self.add_locks = {}
135
    self.remove_locks = {}
136
    # Used to force good behavior when calling helper functions
137
    self.recalculate_locks = {}
138
    # logging
139
    self.Log = processor.Log # pylint: disable=C0103
140
    self.LogWarning = processor.LogWarning # pylint: disable=C0103
141
    self.LogInfo = processor.LogInfo # pylint: disable=C0103
142
    self.LogStep = processor.LogStep # pylint: disable=C0103
143
    # support for dry-run
144
    self.dry_run_result = None
145
    # support for generic debug attribute
146
    if (not hasattr(self.op, "debug_level") or
147
        not isinstance(self.op.debug_level, int)):
148
      self.op.debug_level = 0
149

    
150
    # Tasklets
151
    self.tasklets = None
152

    
153
    # Validate opcode parameters and set defaults
154
    self.op.Validate(True)
155

    
156
    self.CheckArguments()
157

    
158
  def CheckArguments(self):
159
    """Check syntactic validity for the opcode arguments.
160

161
    This method is for doing a simple syntactic check and ensure
162
    validity of opcode parameters, without any cluster-related
163
    checks. While the same can be accomplished in ExpandNames and/or
164
    CheckPrereq, doing these separate is better because:
165

166
      - ExpandNames is left as as purely a lock-related function
167
      - CheckPrereq is run after we have acquired locks (and possible
168
        waited for them)
169

170
    The function is allowed to change the self.op attribute so that
171
    later methods can no longer worry about missing parameters.
172

173
    """
174
    pass
175

    
176
  def ExpandNames(self):
177
    """Expand names for this LU.
178

179
    This method is called before starting to execute the opcode, and it should
180
    update all the parameters of the opcode to their canonical form (e.g. a
181
    short node name must be fully expanded after this method has successfully
182
    completed). This way locking, hooks, logging, etc. can work correctly.
183

184
    LUs which implement this method must also populate the self.needed_locks
185
    member, as a dict with lock levels as keys, and a list of needed lock names
186
    as values. Rules:
187

188
      - use an empty dict if you don't need any lock
189
      - if you don't need any lock at a particular level omit that level
190
      - don't put anything for the BGL level
191
      - if you want all locks at a level use locking.ALL_SET as a value
192

193
    If you need to share locks (rather than acquire them exclusively) at one
194
    level you can modify self.share_locks, setting a true value (usually 1) for
195
    that level. By default locks are not shared.
196

197
    This function can also define a list of tasklets, which then will be
198
    executed in order instead of the usual LU-level CheckPrereq and Exec
199
    functions, if those are not defined by the LU.
200

201
    Examples::
202

203
      # Acquire all nodes and one instance
204
      self.needed_locks = {
205
        locking.LEVEL_NODE: locking.ALL_SET,
206
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
207
      }
208
      # Acquire just two nodes
209
      self.needed_locks = {
210
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
211
      }
212
      # Acquire no locks
213
      self.needed_locks = {} # No, you can't leave it to the default value None
214

215
    """
216
    # The implementation of this method is mandatory only if the new LU is
217
    # concurrent, so that old LUs don't need to be changed all at the same
218
    # time.
219
    if self.REQ_BGL:
220
      self.needed_locks = {} # Exclusive LUs don't need locks.
221
    else:
222
      raise NotImplementedError
223

    
224
  def DeclareLocks(self, level):
225
    """Declare LU locking needs for a level
226

227
    While most LUs can just declare their locking needs at ExpandNames time,
228
    sometimes there's the need to calculate some locks after having acquired
229
    the ones before. This function is called just before acquiring locks at a
230
    particular level, but after acquiring the ones at lower levels, and permits
231
    such calculations. It can be used to modify self.needed_locks, and by
232
    default it does nothing.
233

234
    This function is only called if you have something already set in
235
    self.needed_locks for the level.
236

237
    @param level: Locking level which is going to be locked
238
    @type level: member of ganeti.locking.LEVELS
239

240
    """
241

    
242
  def CheckPrereq(self):
243
    """Check prerequisites for this LU.
244

245
    This method should check that the prerequisites for the execution
246
    of this LU are fulfilled. It can do internode communication, but
247
    it should be idempotent - no cluster or system changes are
248
    allowed.
249

250
    The method should raise errors.OpPrereqError in case something is
251
    not fulfilled. Its return value is ignored.
252

253
    This method should also update all the parameters of the opcode to
254
    their canonical form if it hasn't been done by ExpandNames before.
255

256
    """
257
    if self.tasklets is not None:
258
      for (idx, tl) in enumerate(self.tasklets):
259
        logging.debug("Checking prerequisites for tasklet %s/%s",
260
                      idx + 1, len(self.tasklets))
261
        tl.CheckPrereq()
262
    else:
263
      pass
264

    
265
  def Exec(self, feedback_fn):
266
    """Execute the LU.
267

268
    This method should implement the actual work. It should raise
269
    errors.OpExecError for failures that are somewhat dealt with in
270
    code, or expected.
271

272
    """
273
    if self.tasklets is not None:
274
      for (idx, tl) in enumerate(self.tasklets):
275
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
276
        tl.Exec(feedback_fn)
277
    else:
278
      raise NotImplementedError
279

    
280
  def BuildHooksEnv(self):
281
    """Build hooks environment for this LU.
282

283
    @rtype: dict
284
    @return: Dictionary containing the environment that will be used for
285
      running the hooks for this LU. The keys of the dict must not be prefixed
286
      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
287
      will extend the environment with additional variables. If no environment
288
      should be defined, an empty dictionary should be returned (not C{None}).
289
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
290
      will not be called.
291

292
    """
293
    raise NotImplementedError
294

    
295
  def BuildHooksNodes(self):
296
    """Build list of nodes to run LU's hooks.
297

298
    @rtype: tuple; (list, list)
299
    @return: Tuple containing a list of node names on which the hook
300
      should run before the execution and a list of node names on which the
301
      hook should run after the execution. No nodes should be returned as an
302
      empty list (and not None).
303
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
304
      will not be called.
305

306
    """
307
    raise NotImplementedError
308

    
309
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
310
    """Notify the LU about the results of its hooks.
311

312
    This method is called every time a hooks phase is executed, and notifies
313
    the Logical Unit about the hooks' result. The LU can then use it to alter
314
    its result based on the hooks.  By default the method does nothing and the
315
    previous result is passed back unchanged but any LU can define it if it
316
    wants to use the local cluster hook-scripts somehow.
317

318
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
319
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
320
    @param hook_results: the results of the multi-node hooks rpc call
321
    @param feedback_fn: function used send feedback back to the caller
322
    @param lu_result: the previous Exec result this LU had, or None
323
        in the PRE phase
324
    @return: the new Exec result, based on the previous result
325
        and hook results
326

327
    """
328
    # API must be kept, thus we ignore the unused argument and could
329
    # be a function warnings
330
    # pylint: disable=W0613,R0201
331
    return lu_result
332

    
333
  def _ExpandAndLockInstance(self):
334
    """Helper function to expand and lock an instance.
335

336
    Many LUs that work on an instance take its name in self.op.instance_name
337
    and need to expand it and then declare the expanded name for locking. This
338
    function does it, and then updates self.op.instance_name to the expanded
339
    name. It also initializes needed_locks as a dict, if this hasn't been done
340
    before.
341

342
    """
343
    if self.needed_locks is None:
344
      self.needed_locks = {}
345
    else:
346
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
347
        "_ExpandAndLockInstance called with instance-level locks set"
348
    self.op.instance_name = _ExpandInstanceName(self.cfg,
349
                                                self.op.instance_name)
350
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
351

    
352
  def _LockInstancesNodes(self, primary_only=False):
353
    """Helper function to declare instances' nodes for locking.
354

355
    This function should be called after locking one or more instances to lock
356
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
357
    with all primary or secondary nodes for instances already locked and
358
    present in self.needed_locks[locking.LEVEL_INSTANCE].
359

360
    It should be called from DeclareLocks, and for safety only works if
361
    self.recalculate_locks[locking.LEVEL_NODE] is set.
362

363
    In the future it may grow parameters to just lock some instance's nodes, or
364
    to just lock primaries or secondary nodes, if needed.
365

366
    If should be called in DeclareLocks in a way similar to::
367

368
      if level == locking.LEVEL_NODE:
369
        self._LockInstancesNodes()
370

371
    @type primary_only: boolean
372
    @param primary_only: only lock primary nodes of locked instances
373

374
    """
375
    assert locking.LEVEL_NODE in self.recalculate_locks, \
376
      "_LockInstancesNodes helper function called with no nodes to recalculate"
377

    
378
    # TODO: check if we're really been called with the instance locks held
379

    
380
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
381
    # future we might want to have different behaviors depending on the value
382
    # of self.recalculate_locks[locking.LEVEL_NODE]
383
    wanted_nodes = []
384
    locked_i = self.owned_locks(locking.LEVEL_INSTANCE)
385
    for _, instance in self.cfg.GetMultiInstanceInfo(locked_i):
386
      wanted_nodes.append(instance.primary_node)
387
      if not primary_only:
388
        wanted_nodes.extend(instance.secondary_nodes)
389

    
390
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
391
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
392
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
393
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
394

    
395
    del self.recalculate_locks[locking.LEVEL_NODE]
396

    
397

    
398
class NoHooksLU(LogicalUnit): # pylint: disable=W0223
399
  """Simple LU which runs no hooks.
400

401
  This LU is intended as a parent for other LogicalUnits which will
402
  run no hooks, in order to reduce duplicate code.
403

404
  """
405
  HPATH = None
406
  HTYPE = None
407

    
408
  def BuildHooksEnv(self):
409
    """Empty BuildHooksEnv for NoHooksLu.
410

411
    This just raises an error.
412

413
    """
414
    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
415

    
416
  def BuildHooksNodes(self):
417
    """Empty BuildHooksNodes for NoHooksLU.
418

419
    """
420
    raise AssertionError("BuildHooksNodes called for NoHooksLU")
421

    
422

    
423
class Tasklet:
424
  """Tasklet base class.
425

426
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
427
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
428
  tasklets know nothing about locks.
429

430
  Subclasses must follow these rules:
431
    - Implement CheckPrereq
432
    - Implement Exec
433

434
  """
435
  def __init__(self, lu):
436
    self.lu = lu
437

    
438
    # Shortcuts
439
    self.cfg = lu.cfg
440
    self.rpc = lu.rpc
441

    
442
  def CheckPrereq(self):
443
    """Check prerequisites for this tasklets.
444

445
    This method should check whether the prerequisites for the execution of
446
    this tasklet are fulfilled. It can do internode communication, but it
447
    should be idempotent - no cluster or system changes are allowed.
448

449
    The method should raise errors.OpPrereqError in case something is not
450
    fulfilled. Its return value is ignored.
451

452
    This method should also update all parameters to their canonical form if it
453
    hasn't been done before.
454

455
    """
456
    pass
457

    
458
  def Exec(self, feedback_fn):
459
    """Execute the tasklet.
460

461
    This method should implement the actual work. It should raise
462
    errors.OpExecError for failures that are somewhat dealt with in code, or
463
    expected.
464

465
    """
466
    raise NotImplementedError
467

    
468

    
469
class _QueryBase:
470
  """Base for query utility classes.
471

472
  """
473
  #: Attribute holding field definitions
474
  FIELDS = None
475

    
476
  def __init__(self, qfilter, fields, use_locking):
477
    """Initializes this class.
478

479
    """
480
    self.use_locking = use_locking
481

    
482
    self.query = query.Query(self.FIELDS, fields, qfilter=qfilter,
483
                             namefield="name")
484
    self.requested_data = self.query.RequestedData()
485
    self.names = self.query.RequestedNames()
486

    
487
    # Sort only if no names were requested
488
    self.sort_by_name = not self.names
489

    
490
    self.do_locking = None
491
    self.wanted = None
492

    
493
  def _GetNames(self, lu, all_names, lock_level):
494
    """Helper function to determine names asked for in the query.
495

496
    """
497
    if self.do_locking:
498
      names = lu.owned_locks(lock_level)
499
    else:
500
      names = all_names
501

    
502
    if self.wanted == locking.ALL_SET:
503
      assert not self.names
504
      # caller didn't specify names, so ordering is not important
505
      return utils.NiceSort(names)
506

    
507
    # caller specified names and we must keep the same order
508
    assert self.names
509
    assert not self.do_locking or lu.glm.is_owned(lock_level)
510

    
511
    missing = set(self.wanted).difference(names)
512
    if missing:
513
      raise errors.OpExecError("Some items were removed before retrieving"
514
                               " their data: %s" % missing)
515

    
516
    # Return expanded names
517
    return self.wanted
518

    
519
  def ExpandNames(self, lu):
520
    """Expand names for this query.
521

522
    See L{LogicalUnit.ExpandNames}.
523

524
    """
525
    raise NotImplementedError()
526

    
527
  def DeclareLocks(self, lu, level):
528
    """Declare locks for this query.
529

530
    See L{LogicalUnit.DeclareLocks}.
531

532
    """
533
    raise NotImplementedError()
534

    
535
  def _GetQueryData(self, lu):
536
    """Collects all data for this query.
537

538
    @return: Query data object
539

540
    """
541
    raise NotImplementedError()
542

    
543
  def NewStyleQuery(self, lu):
544
    """Collect data and execute query.
545

546
    """
547
    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
548
                                  sort_by_name=self.sort_by_name)
549

    
550
  def OldStyleQuery(self, lu):
551
    """Collect data and execute query.
552

553
    """
554
    return self.query.OldStyleQuery(self._GetQueryData(lu),
555
                                    sort_by_name=self.sort_by_name)
556

    
557

    
558
def _ShareAll():
559
  """Returns a dict declaring all lock levels shared.
560

561
  """
562
  return dict.fromkeys(locking.LEVELS, 1)
563

    
564

    
565
def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
566
  """Checks if the owned node groups are still correct for an instance.
567

568
  @type cfg: L{config.ConfigWriter}
569
  @param cfg: The cluster configuration
570
  @type instance_name: string
571
  @param instance_name: Instance name
572
  @type owned_groups: set or frozenset
573
  @param owned_groups: List of currently owned node groups
574

575
  """
576
  inst_groups = cfg.GetInstanceNodeGroups(instance_name)
577

    
578
  if not owned_groups.issuperset(inst_groups):
579
    raise errors.OpPrereqError("Instance %s's node groups changed since"
580
                               " locks were acquired, current groups are"
581
                               " are '%s', owning groups '%s'; retry the"
582
                               " operation" %
583
                               (instance_name,
584
                                utils.CommaJoin(inst_groups),
585
                                utils.CommaJoin(owned_groups)),
586
                               errors.ECODE_STATE)
587

    
588
  return inst_groups
589

    
590

    
591
def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
592
  """Checks if the instances in a node group are still correct.
593

594
  @type cfg: L{config.ConfigWriter}
595
  @param cfg: The cluster configuration
596
  @type group_uuid: string
597
  @param group_uuid: Node group UUID
598
  @type owned_instances: set or frozenset
599
  @param owned_instances: List of currently owned instances
600

601
  """
602
  wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
603
  if owned_instances != wanted_instances:
604
    raise errors.OpPrereqError("Instances in node group '%s' changed since"
605
                               " locks were acquired, wanted '%s', have '%s';"
606
                               " retry the operation" %
607
                               (group_uuid,
608
                                utils.CommaJoin(wanted_instances),
609
                                utils.CommaJoin(owned_instances)),
610
                               errors.ECODE_STATE)
611

    
612
  return wanted_instances
613

    
614

    
615
def _SupportsOob(cfg, node):
616
  """Tells if node supports OOB.
617

618
  @type cfg: L{config.ConfigWriter}
619
  @param cfg: The cluster configuration
620
  @type node: L{objects.Node}
621
  @param node: The node
622
  @return: The OOB script if supported or an empty string otherwise
623

624
  """
625
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
626

    
627

    
628
def _GetWantedNodes(lu, nodes):
629
  """Returns list of checked and expanded node names.
630

631
  @type lu: L{LogicalUnit}
632
  @param lu: the logical unit on whose behalf we execute
633
  @type nodes: list
634
  @param nodes: list of node names or None for all nodes
635
  @rtype: list
636
  @return: the list of nodes, sorted
637
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
638

639
  """
640
  if nodes:
641
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
642

    
643
  return utils.NiceSort(lu.cfg.GetNodeList())
644

    
645

    
646
def _GetWantedInstances(lu, instances):
647
  """Returns list of checked and expanded instance names.
648

649
  @type lu: L{LogicalUnit}
650
  @param lu: the logical unit on whose behalf we execute
651
  @type instances: list
652
  @param instances: list of instance names or None for all instances
653
  @rtype: list
654
  @return: the list of instances, sorted
655
  @raise errors.OpPrereqError: if the instances parameter is wrong type
656
  @raise errors.OpPrereqError: if any of the passed instances is not found
657

658
  """
659
  if instances:
660
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
661
  else:
662
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
663
  return wanted
664

    
665

    
666
def _GetUpdatedParams(old_params, update_dict,
667
                      use_default=True, use_none=False):
668
  """Return the new version of a parameter dictionary.
669

670
  @type old_params: dict
671
  @param old_params: old parameters
672
  @type update_dict: dict
673
  @param update_dict: dict containing new parameter values, or
674
      constants.VALUE_DEFAULT to reset the parameter to its default
675
      value
676
  @param use_default: boolean
677
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
678
      values as 'to be deleted' values
679
  @param use_none: boolean
680
  @type use_none: whether to recognise C{None} values as 'to be
681
      deleted' values
682
  @rtype: dict
683
  @return: the new parameter dictionary
684

685
  """
686
  params_copy = copy.deepcopy(old_params)
687
  for key, val in update_dict.iteritems():
688
    if ((use_default and val == constants.VALUE_DEFAULT) or
689
        (use_none and val is None)):
690
      try:
691
        del params_copy[key]
692
      except KeyError:
693
        pass
694
    else:
695
      params_copy[key] = val
696
  return params_copy
697

    
698

    
699
def _ReleaseLocks(lu, level, names=None, keep=None):
700
  """Releases locks owned by an LU.
701

702
  @type lu: L{LogicalUnit}
703
  @param level: Lock level
704
  @type names: list or None
705
  @param names: Names of locks to release
706
  @type keep: list or None
707
  @param keep: Names of locks to retain
708

709
  """
710
  assert not (keep is not None and names is not None), \
711
         "Only one of the 'names' and the 'keep' parameters can be given"
712

    
713
  if names is not None:
714
    should_release = names.__contains__
715
  elif keep:
716
    should_release = lambda name: name not in keep
717
  else:
718
    should_release = None
719

    
720
  if should_release:
721
    retain = []
722
    release = []
723

    
724
    # Determine which locks to release
725
    for name in lu.owned_locks(level):
726
      if should_release(name):
727
        release.append(name)
728
      else:
729
        retain.append(name)
730

    
731
    assert len(lu.owned_locks(level)) == (len(retain) + len(release))
732

    
733
    # Release just some locks
734
    lu.glm.release(level, names=release)
735

    
736
    assert frozenset(lu.owned_locks(level)) == frozenset(retain)
737
  else:
738
    # Release everything
739
    lu.glm.release(level)
740

    
741
    assert not lu.glm.is_owned(level), "No locks should be owned"
742

    
743

    
744
def _MapInstanceDisksToNodes(instances):
745
  """Creates a map from (node, volume) to instance name.
746

747
  @type instances: list of L{objects.Instance}
748
  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
749

750
  """
751
  return dict(((node, vol), inst.name)
752
              for inst in instances
753
              for (node, vols) in inst.MapLVsByNode().items()
754
              for vol in vols)
755

    
756

    
757
def _RunPostHook(lu, node_name):
758
  """Runs the post-hook for an opcode on a single node.
759

760
  """
761
  hm = lu.proc.BuildHooksManager(lu)
762
  try:
763
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
764
  except:
765
    # pylint: disable=W0702
766
    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
767

    
768

    
769
def _CheckOutputFields(static, dynamic, selected):
770
  """Checks whether all selected fields are valid.
771

772
  @type static: L{utils.FieldSet}
773
  @param static: static fields set
774
  @type dynamic: L{utils.FieldSet}
775
  @param dynamic: dynamic fields set
776

777
  """
778
  f = utils.FieldSet()
779
  f.Extend(static)
780
  f.Extend(dynamic)
781

    
782
  delta = f.NonMatching(selected)
783
  if delta:
784
    raise errors.OpPrereqError("Unknown output fields selected: %s"
785
                               % ",".join(delta), errors.ECODE_INVAL)
786

    
787

    
788
def _CheckGlobalHvParams(params):
789
  """Validates that given hypervisor params are not global ones.
790

791
  This will ensure that instances don't get customised versions of
792
  global params.
793

794
  """
795
  used_globals = constants.HVC_GLOBALS.intersection(params)
796
  if used_globals:
797
    msg = ("The following hypervisor parameters are global and cannot"
798
           " be customized at instance level, please modify them at"
799
           " cluster level: %s" % utils.CommaJoin(used_globals))
800
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
801

    
802

    
803
def _CheckNodeOnline(lu, node, msg=None):
804
  """Ensure that a given node is online.
805

806
  @param lu: the LU on behalf of which we make the check
807
  @param node: the node to check
808
  @param msg: if passed, should be a message to replace the default one
809
  @raise errors.OpPrereqError: if the node is offline
810

811
  """
812
  if msg is None:
813
    msg = "Can't use offline node"
814
  if lu.cfg.GetNodeInfo(node).offline:
815
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
816

    
817

    
818
def _CheckNodeNotDrained(lu, node):
819
  """Ensure that a given node is not drained.
820

821
  @param lu: the LU on behalf of which we make the check
822
  @param node: the node to check
823
  @raise errors.OpPrereqError: if the node is drained
824

825
  """
826
  if lu.cfg.GetNodeInfo(node).drained:
827
    raise errors.OpPrereqError("Can't use drained node %s" % node,
828
                               errors.ECODE_STATE)
829

    
830

    
831
def _CheckNodeVmCapable(lu, node):
832
  """Ensure that a given node is vm capable.
833

834
  @param lu: the LU on behalf of which we make the check
835
  @param node: the node to check
836
  @raise errors.OpPrereqError: if the node is not vm capable
837

838
  """
839
  if not lu.cfg.GetNodeInfo(node).vm_capable:
840
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
841
                               errors.ECODE_STATE)
842

    
843

    
844
def _CheckNodeHasOS(lu, node, os_name, force_variant):
845
  """Ensure that a node supports a given OS.
846

847
  @param lu: the LU on behalf of which we make the check
848
  @param node: the node to check
849
  @param os_name: the OS to query about
850
  @param force_variant: whether to ignore variant errors
851
  @raise errors.OpPrereqError: if the node is not supporting the OS
852

853
  """
854
  result = lu.rpc.call_os_get(node, os_name)
855
  result.Raise("OS '%s' not in supported OS list for node %s" %
856
               (os_name, node),
857
               prereq=True, ecode=errors.ECODE_INVAL)
858
  if not force_variant:
859
    _CheckOSVariant(result.payload, os_name)
860

    
861

    
862
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
863
  """Ensure that a node has the given secondary ip.
864

865
  @type lu: L{LogicalUnit}
866
  @param lu: the LU on behalf of which we make the check
867
  @type node: string
868
  @param node: the node to check
869
  @type secondary_ip: string
870
  @param secondary_ip: the ip to check
871
  @type prereq: boolean
872
  @param prereq: whether to throw a prerequisite or an execute error
873
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
874
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
875

876
  """
877
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
878
  result.Raise("Failure checking secondary ip on node %s" % node,
879
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
880
  if not result.payload:
881
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
882
           " please fix and re-run this command" % secondary_ip)
883
    if prereq:
884
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
885
    else:
886
      raise errors.OpExecError(msg)
887

    
888

    
889
def _GetClusterDomainSecret():
890
  """Reads the cluster domain secret.
891

892
  """
893
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
894
                               strict=True)
895

    
896

    
897
def _CheckInstanceDown(lu, instance, reason):
898
  """Ensure that an instance is not running."""
899
  if instance.admin_up:
900
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
901
                               (instance.name, reason), errors.ECODE_STATE)
902

    
903
  pnode = instance.primary_node
904
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
905
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
906
              prereq=True, ecode=errors.ECODE_ENVIRON)
907

    
908
  if instance.name in ins_l.payload:
909
    raise errors.OpPrereqError("Instance %s is running, %s" %
910
                               (instance.name, reason), errors.ECODE_STATE)
911

    
912

    
913
def _ExpandItemName(fn, name, kind):
914
  """Expand an item name.
915

916
  @param fn: the function to use for expansion
917
  @param name: requested item name
918
  @param kind: text description ('Node' or 'Instance')
919
  @return: the resolved (full) name
920
  @raise errors.OpPrereqError: if the item is not found
921

922
  """
923
  full_name = fn(name)
924
  if full_name is None:
925
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
926
                               errors.ECODE_NOENT)
927
  return full_name
928

    
929

    
930
def _ExpandNodeName(cfg, name):
931
  """Wrapper over L{_ExpandItemName} for nodes."""
932
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
933

    
934

    
935
def _ExpandInstanceName(cfg, name):
936
  """Wrapper over L{_ExpandItemName} for instance."""
937
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
938

    
939

    
940
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
941
                          memory, vcpus, nics, disk_template, disks,
942
                          bep, hvp, hypervisor_name, tags):
943
  """Builds instance related env variables for hooks
944

945
  This builds the hook environment from individual variables.
946

947
  @type name: string
948
  @param name: the name of the instance
949
  @type primary_node: string
950
  @param primary_node: the name of the instance's primary node
951
  @type secondary_nodes: list
952
  @param secondary_nodes: list of secondary nodes as strings
953
  @type os_type: string
954
  @param os_type: the name of the instance's OS
955
  @type status: boolean
956
  @param status: the should_run status of the instance
957
  @type memory: string
958
  @param memory: the memory size of the instance
959
  @type vcpus: string
960
  @param vcpus: the count of VCPUs the instance has
961
  @type nics: list
962
  @param nics: list of tuples (ip, mac, mode, link) representing
963
      the NICs the instance has
964
  @type disk_template: string
965
  @param disk_template: the disk template of the instance
966
  @type disks: list
967
  @param disks: the list of (size, mode) pairs
968
  @type bep: dict
969
  @param bep: the backend parameters for the instance
970
  @type hvp: dict
971
  @param hvp: the hypervisor parameters for the instance
972
  @type hypervisor_name: string
973
  @param hypervisor_name: the hypervisor for the instance
974
  @type tags: list
975
  @param tags: list of instance tags as strings
976
  @rtype: dict
977
  @return: the hook environment for this instance
978

979
  """
980
  if status:
981
    str_status = "up"
982
  else:
983
    str_status = "down"
984
  env = {
985
    "OP_TARGET": name,
986
    "INSTANCE_NAME": name,
987
    "INSTANCE_PRIMARY": primary_node,
988
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
989
    "INSTANCE_OS_TYPE": os_type,
990
    "INSTANCE_STATUS": str_status,
991
    "INSTANCE_MEMORY": memory,
992
    "INSTANCE_VCPUS": vcpus,
993
    "INSTANCE_DISK_TEMPLATE": disk_template,
994
    "INSTANCE_HYPERVISOR": hypervisor_name,
995
  }
996

    
997
  if nics:
998
    nic_count = len(nics)
999
    for idx, (ip, mac, mode, link) in enumerate(nics):
1000
      if ip is None:
1001
        ip = ""
1002
      env["INSTANCE_NIC%d_IP" % idx] = ip
1003
      env["INSTANCE_NIC%d_MAC" % idx] = mac
1004
      env["INSTANCE_NIC%d_MODE" % idx] = mode
1005
      env["INSTANCE_NIC%d_LINK" % idx] = link
1006
      if mode == constants.NIC_MODE_BRIDGED:
1007
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
1008
  else:
1009
    nic_count = 0
1010

    
1011
  env["INSTANCE_NIC_COUNT"] = nic_count
1012

    
1013
  if disks:
1014
    disk_count = len(disks)
1015
    for idx, (size, mode) in enumerate(disks):
1016
      env["INSTANCE_DISK%d_SIZE" % idx] = size
1017
      env["INSTANCE_DISK%d_MODE" % idx] = mode
1018
  else:
1019
    disk_count = 0
1020

    
1021
  env["INSTANCE_DISK_COUNT"] = disk_count
1022

    
1023
  if not tags:
1024
    tags = []
1025

    
1026
  env["INSTANCE_TAGS"] = " ".join(tags)
1027

    
1028
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
1029
    for key, value in source.items():
1030
      env["INSTANCE_%s_%s" % (kind, key)] = value
1031

    
1032
  return env
1033

    
1034

    
1035
def _NICListToTuple(lu, nics):
1036
  """Build a list of nic information tuples.
1037

1038
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
1039
  value in LUInstanceQueryData.
1040

1041
  @type lu:  L{LogicalUnit}
1042
  @param lu: the logical unit on whose behalf we execute
1043
  @type nics: list of L{objects.NIC}
1044
  @param nics: list of nics to convert to hooks tuples
1045

1046
  """
1047
  hooks_nics = []
1048
  cluster = lu.cfg.GetClusterInfo()
1049
  for nic in nics:
1050
    ip = nic.ip
1051
    mac = nic.mac
1052
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
1053
    mode = filled_params[constants.NIC_MODE]
1054
    link = filled_params[constants.NIC_LINK]
1055
    hooks_nics.append((ip, mac, mode, link))
1056
  return hooks_nics
1057

    
1058

    
1059
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
1060
  """Builds instance related env variables for hooks from an object.
1061

1062
  @type lu: L{LogicalUnit}
1063
  @param lu: the logical unit on whose behalf we execute
1064
  @type instance: L{objects.Instance}
1065
  @param instance: the instance for which we should build the
1066
      environment
1067
  @type override: dict
1068
  @param override: dictionary with key/values that will override
1069
      our values
1070
  @rtype: dict
1071
  @return: the hook environment dictionary
1072

1073
  """
1074
  cluster = lu.cfg.GetClusterInfo()
1075
  bep = cluster.FillBE(instance)
1076
  hvp = cluster.FillHV(instance)
1077
  args = {
1078
    "name": instance.name,
1079
    "primary_node": instance.primary_node,
1080
    "secondary_nodes": instance.secondary_nodes,
1081
    "os_type": instance.os,
1082
    "status": instance.admin_up,
1083
    "memory": bep[constants.BE_MEMORY],
1084
    "vcpus": bep[constants.BE_VCPUS],
1085
    "nics": _NICListToTuple(lu, instance.nics),
1086
    "disk_template": instance.disk_template,
1087
    "disks": [(disk.size, disk.mode) for disk in instance.disks],
1088
    "bep": bep,
1089
    "hvp": hvp,
1090
    "hypervisor_name": instance.hypervisor,
1091
    "tags": instance.tags,
1092
  }
1093
  if override:
1094
    args.update(override)
1095
  return _BuildInstanceHookEnv(**args) # pylint: disable=W0142
1096

    
1097

    
1098
def _AdjustCandidatePool(lu, exceptions):
1099
  """Adjust the candidate pool after node operations.
1100

1101
  """
1102
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1103
  if mod_list:
1104
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1105
               utils.CommaJoin(node.name for node in mod_list))
1106
    for name in mod_list:
1107
      lu.context.ReaddNode(name)
1108
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1109
  if mc_now > mc_max:
1110
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1111
               (mc_now, mc_max))
1112

    
1113

    
1114
def _DecideSelfPromotion(lu, exceptions=None):
1115
  """Decide whether I should promote myself as a master candidate.
1116

1117
  """
1118
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1119
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1120
  # the new node will increase mc_max with one, so:
1121
  mc_should = min(mc_should + 1, cp_size)
1122
  return mc_now < mc_should
1123

    
1124

    
1125
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1126
  """Check that the brigdes needed by a list of nics exist.
1127

1128
  """
1129
  cluster = lu.cfg.GetClusterInfo()
1130
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1131
  brlist = [params[constants.NIC_LINK] for params in paramslist
1132
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1133
  if brlist:
1134
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1135
    result.Raise("Error checking bridges on destination node '%s'" %
1136
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1137

    
1138

    
1139
def _CheckInstanceBridgesExist(lu, instance, node=None):
1140
  """Check that the brigdes needed by an instance exist.
1141

1142
  """
1143
  if node is None:
1144
    node = instance.primary_node
1145
  _CheckNicsBridgesExist(lu, instance.nics, node)
1146

    
1147

    
1148
def _CheckOSVariant(os_obj, name):
1149
  """Check whether an OS name conforms to the os variants specification.
1150

1151
  @type os_obj: L{objects.OS}
1152
  @param os_obj: OS object to check
1153
  @type name: string
1154
  @param name: OS name passed by the user, to check for validity
1155

1156
  """
1157
  variant = objects.OS.GetVariant(name)
1158
  if not os_obj.supported_variants:
1159
    if variant:
1160
      raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
1161
                                 " passed)" % (os_obj.name, variant),
1162
                                 errors.ECODE_INVAL)
1163
    return
1164
  if not variant:
1165
    raise errors.OpPrereqError("OS name must include a variant",
1166
                               errors.ECODE_INVAL)
1167

    
1168
  if variant not in os_obj.supported_variants:
1169
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1170

    
1171

    
1172
def _GetNodeInstancesInner(cfg, fn):
1173
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1174

    
1175

    
1176
def _GetNodeInstances(cfg, node_name):
1177
  """Returns a list of all primary and secondary instances on a node.
1178

1179
  """
1180

    
1181
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1182

    
1183

    
1184
def _GetNodePrimaryInstances(cfg, node_name):
1185
  """Returns primary instances on a node.
1186

1187
  """
1188
  return _GetNodeInstancesInner(cfg,
1189
                                lambda inst: node_name == inst.primary_node)
1190

    
1191

    
1192
def _GetNodeSecondaryInstances(cfg, node_name):
1193
  """Returns secondary instances on a node.
1194

1195
  """
1196
  return _GetNodeInstancesInner(cfg,
1197
                                lambda inst: node_name in inst.secondary_nodes)
1198

    
1199

    
1200
def _GetStorageTypeArgs(cfg, storage_type):
1201
  """Returns the arguments for a storage type.
1202

1203
  """
1204
  # Special case for file storage
1205
  if storage_type == constants.ST_FILE:
1206
    # storage.FileStorage wants a list of storage directories
1207
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1208

    
1209
  return []
1210

    
1211

    
1212
def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
1213
  faulty = []
1214

    
1215
  for dev in instance.disks:
1216
    cfg.SetDiskID(dev, node_name)
1217

    
1218
  result = rpc_runner.call_blockdev_getmirrorstatus(node_name, instance.disks)
1219
  result.Raise("Failed to get disk status from node %s" % node_name,
1220
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1221

    
1222
  for idx, bdev_status in enumerate(result.payload):
1223
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1224
      faulty.append(idx)
1225

    
1226
  return faulty
1227

    
1228

    
1229
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1230
  """Check the sanity of iallocator and node arguments and use the
1231
  cluster-wide iallocator if appropriate.
1232

1233
  Check that at most one of (iallocator, node) is specified. If none is
1234
  specified, then the LU's opcode's iallocator slot is filled with the
1235
  cluster-wide default iallocator.
1236

1237
  @type iallocator_slot: string
1238
  @param iallocator_slot: the name of the opcode iallocator slot
1239
  @type node_slot: string
1240
  @param node_slot: the name of the opcode target node slot
1241

1242
  """
1243
  node = getattr(lu.op, node_slot, None)
1244
  iallocator = getattr(lu.op, iallocator_slot, None)
1245

    
1246
  if node is not None and iallocator is not None:
1247
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
1248
                               errors.ECODE_INVAL)
1249
  elif node is None and iallocator is None:
1250
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1251
    if default_iallocator:
1252
      setattr(lu.op, iallocator_slot, default_iallocator)
1253
    else:
1254
      raise errors.OpPrereqError("No iallocator or node given and no"
1255
                                 " cluster-wide default iallocator found;"
1256
                                 " please specify either an iallocator or a"
1257
                                 " node, or set a cluster-wide default"
1258
                                 " iallocator")
1259

    
1260

    
1261
def _GetDefaultIAllocator(cfg, iallocator):
1262
  """Decides on which iallocator to use.
1263

1264
  @type cfg: L{config.ConfigWriter}
1265
  @param cfg: Cluster configuration object
1266
  @type iallocator: string or None
1267
  @param iallocator: Iallocator specified in opcode
1268
  @rtype: string
1269
  @return: Iallocator name
1270

1271
  """
1272
  if not iallocator:
1273
    # Use default iallocator
1274
    iallocator = cfg.GetDefaultIAllocator()
1275

    
1276
  if not iallocator:
1277
    raise errors.OpPrereqError("No iallocator was specified, neither in the"
1278
                               " opcode nor as a cluster-wide default",
1279
                               errors.ECODE_INVAL)
1280

    
1281
  return iallocator
1282

    
1283

    
1284
class LUClusterPostInit(LogicalUnit):
1285
  """Logical unit for running hooks after cluster initialization.
1286

1287
  """
1288
  HPATH = "cluster-init"
1289
  HTYPE = constants.HTYPE_CLUSTER
1290

    
1291
  def BuildHooksEnv(self):
1292
    """Build hooks env.
1293

1294
    """
1295
    return {
1296
      "OP_TARGET": self.cfg.GetClusterName(),
1297
      }
1298

    
1299
  def BuildHooksNodes(self):
1300
    """Build hooks nodes.
1301

1302
    """
1303
    return ([], [self.cfg.GetMasterNode()])
1304

    
1305
  def Exec(self, feedback_fn):
1306
    """Nothing to do.
1307

1308
    """
1309
    return True
1310

    
1311

    
1312
class LUClusterDestroy(LogicalUnit):
1313
  """Logical unit for destroying the cluster.
1314

1315
  """
1316
  HPATH = "cluster-destroy"
1317
  HTYPE = constants.HTYPE_CLUSTER
1318

    
1319
  def BuildHooksEnv(self):
1320
    """Build hooks env.
1321

1322
    """
1323
    return {
1324
      "OP_TARGET": self.cfg.GetClusterName(),
1325
      }
1326

    
1327
  def BuildHooksNodes(self):
1328
    """Build hooks nodes.
1329

1330
    """
1331
    return ([], [])
1332

    
1333
  def CheckPrereq(self):
1334
    """Check prerequisites.
1335

1336
    This checks whether the cluster is empty.
1337

1338
    Any errors are signaled by raising errors.OpPrereqError.
1339

1340
    """
1341
    master = self.cfg.GetMasterNode()
1342

    
1343
    nodelist = self.cfg.GetNodeList()
1344
    if len(nodelist) != 1 or nodelist[0] != master:
1345
      raise errors.OpPrereqError("There are still %d node(s) in"
1346
                                 " this cluster." % (len(nodelist) - 1),
1347
                                 errors.ECODE_INVAL)
1348
    instancelist = self.cfg.GetInstanceList()
1349
    if instancelist:
1350
      raise errors.OpPrereqError("There are still %d instance(s) in"
1351
                                 " this cluster." % len(instancelist),
1352
                                 errors.ECODE_INVAL)
1353

    
1354
  def Exec(self, feedback_fn):
1355
    """Destroys the cluster.
1356

1357
    """
1358
    (master, ip, dev, netmask, _) = self.cfg.GetMasterNetworkParameters()
1359

    
1360
    # Run post hooks on master node before it's removed
1361
    _RunPostHook(self, master)
1362

    
1363
    result = self.rpc.call_node_deactivate_master_ip(master, ip, netmask, dev)
1364
    result.Raise("Could not disable the master role")
1365

    
1366
    return master
1367

    
1368

    
1369
def _VerifyCertificate(filename):
1370
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1371

1372
  @type filename: string
1373
  @param filename: Path to PEM file
1374

1375
  """
1376
  try:
1377
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1378
                                           utils.ReadFile(filename))
1379
  except Exception, err: # pylint: disable=W0703
1380
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1381
            "Failed to load X509 certificate %s: %s" % (filename, err))
1382

    
1383
  (errcode, msg) = \
1384
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1385
                                constants.SSL_CERT_EXPIRATION_ERROR)
1386

    
1387
  if msg:
1388
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1389
  else:
1390
    fnamemsg = None
1391

    
1392
  if errcode is None:
1393
    return (None, fnamemsg)
1394
  elif errcode == utils.CERT_WARNING:
1395
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1396
  elif errcode == utils.CERT_ERROR:
1397
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1398

    
1399
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1400

    
1401

    
1402
def _GetAllHypervisorParameters(cluster, instances):
1403
  """Compute the set of all hypervisor parameters.
1404

1405
  @type cluster: L{objects.Cluster}
1406
  @param cluster: the cluster object
1407
  @param instances: list of L{objects.Instance}
1408
  @param instances: additional instances from which to obtain parameters
1409
  @rtype: list of (origin, hypervisor, parameters)
1410
  @return: a list with all parameters found, indicating the hypervisor they
1411
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1412

1413
  """
1414
  hvp_data = []
1415

    
1416
  for hv_name in cluster.enabled_hypervisors:
1417
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1418

    
1419
  for os_name, os_hvp in cluster.os_hvp.items():
1420
    for hv_name, hv_params in os_hvp.items():
1421
      if hv_params:
1422
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1423
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1424

    
1425
  # TODO: collapse identical parameter values in a single one
1426
  for instance in instances:
1427
    if instance.hvparams:
1428
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1429
                       cluster.FillHV(instance)))
1430

    
1431
  return hvp_data
1432

    
1433

    
1434
class _VerifyErrors(object):
1435
  """Mix-in for cluster/group verify LUs.
1436

1437
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1438
  self.op and self._feedback_fn to be available.)
1439

1440
  """
1441

    
1442
  ETYPE_FIELD = "code"
1443
  ETYPE_ERROR = "ERROR"
1444
  ETYPE_WARNING = "WARNING"
1445

    
1446
  def _Error(self, ecode, item, msg, *args, **kwargs):
1447
    """Format an error message.
1448

1449
    Based on the opcode's error_codes parameter, either format a
1450
    parseable error code, or a simpler error string.
1451

1452
    This must be called only from Exec and functions called from Exec.
1453

1454
    """
1455
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1456
    itype, etxt, _ = ecode
1457
    # first complete the msg
1458
    if args:
1459
      msg = msg % args
1460
    # then format the whole message
1461
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1462
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1463
    else:
1464
      if item:
1465
        item = " " + item
1466
      else:
1467
        item = ""
1468
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1469
    # and finally report it via the feedback_fn
1470
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1471

    
1472
  def _ErrorIf(self, cond, ecode, *args, **kwargs):
1473
    """Log an error message if the passed condition is True.
1474

1475
    """
1476
    cond = (bool(cond)
1477
            or self.op.debug_simulate_errors) # pylint: disable=E1101
1478

    
1479
    # If the error code is in the list of ignored errors, demote the error to a
1480
    # warning
1481
    (_, etxt, _) = ecode
1482
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1483
      kwargs[self.ETYPE_FIELD] = self.ETYPE_WARNING
1484

    
1485
    if cond:
1486
      self._Error(ecode, *args, **kwargs)
1487

    
1488
    # do not mark the operation as failed for WARN cases only
1489
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1490
      self.bad = self.bad or cond
1491

    
1492

    
1493
class LUClusterVerify(NoHooksLU):
1494
  """Submits all jobs necessary to verify the cluster.
1495

1496
  """
1497
  REQ_BGL = False
1498

    
1499
  def ExpandNames(self):
1500
    self.needed_locks = {}
1501

    
1502
  def Exec(self, feedback_fn):
1503
    jobs = []
1504

    
1505
    if self.op.group_name:
1506
      groups = [self.op.group_name]
1507
      depends_fn = lambda: None
1508
    else:
1509
      groups = self.cfg.GetNodeGroupList()
1510

    
1511
      # Verify global configuration
1512
      jobs.append([
1513
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors)
1514
        ])
1515

    
1516
      # Always depend on global verification
1517
      depends_fn = lambda: [(-len(jobs), [])]
1518

    
1519
    jobs.extend([opcodes.OpClusterVerifyGroup(group_name=group,
1520
                                            ignore_errors=self.op.ignore_errors,
1521
                                            depends=depends_fn())]
1522
                for group in groups)
1523

    
1524
    # Fix up all parameters
1525
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1526
      op.debug_simulate_errors = self.op.debug_simulate_errors
1527
      op.verbose = self.op.verbose
1528
      op.error_codes = self.op.error_codes
1529
      try:
1530
        op.skip_checks = self.op.skip_checks
1531
      except AttributeError:
1532
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1533

    
1534
    return ResultWithJobs(jobs)
1535

    
1536

    
1537
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1538
  """Verifies the cluster config.
1539

1540
  """
1541
  REQ_BGL = True
1542

    
1543
  def _VerifyHVP(self, hvp_data):
1544
    """Verifies locally the syntax of the hypervisor parameters.
1545

1546
    """
1547
    for item, hv_name, hv_params in hvp_data:
1548
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1549
             (item, hv_name))
1550
      try:
1551
        hv_class = hypervisor.GetHypervisor(hv_name)
1552
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1553
        hv_class.CheckParameterSyntax(hv_params)
1554
      except errors.GenericError, err:
1555
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1556

    
1557
  def ExpandNames(self):
1558
    # Information can be safely retrieved as the BGL is acquired in exclusive
1559
    # mode
1560
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
1561
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1562
    self.all_node_info = self.cfg.GetAllNodesInfo()
1563
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1564
    self.needed_locks = {}
1565

    
1566
  def Exec(self, feedback_fn):
1567
    """Verify integrity of cluster, performing various test on nodes.
1568

1569
    """
1570
    self.bad = False
1571
    self._feedback_fn = feedback_fn
1572

    
1573
    feedback_fn("* Verifying cluster config")
1574

    
1575
    for msg in self.cfg.VerifyConfig():
1576
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1577

    
1578
    feedback_fn("* Verifying cluster certificate files")
1579

    
1580
    for cert_filename in constants.ALL_CERT_FILES:
1581
      (errcode, msg) = _VerifyCertificate(cert_filename)
1582
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1583

    
1584
    feedback_fn("* Verifying hypervisor parameters")
1585

    
1586
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1587
                                                self.all_inst_info.values()))
1588

    
1589
    feedback_fn("* Verifying all nodes belong to an existing group")
1590

    
1591
    # We do this verification here because, should this bogus circumstance
1592
    # occur, it would never be caught by VerifyGroup, which only acts on
1593
    # nodes/instances reachable from existing node groups.
1594

    
1595
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1596
                         if node.group not in self.all_group_info)
1597

    
1598
    dangling_instances = {}
1599
    no_node_instances = []
1600

    
1601
    for inst in self.all_inst_info.values():
1602
      if inst.primary_node in dangling_nodes:
1603
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1604
      elif inst.primary_node not in self.all_node_info:
1605
        no_node_instances.append(inst.name)
1606

    
1607
    pretty_dangling = [
1608
        "%s (%s)" %
1609
        (node.name,
1610
         utils.CommaJoin(dangling_instances.get(node.name,
1611
                                                ["no instances"])))
1612
        for node in dangling_nodes]
1613

    
1614
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1615
                  None,
1616
                  "the following nodes (and their instances) belong to a non"
1617
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1618

    
1619
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1620
                  None,
1621
                  "the following instances have a non-existing primary-node:"
1622
                  " %s", utils.CommaJoin(no_node_instances))
1623

    
1624
    return not self.bad
1625

    
1626

    
1627
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1628
  """Verifies the status of a node group.
1629

1630
  """
1631
  HPATH = "cluster-verify"
1632
  HTYPE = constants.HTYPE_CLUSTER
1633
  REQ_BGL = False
1634

    
1635
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1636

    
1637
  class NodeImage(object):
1638
    """A class representing the logical and physical status of a node.
1639

1640
    @type name: string
1641
    @ivar name: the node name to which this object refers
1642
    @ivar volumes: a structure as returned from
1643
        L{ganeti.backend.GetVolumeList} (runtime)
1644
    @ivar instances: a list of running instances (runtime)
1645
    @ivar pinst: list of configured primary instances (config)
1646
    @ivar sinst: list of configured secondary instances (config)
1647
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1648
        instances for which this node is secondary (config)
1649
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1650
    @ivar dfree: free disk, as reported by the node (runtime)
1651
    @ivar offline: the offline status (config)
1652
    @type rpc_fail: boolean
1653
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1654
        not whether the individual keys were correct) (runtime)
1655
    @type lvm_fail: boolean
1656
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1657
    @type hyp_fail: boolean
1658
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1659
    @type ghost: boolean
1660
    @ivar ghost: whether this is a known node or not (config)
1661
    @type os_fail: boolean
1662
    @ivar os_fail: whether the RPC call didn't return valid OS data
1663
    @type oslist: list
1664
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1665
    @type vm_capable: boolean
1666
    @ivar vm_capable: whether the node can host instances
1667

1668
    """
1669
    def __init__(self, offline=False, name=None, vm_capable=True):
1670
      self.name = name
1671
      self.volumes = {}
1672
      self.instances = []
1673
      self.pinst = []
1674
      self.sinst = []
1675
      self.sbp = {}
1676
      self.mfree = 0
1677
      self.dfree = 0
1678
      self.offline = offline
1679
      self.vm_capable = vm_capable
1680
      self.rpc_fail = False
1681
      self.lvm_fail = False
1682
      self.hyp_fail = False
1683
      self.ghost = False
1684
      self.os_fail = False
1685
      self.oslist = {}
1686

    
1687
  def ExpandNames(self):
1688
    # This raises errors.OpPrereqError on its own:
1689
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1690

    
1691
    # Get instances in node group; this is unsafe and needs verification later
1692
    inst_names = self.cfg.GetNodeGroupInstances(self.group_uuid)
1693

    
1694
    self.needed_locks = {
1695
      locking.LEVEL_INSTANCE: inst_names,
1696
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1697
      locking.LEVEL_NODE: [],
1698
      }
1699

    
1700
    self.share_locks = _ShareAll()
1701

    
1702
  def DeclareLocks(self, level):
1703
    if level == locking.LEVEL_NODE:
1704
      # Get members of node group; this is unsafe and needs verification later
1705
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1706

    
1707
      all_inst_info = self.cfg.GetAllInstancesInfo()
1708

    
1709
      # In Exec(), we warn about mirrored instances that have primary and
1710
      # secondary living in separate node groups. To fully verify that
1711
      # volumes for these instances are healthy, we will need to do an
1712
      # extra call to their secondaries. We ensure here those nodes will
1713
      # be locked.
1714
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1715
        # Important: access only the instances whose lock is owned
1716
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1717
          nodes.update(all_inst_info[inst].secondary_nodes)
1718

    
1719
      self.needed_locks[locking.LEVEL_NODE] = nodes
1720

    
1721
  def CheckPrereq(self):
1722
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1723
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1724

    
1725
    group_nodes = set(self.group_info.members)
1726
    group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
1727

    
1728
    unlocked_nodes = \
1729
        group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1730

    
1731
    unlocked_instances = \
1732
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1733

    
1734
    if unlocked_nodes:
1735
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1736
                                 utils.CommaJoin(unlocked_nodes))
1737

    
1738
    if unlocked_instances:
1739
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1740
                                 utils.CommaJoin(unlocked_instances))
1741

    
1742
    self.all_node_info = self.cfg.GetAllNodesInfo()
1743
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1744

    
1745
    self.my_node_names = utils.NiceSort(group_nodes)
1746
    self.my_inst_names = utils.NiceSort(group_instances)
1747

    
1748
    self.my_node_info = dict((name, self.all_node_info[name])
1749
                             for name in self.my_node_names)
1750

    
1751
    self.my_inst_info = dict((name, self.all_inst_info[name])
1752
                             for name in self.my_inst_names)
1753

    
1754
    # We detect here the nodes that will need the extra RPC calls for verifying
1755
    # split LV volumes; they should be locked.
1756
    extra_lv_nodes = set()
1757

    
1758
    for inst in self.my_inst_info.values():
1759
      if inst.disk_template in constants.DTS_INT_MIRROR:
1760
        group = self.my_node_info[inst.primary_node].group
1761
        for nname in inst.secondary_nodes:
1762
          if self.all_node_info[nname].group != group:
1763
            extra_lv_nodes.add(nname)
1764

    
1765
    unlocked_lv_nodes = \
1766
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1767

    
1768
    if unlocked_lv_nodes:
1769
      raise errors.OpPrereqError("these nodes could be locked: %s" %
1770
                                 utils.CommaJoin(unlocked_lv_nodes))
1771
    self.extra_lv_nodes = list(extra_lv_nodes)
1772

    
1773
  def _VerifyNode(self, ninfo, nresult):
1774
    """Perform some basic validation on data returned from a node.
1775

1776
      - check the result data structure is well formed and has all the
1777
        mandatory fields
1778
      - check ganeti version
1779

1780
    @type ninfo: L{objects.Node}
1781
    @param ninfo: the node to check
1782
    @param nresult: the results from the node
1783
    @rtype: boolean
1784
    @return: whether overall this call was successful (and we can expect
1785
         reasonable values in the respose)
1786

1787
    """
1788
    node = ninfo.name
1789
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1790

    
1791
    # main result, nresult should be a non-empty dict
1792
    test = not nresult or not isinstance(nresult, dict)
1793
    _ErrorIf(test, constants.CV_ENODERPC, node,
1794
                  "unable to verify node: no data returned")
1795
    if test:
1796
      return False
1797

    
1798
    # compares ganeti version
1799
    local_version = constants.PROTOCOL_VERSION
1800
    remote_version = nresult.get("version", None)
1801
    test = not (remote_version and
1802
                isinstance(remote_version, (list, tuple)) and
1803
                len(remote_version) == 2)
1804
    _ErrorIf(test, constants.CV_ENODERPC, node,
1805
             "connection to node returned invalid data")
1806
    if test:
1807
      return False
1808

    
1809
    test = local_version != remote_version[0]
1810
    _ErrorIf(test, constants.CV_ENODEVERSION, node,
1811
             "incompatible protocol versions: master %s,"
1812
             " node %s", local_version, remote_version[0])
1813
    if test:
1814
      return False
1815

    
1816
    # node seems compatible, we can actually try to look into its results
1817

    
1818
    # full package version
1819
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1820
                  constants.CV_ENODEVERSION, node,
1821
                  "software version mismatch: master %s, node %s",
1822
                  constants.RELEASE_VERSION, remote_version[1],
1823
                  code=self.ETYPE_WARNING)
1824

    
1825
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1826
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1827
      for hv_name, hv_result in hyp_result.iteritems():
1828
        test = hv_result is not None
1829
        _ErrorIf(test, constants.CV_ENODEHV, node,
1830
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1831

    
1832
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1833
    if ninfo.vm_capable and isinstance(hvp_result, list):
1834
      for item, hv_name, hv_result in hvp_result:
1835
        _ErrorIf(True, constants.CV_ENODEHV, node,
1836
                 "hypervisor %s parameter verify failure (source %s): %s",
1837
                 hv_name, item, hv_result)
1838

    
1839
    test = nresult.get(constants.NV_NODESETUP,
1840
                       ["Missing NODESETUP results"])
1841
    _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1842
             "; ".join(test))
1843

    
1844
    return True
1845

    
1846
  def _VerifyNodeTime(self, ninfo, nresult,
1847
                      nvinfo_starttime, nvinfo_endtime):
1848
    """Check the node time.
1849

1850
    @type ninfo: L{objects.Node}
1851
    @param ninfo: the node to check
1852
    @param nresult: the remote results for the node
1853
    @param nvinfo_starttime: the start time of the RPC call
1854
    @param nvinfo_endtime: the end time of the RPC call
1855

1856
    """
1857
    node = ninfo.name
1858
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1859

    
1860
    ntime = nresult.get(constants.NV_TIME, None)
1861
    try:
1862
      ntime_merged = utils.MergeTime(ntime)
1863
    except (ValueError, TypeError):
1864
      _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1865
      return
1866

    
1867
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1868
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1869
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1870
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1871
    else:
1872
      ntime_diff = None
1873

    
1874
    _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1875
             "Node time diverges by at least %s from master node time",
1876
             ntime_diff)
1877

    
1878
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1879
    """Check the node LVM results.
1880

1881
    @type ninfo: L{objects.Node}
1882
    @param ninfo: the node to check
1883
    @param nresult: the remote results for the node
1884
    @param vg_name: the configured VG name
1885

1886
    """
1887
    if vg_name is None:
1888
      return
1889

    
1890
    node = ninfo.name
1891
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1892

    
1893
    # checks vg existence and size > 20G
1894
    vglist = nresult.get(constants.NV_VGLIST, None)
1895
    test = not vglist
1896
    _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1897
    if not test:
1898
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1899
                                            constants.MIN_VG_SIZE)
1900
      _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1901

    
1902
    # check pv names
1903
    pvlist = nresult.get(constants.NV_PVLIST, None)
1904
    test = pvlist is None
1905
    _ErrorIf(test, constants.CV_ENODELVM, node, "Can't get PV list from node")
1906
    if not test:
1907
      # check that ':' is not present in PV names, since it's a
1908
      # special character for lvcreate (denotes the range of PEs to
1909
      # use on the PV)
1910
      for _, pvname, owner_vg in pvlist:
1911
        test = ":" in pvname
1912
        _ErrorIf(test, constants.CV_ENODELVM, node,
1913
                 "Invalid character ':' in PV '%s' of VG '%s'",
1914
                 pvname, owner_vg)
1915

    
1916
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1917
    """Check the node bridges.
1918

1919
    @type ninfo: L{objects.Node}
1920
    @param ninfo: the node to check
1921
    @param nresult: the remote results for the node
1922
    @param bridges: the expected list of bridges
1923

1924
    """
1925
    if not bridges:
1926
      return
1927

    
1928
    node = ninfo.name
1929
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1930

    
1931
    missing = nresult.get(constants.NV_BRIDGES, None)
1932
    test = not isinstance(missing, list)
1933
    _ErrorIf(test, constants.CV_ENODENET, node,
1934
             "did not return valid bridge information")
1935
    if not test:
1936
      _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1937
               "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1938

    
1939
  def _VerifyNodeNetwork(self, ninfo, nresult):
1940
    """Check the node network connectivity results.
1941

1942
    @type ninfo: L{objects.Node}
1943
    @param ninfo: the node to check
1944
    @param nresult: the remote results for the node
1945

1946
    """
1947
    node = ninfo.name
1948
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1949

    
1950
    test = constants.NV_NODELIST not in nresult
1951
    _ErrorIf(test, constants.CV_ENODESSH, node,
1952
             "node hasn't returned node ssh connectivity data")
1953
    if not test:
1954
      if nresult[constants.NV_NODELIST]:
1955
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1956
          _ErrorIf(True, constants.CV_ENODESSH, node,
1957
                   "ssh communication with node '%s': %s", a_node, a_msg)
1958

    
1959
    test = constants.NV_NODENETTEST not in nresult
1960
    _ErrorIf(test, constants.CV_ENODENET, node,
1961
             "node hasn't returned node tcp connectivity data")
1962
    if not test:
1963
      if nresult[constants.NV_NODENETTEST]:
1964
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1965
        for anode in nlist:
1966
          _ErrorIf(True, constants.CV_ENODENET, node,
1967
                   "tcp communication with node '%s': %s",
1968
                   anode, nresult[constants.NV_NODENETTEST][anode])
1969

    
1970
    test = constants.NV_MASTERIP not in nresult
1971
    _ErrorIf(test, constants.CV_ENODENET, node,
1972
             "node hasn't returned node master IP reachability data")
1973
    if not test:
1974
      if not nresult[constants.NV_MASTERIP]:
1975
        if node == self.master_node:
1976
          msg = "the master node cannot reach the master IP (not configured?)"
1977
        else:
1978
          msg = "cannot reach the master IP"
1979
        _ErrorIf(True, constants.CV_ENODENET, node, msg)
1980

    
1981
  def _VerifyInstance(self, instance, instanceconfig, node_image,
1982
                      diskstatus):
1983
    """Verify an instance.
1984

1985
    This function checks to see if the required block devices are
1986
    available on the instance's node.
1987

1988
    """
1989
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1990
    node_current = instanceconfig.primary_node
1991

    
1992
    node_vol_should = {}
1993
    instanceconfig.MapLVsByNode(node_vol_should)
1994

    
1995
    for node in node_vol_should:
1996
      n_img = node_image[node]
1997
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1998
        # ignore missing volumes on offline or broken nodes
1999
        continue
2000
      for volume in node_vol_should[node]:
2001
        test = volume not in n_img.volumes
2002
        _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
2003
                 "volume %s missing on node %s", volume, node)
2004

    
2005
    if instanceconfig.admin_up:
2006
      pri_img = node_image[node_current]
2007
      test = instance not in pri_img.instances and not pri_img.offline
2008
      _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
2009
               "instance not running on its primary node %s",
2010
               node_current)
2011

    
2012
    diskdata = [(nname, success, status, idx)
2013
                for (nname, disks) in diskstatus.items()
2014
                for idx, (success, status) in enumerate(disks)]
2015

    
2016
    for nname, success, bdev_status, idx in diskdata:
2017
      # the 'ghost node' construction in Exec() ensures that we have a
2018
      # node here
2019
      snode = node_image[nname]
2020
      bad_snode = snode.ghost or snode.offline
2021
      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
2022
               constants.CV_EINSTANCEFAULTYDISK, instance,
2023
               "couldn't retrieve status for disk/%s on %s: %s",
2024
               idx, nname, bdev_status)
2025
      _ErrorIf((instanceconfig.admin_up and success and
2026
                bdev_status.ldisk_status == constants.LDS_FAULTY),
2027
               constants.CV_EINSTANCEFAULTYDISK, instance,
2028
               "disk/%s on %s is faulty", idx, nname)
2029

    
2030
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2031
    """Verify if there are any unknown volumes in the cluster.
2032

2033
    The .os, .swap and backup volumes are ignored. All other volumes are
2034
    reported as unknown.
2035

2036
    @type reserved: L{ganeti.utils.FieldSet}
2037
    @param reserved: a FieldSet of reserved volume names
2038

2039
    """
2040
    for node, n_img in node_image.items():
2041
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2042
        # skip non-healthy nodes
2043
        continue
2044
      for volume in n_img.volumes:
2045
        test = ((node not in node_vol_should or
2046
                volume not in node_vol_should[node]) and
2047
                not reserved.Matches(volume))
2048
        self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
2049
                      "volume %s is unknown", volume)
2050

    
2051
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
2052
    """Verify N+1 Memory Resilience.
2053

2054
    Check that if one single node dies we can still start all the
2055
    instances it was primary for.
2056

2057
    """
2058
    cluster_info = self.cfg.GetClusterInfo()
2059
    for node, n_img in node_image.items():
2060
      # This code checks that every node which is now listed as
2061
      # secondary has enough memory to host all instances it is
2062
      # supposed to should a single other node in the cluster fail.
2063
      # FIXME: not ready for failover to an arbitrary node
2064
      # FIXME: does not support file-backed instances
2065
      # WARNING: we currently take into account down instances as well
2066
      # as up ones, considering that even if they're down someone
2067
      # might want to start them even in the event of a node failure.
2068
      if n_img.offline:
2069
        # we're skipping offline nodes from the N+1 warning, since
2070
        # most likely we don't have good memory infromation from them;
2071
        # we already list instances living on such nodes, and that's
2072
        # enough warning
2073
        continue
2074
      for prinode, instances in n_img.sbp.items():
2075
        needed_mem = 0
2076
        for instance in instances:
2077
          bep = cluster_info.FillBE(instance_cfg[instance])
2078
          if bep[constants.BE_AUTO_BALANCE]:
2079
            needed_mem += bep[constants.BE_MEMORY]
2080
        test = n_img.mfree < needed_mem
2081
        self._ErrorIf(test, constants.CV_ENODEN1, node,
2082
                      "not enough memory to accomodate instance failovers"
2083
                      " should node %s fail (%dMiB needed, %dMiB available)",
2084
                      prinode, needed_mem, n_img.mfree)
2085

    
2086
  @classmethod
2087
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
2088
                   (files_all, files_opt, files_mc, files_vm)):
2089
    """Verifies file checksums collected from all nodes.
2090

2091
    @param errorif: Callback for reporting errors
2092
    @param nodeinfo: List of L{objects.Node} objects
2093
    @param master_node: Name of master node
2094
    @param all_nvinfo: RPC results
2095

2096
    """
2097
    # Define functions determining which nodes to consider for a file
2098
    files2nodefn = [
2099
      (files_all, None),
2100
      (files_mc, lambda node: (node.master_candidate or
2101
                               node.name == master_node)),
2102
      (files_vm, lambda node: node.vm_capable),
2103
      ]
2104

    
2105
    # Build mapping from filename to list of nodes which should have the file
2106
    nodefiles = {}
2107
    for (files, fn) in files2nodefn:
2108
      if fn is None:
2109
        filenodes = nodeinfo
2110
      else:
2111
        filenodes = filter(fn, nodeinfo)
2112
      nodefiles.update((filename,
2113
                        frozenset(map(operator.attrgetter("name"), filenodes)))
2114
                       for filename in files)
2115

    
2116
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2117

    
2118
    fileinfo = dict((filename, {}) for filename in nodefiles)
2119
    ignore_nodes = set()
2120

    
2121
    for node in nodeinfo:
2122
      if node.offline:
2123
        ignore_nodes.add(node.name)
2124
        continue
2125

    
2126
      nresult = all_nvinfo[node.name]
2127

    
2128
      if nresult.fail_msg or not nresult.payload:
2129
        node_files = None
2130
      else:
2131
        node_files = nresult.payload.get(constants.NV_FILELIST, None)
2132

    
2133
      test = not (node_files and isinstance(node_files, dict))
2134
      errorif(test, constants.CV_ENODEFILECHECK, node.name,
2135
              "Node did not return file checksum data")
2136
      if test:
2137
        ignore_nodes.add(node.name)
2138
        continue
2139

    
2140
      # Build per-checksum mapping from filename to nodes having it
2141
      for (filename, checksum) in node_files.items():
2142
        assert filename in nodefiles
2143
        fileinfo[filename].setdefault(checksum, set()).add(node.name)
2144

    
2145
    for (filename, checksums) in fileinfo.items():
2146
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2147

    
2148
      # Nodes having the file
2149
      with_file = frozenset(node_name
2150
                            for nodes in fileinfo[filename].values()
2151
                            for node_name in nodes) - ignore_nodes
2152

    
2153
      expected_nodes = nodefiles[filename] - ignore_nodes
2154

    
2155
      # Nodes missing file
2156
      missing_file = expected_nodes - with_file
2157

    
2158
      if filename in files_opt:
2159
        # All or no nodes
2160
        errorif(missing_file and missing_file != expected_nodes,
2161
                constants.CV_ECLUSTERFILECHECK, None,
2162
                "File %s is optional, but it must exist on all or no"
2163
                " nodes (not found on %s)",
2164
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2165
      else:
2166
        errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2167
                "File %s is missing from node(s) %s", filename,
2168
                utils.CommaJoin(utils.NiceSort(missing_file)))
2169

    
2170
        # Warn if a node has a file it shouldn't
2171
        unexpected = with_file - expected_nodes
2172
        errorif(unexpected,
2173
                constants.CV_ECLUSTERFILECHECK, None,
2174
                "File %s should not exist on node(s) %s",
2175
                filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2176

    
2177
      # See if there are multiple versions of the file
2178
      test = len(checksums) > 1
2179
      if test:
2180
        variants = ["variant %s on %s" %
2181
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2182
                    for (idx, (checksum, nodes)) in
2183
                      enumerate(sorted(checksums.items()))]
2184
      else:
2185
        variants = []
2186

    
2187
      errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2188
              "File %s found with %s different checksums (%s)",
2189
              filename, len(checksums), "; ".join(variants))
2190

    
2191
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2192
                      drbd_map):
2193
    """Verifies and the node DRBD status.
2194

2195
    @type ninfo: L{objects.Node}
2196
    @param ninfo: the node to check
2197
    @param nresult: the remote results for the node
2198
    @param instanceinfo: the dict of instances
2199
    @param drbd_helper: the configured DRBD usermode helper
2200
    @param drbd_map: the DRBD map as returned by
2201
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2202

2203
    """
2204
    node = ninfo.name
2205
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2206

    
2207
    if drbd_helper:
2208
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2209
      test = (helper_result == None)
2210
      _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2211
               "no drbd usermode helper returned")
2212
      if helper_result:
2213
        status, payload = helper_result
2214
        test = not status
2215
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2216
                 "drbd usermode helper check unsuccessful: %s", payload)
2217
        test = status and (payload != drbd_helper)
2218
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2219
                 "wrong drbd usermode helper: %s", payload)
2220

    
2221
    # compute the DRBD minors
2222
    node_drbd = {}
2223
    for minor, instance in drbd_map[node].items():
2224
      test = instance not in instanceinfo
2225
      _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2226
               "ghost instance '%s' in temporary DRBD map", instance)
2227
        # ghost instance should not be running, but otherwise we
2228
        # don't give double warnings (both ghost instance and
2229
        # unallocated minor in use)
2230
      if test:
2231
        node_drbd[minor] = (instance, False)
2232
      else:
2233
        instance = instanceinfo[instance]
2234
        node_drbd[minor] = (instance.name, instance.admin_up)
2235

    
2236
    # and now check them
2237
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2238
    test = not isinstance(used_minors, (tuple, list))
2239
    _ErrorIf(test, constants.CV_ENODEDRBD, node,
2240
             "cannot parse drbd status file: %s", str(used_minors))
2241
    if test:
2242
      # we cannot check drbd status
2243
      return
2244

    
2245
    for minor, (iname, must_exist) in node_drbd.items():
2246
      test = minor not in used_minors and must_exist
2247
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2248
               "drbd minor %d of instance %s is not active", minor, iname)
2249
    for minor in used_minors:
2250
      test = minor not in node_drbd
2251
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2252
               "unallocated drbd minor %d is in use", minor)
2253

    
2254
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2255
    """Builds the node OS structures.
2256

2257
    @type ninfo: L{objects.Node}
2258
    @param ninfo: the node to check
2259
    @param nresult: the remote results for the node
2260
    @param nimg: the node image object
2261

2262
    """
2263
    node = ninfo.name
2264
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2265

    
2266
    remote_os = nresult.get(constants.NV_OSLIST, None)
2267
    test = (not isinstance(remote_os, list) or
2268
            not compat.all(isinstance(v, list) and len(v) == 7
2269
                           for v in remote_os))
2270

    
2271
    _ErrorIf(test, constants.CV_ENODEOS, node,
2272
             "node hasn't returned valid OS data")
2273

    
2274
    nimg.os_fail = test
2275

    
2276
    if test:
2277
      return
2278

    
2279
    os_dict = {}
2280

    
2281
    for (name, os_path, status, diagnose,
2282
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2283

    
2284
      if name not in os_dict:
2285
        os_dict[name] = []
2286

    
2287
      # parameters is a list of lists instead of list of tuples due to
2288
      # JSON lacking a real tuple type, fix it:
2289
      parameters = [tuple(v) for v in parameters]
2290
      os_dict[name].append((os_path, status, diagnose,
2291
                            set(variants), set(parameters), set(api_ver)))
2292

    
2293
    nimg.oslist = os_dict
2294

    
2295
  def _VerifyNodeOS(self, ninfo, nimg, base):
2296
    """Verifies the node OS list.
2297

2298
    @type ninfo: L{objects.Node}
2299
    @param ninfo: the node to check
2300
    @param nimg: the node image object
2301
    @param base: the 'template' node we match against (e.g. from the master)
2302

2303
    """
2304
    node = ninfo.name
2305
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2306

    
2307
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2308

    
2309
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2310
    for os_name, os_data in nimg.oslist.items():
2311
      assert os_data, "Empty OS status for OS %s?!" % os_name
2312
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2313
      _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2314
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2315
      _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2316
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2317
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2318
      # comparisons with the 'base' image
2319
      test = os_name not in base.oslist
2320
      _ErrorIf(test, constants.CV_ENODEOS, node,
2321
               "Extra OS %s not present on reference node (%s)",
2322
               os_name, base.name)
2323
      if test:
2324
        continue
2325
      assert base.oslist[os_name], "Base node has empty OS status?"
2326
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2327
      if not b_status:
2328
        # base OS is invalid, skipping
2329
        continue
2330
      for kind, a, b in [("API version", f_api, b_api),
2331
                         ("variants list", f_var, b_var),
2332
                         ("parameters", beautify_params(f_param),
2333
                          beautify_params(b_param))]:
2334
        _ErrorIf(a != b, constants.CV_ENODEOS, node,
2335
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2336
                 kind, os_name, base.name,
2337
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2338

    
2339
    # check any missing OSes
2340
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2341
    _ErrorIf(missing, constants.CV_ENODEOS, node,
2342
             "OSes present on reference node %s but missing on this node: %s",
2343
             base.name, utils.CommaJoin(missing))
2344

    
2345
  def _VerifyOob(self, ninfo, nresult):
2346
    """Verifies out of band functionality of a node.
2347

2348
    @type ninfo: L{objects.Node}
2349
    @param ninfo: the node to check
2350
    @param nresult: the remote results for the node
2351

2352
    """
2353
    node = ninfo.name
2354
    # We just have to verify the paths on master and/or master candidates
2355
    # as the oob helper is invoked on the master
2356
    if ((ninfo.master_candidate or ninfo.master_capable) and
2357
        constants.NV_OOB_PATHS in nresult):
2358
      for path_result in nresult[constants.NV_OOB_PATHS]:
2359
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2360

    
2361
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2362
    """Verifies and updates the node volume data.
2363

2364
    This function will update a L{NodeImage}'s internal structures
2365
    with data from the remote call.
2366

2367
    @type ninfo: L{objects.Node}
2368
    @param ninfo: the node to check
2369
    @param nresult: the remote results for the node
2370
    @param nimg: the node image object
2371
    @param vg_name: the configured VG name
2372

2373
    """
2374
    node = ninfo.name
2375
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2376

    
2377
    nimg.lvm_fail = True
2378
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2379
    if vg_name is None:
2380
      pass
2381
    elif isinstance(lvdata, basestring):
2382
      _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2383
               utils.SafeEncode(lvdata))
2384
    elif not isinstance(lvdata, dict):
2385
      _ErrorIf(True, constants.CV_ENODELVM, node,
2386
               "rpc call to node failed (lvlist)")
2387
    else:
2388
      nimg.volumes = lvdata
2389
      nimg.lvm_fail = False
2390

    
2391
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2392
    """Verifies and updates the node instance list.
2393

2394
    If the listing was successful, then updates this node's instance
2395
    list. Otherwise, it marks the RPC call as failed for the instance
2396
    list key.
2397

2398
    @type ninfo: L{objects.Node}
2399
    @param ninfo: the node to check
2400
    @param nresult: the remote results for the node
2401
    @param nimg: the node image object
2402

2403
    """
2404
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2405
    test = not isinstance(idata, list)
2406
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2407
                  "rpc call to node failed (instancelist): %s",
2408
                  utils.SafeEncode(str(idata)))
2409
    if test:
2410
      nimg.hyp_fail = True
2411
    else:
2412
      nimg.instances = idata
2413

    
2414
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2415
    """Verifies and computes a node information map
2416

2417
    @type ninfo: L{objects.Node}
2418
    @param ninfo: the node to check
2419
    @param nresult: the remote results for the node
2420
    @param nimg: the node image object
2421
    @param vg_name: the configured VG name
2422

2423
    """
2424
    node = ninfo.name
2425
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2426

    
2427
    # try to read free memory (from the hypervisor)
2428
    hv_info = nresult.get(constants.NV_HVINFO, None)
2429
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2430
    _ErrorIf(test, constants.CV_ENODEHV, node,
2431
             "rpc call to node failed (hvinfo)")
2432
    if not test:
2433
      try:
2434
        nimg.mfree = int(hv_info["memory_free"])
2435
      except (ValueError, TypeError):
2436
        _ErrorIf(True, constants.CV_ENODERPC, node,
2437
                 "node returned invalid nodeinfo, check hypervisor")
2438

    
2439
    # FIXME: devise a free space model for file based instances as well
2440
    if vg_name is not None:
2441
      test = (constants.NV_VGLIST not in nresult or
2442
              vg_name not in nresult[constants.NV_VGLIST])
2443
      _ErrorIf(test, constants.CV_ENODELVM, node,
2444
               "node didn't return data for the volume group '%s'"
2445
               " - it is either missing or broken", vg_name)
2446
      if not test:
2447
        try:
2448
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2449
        except (ValueError, TypeError):
2450
          _ErrorIf(True, constants.CV_ENODERPC, node,
2451
                   "node returned invalid LVM info, check LVM status")
2452

    
2453
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2454
    """Gets per-disk status information for all instances.
2455

2456
    @type nodelist: list of strings
2457
    @param nodelist: Node names
2458
    @type node_image: dict of (name, L{objects.Node})
2459
    @param node_image: Node objects
2460
    @type instanceinfo: dict of (name, L{objects.Instance})
2461
    @param instanceinfo: Instance objects
2462
    @rtype: {instance: {node: [(succes, payload)]}}
2463
    @return: a dictionary of per-instance dictionaries with nodes as
2464
        keys and disk information as values; the disk information is a
2465
        list of tuples (success, payload)
2466

2467
    """
2468
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2469

    
2470
    node_disks = {}
2471
    node_disks_devonly = {}
2472
    diskless_instances = set()
2473
    diskless = constants.DT_DISKLESS
2474

    
2475
    for nname in nodelist:
2476
      node_instances = list(itertools.chain(node_image[nname].pinst,
2477
                                            node_image[nname].sinst))
2478
      diskless_instances.update(inst for inst in node_instances
2479
                                if instanceinfo[inst].disk_template == diskless)
2480
      disks = [(inst, disk)
2481
               for inst in node_instances
2482
               for disk in instanceinfo[inst].disks]
2483

    
2484
      if not disks:
2485
        # No need to collect data
2486
        continue
2487

    
2488
      node_disks[nname] = disks
2489

    
2490
      # Creating copies as SetDiskID below will modify the objects and that can
2491
      # lead to incorrect data returned from nodes
2492
      devonly = [dev.Copy() for (_, dev) in disks]
2493

    
2494
      for dev in devonly:
2495
        self.cfg.SetDiskID(dev, nname)
2496

    
2497
      node_disks_devonly[nname] = devonly
2498

    
2499
    assert len(node_disks) == len(node_disks_devonly)
2500

    
2501
    # Collect data from all nodes with disks
2502
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2503
                                                          node_disks_devonly)
2504

    
2505
    assert len(result) == len(node_disks)
2506

    
2507
    instdisk = {}
2508

    
2509
    for (nname, nres) in result.items():
2510
      disks = node_disks[nname]
2511

    
2512
      if nres.offline:
2513
        # No data from this node
2514
        data = len(disks) * [(False, "node offline")]
2515
      else:
2516
        msg = nres.fail_msg
2517
        _ErrorIf(msg, constants.CV_ENODERPC, nname,
2518
                 "while getting disk information: %s", msg)
2519
        if msg:
2520
          # No data from this node
2521
          data = len(disks) * [(False, msg)]
2522
        else:
2523
          data = []
2524
          for idx, i in enumerate(nres.payload):
2525
            if isinstance(i, (tuple, list)) and len(i) == 2:
2526
              data.append(i)
2527
            else:
2528
              logging.warning("Invalid result from node %s, entry %d: %s",
2529
                              nname, idx, i)
2530
              data.append((False, "Invalid result from the remote node"))
2531

    
2532
      for ((inst, _), status) in zip(disks, data):
2533
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2534

    
2535
    # Add empty entries for diskless instances.
2536
    for inst in diskless_instances:
2537
      assert inst not in instdisk
2538
      instdisk[inst] = {}
2539

    
2540
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2541
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2542
                      compat.all(isinstance(s, (tuple, list)) and
2543
                                 len(s) == 2 for s in statuses)
2544
                      for inst, nnames in instdisk.items()
2545
                      for nname, statuses in nnames.items())
2546
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2547

    
2548
    return instdisk
2549

    
2550
  @staticmethod
2551
  def _SshNodeSelector(group_uuid, all_nodes):
2552
    """Create endless iterators for all potential SSH check hosts.
2553

2554
    """
2555
    nodes = [node for node in all_nodes
2556
             if (node.group != group_uuid and
2557
                 not node.offline)]
2558
    keyfunc = operator.attrgetter("group")
2559

    
2560
    return map(itertools.cycle,
2561
               [sorted(map(operator.attrgetter("name"), names))
2562
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2563
                                                  keyfunc)])
2564

    
2565
  @classmethod
2566
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2567
    """Choose which nodes should talk to which other nodes.
2568

2569
    We will make nodes contact all nodes in their group, and one node from
2570
    every other group.
2571

2572
    @warning: This algorithm has a known issue if one node group is much
2573
      smaller than others (e.g. just one node). In such a case all other
2574
      nodes will talk to the single node.
2575

2576
    """
2577
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2578
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2579

    
2580
    return (online_nodes,
2581
            dict((name, sorted([i.next() for i in sel]))
2582
                 for name in online_nodes))
2583

    
2584
  def BuildHooksEnv(self):
2585
    """Build hooks env.
2586

2587
    Cluster-Verify hooks just ran in the post phase and their failure makes
2588
    the output be logged in the verify output and the verification to fail.
2589

2590
    """
2591
    env = {
2592
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2593
      }
2594

    
2595
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2596
               for node in self.my_node_info.values())
2597

    
2598
    return env
2599

    
2600
  def BuildHooksNodes(self):
2601
    """Build hooks nodes.
2602

2603
    """
2604
    return ([], self.my_node_names)
2605

    
2606
  def Exec(self, feedback_fn):
2607
    """Verify integrity of the node group, performing various test on nodes.
2608

2609
    """
2610
    # This method has too many local variables. pylint: disable=R0914
2611
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2612

    
2613
    if not self.my_node_names:
2614
      # empty node group
2615
      feedback_fn("* Empty node group, skipping verification")
2616
      return True
2617

    
2618
    self.bad = False
2619
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2620
    verbose = self.op.verbose
2621
    self._feedback_fn = feedback_fn
2622

    
2623
    vg_name = self.cfg.GetVGName()
2624
    drbd_helper = self.cfg.GetDRBDHelper()
2625
    cluster = self.cfg.GetClusterInfo()
2626
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2627
    hypervisors = cluster.enabled_hypervisors
2628
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2629

    
2630
    i_non_redundant = [] # Non redundant instances
2631
    i_non_a_balanced = [] # Non auto-balanced instances
2632
    n_offline = 0 # Count of offline nodes
2633
    n_drained = 0 # Count of nodes being drained
2634
    node_vol_should = {}
2635

    
2636
    # FIXME: verify OS list
2637

    
2638
    # File verification
2639
    filemap = _ComputeAncillaryFiles(cluster, False)
2640

    
2641
    # do local checksums
2642
    master_node = self.master_node = self.cfg.GetMasterNode()
2643
    master_ip = self.cfg.GetMasterIP()
2644

    
2645
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2646

    
2647
    node_verify_param = {
2648
      constants.NV_FILELIST:
2649
        utils.UniqueSequence(filename
2650
                             for files in filemap
2651
                             for filename in files),
2652
      constants.NV_NODELIST:
2653
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2654
                                  self.all_node_info.values()),
2655
      constants.NV_HYPERVISOR: hypervisors,
2656
      constants.NV_HVPARAMS:
2657
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2658
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2659
                                 for node in node_data_list
2660
                                 if not node.offline],
2661
      constants.NV_INSTANCELIST: hypervisors,
2662
      constants.NV_VERSION: None,
2663
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2664
      constants.NV_NODESETUP: None,
2665
      constants.NV_TIME: None,
2666
      constants.NV_MASTERIP: (master_node, master_ip),
2667
      constants.NV_OSLIST: None,
2668
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2669
      }
2670

    
2671
    if vg_name is not None:
2672
      node_verify_param[constants.NV_VGLIST] = None
2673
      node_verify_param[constants.NV_LVLIST] = vg_name
2674
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2675
      node_verify_param[constants.NV_DRBDLIST] = None
2676

    
2677
    if drbd_helper:
2678
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2679

    
2680
    # bridge checks
2681
    # FIXME: this needs to be changed per node-group, not cluster-wide
2682
    bridges = set()
2683
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2684
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2685
      bridges.add(default_nicpp[constants.NIC_LINK])
2686
    for instance in self.my_inst_info.values():
2687
      for nic in instance.nics:
2688
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2689
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2690
          bridges.add(full_nic[constants.NIC_LINK])
2691

    
2692
    if bridges:
2693
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2694

    
2695
    # Build our expected cluster state
2696
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2697
                                                 name=node.name,
2698
                                                 vm_capable=node.vm_capable))
2699
                      for node in node_data_list)
2700

    
2701
    # Gather OOB paths
2702
    oob_paths = []
2703
    for node in self.all_node_info.values():
2704
      path = _SupportsOob(self.cfg, node)
2705
      if path and path not in oob_paths:
2706
        oob_paths.append(path)
2707

    
2708
    if oob_paths:
2709
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2710

    
2711
    for instance in self.my_inst_names:
2712
      inst_config = self.my_inst_info[instance]
2713

    
2714
      for nname in inst_config.all_nodes:
2715
        if nname not in node_image:
2716
          gnode = self.NodeImage(name=nname)
2717
          gnode.ghost = (nname not in self.all_node_info)
2718
          node_image[nname] = gnode
2719

    
2720
      inst_config.MapLVsByNode(node_vol_should)
2721

    
2722
      pnode = inst_config.primary_node
2723
      node_image[pnode].pinst.append(instance)
2724

    
2725
      for snode in inst_config.secondary_nodes:
2726
        nimg = node_image[snode]
2727
        nimg.sinst.append(instance)
2728
        if pnode not in nimg.sbp:
2729
          nimg.sbp[pnode] = []
2730
        nimg.sbp[pnode].append(instance)
2731

    
2732
    # At this point, we have the in-memory data structures complete,
2733
    # except for the runtime information, which we'll gather next
2734

    
2735
    # Due to the way our RPC system works, exact response times cannot be
2736
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2737
    # time before and after executing the request, we can at least have a time
2738
    # window.
2739
    nvinfo_starttime = time.time()
2740
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2741
                                           node_verify_param,
2742
                                           self.cfg.GetClusterName())
2743
    nvinfo_endtime = time.time()
2744

    
2745
    if self.extra_lv_nodes and vg_name is not None:
2746
      extra_lv_nvinfo = \
2747
          self.rpc.call_node_verify(self.extra_lv_nodes,
2748
                                    {constants.NV_LVLIST: vg_name},
2749
                                    self.cfg.GetClusterName())
2750
    else:
2751
      extra_lv_nvinfo = {}
2752

    
2753
    all_drbd_map = self.cfg.ComputeDRBDMap()
2754

    
2755
    feedback_fn("* Gathering disk information (%s nodes)" %
2756
                len(self.my_node_names))
2757
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2758
                                     self.my_inst_info)
2759

    
2760
    feedback_fn("* Verifying configuration file consistency")
2761

    
2762
    # If not all nodes are being checked, we need to make sure the master node
2763
    # and a non-checked vm_capable node are in the list.
2764
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2765
    if absent_nodes:
2766
      vf_nvinfo = all_nvinfo.copy()
2767
      vf_node_info = list(self.my_node_info.values())
2768
      additional_nodes = []
2769
      if master_node not in self.my_node_info:
2770
        additional_nodes.append(master_node)
2771
        vf_node_info.append(self.all_node_info[master_node])
2772
      # Add the first vm_capable node we find which is not included
2773
      for node in absent_nodes:
2774
        nodeinfo = self.all_node_info[node]
2775
        if nodeinfo.vm_capable and not nodeinfo.offline:
2776
          additional_nodes.append(node)
2777
          vf_node_info.append(self.all_node_info[node])
2778
          break
2779
      key = constants.NV_FILELIST
2780
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2781
                                                 {key: node_verify_param[key]},
2782
                                                 self.cfg.GetClusterName()))
2783
    else:
2784
      vf_nvinfo = all_nvinfo
2785
      vf_node_info = self.my_node_info.values()
2786

    
2787
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2788

    
2789
    feedback_fn("* Verifying node status")
2790

    
2791
    refos_img = None
2792

    
2793
    for node_i in node_data_list:
2794
      node = node_i.name
2795
      nimg = node_image[node]
2796

    
2797
      if node_i.offline:
2798
        if verbose:
2799
          feedback_fn("* Skipping offline node %s" % (node,))
2800
        n_offline += 1
2801
        continue
2802

    
2803
      if node == master_node:
2804
        ntype = "master"
2805
      elif node_i.master_candidate:
2806
        ntype = "master candidate"
2807
      elif node_i.drained:
2808
        ntype = "drained"
2809
        n_drained += 1
2810
      else:
2811
        ntype = "regular"
2812
      if verbose:
2813
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2814

    
2815
      msg = all_nvinfo[node].fail_msg
2816
      _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2817
               msg)
2818
      if msg:
2819
        nimg.rpc_fail = True
2820
        continue
2821

    
2822
      nresult = all_nvinfo[node].payload
2823

    
2824
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2825
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2826
      self._VerifyNodeNetwork(node_i, nresult)
2827
      self._VerifyOob(node_i, nresult)
2828

    
2829
      if nimg.vm_capable:
2830
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2831
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2832
                             all_drbd_map)
2833

    
2834
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2835
        self._UpdateNodeInstances(node_i, nresult, nimg)
2836
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2837
        self._UpdateNodeOS(node_i, nresult, nimg)
2838

    
2839
        if not nimg.os_fail:
2840
          if refos_img is None:
2841
            refos_img = nimg
2842
          self._VerifyNodeOS(node_i, nimg, refos_img)
2843
        self._VerifyNodeBridges(node_i, nresult, bridges)
2844

    
2845
        # Check whether all running instancies are primary for the node. (This
2846
        # can no longer be done from _VerifyInstance below, since some of the
2847
        # wrong instances could be from other node groups.)
2848
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2849

    
2850
        for inst in non_primary_inst:
2851
          test = inst in self.all_inst_info
2852
          _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2853
                   "instance should not run on node %s", node_i.name)
2854
          _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2855
                   "node is running unknown instance %s", inst)
2856

    
2857
    for node, result in extra_lv_nvinfo.items():
2858
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2859
                              node_image[node], vg_name)
2860

    
2861
    feedback_fn("* Verifying instance status")
2862
    for instance in self.my_inst_names:
2863
      if verbose:
2864
        feedback_fn("* Verifying instance %s" % instance)
2865
      inst_config = self.my_inst_info[instance]
2866
      self._VerifyInstance(instance, inst_config, node_image,
2867
                           instdisk[instance])
2868
      inst_nodes_offline = []
2869

    
2870
      pnode = inst_config.primary_node
2871
      pnode_img = node_image[pnode]
2872
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2873
               constants.CV_ENODERPC, pnode, "instance %s, connection to"
2874
               " primary node failed", instance)
2875

    
2876
      _ErrorIf(inst_config.admin_up and pnode_img.offline,
2877
               constants.CV_EINSTANCEBADNODE, instance,
2878
               "instance is marked as running and lives on offline node %s",
2879
               inst_config.primary_node)
2880

    
2881
      # If the instance is non-redundant we cannot survive losing its primary
2882
      # node, so we are not N+1 compliant. On the other hand we have no disk
2883
      # templates with more than one secondary so that situation is not well
2884
      # supported either.
2885
      # FIXME: does not support file-backed instances
2886
      if not inst_config.secondary_nodes:
2887
        i_non_redundant.append(instance)
2888

    
2889
      _ErrorIf(len(inst_config.secondary_nodes) > 1,
2890
               constants.CV_EINSTANCELAYOUT,
2891
               instance, "instance has multiple secondary nodes: %s",
2892
               utils.CommaJoin(inst_config.secondary_nodes),
2893
               code=self.ETYPE_WARNING)
2894

    
2895
      if inst_config.disk_template in constants.DTS_INT_MIRROR:
2896
        pnode = inst_config.primary_node
2897
        instance_nodes = utils.NiceSort(inst_config.all_nodes)
2898
        instance_groups = {}
2899

    
2900
        for node in instance_nodes:
2901
          instance_groups.setdefault(self.all_node_info[node].group,
2902
                                     []).append(node)
2903

    
2904
        pretty_list = [
2905
          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
2906
          # Sort so that we always list the primary node first.
2907
          for group, nodes in sorted(instance_groups.items(),
2908
                                     key=lambda (_, nodes): pnode in nodes,
2909
                                     reverse=True)]
2910

    
2911
        self._ErrorIf(len(instance_groups) > 1,
2912
                      constants.CV_EINSTANCESPLITGROUPS,
2913
                      instance, "instance has primary and secondary nodes in"
2914
                      " different groups: %s", utils.CommaJoin(pretty_list),
2915
                      code=self.ETYPE_WARNING)
2916

    
2917
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2918
        i_non_a_balanced.append(instance)
2919

    
2920
      for snode in inst_config.secondary_nodes:
2921
        s_img = node_image[snode]
2922
        _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2923
                 snode, "instance %s, connection to secondary node failed",
2924
                 instance)
2925

    
2926
        if s_img.offline:
2927
          inst_nodes_offline.append(snode)
2928

    
2929
      # warn that the instance lives on offline nodes
2930
      _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
2931
               "instance has offline secondary node(s) %s",
2932
               utils.CommaJoin(inst_nodes_offline))
2933
      # ... or ghost/non-vm_capable nodes
2934
      for node in inst_config.all_nodes:
2935
        _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
2936
                 instance, "instance lives on ghost node %s", node)
2937
        _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
2938
                 instance, "instance lives on non-vm_capable node %s", node)
2939

    
2940
    feedback_fn("* Verifying orphan volumes")
2941
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2942

    
2943
    # We will get spurious "unknown volume" warnings if any node of this group
2944
    # is secondary for an instance whose primary is in another group. To avoid
2945
    # them, we find these instances and add their volumes to node_vol_should.
2946
    for inst in self.all_inst_info.values():
2947
      for secondary in inst.secondary_nodes:
2948
        if (secondary in self.my_node_info
2949
            and inst.name not in self.my_inst_info):
2950
          inst.MapLVsByNode(node_vol_should)
2951
          break
2952

    
2953
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2954

    
2955
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2956
      feedback_fn("* Verifying N+1 Memory redundancy")
2957
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2958

    
2959
    feedback_fn("* Other Notes")
2960
    if i_non_redundant:
2961
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2962
                  % len(i_non_redundant))
2963

    
2964
    if i_non_a_balanced:
2965
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2966
                  % len(i_non_a_balanced))
2967

    
2968
    if n_offline:
2969
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2970

    
2971
    if n_drained:
2972
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2973

    
2974
    return not self.bad
2975

    
2976
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2977
    """Analyze the post-hooks' result
2978

2979
    This method analyses the hook result, handles it, and sends some
2980
    nicely-formatted feedback back to the user.
2981

2982
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2983
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2984
    @param hooks_results: the results of the multi-node hooks rpc call
2985
    @param feedback_fn: function used send feedback back to the caller
2986
    @param lu_result: previous Exec result
2987
    @return: the new Exec result, based on the previous result
2988
        and hook results
2989

2990
    """
2991
    # We only really run POST phase hooks, only for non-empty groups,
2992
    # and are only interested in their results
2993
    if not self.my_node_names:
2994
      # empty node group
2995
      pass
2996
    elif phase == constants.HOOKS_PHASE_POST:
2997
      # Used to change hooks' output to proper indentation
2998
      feedback_fn("* Hooks Results")
2999
      assert hooks_results, "invalid result from hooks"
3000

    
3001
      for node_name in hooks_results:
3002
        res = hooks_results[node_name]
3003
        msg = res.fail_msg
3004
        test = msg and not res.offline
3005
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3006
                      "Communication failure in hooks execution: %s", msg)
3007
        if res.offline or msg:
3008
          # No need to investigate payload if node is offline or gave
3009
          # an error.
3010
          continue
3011
        for script, hkr, output in res.payload:
3012
          test = hkr == constants.HKR_FAIL
3013
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3014
                        "Script %s failed, output:", script)
3015
          if test:
3016
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3017
            feedback_fn("%s" % output)
3018
            lu_result = False
3019

    
3020
    return lu_result
3021

    
3022

    
3023
class LUClusterVerifyDisks(NoHooksLU):
3024
  """Verifies the cluster disks status.
3025

3026
  """
3027
  REQ_BGL = False
3028

    
3029
  def ExpandNames(self):
3030
    self.share_locks = _ShareAll()
3031
    self.needed_locks = {
3032
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3033
      }
3034

    
3035
  def Exec(self, feedback_fn):
3036
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3037

    
3038
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3039
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3040
                           for group in group_names])
3041

    
3042

    
3043
class LUGroupVerifyDisks(NoHooksLU):
3044
  """Verifies the status of all disks in a node group.
3045

3046
  """
3047
  REQ_BGL = False
3048

    
3049
  def ExpandNames(self):
3050
    # Raises errors.OpPrereqError on its own if group can't be found
3051
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
3052

    
3053
    self.share_locks = _ShareAll()
3054
    self.needed_locks = {
3055
      locking.LEVEL_INSTANCE: [],
3056
      locking.LEVEL_NODEGROUP: [],
3057
      locking.LEVEL_NODE: [],
3058
      }
3059

    
3060
  def DeclareLocks(self, level):
3061
    if level == locking.LEVEL_INSTANCE:
3062
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
3063

    
3064
      # Lock instances optimistically, needs verification once node and group
3065
      # locks have been acquired
3066
      self.needed_locks[locking.LEVEL_INSTANCE] = \
3067
        self.cfg.GetNodeGroupInstances(self.group_uuid)
3068

    
3069
    elif level == locking.LEVEL_NODEGROUP:
3070
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3071

    
3072
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
3073
        set([self.group_uuid] +
3074
            # Lock all groups used by instances optimistically; this requires
3075
            # going via the node before it's locked, requiring verification
3076
            # later on
3077
            [group_uuid
3078
             for instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
3079
             for group_uuid in self.cfg.GetInstanceNodeGroups(instance_name)])
3080

    
3081
    elif level == locking.LEVEL_NODE:
3082
      # This will only lock the nodes in the group to be verified which contain
3083
      # actual instances
3084
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3085
      self._LockInstancesNodes()
3086

    
3087
      # Lock all nodes in group to be verified
3088
      assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
3089
      member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
3090
      self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
3091

    
3092
  def CheckPrereq(self):
3093
    owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
3094
    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
3095
    owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
3096

    
3097
    assert self.group_uuid in owned_groups
3098

    
3099
    # Check if locked instances are still correct
3100
    _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
3101

    
3102
    # Get instance information
3103
    self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
3104

    
3105
    # Check if node groups for locked instances are still correct
3106
    for (instance_name, inst) in self.instances.items():
3107
      assert owned_nodes.issuperset(inst.all_nodes), \
3108
        "Instance %s's nodes changed while we kept the lock" % instance_name
3109

    
3110
      inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
3111
                                             owned_groups)
3112

    
3113
      assert self.group_uuid in inst_groups, \
3114
        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
3115

    
3116
  def Exec(self, feedback_fn):
3117
    """Verify integrity of cluster disks.
3118

3119
    @rtype: tuple of three items
3120
    @return: a tuple of (dict of node-to-node_error, list of instances
3121
        which need activate-disks, dict of instance: (node, volume) for
3122
        missing volumes
3123

3124
    """
3125
    res_nodes = {}
3126
    res_instances = set()
3127
    res_missing = {}
3128

    
3129
    nv_dict = _MapInstanceDisksToNodes([inst
3130
                                        for inst in self.instances.values()
3131
                                        if inst.admin_up])
3132

    
3133
    if nv_dict:
3134
      nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
3135
                             set(self.cfg.GetVmCapableNodeList()))
3136

    
3137
      node_lvs = self.rpc.call_lv_list(nodes, [])
3138

    
3139
      for (node, node_res) in node_lvs.items():
3140
        if node_res.offline:
3141
          continue
3142

    
3143
        msg = node_res.fail_msg
3144
        if msg:
3145
          logging.warning("Error enumerating LVs on node %s: %s", node, msg)
3146
          res_nodes[node] = msg
3147
          continue
3148

    
3149
        for lv_name, (_, _, lv_online) in node_res.payload.items():
3150
          inst = nv_dict.pop((node, lv_name), None)
3151
          if not (lv_online or inst is None):
3152
            res_instances.add(inst)
3153

    
3154
      # any leftover items in nv_dict are missing LVs, let's arrange the data
3155
      # better
3156
      for key, inst in nv_dict.iteritems():
3157
        res_missing.setdefault(inst, []).append(list(key))
3158

    
3159
    return (res_nodes, list(res_instances), res_missing)
3160

    
3161

    
3162
class LUClusterRepairDiskSizes(NoHooksLU):
3163
  """Verifies the cluster disks sizes.
3164

3165
  """
3166
  REQ_BGL = False
3167

    
3168
  def ExpandNames(self):
3169
    if self.op.instances:
3170
      self.wanted_names = _GetWantedInstances(self, self.op.instances)
3171
      self.needed_locks = {
3172
        locking.LEVEL_NODE: [],
3173
        locking.LEVEL_INSTANCE: self.wanted_names,
3174
        }
3175
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3176
    else:
3177
      self.wanted_names = None
3178
      self.needed_locks = {
3179
        locking.LEVEL_NODE: locking.ALL_SET,
3180
        locking.LEVEL_INSTANCE: locking.ALL_SET,
3181
        }
3182
    self.share_locks = _ShareAll()
3183

    
3184
  def DeclareLocks(self, level):
3185
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
3186
      self._LockInstancesNodes(primary_only=True)
3187

    
3188
  def CheckPrereq(self):
3189
    """Check prerequisites.
3190

3191
    This only checks the optional instance list against the existing names.
3192

3193
    """
3194
    if self.wanted_names is None:
3195
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
3196

    
3197
    self.wanted_instances = \
3198
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
3199

    
3200
  def _EnsureChildSizes(self, disk):
3201
    """Ensure children of the disk have the needed disk size.
3202

3203
    This is valid mainly for DRBD8 and fixes an issue where the
3204
    children have smaller disk size.
3205

3206
    @param disk: an L{ganeti.objects.Disk} object
3207

3208
    """
3209
    if disk.dev_type == constants.LD_DRBD8:
3210
      assert disk.children, "Empty children for DRBD8?"
3211
      fchild = disk.children[0]
3212
      mismatch = fchild.size < disk.size
3213
      if mismatch:
3214
        self.LogInfo("Child disk has size %d, parent %d, fixing",
3215
                     fchild.size, disk.size)
3216
        fchild.size = disk.size
3217

    
3218
      # and we recurse on this child only, not on the metadev
3219
      return self._EnsureChildSizes(fchild) or mismatch
3220
    else:
3221
      return False
3222

    
3223
  def Exec(self, feedback_fn):
3224
    """Verify the size of cluster disks.
3225

3226
    """
3227
    # TODO: check child disks too
3228
    # TODO: check differences in size between primary/secondary nodes
3229
    per_node_disks = {}
3230
    for instance in self.wanted_instances:
3231
      pnode = instance.primary_node
3232
      if pnode not in per_node_disks:
3233
        per_node_disks[pnode] = []
3234
      for idx, disk in enumerate(instance.disks):
3235
        per_node_disks[pnode].append((instance, idx, disk))
3236

    
3237
    changed = []
3238
    for node, dskl in per_node_disks.items():
3239
      newl = [v[2].Copy() for v in dskl]
3240
      for dsk in newl:
3241
        self.cfg.SetDiskID(dsk, node)
3242
      result = self.rpc.call_blockdev_getsize(node, newl)
3243
      if result.fail_msg:
3244
        self.LogWarning("Failure in blockdev_getsize call to node"
3245
                        " %s, ignoring", node)
3246
        continue
3247
      if len(result.payload) != len(dskl):
3248
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
3249
                        " result.payload=%s", node, len(dskl), result.payload)
3250
        self.LogWarning("Invalid result from node %s, ignoring node results",
3251
                        node)
3252
        continue
3253
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
3254
        if size is None:
3255
          self.LogWarning("Disk %d of instance %s did not return size"
3256
                          " information, ignoring", idx, instance.name)
3257
          continue
3258
        if not isinstance(size, (int, long)):
3259
          self.LogWarning("Disk %d of instance %s did not return valid"
3260
                          " size information, ignoring", idx, instance.name)
3261
          continue
3262
        size = size >> 20
3263
        if size != disk.size:
3264
          self.LogInfo("Disk %d of instance %s has mismatched size,"
3265
                       " correcting: recorded %d, actual %d", idx,
3266
                       instance.name, disk.size, size)
3267
          disk.size = size
3268
          self.cfg.Update(instance, feedback_fn)
3269
          changed.append((instance.name, idx, size))
3270
        if self._EnsureChildSizes(disk):
3271
          self.cfg.Update(instance, feedback_fn)
3272
          changed.append((instance.name, idx, disk.size))
3273
    return changed
3274

    
3275

    
3276
class LUClusterRename(LogicalUnit):
3277
  """Rename the cluster.
3278

3279
  """
3280
  HPATH = "cluster-rename"
3281
  HTYPE = constants.HTYPE_CLUSTER
3282

    
3283
  def BuildHooksEnv(self):
3284
    """Build hooks env.
3285

3286
    """
3287
    return {
3288
      "OP_TARGET": self.cfg.GetClusterName(),
3289
      "NEW_NAME": self.op.name,
3290
      }
3291

    
3292
  def BuildHooksNodes(self):
3293
    """Build hooks nodes.
3294

3295
    """
3296
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
3297

    
3298
  def CheckPrereq(self):
3299
    """Verify that the passed name is a valid one.
3300

3301
    """
3302
    hostname = netutils.GetHostname(name=self.op.name,
3303
                                    family=self.cfg.GetPrimaryIPFamily())
3304

    
3305
    new_name = hostname.name
3306
    self.ip = new_ip = hostname.ip
3307
    old_name = self.cfg.GetClusterName()
3308
    old_ip = self.cfg.GetMasterIP()
3309
    if new_name == old_name and new_ip == old_ip:
3310
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
3311
                                 " cluster has changed",
3312
                                 errors.ECODE_INVAL)
3313
    if new_ip != old_ip:
3314
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
3315
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
3316
                                   " reachable on the network" %
3317
                                   new_ip, errors.ECODE_NOTUNIQUE)
3318

    
3319
    self.op.name = new_name
3320

    
3321
  def Exec(self, feedback_fn):
3322
    """Rename the cluster.
3323

3324
    """
3325
    clustername = self.op.name
3326
    new_ip = self.ip
3327

    
3328
    # shutdown the master IP
3329
    (master, ip, dev, netmask, family) = self.cfg.GetMasterNetworkParameters()
3330
    result = self.rpc.call_node_deactivate_master_ip(master, ip, netmask, dev)
3331
    result.Raise("Could not disable the master role")
3332

    
3333
    try:
3334
      cluster = self.cfg.GetClusterInfo()
3335
      cluster.cluster_name = clustername
3336
      cluster.master_ip = new_ip
3337
      self.cfg.Update(cluster, feedback_fn)
3338

    
3339
      # update the known hosts file
3340
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
3341
      node_list = self.cfg.GetOnlineNodeList()
3342
      try:
3343
        node_list.remove(master)
3344
      except ValueError:
3345
        pass
3346
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
3347
    finally:
3348
      result = self.rpc.call_node_activate_master_ip(master, new_ip, netmask,
3349
                                                     dev, family)
3350
      msg = result.fail_msg
3351
      if msg:
3352
        self.LogWarning("Could not re-enable the master role on"
3353
                        " the master, please restart manually: %s", msg)
3354

    
3355
    return clustername
3356

    
3357

    
3358
def _ValidateNetmask(cfg, netmask):
3359
  """Checks if a netmask is valid.
3360

3361
  @type cfg: L{config.ConfigWriter}
3362
  @param cfg: The cluster configuration
3363
  @type netmask: int
3364
  @param netmask: the netmask to be verified
3365
  @raise errors.OpPrereqError: if the validation fails
3366

3367
  """
3368
  ip_family = cfg.GetPrimaryIPFamily()
3369
  try:
3370
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
3371
  except errors.ProgrammerError:
3372
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
3373
                               ip_family)
3374
  if not ipcls.ValidateNetmask(netmask):
3375
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
3376
                                (netmask))
3377

    
3378

    
3379
class LUClusterSetParams(LogicalUnit):
3380
  """Change the parameters of the cluster.
3381

3382
  """
3383
  HPATH = "cluster-modify"
3384
  HTYPE = constants.HTYPE_CLUSTER
3385
  REQ_BGL = False
3386

    
3387
  def CheckArguments(self):
3388
    """Check parameters
3389

3390
    """
3391
    if self.op.uid_pool:
3392
      uidpool.CheckUidPool(self.op.uid_pool)
3393

    
3394
    if self.op.add_uids:
3395
      uidpool.CheckUidPool(self.op.add_uids)
3396

    
3397
    if self.op.remove_uids:
3398
      uidpool.CheckUidPool(self.op.remove_uids)
3399

    
3400
    if self.op.master_netmask is not None:
3401
      _ValidateNetmask(self.cfg, self.op.master_netmask)
3402

    
3403
  def ExpandNames(self):
3404
    # FIXME: in the future maybe other cluster params won't require checking on
3405
    # all nodes to be modified.
3406
    self.needed_locks = {
3407
      locking.LEVEL_NODE: locking.ALL_SET,
3408
    }
3409
    self.share_locks[locking.LEVEL_NODE] = 1
3410

    
3411
  def BuildHooksEnv(self):
3412
    """Build hooks env.
3413

3414
    """
3415
    return {
3416
      "OP_TARGET": self.cfg.GetClusterName(),
3417
      "NEW_VG_NAME": self.op.vg_name,
3418
      }
3419

    
3420
  def BuildHooksNodes(self):
3421
    """Build hooks nodes.
3422

3423
    """
3424
    mn = self.cfg.GetMasterNode()
3425
    return ([mn], [mn])
3426

    
3427
  def CheckPrereq(self):
3428
    """Check prerequisites.
3429

3430
    This checks whether the given params don't conflict and
3431
    if the given volume group is valid.
3432

3433
    """
3434
    if self.op.vg_name is not None and not self.op.vg_name:
3435
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
3436
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
3437
                                   " instances exist", errors.ECODE_INVAL)
3438

    
3439
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
3440
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
3441
        raise errors.OpPrereqError("Cannot disable drbd helper while"
3442
                                   " drbd-based instances exist",
3443
                                   errors.ECODE_INVAL)
3444

    
3445
    node_list = self.owned_locks(locking.LEVEL_NODE)
3446

    
3447
    # if vg_name not None, checks given volume group on all nodes
3448
    if self.op.vg_name:
3449
      vglist = self.rpc.call_vg_list(node_list)
3450
      for node in node_list:
3451
        msg = vglist[node].fail_msg
3452
        if msg:
3453
          # ignoring down node
3454
          self.LogWarning("Error while gathering data on node %s"
3455
                          " (ignoring node): %s", node, msg)
3456
          continue
3457
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
3458
                                              self.op.vg_name,
3459
                                              constants.MIN_VG_SIZE)
3460
        if vgstatus:
3461
          raise errors.OpPrereqError("Error on node '%s': %s" %
3462
                                     (node, vgstatus), errors.ECODE_ENVIRON)
3463

    
3464
    if self.op.drbd_helper:
3465
      # checks given drbd helper on all nodes
3466
      helpers = self.rpc.call_drbd_helper(node_list)
3467
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
3468
        if ninfo.offline:
3469
          self.LogInfo("Not checking drbd helper on offline node %s", node)
3470
          continue
3471
        msg = helpers[node].fail_msg
3472
        if msg:
3473
          raise errors.OpPrereqError("Error checking drbd helper on node"
3474
                                     " '%s': %s" % (node, msg),
3475
                                     errors.ECODE_ENVIRON)
3476
        node_helper = helpers[node].payload
3477
        if node_helper != self.op.drbd_helper:
3478
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
3479
                                     (node, node_helper), errors.ECODE_ENVIRON)
3480

    
3481
    self.cluster = cluster = self.cfg.GetClusterInfo()
3482
    # validate params changes
3483
    if self.op.beparams:
3484
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
3485
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
3486

    
3487
    if self.op.ndparams:
3488
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
3489
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
3490

    
3491
      # TODO: we need a more general way to handle resetting
3492
      # cluster-level parameters to default values
3493
      if self.new_ndparams["oob_program"] == "":
3494
        self.new_ndparams["oob_program"] = \
3495
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
3496

    
3497
    if self.op.nicparams:
3498
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
3499
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
3500
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
3501
      nic_errors = []
3502

    
3503
      # check all instances for consistency
3504
      for instance in self.cfg.GetAllInstancesInfo().values():
3505
        for nic_idx, nic in enumerate(instance.nics):
3506
          params_copy = copy.deepcopy(nic.nicparams)
3507
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
3508

    
3509
          # check parameter syntax
3510
          try:
3511
            objects.NIC.CheckParameterSyntax(params_filled)
3512
          except errors.ConfigurationError, err:
3513
            nic_errors.append("Instance %s, nic/%d: %s" %
3514
                              (instance.name, nic_idx, err))
3515

    
3516
          # if we're moving instances to routed, check that they have an ip
3517
          target_mode = params_filled[constants.NIC_MODE]
3518
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
3519
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
3520
                              " address" % (instance.name, nic_idx))
3521
      if nic_errors:
3522
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
3523
                                   "\n".join(nic_errors))
3524

    
3525
    # hypervisor list/parameters
3526
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
3527
    if self.op.hvparams:
3528
      for hv_name, hv_dict in self.op.hvparams.items():
3529
        if hv_name not in self.new_hvparams:
3530
          self.new_hvparams[hv_name] = hv_dict
3531
        else:
3532
          self.new_hvparams[hv_name].update(hv_dict)
3533

    
3534
    # os hypervisor parameters
3535
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
3536
    if self.op.os_hvp:
3537
      for os_name, hvs in self.op.os_hvp.items():
3538
        if os_name not in self.new_os_hvp:
3539
          self.new_os_hvp[os_name] = hvs
3540
        else:
3541
          for hv_name, hv_dict in hvs.items():
3542
            if hv_name not in self.new_os_hvp[os_name]:
3543
              self.new_os_hvp[os_name][hv_name] = hv_dict
3544
            else:
3545
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
3546

    
3547
    # os parameters
3548
    self.new_osp = objects.FillDict(cluster.osparams, {})
3549
    if self.op.osparams:
3550
      for os_name, osp in self.op.osparams.items():
3551
        if os_name not in self.new_osp:
3552
          self.new_osp[os_name] = {}
3553

    
3554
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
3555
                                                  use_none=True)
3556

    
3557
        if not self.new_osp[os_name]:
3558
          # we removed all parameters
3559
          del self.new_osp[os_name]
3560
        else:
3561
          # check the parameter validity (remote check)
3562
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
3563
                         os_name, self.new_osp[os_name])
3564

    
3565
    # changes to the hypervisor list
3566
    if self.op.enabled_hypervisors is not None:
3567
      self.hv_list = self.op.enabled_hypervisors
3568
      for hv in self.hv_list:
3569
        # if the hypervisor doesn't already exist in the cluster
3570
        # hvparams, we initialize it to empty, and then (in both
3571
        # cases) we make sure to fill the defaults, as we might not
3572
        # have a complete defaults list if the hypervisor wasn't
3573
        # enabled before
3574
        if hv not in new_hvp:
3575
          new_hvp[hv] = {}
3576
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
3577
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
3578
    else:
3579
      self.hv_list = cluster.enabled_hypervisors
3580

    
3581
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
3582
      # either the enabled list has changed, or the parameters have, validate
3583
      for hv_name, hv_params in self.new_hvparams.items():
3584
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
3585
            (self.op.enabled_hypervisors and
3586
             hv_name in self.op.enabled_hypervisors)):
3587
          # either this is a new hypervisor, or its parameters have changed
3588
          hv_class = hypervisor.GetHypervisor(hv_name)
3589
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3590
          hv_class.CheckParameterSyntax(hv_params)
3591
          _CheckHVParams(self, node_list, hv_name, hv_params)
3592

    
3593
    if self.op.os_hvp:
3594
      # no need to check any newly-enabled hypervisors, since the
3595
      # defaults have already been checked in the above code-block
3596
      for os_name, os_hvp in self.new_os_hvp.items():
3597
        for hv_name, hv_params in os_hvp.items():
3598
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3599
          # we need to fill in the new os_hvp on top of the actual hv_p
3600
          cluster_defaults = self.new_hvparams.get(hv_name, {})
3601
          new_osp = objects.FillDict(cluster_defaults, hv_params)
3602
          hv_class = hypervisor.GetHypervisor(hv_name)
3603
          hv_class.CheckParameterSyntax(new_osp)
3604
          _CheckHVParams(self, node_list, hv_name, new_osp)
3605

    
3606
    if self.op.default_iallocator:
3607
      alloc_script = utils.FindFile(self.op.default_iallocator,
3608
                                    constants.IALLOCATOR_SEARCH_PATH,
3609
                                    os.path.isfile)
3610
      if alloc_script is None:
3611
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
3612
                                   " specified" % self.op.default_iallocator,
3613
                                   errors.ECODE_INVAL)
3614

    
3615
  def Exec(self, feedback_fn):
3616
    """Change the parameters of the cluster.
3617

3618
    """
3619
    if self.op.vg_name is not None:
3620
      new_volume = self.op.vg_name
3621
      if not new_volume:
3622
        new_volume = None
3623
      if new_volume != self.cfg.GetVGName():
3624
        self.cfg.SetVGName(new_volume)
3625
      else:
3626
        feedback_fn("Cluster LVM configuration already in desired"
3627
                    " state, not changing")
3628
    if self.op.drbd_helper is not None:
3629
      new_helper = self.op.drbd_helper
3630
      if not new_helper:
3631
        new_helper = None
3632
      if new_helper != self.cfg.GetDRBDHelper():
3633
        self.cfg.SetDRBDHelper(new_helper)
3634
      else:
3635
        feedback_fn("Cluster DRBD helper already in desired state,"
3636
                    " not changing")
3637
    if self.op.hvparams:
3638
      self.cluster.hvparams = self.new_hvparams
3639
    if self.op.os_hvp:
3640
      self.cluster.os_hvp = self.new_os_hvp
3641
    if self.op.enabled_hypervisors is not None:
3642
      self.cluster.hvparams = self.new_hvparams
3643
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
3644
    if self.op.beparams:
3645
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
3646
    if self.op.nicparams:
3647
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
3648
    if self.op.osparams:
3649
      self.cluster.osparams = self.new_osp
3650
    if self.op.ndparams:
3651
      self.cluster.ndparams = self.new_ndparams
3652

    
3653
    if self.op.candidate_pool_size is not None:
3654
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3655
      # we need to update the pool size here, otherwise the save will fail
3656
      _AdjustCandidatePool(self, [])
3657

    
3658
    if self.op.maintain_node_health is not None:
3659
      self.cluster.maintain_node_health = self.op.maintain_node_health
3660

    
3661
    if self.op.prealloc_wipe_disks is not None:
3662
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3663

    
3664
    if self.op.add_uids is not None:
3665
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3666

    
3667
    if self.op.remove_uids is not None:
3668
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3669

    
3670
    if self.op.uid_pool is not None:
3671
      self.cluster.uid_pool = self.op.uid_pool
3672

    
3673
    if self.op.default_iallocator is not None:
3674
      self.cluster.default_iallocator = self.op.default_iallocator
3675

    
3676
    if self.op.reserved_lvs is not None:
3677
      self.cluster.reserved_lvs = self.op.reserved_lvs
3678

    
3679
    def helper_os(aname, mods, desc):
3680
      desc += " OS list"
3681
      lst = getattr(self.cluster, aname)
3682
      for key, val in mods:
3683
        if key == constants.DDM_ADD:
3684
          if val in lst:
3685
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3686
          else:
3687
            lst.append(val)
3688
        elif key == constants.DDM_REMOVE:
3689
          if val in lst:
3690
            lst.remove(val)
3691
          else:
3692
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3693
        else:
3694
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3695

    
3696
    if self.op.hidden_os:
3697
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3698

    
3699
    if self.op.blacklisted_os:
3700
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3701

    
3702
    if self.op.master_netdev:
3703
      (master, ip, dev, netmask, _) = self.cfg.GetMasterNetworkParameters()
3704
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3705
                  self.cluster.master_netdev)
3706
      result = self.rpc.call_node_deactivate_master_ip(master, ip, netmask, dev)
3707
      result.Raise("Could not disable the master ip")
3708
      feedback_fn("Changing master_netdev from %s to %s" %
3709
                  (dev, self.op.master_netdev))
3710
      self.cluster.master_netdev = self.op.master_netdev
3711

    
3712
    if self.op.master_netmask:
3713
      master = self.cfg.GetMasterNode()
3714
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
3715
      result = self.rpc.call_node_change_master_netmask(master,
3716
                                                        self.op.master_netmask)
3717
      if result.fail_msg:
3718
        msg = "Could not change the master IP netmask: %s" % result.fail_msg
3719
        self.LogWarning(msg)
3720
        feedback_fn(msg)
3721
      else:
3722
        self.cluster.master_netmask = self.op.master_netmask
3723

    
3724
    self.cfg.Update(self.cluster, feedback_fn)
3725

    
3726
    if self.op.master_netdev:
3727
      (master, ip, dev, netmask, family) = self.cfg.GetMasterNetworkParameters()
3728
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3729
                  self.op.master_netdev)
3730
      result = self.rpc.call_node_activate_master_ip(master, ip, netmask, dev,
3731
                                                     family)
3732
      if result.fail_msg:
3733
        self.LogWarning("Could not re-enable the master ip on"
3734
                        " the master, please restart manually: %s",
3735
                        result.fail_msg)
3736

    
3737

    
3738
def _UploadHelper(lu, nodes, fname):
3739
  """Helper for uploading a file and showing warnings.
3740

3741
  """
3742
  if os.path.exists(fname):
3743
    result = lu.rpc.call_upload_file(nodes, fname)
3744
    for to_node, to_result in result.items():
3745
      msg = to_result.fail_msg
3746
      if msg:
3747
        msg = ("Copy of file %s to node %s failed: %s" %
3748
               (fname, to_node, msg))
3749
        lu.proc.LogWarning(msg)
3750

    
3751

    
3752
def _ComputeAncillaryFiles(cluster, redist):
3753
  """Compute files external to Ganeti which need to be consistent.
3754

3755
  @type redist: boolean
3756
  @param redist: Whether to include files which need to be redistributed
3757

3758
  """
3759
  # Compute files for all nodes
3760
  files_all = set([
3761
    constants.SSH_KNOWN_HOSTS_FILE,
3762
    constants.CONFD_HMAC_KEY,
3763
    constants.CLUSTER_DOMAIN_SECRET_FILE,
3764
    constants.SPICE_CERT_FILE,
3765
    constants.SPICE_CACERT_FILE,
3766
    constants.RAPI_USERS_FILE,
3767
    ])
3768

    
3769
  if not redist:
3770
    files_all.update(constants.ALL_CERT_FILES)
3771
    files_all.update(ssconf.SimpleStore().GetFileList())
3772
  else:
3773
    # we need to ship at least the RAPI certificate
3774
    files_all.add(constants.RAPI_CERT_FILE)
3775

    
3776
  if cluster.modify_etc_hosts:
3777
    files_all.add(constants.ETC_HOSTS)
3778

    
3779
  # Files which are optional, these must:
3780
  # - be present in one other category as well
3781
  # - either exist or not exist on all nodes of that category (mc, vm all)
3782
  files_opt = set([
3783
    constants.RAPI_USERS_FILE,
3784
    ])
3785

    
3786
  # Files which should only be on master candidates
3787
  files_mc = set()
3788
  if not redist:
3789
    files_mc.add(constants.CLUSTER_CONF_FILE)
3790

    
3791
  # Files which should only be on VM-capable nodes
3792
  files_vm = set(filename
3793
    for hv_name in cluster.enabled_hypervisors
3794
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0])
3795

    
3796
  files_opt |= set(filename
3797
    for hv_name in cluster.enabled_hypervisors
3798
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1])
3799

    
3800
  # Filenames in each category must be unique
3801
  all_files_set = files_all | files_mc | files_vm
3802
  assert (len(all_files_set) ==
3803
          sum(map(len, [files_all, files_mc, files_vm]))), \
3804
         "Found file listed in more than one file list"
3805

    
3806
  # Optional files must be present in one other category
3807
  assert all_files_set.issuperset(files_opt), \
3808
         "Optional file not in a different required list"
3809

    
3810
  return (files_all, files_opt, files_mc, files_vm)
3811

    
3812

    
3813
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3814
  """Distribute additional files which are part of the cluster configuration.
3815

3816
  ConfigWriter takes care of distributing the config and ssconf files, but
3817
  there are more files which should be distributed to all nodes. This function
3818
  makes sure those are copied.
3819

3820
  @param lu: calling logical unit
3821
  @param additional_nodes: list of nodes not in the config to distribute to
3822
  @type additional_vm: boolean
3823
  @param additional_vm: whether the additional nodes are vm-capable or not
3824

3825
  """
3826
  # Gather target nodes
3827
  cluster = lu.cfg.GetClusterInfo()
3828
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3829

    
3830
  online_nodes = lu.cfg.GetOnlineNodeList()
3831
  vm_nodes = lu.cfg.GetVmCapableNodeList()
3832

    
3833
  if additional_nodes is not None:
3834
    online_nodes.extend(additional_nodes)
3835
    if additional_vm:
3836
      vm_nodes.extend(additional_nodes)
3837

    
3838
  # Never distribute to master node
3839
  for nodelist in [online_nodes, vm_nodes]:
3840
    if master_info.name in nodelist:
3841
      nodelist.remove(master_info.name)
3842

    
3843
  # Gather file lists
3844
  (files_all, _, files_mc, files_vm) = \
3845
    _ComputeAncillaryFiles(cluster, True)
3846

    
3847
  # Never re-distribute configuration file from here
3848
  assert not (constants.CLUSTER_CONF_FILE in files_all or
3849
              constants.CLUSTER_CONF_FILE in files_vm)
3850
  assert not files_mc, "Master candidates not handled in this function"
3851

    
3852
  filemap = [
3853
    (online_nodes, files_all),
3854
    (vm_nodes, files_vm),
3855
    ]
3856

    
3857
  # Upload the files
3858
  for (node_list, files) in filemap:
3859
    for fname in files:
3860
      _UploadHelper(lu, node_list, fname)
3861

    
3862

    
3863
class LUClusterRedistConf(NoHooksLU):
3864
  """Force the redistribution of cluster configuration.
3865

3866
  This is a very simple LU.
3867

3868
  """
3869
  REQ_BGL = False
3870

    
3871
  def ExpandNames(self):
3872
    self.needed_locks = {
3873
      locking.LEVEL_NODE: locking.ALL_SET,
3874
    }
3875
    self.share_locks[locking.LEVEL_NODE] = 1
3876

    
3877
  def Exec(self, feedback_fn):
3878
    """Redistribute the configuration.
3879

3880
    """
3881
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3882
    _RedistributeAncillaryFiles(self)
3883

    
3884

    
3885
class LUClusterActivateMasterIp(NoHooksLU):
3886
  """Activate the master IP on the master node.
3887

3888
  """
3889
  def Exec(self, feedback_fn):
3890
    """Activate the master IP.
3891

3892
    """
3893
    (master, ip, dev, netmask, family) = self.cfg.GetMasterNetworkParameters()
3894
    self.rpc.call_node_activate_master_ip(master, ip, netmask, dev, family)
3895

    
3896

    
3897
class LUClusterDeactivateMasterIp(NoHooksLU):
3898
  """Deactivate the master IP on the master node.
3899

3900
  """
3901
  def Exec(self, feedback_fn):
3902
    """Deactivate the master IP.
3903

3904
    """
3905
    (master, ip, dev, netmask, _) = self.cfg.GetMasterNetworkParameters()
3906
    self.rpc.call_node_deactivate_master_ip(master, ip, netmask, dev)
3907

    
3908

    
3909
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3910
  """Sleep and poll for an instance's disk to sync.
3911

3912
  """
3913
  if not instance.disks or disks is not None and not disks:
3914
    return True
3915

    
3916
  disks = _ExpandCheckDisks(instance, disks)
3917

    
3918
  if not oneshot:
3919
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3920

    
3921
  node = instance.primary_node
3922

    
3923
  for dev in disks:
3924
    lu.cfg.SetDiskID(dev, node)
3925

    
3926
  # TODO: Convert to utils.Retry
3927

    
3928
  retries = 0
3929
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3930
  while True:
3931
    max_time = 0
3932
    done = True
3933
    cumul_degraded = False
3934
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3935
    msg = rstats.fail_msg
3936
    if msg:
3937
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3938
      retries += 1
3939
      if retries >= 10:
3940
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3941
                                 " aborting." % node)
3942
      time.sleep(6)
3943
      continue
3944
    rstats = rstats.payload
3945
    retries = 0
3946
    for i, mstat in enumerate(rstats):
3947
      if mstat is None:
3948
        lu.LogWarning("Can't compute data for node %s/%s",
3949
                           node, disks[i].iv_name)
3950
        continue
3951

    
3952
      cumul_degraded = (cumul_degraded or
3953
                        (mstat.is_degraded and mstat.sync_percent is None))
3954
      if mstat.sync_percent is not None:
3955
        done = False
3956
        if mstat.estimated_time is not None:
3957
          rem_time = ("%s remaining (estimated)" %
3958
                      utils.FormatSeconds(mstat.estimated_time))
3959
          max_time = mstat.estimated_time
3960
        else:
3961
          rem_time = "no time estimate"
3962
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
3963
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
3964

    
3965
    # if we're done but degraded, let's do a few small retries, to
3966
    # make sure we see a stable and not transient situation; therefore
3967
    # we force restart of the loop
3968
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3969
      logging.info("Degraded disks found, %d retries left", degr_retries)
3970
      degr_retries -= 1
3971
      time.sleep(1)
3972
      continue
3973

    
3974
    if done or oneshot:
3975
      break
3976

    
3977
    time.sleep(min(60, max_time))
3978

    
3979
  if done:
3980
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
3981
  return not cumul_degraded
3982

    
3983

    
3984
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3985
  """Check that mirrors are not degraded.
3986

3987
  The ldisk parameter, if True, will change the test from the
3988
  is_degraded attribute (which represents overall non-ok status for
3989
  the device(s)) to the ldisk (representing the local storage status).
3990

3991
  """
3992
  lu.cfg.SetDiskID(dev, node)
3993

    
3994
  result = True
3995

    
3996
  if on_primary or dev.AssembleOnSecondary():
3997
    rstats = lu.rpc.call_blockdev_find(node, dev)
3998
    msg = rstats.fail_msg
3999
    if msg:
4000
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
4001
      result = False
4002
    elif not rstats.payload:
4003
      lu.LogWarning("Can't find disk on node %s", node)
4004
      result = False
4005
    else:
4006
      if ldisk:
4007
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
4008
      else:
4009
        result = result and not rstats.payload.is_degraded
4010

    
4011
  if dev.children:
4012
    for child in dev.children:
4013
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
4014

    
4015
  return result
4016

    
4017

    
4018
class LUOobCommand(NoHooksLU):
4019
  """Logical unit for OOB handling.
4020

4021
  """
4022
  REG_BGL = False
4023
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
4024

    
4025
  def ExpandNames(self):
4026
    """Gather locks we need.
4027

4028
    """
4029
    if self.op.node_names:
4030
      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
4031
      lock_names = self.op.node_names
4032
    else:
4033
      lock_names = locking.ALL_SET
4034

    
4035
    self.needed_locks = {
4036
      locking.LEVEL_NODE: lock_names,
4037
      }
4038

    
4039
  def CheckPrereq(self):
4040
    """Check prerequisites.
4041

4042
    This checks:
4043
     - the node exists in the configuration
4044
     - OOB is supported
4045

4046
    Any errors are signaled by raising errors.OpPrereqError.
4047

4048
    """
4049
    self.nodes = []
4050
    self.master_node = self.cfg.GetMasterNode()
4051

    
4052
    assert self.op.power_delay >= 0.0
4053

    
4054
    if self.op.node_names:
4055
      if (self.op.command in self._SKIP_MASTER and
4056
          self.master_node in self.op.node_names):
4057
        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
4058
        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
4059

    
4060
        if master_oob_handler:
4061
          additional_text = ("run '%s %s %s' if you want to operate on the"
4062
                             " master regardless") % (master_oob_handler,
4063
                                                      self.op.command,
4064
                                                      self.master_node)
4065
        else:
4066
          additional_text = "it does not support out-of-band operations"
4067

    
4068
        raise errors.OpPrereqError(("Operating on the master node %s is not"
4069
                                    " allowed for %s; %s") %
4070
                                   (self.master_node, self.op.command,
4071
                                    additional_text), errors.ECODE_INVAL)
4072
    else:
4073
      self.op.node_names = self.cfg.GetNodeList()
4074
      if self.op.command in self._SKIP_MASTER:
4075
        self.op.node_names.remove(self.master_node)
4076

    
4077
    if self.op.command in self._SKIP_MASTER:
4078
      assert self.master_node not in self.op.node_names
4079

    
4080
    for (node_name, node) in self.cfg.GetMultiNodeInfo(self.op.node_names):
4081
      if node is None:
4082
        raise errors.OpPrereqError("Node %s not found" % node_name,
4083
                                   errors.ECODE_NOENT)
4084
      else:
4085
        self.nodes.append(node)
4086

    
4087
      if (not self.op.ignore_status and
4088
          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
4089
        raise errors.OpPrereqError(("Cannot power off node %s because it is"
4090
                                    " not marked offline") % node_name,
4091
                                   errors.ECODE_STATE)
4092

    
4093
  def Exec(self, feedback_fn):
4094
    """Execute OOB a