Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 17b0b812

History | View | Annotate | Download (484.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay too many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42
import itertools
43
import operator
44

    
45
from ganeti import ssh
46
from ganeti import utils
47
from ganeti import errors
48
from ganeti import hypervisor
49
from ganeti import locking
50
from ganeti import constants
51
from ganeti import objects
52
from ganeti import serializer
53
from ganeti import ssconf
54
from ganeti import uidpool
55
from ganeti import compat
56
from ganeti import masterd
57
from ganeti import netutils
58
from ganeti import query
59
from ganeti import qlang
60
from ganeti import opcodes
61
from ganeti import ht
62
from ganeti import rpc
63

    
64
import ganeti.masterd.instance # pylint: disable=W0611
65

    
66

    
67
#: Size of DRBD meta block device
68
DRBD_META_SIZE = 128
69

    
70

    
71
class ResultWithJobs:
72
  """Data container for LU results with jobs.
73

74
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
75
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
76
  contained in the C{jobs} attribute and include the job IDs in the opcode
77
  result.
78

79
  """
80
  def __init__(self, jobs, **kwargs):
81
    """Initializes this class.
82

83
    Additional return values can be specified as keyword arguments.
84

85
    @type jobs: list of lists of L{opcode.OpCode}
86
    @param jobs: A list of lists of opcode objects
87

88
    """
89
    self.jobs = jobs
90
    self.other = kwargs
91

    
92

    
93
class LogicalUnit(object):
94
  """Logical Unit base class.
95

96
  Subclasses must follow these rules:
97
    - implement ExpandNames
98
    - implement CheckPrereq (except when tasklets are used)
99
    - implement Exec (except when tasklets are used)
100
    - implement BuildHooksEnv
101
    - implement BuildHooksNodes
102
    - redefine HPATH and HTYPE
103
    - optionally redefine their run requirements:
104
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
105

106
  Note that all commands require root permissions.
107

108
  @ivar dry_run_result: the value (if any) that will be returned to the caller
109
      in dry-run mode (signalled by opcode dry_run parameter)
110

111
  """
112
  HPATH = None
113
  HTYPE = None
114
  REQ_BGL = True
115

    
116
  def __init__(self, processor, op, context, rpc_runner):
117
    """Constructor for LogicalUnit.
118

119
    This needs to be overridden in derived classes in order to check op
120
    validity.
121

122
    """
123
    self.proc = processor
124
    self.op = op
125
    self.cfg = context.cfg
126
    self.glm = context.glm
127
    # readability alias
128
    self.owned_locks = context.glm.list_owned
129
    self.context = context
130
    self.rpc = rpc_runner
131
    # Dicts used to declare locking needs to mcpu
132
    self.needed_locks = None
133
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
134
    self.add_locks = {}
135
    self.remove_locks = {}
136
    # Used to force good behavior when calling helper functions
137
    self.recalculate_locks = {}
138
    # logging
139
    self.Log = processor.Log # pylint: disable=C0103
140
    self.LogWarning = processor.LogWarning # pylint: disable=C0103
141
    self.LogInfo = processor.LogInfo # pylint: disable=C0103
142
    self.LogStep = processor.LogStep # pylint: disable=C0103
143
    # support for dry-run
144
    self.dry_run_result = None
145
    # support for generic debug attribute
146
    if (not hasattr(self.op, "debug_level") or
147
        not isinstance(self.op.debug_level, int)):
148
      self.op.debug_level = 0
149

    
150
    # Tasklets
151
    self.tasklets = None
152

    
153
    # Validate opcode parameters and set defaults
154
    self.op.Validate(True)
155

    
156
    self.CheckArguments()
157

    
158
  def CheckArguments(self):
159
    """Check syntactic validity for the opcode arguments.
160

161
    This method is for doing a simple syntactic check and ensure
162
    validity of opcode parameters, without any cluster-related
163
    checks. While the same can be accomplished in ExpandNames and/or
164
    CheckPrereq, doing these separate is better because:
165

166
      - ExpandNames is left as as purely a lock-related function
167
      - CheckPrereq is run after we have acquired locks (and possible
168
        waited for them)
169

170
    The function is allowed to change the self.op attribute so that
171
    later methods can no longer worry about missing parameters.
172

173
    """
174
    pass
175

    
176
  def ExpandNames(self):
177
    """Expand names for this LU.
178

179
    This method is called before starting to execute the opcode, and it should
180
    update all the parameters of the opcode to their canonical form (e.g. a
181
    short node name must be fully expanded after this method has successfully
182
    completed). This way locking, hooks, logging, etc. can work correctly.
183

184
    LUs which implement this method must also populate the self.needed_locks
185
    member, as a dict with lock levels as keys, and a list of needed lock names
186
    as values. Rules:
187

188
      - use an empty dict if you don't need any lock
189
      - if you don't need any lock at a particular level omit that level
190
      - don't put anything for the BGL level
191
      - if you want all locks at a level use locking.ALL_SET as a value
192

193
    If you need to share locks (rather than acquire them exclusively) at one
194
    level you can modify self.share_locks, setting a true value (usually 1) for
195
    that level. By default locks are not shared.
196

197
    This function can also define a list of tasklets, which then will be
198
    executed in order instead of the usual LU-level CheckPrereq and Exec
199
    functions, if those are not defined by the LU.
200

201
    Examples::
202

203
      # Acquire all nodes and one instance
204
      self.needed_locks = {
205
        locking.LEVEL_NODE: locking.ALL_SET,
206
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
207
      }
208
      # Acquire just two nodes
209
      self.needed_locks = {
210
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
211
      }
212
      # Acquire no locks
213
      self.needed_locks = {} # No, you can't leave it to the default value None
214

215
    """
216
    # The implementation of this method is mandatory only if the new LU is
217
    # concurrent, so that old LUs don't need to be changed all at the same
218
    # time.
219
    if self.REQ_BGL:
220
      self.needed_locks = {} # Exclusive LUs don't need locks.
221
    else:
222
      raise NotImplementedError
223

    
224
  def DeclareLocks(self, level):
225
    """Declare LU locking needs for a level
226

227
    While most LUs can just declare their locking needs at ExpandNames time,
228
    sometimes there's the need to calculate some locks after having acquired
229
    the ones before. This function is called just before acquiring locks at a
230
    particular level, but after acquiring the ones at lower levels, and permits
231
    such calculations. It can be used to modify self.needed_locks, and by
232
    default it does nothing.
233

234
    This function is only called if you have something already set in
235
    self.needed_locks for the level.
236

237
    @param level: Locking level which is going to be locked
238
    @type level: member of ganeti.locking.LEVELS
239

240
    """
241

    
242
  def CheckPrereq(self):
243
    """Check prerequisites for this LU.
244

245
    This method should check that the prerequisites for the execution
246
    of this LU are fulfilled. It can do internode communication, but
247
    it should be idempotent - no cluster or system changes are
248
    allowed.
249

250
    The method should raise errors.OpPrereqError in case something is
251
    not fulfilled. Its return value is ignored.
252

253
    This method should also update all the parameters of the opcode to
254
    their canonical form if it hasn't been done by ExpandNames before.
255

256
    """
257
    if self.tasklets is not None:
258
      for (idx, tl) in enumerate(self.tasklets):
259
        logging.debug("Checking prerequisites for tasklet %s/%s",
260
                      idx + 1, len(self.tasklets))
261
        tl.CheckPrereq()
262
    else:
263
      pass
264

    
265
  def Exec(self, feedback_fn):
266
    """Execute the LU.
267

268
    This method should implement the actual work. It should raise
269
    errors.OpExecError for failures that are somewhat dealt with in
270
    code, or expected.
271

272
    """
273
    if self.tasklets is not None:
274
      for (idx, tl) in enumerate(self.tasklets):
275
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
276
        tl.Exec(feedback_fn)
277
    else:
278
      raise NotImplementedError
279

    
280
  def BuildHooksEnv(self):
281
    """Build hooks environment for this LU.
282

283
    @rtype: dict
284
    @return: Dictionary containing the environment that will be used for
285
      running the hooks for this LU. The keys of the dict must not be prefixed
286
      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
287
      will extend the environment with additional variables. If no environment
288
      should be defined, an empty dictionary should be returned (not C{None}).
289
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
290
      will not be called.
291

292
    """
293
    raise NotImplementedError
294

    
295
  def BuildHooksNodes(self):
296
    """Build list of nodes to run LU's hooks.
297

298
    @rtype: tuple; (list, list)
299
    @return: Tuple containing a list of node names on which the hook
300
      should run before the execution and a list of node names on which the
301
      hook should run after the execution. No nodes should be returned as an
302
      empty list (and not None).
303
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
304
      will not be called.
305

306
    """
307
    raise NotImplementedError
308

    
309
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
310
    """Notify the LU about the results of its hooks.
311

312
    This method is called every time a hooks phase is executed, and notifies
313
    the Logical Unit about the hooks' result. The LU can then use it to alter
314
    its result based on the hooks.  By default the method does nothing and the
315
    previous result is passed back unchanged but any LU can define it if it
316
    wants to use the local cluster hook-scripts somehow.
317

318
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
319
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
320
    @param hook_results: the results of the multi-node hooks rpc call
321
    @param feedback_fn: function used send feedback back to the caller
322
    @param lu_result: the previous Exec result this LU had, or None
323
        in the PRE phase
324
    @return: the new Exec result, based on the previous result
325
        and hook results
326

327
    """
328
    # API must be kept, thus we ignore the unused argument and could
329
    # be a function warnings
330
    # pylint: disable=W0613,R0201
331
    return lu_result
332

    
333
  def _ExpandAndLockInstance(self):
334
    """Helper function to expand and lock an instance.
335

336
    Many LUs that work on an instance take its name in self.op.instance_name
337
    and need to expand it and then declare the expanded name for locking. This
338
    function does it, and then updates self.op.instance_name to the expanded
339
    name. It also initializes needed_locks as a dict, if this hasn't been done
340
    before.
341

342
    """
343
    if self.needed_locks is None:
344
      self.needed_locks = {}
345
    else:
346
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
347
        "_ExpandAndLockInstance called with instance-level locks set"
348
    self.op.instance_name = _ExpandInstanceName(self.cfg,
349
                                                self.op.instance_name)
350
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
351

    
352
  def _LockInstancesNodes(self, primary_only=False,
353
                          level=locking.LEVEL_NODE):
354
    """Helper function to declare instances' nodes for locking.
355

356
    This function should be called after locking one or more instances to lock
357
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
358
    with all primary or secondary nodes for instances already locked and
359
    present in self.needed_locks[locking.LEVEL_INSTANCE].
360

361
    It should be called from DeclareLocks, and for safety only works if
362
    self.recalculate_locks[locking.LEVEL_NODE] is set.
363

364
    In the future it may grow parameters to just lock some instance's nodes, or
365
    to just lock primaries or secondary nodes, if needed.
366

367
    If should be called in DeclareLocks in a way similar to::
368

369
      if level == locking.LEVEL_NODE:
370
        self._LockInstancesNodes()
371

372
    @type primary_only: boolean
373
    @param primary_only: only lock primary nodes of locked instances
374
    @param level: Which lock level to use for locking nodes
375

376
    """
377
    assert level in self.recalculate_locks, \
378
      "_LockInstancesNodes helper function called with no nodes to recalculate"
379

    
380
    # TODO: check if we're really been called with the instance locks held
381

    
382
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
383
    # future we might want to have different behaviors depending on the value
384
    # of self.recalculate_locks[locking.LEVEL_NODE]
385
    wanted_nodes = []
386
    locked_i = self.owned_locks(locking.LEVEL_INSTANCE)
387
    for _, instance in self.cfg.GetMultiInstanceInfo(locked_i):
388
      wanted_nodes.append(instance.primary_node)
389
      if not primary_only:
390
        wanted_nodes.extend(instance.secondary_nodes)
391

    
392
    if self.recalculate_locks[level] == constants.LOCKS_REPLACE:
393
      self.needed_locks[level] = wanted_nodes
394
    elif self.recalculate_locks[level] == constants.LOCKS_APPEND:
395
      self.needed_locks[level].extend(wanted_nodes)
396
    else:
397
      raise errors.ProgrammerError("Unknown recalculation mode")
398

    
399
    del self.recalculate_locks[level]
400

    
401

    
402
class NoHooksLU(LogicalUnit): # pylint: disable=W0223
403
  """Simple LU which runs no hooks.
404

405
  This LU is intended as a parent for other LogicalUnits which will
406
  run no hooks, in order to reduce duplicate code.
407

408
  """
409
  HPATH = None
410
  HTYPE = None
411

    
412
  def BuildHooksEnv(self):
413
    """Empty BuildHooksEnv for NoHooksLu.
414

415
    This just raises an error.
416

417
    """
418
    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
419

    
420
  def BuildHooksNodes(self):
421
    """Empty BuildHooksNodes for NoHooksLU.
422

423
    """
424
    raise AssertionError("BuildHooksNodes called for NoHooksLU")
425

    
426

    
427
class Tasklet:
428
  """Tasklet base class.
429

430
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
431
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
432
  tasklets know nothing about locks.
433

434
  Subclasses must follow these rules:
435
    - Implement CheckPrereq
436
    - Implement Exec
437

438
  """
439
  def __init__(self, lu):
440
    self.lu = lu
441

    
442
    # Shortcuts
443
    self.cfg = lu.cfg
444
    self.rpc = lu.rpc
445

    
446
  def CheckPrereq(self):
447
    """Check prerequisites for this tasklets.
448

449
    This method should check whether the prerequisites for the execution of
450
    this tasklet are fulfilled. It can do internode communication, but it
451
    should be idempotent - no cluster or system changes are allowed.
452

453
    The method should raise errors.OpPrereqError in case something is not
454
    fulfilled. Its return value is ignored.
455

456
    This method should also update all parameters to their canonical form if it
457
    hasn't been done before.
458

459
    """
460
    pass
461

    
462
  def Exec(self, feedback_fn):
463
    """Execute the tasklet.
464

465
    This method should implement the actual work. It should raise
466
    errors.OpExecError for failures that are somewhat dealt with in code, or
467
    expected.
468

469
    """
470
    raise NotImplementedError
471

    
472

    
473
class _QueryBase:
474
  """Base for query utility classes.
475

476
  """
477
  #: Attribute holding field definitions
478
  FIELDS = None
479

    
480
  def __init__(self, qfilter, fields, use_locking):
481
    """Initializes this class.
482

483
    """
484
    self.use_locking = use_locking
485

    
486
    self.query = query.Query(self.FIELDS, fields, qfilter=qfilter,
487
                             namefield="name")
488
    self.requested_data = self.query.RequestedData()
489
    self.names = self.query.RequestedNames()
490

    
491
    # Sort only if no names were requested
492
    self.sort_by_name = not self.names
493

    
494
    self.do_locking = None
495
    self.wanted = None
496

    
497
  def _GetNames(self, lu, all_names, lock_level):
498
    """Helper function to determine names asked for in the query.
499

500
    """
501
    if self.do_locking:
502
      names = lu.owned_locks(lock_level)
503
    else:
504
      names = all_names
505

    
506
    if self.wanted == locking.ALL_SET:
507
      assert not self.names
508
      # caller didn't specify names, so ordering is not important
509
      return utils.NiceSort(names)
510

    
511
    # caller specified names and we must keep the same order
512
    assert self.names
513
    assert not self.do_locking or lu.glm.is_owned(lock_level)
514

    
515
    missing = set(self.wanted).difference(names)
516
    if missing:
517
      raise errors.OpExecError("Some items were removed before retrieving"
518
                               " their data: %s" % missing)
519

    
520
    # Return expanded names
521
    return self.wanted
522

    
523
  def ExpandNames(self, lu):
524
    """Expand names for this query.
525

526
    See L{LogicalUnit.ExpandNames}.
527

528
    """
529
    raise NotImplementedError()
530

    
531
  def DeclareLocks(self, lu, level):
532
    """Declare locks for this query.
533

534
    See L{LogicalUnit.DeclareLocks}.
535

536
    """
537
    raise NotImplementedError()
538

    
539
  def _GetQueryData(self, lu):
540
    """Collects all data for this query.
541

542
    @return: Query data object
543

544
    """
545
    raise NotImplementedError()
546

    
547
  def NewStyleQuery(self, lu):
548
    """Collect data and execute query.
549

550
    """
551
    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
552
                                  sort_by_name=self.sort_by_name)
553

    
554
  def OldStyleQuery(self, lu):
555
    """Collect data and execute query.
556

557
    """
558
    return self.query.OldStyleQuery(self._GetQueryData(lu),
559
                                    sort_by_name=self.sort_by_name)
560

    
561

    
562
def _ShareAll():
563
  """Returns a dict declaring all lock levels shared.
564

565
  """
566
  return dict.fromkeys(locking.LEVELS, 1)
567

    
568

    
569
def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
570
  """Checks if the owned node groups are still correct for an instance.
571

572
  @type cfg: L{config.ConfigWriter}
573
  @param cfg: The cluster configuration
574
  @type instance_name: string
575
  @param instance_name: Instance name
576
  @type owned_groups: set or frozenset
577
  @param owned_groups: List of currently owned node groups
578

579
  """
580
  inst_groups = cfg.GetInstanceNodeGroups(instance_name)
581

    
582
  if not owned_groups.issuperset(inst_groups):
583
    raise errors.OpPrereqError("Instance %s's node groups changed since"
584
                               " locks were acquired, current groups are"
585
                               " are '%s', owning groups '%s'; retry the"
586
                               " operation" %
587
                               (instance_name,
588
                                utils.CommaJoin(inst_groups),
589
                                utils.CommaJoin(owned_groups)),
590
                               errors.ECODE_STATE)
591

    
592
  return inst_groups
593

    
594

    
595
def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
596
  """Checks if the instances in a node group are still correct.
597

598
  @type cfg: L{config.ConfigWriter}
599
  @param cfg: The cluster configuration
600
  @type group_uuid: string
601
  @param group_uuid: Node group UUID
602
  @type owned_instances: set or frozenset
603
  @param owned_instances: List of currently owned instances
604

605
  """
606
  wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
607
  if owned_instances != wanted_instances:
608
    raise errors.OpPrereqError("Instances in node group '%s' changed since"
609
                               " locks were acquired, wanted '%s', have '%s';"
610
                               " retry the operation" %
611
                               (group_uuid,
612
                                utils.CommaJoin(wanted_instances),
613
                                utils.CommaJoin(owned_instances)),
614
                               errors.ECODE_STATE)
615

    
616
  return wanted_instances
617

    
618

    
619
def _SupportsOob(cfg, node):
620
  """Tells if node supports OOB.
621

622
  @type cfg: L{config.ConfigWriter}
623
  @param cfg: The cluster configuration
624
  @type node: L{objects.Node}
625
  @param node: The node
626
  @return: The OOB script if supported or an empty string otherwise
627

628
  """
629
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
630

    
631

    
632
def _GetWantedNodes(lu, nodes):
633
  """Returns list of checked and expanded node names.
634

635
  @type lu: L{LogicalUnit}
636
  @param lu: the logical unit on whose behalf we execute
637
  @type nodes: list
638
  @param nodes: list of node names or None for all nodes
639
  @rtype: list
640
  @return: the list of nodes, sorted
641
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
642

643
  """
644
  if nodes:
645
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
646

    
647
  return utils.NiceSort(lu.cfg.GetNodeList())
648

    
649

    
650
def _GetWantedInstances(lu, instances):
651
  """Returns list of checked and expanded instance names.
652

653
  @type lu: L{LogicalUnit}
654
  @param lu: the logical unit on whose behalf we execute
655
  @type instances: list
656
  @param instances: list of instance names or None for all instances
657
  @rtype: list
658
  @return: the list of instances, sorted
659
  @raise errors.OpPrereqError: if the instances parameter is wrong type
660
  @raise errors.OpPrereqError: if any of the passed instances is not found
661

662
  """
663
  if instances:
664
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
665
  else:
666
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
667
  return wanted
668

    
669

    
670
def _GetUpdatedParams(old_params, update_dict,
671
                      use_default=True, use_none=False):
672
  """Return the new version of a parameter dictionary.
673

674
  @type old_params: dict
675
  @param old_params: old parameters
676
  @type update_dict: dict
677
  @param update_dict: dict containing new parameter values, or
678
      constants.VALUE_DEFAULT to reset the parameter to its default
679
      value
680
  @param use_default: boolean
681
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
682
      values as 'to be deleted' values
683
  @param use_none: boolean
684
  @type use_none: whether to recognise C{None} values as 'to be
685
      deleted' values
686
  @rtype: dict
687
  @return: the new parameter dictionary
688

689
  """
690
  params_copy = copy.deepcopy(old_params)
691
  for key, val in update_dict.iteritems():
692
    if ((use_default and val == constants.VALUE_DEFAULT) or
693
        (use_none and val is None)):
694
      try:
695
        del params_copy[key]
696
      except KeyError:
697
        pass
698
    else:
699
      params_copy[key] = val
700
  return params_copy
701

    
702

    
703
def _ReleaseLocks(lu, level, names=None, keep=None):
704
  """Releases locks owned by an LU.
705

706
  @type lu: L{LogicalUnit}
707
  @param level: Lock level
708
  @type names: list or None
709
  @param names: Names of locks to release
710
  @type keep: list or None
711
  @param keep: Names of locks to retain
712

713
  """
714
  assert not (keep is not None and names is not None), \
715
         "Only one of the 'names' and the 'keep' parameters can be given"
716

    
717
  if names is not None:
718
    should_release = names.__contains__
719
  elif keep:
720
    should_release = lambda name: name not in keep
721
  else:
722
    should_release = None
723

    
724
  if should_release:
725
    retain = []
726
    release = []
727

    
728
    # Determine which locks to release
729
    for name in lu.owned_locks(level):
730
      if should_release(name):
731
        release.append(name)
732
      else:
733
        retain.append(name)
734

    
735
    assert len(lu.owned_locks(level)) == (len(retain) + len(release))
736

    
737
    # Release just some locks
738
    lu.glm.release(level, names=release)
739

    
740
    assert frozenset(lu.owned_locks(level)) == frozenset(retain)
741
  else:
742
    # Release everything
743
    lu.glm.release(level)
744

    
745
    assert not lu.glm.is_owned(level), "No locks should be owned"
746

    
747

    
748
def _MapInstanceDisksToNodes(instances):
749
  """Creates a map from (node, volume) to instance name.
750

751
  @type instances: list of L{objects.Instance}
752
  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
753

754
  """
755
  return dict(((node, vol), inst.name)
756
              for inst in instances
757
              for (node, vols) in inst.MapLVsByNode().items()
758
              for vol in vols)
759

    
760

    
761
def _RunPostHook(lu, node_name):
762
  """Runs the post-hook for an opcode on a single node.
763

764
  """
765
  hm = lu.proc.BuildHooksManager(lu)
766
  try:
767
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
768
  except:
769
    # pylint: disable=W0702
770
    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
771

    
772

    
773
def _CheckOutputFields(static, dynamic, selected):
774
  """Checks whether all selected fields are valid.
775

776
  @type static: L{utils.FieldSet}
777
  @param static: static fields set
778
  @type dynamic: L{utils.FieldSet}
779
  @param dynamic: dynamic fields set
780

781
  """
782
  f = utils.FieldSet()
783
  f.Extend(static)
784
  f.Extend(dynamic)
785

    
786
  delta = f.NonMatching(selected)
787
  if delta:
788
    raise errors.OpPrereqError("Unknown output fields selected: %s"
789
                               % ",".join(delta), errors.ECODE_INVAL)
790

    
791

    
792
def _CheckGlobalHvParams(params):
793
  """Validates that given hypervisor params are not global ones.
794

795
  This will ensure that instances don't get customised versions of
796
  global params.
797

798
  """
799
  used_globals = constants.HVC_GLOBALS.intersection(params)
800
  if used_globals:
801
    msg = ("The following hypervisor parameters are global and cannot"
802
           " be customized at instance level, please modify them at"
803
           " cluster level: %s" % utils.CommaJoin(used_globals))
804
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
805

    
806

    
807
def _CheckNodeOnline(lu, node, msg=None):
808
  """Ensure that a given node is online.
809

810
  @param lu: the LU on behalf of which we make the check
811
  @param node: the node to check
812
  @param msg: if passed, should be a message to replace the default one
813
  @raise errors.OpPrereqError: if the node is offline
814

815
  """
816
  if msg is None:
817
    msg = "Can't use offline node"
818
  if lu.cfg.GetNodeInfo(node).offline:
819
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
820

    
821

    
822
def _CheckNodeNotDrained(lu, node):
823
  """Ensure that a given node is not drained.
824

825
  @param lu: the LU on behalf of which we make the check
826
  @param node: the node to check
827
  @raise errors.OpPrereqError: if the node is drained
828

829
  """
830
  if lu.cfg.GetNodeInfo(node).drained:
831
    raise errors.OpPrereqError("Can't use drained node %s" % node,
832
                               errors.ECODE_STATE)
833

    
834

    
835
def _CheckNodeVmCapable(lu, node):
836
  """Ensure that a given node is vm capable.
837

838
  @param lu: the LU on behalf of which we make the check
839
  @param node: the node to check
840
  @raise errors.OpPrereqError: if the node is not vm capable
841

842
  """
843
  if not lu.cfg.GetNodeInfo(node).vm_capable:
844
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
845
                               errors.ECODE_STATE)
846

    
847

    
848
def _CheckNodeHasOS(lu, node, os_name, force_variant):
849
  """Ensure that a node supports a given OS.
850

851
  @param lu: the LU on behalf of which we make the check
852
  @param node: the node to check
853
  @param os_name: the OS to query about
854
  @param force_variant: whether to ignore variant errors
855
  @raise errors.OpPrereqError: if the node is not supporting the OS
856

857
  """
858
  result = lu.rpc.call_os_get(node, os_name)
859
  result.Raise("OS '%s' not in supported OS list for node %s" %
860
               (os_name, node),
861
               prereq=True, ecode=errors.ECODE_INVAL)
862
  if not force_variant:
863
    _CheckOSVariant(result.payload, os_name)
864

    
865

    
866
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
867
  """Ensure that a node has the given secondary ip.
868

869
  @type lu: L{LogicalUnit}
870
  @param lu: the LU on behalf of which we make the check
871
  @type node: string
872
  @param node: the node to check
873
  @type secondary_ip: string
874
  @param secondary_ip: the ip to check
875
  @type prereq: boolean
876
  @param prereq: whether to throw a prerequisite or an execute error
877
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
878
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
879

880
  """
881
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
882
  result.Raise("Failure checking secondary ip on node %s" % node,
883
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
884
  if not result.payload:
885
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
886
           " please fix and re-run this command" % secondary_ip)
887
    if prereq:
888
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
889
    else:
890
      raise errors.OpExecError(msg)
891

    
892

    
893
def _GetClusterDomainSecret():
894
  """Reads the cluster domain secret.
895

896
  """
897
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
898
                               strict=True)
899

    
900

    
901
def _CheckInstanceDown(lu, instance, reason):
902
  """Ensure that an instance is not running."""
903
  if instance.admin_up:
904
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
905
                               (instance.name, reason), errors.ECODE_STATE)
906

    
907
  pnode = instance.primary_node
908
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
909
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
910
              prereq=True, ecode=errors.ECODE_ENVIRON)
911

    
912
  if instance.name in ins_l.payload:
913
    raise errors.OpPrereqError("Instance %s is running, %s" %
914
                               (instance.name, reason), errors.ECODE_STATE)
915

    
916

    
917
def _ExpandItemName(fn, name, kind):
918
  """Expand an item name.
919

920
  @param fn: the function to use for expansion
921
  @param name: requested item name
922
  @param kind: text description ('Node' or 'Instance')
923
  @return: the resolved (full) name
924
  @raise errors.OpPrereqError: if the item is not found
925

926
  """
927
  full_name = fn(name)
928
  if full_name is None:
929
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
930
                               errors.ECODE_NOENT)
931
  return full_name
932

    
933

    
934
def _ExpandNodeName(cfg, name):
935
  """Wrapper over L{_ExpandItemName} for nodes."""
936
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
937

    
938

    
939
def _ExpandInstanceName(cfg, name):
940
  """Wrapper over L{_ExpandItemName} for instance."""
941
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
942

    
943

    
944
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
945
                          memory, vcpus, nics, disk_template, disks,
946
                          bep, hvp, hypervisor_name, tags):
947
  """Builds instance related env variables for hooks
948

949
  This builds the hook environment from individual variables.
950

951
  @type name: string
952
  @param name: the name of the instance
953
  @type primary_node: string
954
  @param primary_node: the name of the instance's primary node
955
  @type secondary_nodes: list
956
  @param secondary_nodes: list of secondary nodes as strings
957
  @type os_type: string
958
  @param os_type: the name of the instance's OS
959
  @type status: boolean
960
  @param status: the should_run status of the instance
961
  @type memory: string
962
  @param memory: the memory size of the instance
963
  @type vcpus: string
964
  @param vcpus: the count of VCPUs the instance has
965
  @type nics: list
966
  @param nics: list of tuples (ip, mac, mode, link) representing
967
      the NICs the instance has
968
  @type disk_template: string
969
  @param disk_template: the disk template of the instance
970
  @type disks: list
971
  @param disks: the list of (size, mode) pairs
972
  @type bep: dict
973
  @param bep: the backend parameters for the instance
974
  @type hvp: dict
975
  @param hvp: the hypervisor parameters for the instance
976
  @type hypervisor_name: string
977
  @param hypervisor_name: the hypervisor for the instance
978
  @type tags: list
979
  @param tags: list of instance tags as strings
980
  @rtype: dict
981
  @return: the hook environment for this instance
982

983
  """
984
  if status:
985
    str_status = "up"
986
  else:
987
    str_status = "down"
988
  env = {
989
    "OP_TARGET": name,
990
    "INSTANCE_NAME": name,
991
    "INSTANCE_PRIMARY": primary_node,
992
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
993
    "INSTANCE_OS_TYPE": os_type,
994
    "INSTANCE_STATUS": str_status,
995
    "INSTANCE_MEMORY": memory,
996
    "INSTANCE_VCPUS": vcpus,
997
    "INSTANCE_DISK_TEMPLATE": disk_template,
998
    "INSTANCE_HYPERVISOR": hypervisor_name,
999
  }
1000

    
1001
  if nics:
1002
    nic_count = len(nics)
1003
    for idx, (ip, mac, mode, link) in enumerate(nics):
1004
      if ip is None:
1005
        ip = ""
1006
      env["INSTANCE_NIC%d_IP" % idx] = ip
1007
      env["INSTANCE_NIC%d_MAC" % idx] = mac
1008
      env["INSTANCE_NIC%d_MODE" % idx] = mode
1009
      env["INSTANCE_NIC%d_LINK" % idx] = link
1010
      if mode == constants.NIC_MODE_BRIDGED:
1011
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
1012
  else:
1013
    nic_count = 0
1014

    
1015
  env["INSTANCE_NIC_COUNT"] = nic_count
1016

    
1017
  if disks:
1018
    disk_count = len(disks)
1019
    for idx, (size, mode) in enumerate(disks):
1020
      env["INSTANCE_DISK%d_SIZE" % idx] = size
1021
      env["INSTANCE_DISK%d_MODE" % idx] = mode
1022
  else:
1023
    disk_count = 0
1024

    
1025
  env["INSTANCE_DISK_COUNT"] = disk_count
1026

    
1027
  if not tags:
1028
    tags = []
1029

    
1030
  env["INSTANCE_TAGS"] = " ".join(tags)
1031

    
1032
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
1033
    for key, value in source.items():
1034
      env["INSTANCE_%s_%s" % (kind, key)] = value
1035

    
1036
  return env
1037

    
1038

    
1039
def _NICListToTuple(lu, nics):
1040
  """Build a list of nic information tuples.
1041

1042
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
1043
  value in LUInstanceQueryData.
1044

1045
  @type lu:  L{LogicalUnit}
1046
  @param lu: the logical unit on whose behalf we execute
1047
  @type nics: list of L{objects.NIC}
1048
  @param nics: list of nics to convert to hooks tuples
1049

1050
  """
1051
  hooks_nics = []
1052
  cluster = lu.cfg.GetClusterInfo()
1053
  for nic in nics:
1054
    ip = nic.ip
1055
    mac = nic.mac
1056
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
1057
    mode = filled_params[constants.NIC_MODE]
1058
    link = filled_params[constants.NIC_LINK]
1059
    hooks_nics.append((ip, mac, mode, link))
1060
  return hooks_nics
1061

    
1062

    
1063
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
1064
  """Builds instance related env variables for hooks from an object.
1065

1066
  @type lu: L{LogicalUnit}
1067
  @param lu: the logical unit on whose behalf we execute
1068
  @type instance: L{objects.Instance}
1069
  @param instance: the instance for which we should build the
1070
      environment
1071
  @type override: dict
1072
  @param override: dictionary with key/values that will override
1073
      our values
1074
  @rtype: dict
1075
  @return: the hook environment dictionary
1076

1077
  """
1078
  cluster = lu.cfg.GetClusterInfo()
1079
  bep = cluster.FillBE(instance)
1080
  hvp = cluster.FillHV(instance)
1081
  args = {
1082
    "name": instance.name,
1083
    "primary_node": instance.primary_node,
1084
    "secondary_nodes": instance.secondary_nodes,
1085
    "os_type": instance.os,
1086
    "status": instance.admin_up,
1087
    "memory": bep[constants.BE_MEMORY],
1088
    "vcpus": bep[constants.BE_VCPUS],
1089
    "nics": _NICListToTuple(lu, instance.nics),
1090
    "disk_template": instance.disk_template,
1091
    "disks": [(disk.size, disk.mode) for disk in instance.disks],
1092
    "bep": bep,
1093
    "hvp": hvp,
1094
    "hypervisor_name": instance.hypervisor,
1095
    "tags": instance.tags,
1096
  }
1097
  if override:
1098
    args.update(override)
1099
  return _BuildInstanceHookEnv(**args) # pylint: disable=W0142
1100

    
1101

    
1102
def _AdjustCandidatePool(lu, exceptions):
1103
  """Adjust the candidate pool after node operations.
1104

1105
  """
1106
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1107
  if mod_list:
1108
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1109
               utils.CommaJoin(node.name for node in mod_list))
1110
    for name in mod_list:
1111
      lu.context.ReaddNode(name)
1112
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1113
  if mc_now > mc_max:
1114
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1115
               (mc_now, mc_max))
1116

    
1117

    
1118
def _DecideSelfPromotion(lu, exceptions=None):
1119
  """Decide whether I should promote myself as a master candidate.
1120

1121
  """
1122
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1123
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1124
  # the new node will increase mc_max with one, so:
1125
  mc_should = min(mc_should + 1, cp_size)
1126
  return mc_now < mc_should
1127

    
1128

    
1129
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1130
  """Check that the brigdes needed by a list of nics exist.
1131

1132
  """
1133
  cluster = lu.cfg.GetClusterInfo()
1134
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1135
  brlist = [params[constants.NIC_LINK] for params in paramslist
1136
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1137
  if brlist:
1138
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1139
    result.Raise("Error checking bridges on destination node '%s'" %
1140
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1141

    
1142

    
1143
def _CheckInstanceBridgesExist(lu, instance, node=None):
1144
  """Check that the brigdes needed by an instance exist.
1145

1146
  """
1147
  if node is None:
1148
    node = instance.primary_node
1149
  _CheckNicsBridgesExist(lu, instance.nics, node)
1150

    
1151

    
1152
def _CheckOSVariant(os_obj, name):
1153
  """Check whether an OS name conforms to the os variants specification.
1154

1155
  @type os_obj: L{objects.OS}
1156
  @param os_obj: OS object to check
1157
  @type name: string
1158
  @param name: OS name passed by the user, to check for validity
1159

1160
  """
1161
  variant = objects.OS.GetVariant(name)
1162
  if not os_obj.supported_variants:
1163
    if variant:
1164
      raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
1165
                                 " passed)" % (os_obj.name, variant),
1166
                                 errors.ECODE_INVAL)
1167
    return
1168
  if not variant:
1169
    raise errors.OpPrereqError("OS name must include a variant",
1170
                               errors.ECODE_INVAL)
1171

    
1172
  if variant not in os_obj.supported_variants:
1173
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1174

    
1175

    
1176
def _GetNodeInstancesInner(cfg, fn):
1177
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1178

    
1179

    
1180
def _GetNodeInstances(cfg, node_name):
1181
  """Returns a list of all primary and secondary instances on a node.
1182

1183
  """
1184

    
1185
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1186

    
1187

    
1188
def _GetNodePrimaryInstances(cfg, node_name):
1189
  """Returns primary instances on a node.
1190

1191
  """
1192
  return _GetNodeInstancesInner(cfg,
1193
                                lambda inst: node_name == inst.primary_node)
1194

    
1195

    
1196
def _GetNodeSecondaryInstances(cfg, node_name):
1197
  """Returns secondary instances on a node.
1198

1199
  """
1200
  return _GetNodeInstancesInner(cfg,
1201
                                lambda inst: node_name in inst.secondary_nodes)
1202

    
1203

    
1204
def _GetStorageTypeArgs(cfg, storage_type):
1205
  """Returns the arguments for a storage type.
1206

1207
  """
1208
  # Special case for file storage
1209
  if storage_type == constants.ST_FILE:
1210
    # storage.FileStorage wants a list of storage directories
1211
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1212

    
1213
  return []
1214

    
1215

    
1216
def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
1217
  faulty = []
1218

    
1219
  for dev in instance.disks:
1220
    cfg.SetDiskID(dev, node_name)
1221

    
1222
  result = rpc_runner.call_blockdev_getmirrorstatus(node_name, instance.disks)
1223
  result.Raise("Failed to get disk status from node %s" % node_name,
1224
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1225

    
1226
  for idx, bdev_status in enumerate(result.payload):
1227
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1228
      faulty.append(idx)
1229

    
1230
  return faulty
1231

    
1232

    
1233
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1234
  """Check the sanity of iallocator and node arguments and use the
1235
  cluster-wide iallocator if appropriate.
1236

1237
  Check that at most one of (iallocator, node) is specified. If none is
1238
  specified, then the LU's opcode's iallocator slot is filled with the
1239
  cluster-wide default iallocator.
1240

1241
  @type iallocator_slot: string
1242
  @param iallocator_slot: the name of the opcode iallocator slot
1243
  @type node_slot: string
1244
  @param node_slot: the name of the opcode target node slot
1245

1246
  """
1247
  node = getattr(lu.op, node_slot, None)
1248
  iallocator = getattr(lu.op, iallocator_slot, None)
1249

    
1250
  if node is not None and iallocator is not None:
1251
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
1252
                               errors.ECODE_INVAL)
1253
  elif node is None and iallocator is None:
1254
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1255
    if default_iallocator:
1256
      setattr(lu.op, iallocator_slot, default_iallocator)
1257
    else:
1258
      raise errors.OpPrereqError("No iallocator or node given and no"
1259
                                 " cluster-wide default iallocator found;"
1260
                                 " please specify either an iallocator or a"
1261
                                 " node, or set a cluster-wide default"
1262
                                 " iallocator")
1263

    
1264

    
1265
def _GetDefaultIAllocator(cfg, iallocator):
1266
  """Decides on which iallocator to use.
1267

1268
  @type cfg: L{config.ConfigWriter}
1269
  @param cfg: Cluster configuration object
1270
  @type iallocator: string or None
1271
  @param iallocator: Iallocator specified in opcode
1272
  @rtype: string
1273
  @return: Iallocator name
1274

1275
  """
1276
  if not iallocator:
1277
    # Use default iallocator
1278
    iallocator = cfg.GetDefaultIAllocator()
1279

    
1280
  if not iallocator:
1281
    raise errors.OpPrereqError("No iallocator was specified, neither in the"
1282
                               " opcode nor as a cluster-wide default",
1283
                               errors.ECODE_INVAL)
1284

    
1285
  return iallocator
1286

    
1287

    
1288
class LUClusterPostInit(LogicalUnit):
1289
  """Logical unit for running hooks after cluster initialization.
1290

1291
  """
1292
  HPATH = "cluster-init"
1293
  HTYPE = constants.HTYPE_CLUSTER
1294

    
1295
  def BuildHooksEnv(self):
1296
    """Build hooks env.
1297

1298
    """
1299
    return {
1300
      "OP_TARGET": self.cfg.GetClusterName(),
1301
      }
1302

    
1303
  def BuildHooksNodes(self):
1304
    """Build hooks nodes.
1305

1306
    """
1307
    return ([], [self.cfg.GetMasterNode()])
1308

    
1309
  def Exec(self, feedback_fn):
1310
    """Nothing to do.
1311

1312
    """
1313
    return True
1314

    
1315

    
1316
class LUClusterDestroy(LogicalUnit):
1317
  """Logical unit for destroying the cluster.
1318

1319
  """
1320
  HPATH = "cluster-destroy"
1321
  HTYPE = constants.HTYPE_CLUSTER
1322

    
1323
  def BuildHooksEnv(self):
1324
    """Build hooks env.
1325

1326
    """
1327
    return {
1328
      "OP_TARGET": self.cfg.GetClusterName(),
1329
      }
1330

    
1331
  def BuildHooksNodes(self):
1332
    """Build hooks nodes.
1333

1334
    """
1335
    return ([], [])
1336

    
1337
  def CheckPrereq(self):
1338
    """Check prerequisites.
1339

1340
    This checks whether the cluster is empty.
1341

1342
    Any errors are signaled by raising errors.OpPrereqError.
1343

1344
    """
1345
    master = self.cfg.GetMasterNode()
1346

    
1347
    nodelist = self.cfg.GetNodeList()
1348
    if len(nodelist) != 1 or nodelist[0] != master:
1349
      raise errors.OpPrereqError("There are still %d node(s) in"
1350
                                 " this cluster." % (len(nodelist) - 1),
1351
                                 errors.ECODE_INVAL)
1352
    instancelist = self.cfg.GetInstanceList()
1353
    if instancelist:
1354
      raise errors.OpPrereqError("There are still %d instance(s) in"
1355
                                 " this cluster." % len(instancelist),
1356
                                 errors.ECODE_INVAL)
1357

    
1358
  def Exec(self, feedback_fn):
1359
    """Destroys the cluster.
1360

1361
    """
1362
    master_params = self.cfg.GetMasterNetworkParameters()
1363

    
1364
    # Run post hooks on master node before it's removed
1365
    _RunPostHook(self, master_params.name)
1366

    
1367
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1368
                                                     master_params)
1369
    result.Raise("Could not disable the master role")
1370

    
1371
    return master_params.name
1372

    
1373

    
1374
def _VerifyCertificate(filename):
1375
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1376

1377
  @type filename: string
1378
  @param filename: Path to PEM file
1379

1380
  """
1381
  try:
1382
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1383
                                           utils.ReadFile(filename))
1384
  except Exception, err: # pylint: disable=W0703
1385
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1386
            "Failed to load X509 certificate %s: %s" % (filename, err))
1387

    
1388
  (errcode, msg) = \
1389
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1390
                                constants.SSL_CERT_EXPIRATION_ERROR)
1391

    
1392
  if msg:
1393
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1394
  else:
1395
    fnamemsg = None
1396

    
1397
  if errcode is None:
1398
    return (None, fnamemsg)
1399
  elif errcode == utils.CERT_WARNING:
1400
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1401
  elif errcode == utils.CERT_ERROR:
1402
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1403

    
1404
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1405

    
1406

    
1407
def _GetAllHypervisorParameters(cluster, instances):
1408
  """Compute the set of all hypervisor parameters.
1409

1410
  @type cluster: L{objects.Cluster}
1411
  @param cluster: the cluster object
1412
  @param instances: list of L{objects.Instance}
1413
  @param instances: additional instances from which to obtain parameters
1414
  @rtype: list of (origin, hypervisor, parameters)
1415
  @return: a list with all parameters found, indicating the hypervisor they
1416
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1417

1418
  """
1419
  hvp_data = []
1420

    
1421
  for hv_name in cluster.enabled_hypervisors:
1422
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1423

    
1424
  for os_name, os_hvp in cluster.os_hvp.items():
1425
    for hv_name, hv_params in os_hvp.items():
1426
      if hv_params:
1427
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1428
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1429

    
1430
  # TODO: collapse identical parameter values in a single one
1431
  for instance in instances:
1432
    if instance.hvparams:
1433
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1434
                       cluster.FillHV(instance)))
1435

    
1436
  return hvp_data
1437

    
1438

    
1439
class _VerifyErrors(object):
1440
  """Mix-in for cluster/group verify LUs.
1441

1442
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1443
  self.op and self._feedback_fn to be available.)
1444

1445
  """
1446

    
1447
  ETYPE_FIELD = "code"
1448
  ETYPE_ERROR = "ERROR"
1449
  ETYPE_WARNING = "WARNING"
1450

    
1451
  def _Error(self, ecode, item, msg, *args, **kwargs):
1452
    """Format an error message.
1453

1454
    Based on the opcode's error_codes parameter, either format a
1455
    parseable error code, or a simpler error string.
1456

1457
    This must be called only from Exec and functions called from Exec.
1458

1459
    """
1460
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1461
    itype, etxt, _ = ecode
1462
    # first complete the msg
1463
    if args:
1464
      msg = msg % args
1465
    # then format the whole message
1466
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1467
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1468
    else:
1469
      if item:
1470
        item = " " + item
1471
      else:
1472
        item = ""
1473
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1474
    # and finally report it via the feedback_fn
1475
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1476

    
1477
  def _ErrorIf(self, cond, ecode, *args, **kwargs):
1478
    """Log an error message if the passed condition is True.
1479

1480
    """
1481
    cond = (bool(cond)
1482
            or self.op.debug_simulate_errors) # pylint: disable=E1101
1483

    
1484
    # If the error code is in the list of ignored errors, demote the error to a
1485
    # warning
1486
    (_, etxt, _) = ecode
1487
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1488
      kwargs[self.ETYPE_FIELD] = self.ETYPE_WARNING
1489

    
1490
    if cond:
1491
      self._Error(ecode, *args, **kwargs)
1492

    
1493
    # do not mark the operation as failed for WARN cases only
1494
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1495
      self.bad = self.bad or cond
1496

    
1497

    
1498
class LUClusterVerify(NoHooksLU):
1499
  """Submits all jobs necessary to verify the cluster.
1500

1501
  """
1502
  REQ_BGL = False
1503

    
1504
  def ExpandNames(self):
1505
    self.needed_locks = {}
1506

    
1507
  def Exec(self, feedback_fn):
1508
    jobs = []
1509

    
1510
    if self.op.group_name:
1511
      groups = [self.op.group_name]
1512
      depends_fn = lambda: None
1513
    else:
1514
      groups = self.cfg.GetNodeGroupList()
1515

    
1516
      # Verify global configuration
1517
      jobs.append([
1518
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors)
1519
        ])
1520

    
1521
      # Always depend on global verification
1522
      depends_fn = lambda: [(-len(jobs), [])]
1523

    
1524
    jobs.extend([opcodes.OpClusterVerifyGroup(group_name=group,
1525
                                            ignore_errors=self.op.ignore_errors,
1526
                                            depends=depends_fn())]
1527
                for group in groups)
1528

    
1529
    # Fix up all parameters
1530
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1531
      op.debug_simulate_errors = self.op.debug_simulate_errors
1532
      op.verbose = self.op.verbose
1533
      op.error_codes = self.op.error_codes
1534
      try:
1535
        op.skip_checks = self.op.skip_checks
1536
      except AttributeError:
1537
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1538

    
1539
    return ResultWithJobs(jobs)
1540

    
1541

    
1542
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1543
  """Verifies the cluster config.
1544

1545
  """
1546
  REQ_BGL = True
1547

    
1548
  def _VerifyHVP(self, hvp_data):
1549
    """Verifies locally the syntax of the hypervisor parameters.
1550

1551
    """
1552
    for item, hv_name, hv_params in hvp_data:
1553
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1554
             (item, hv_name))
1555
      try:
1556
        hv_class = hypervisor.GetHypervisor(hv_name)
1557
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1558
        hv_class.CheckParameterSyntax(hv_params)
1559
      except errors.GenericError, err:
1560
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1561

    
1562
  def ExpandNames(self):
1563
    # Information can be safely retrieved as the BGL is acquired in exclusive
1564
    # mode
1565
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
1566
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1567
    self.all_node_info = self.cfg.GetAllNodesInfo()
1568
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1569
    self.needed_locks = {}
1570

    
1571
  def Exec(self, feedback_fn):
1572
    """Verify integrity of cluster, performing various test on nodes.
1573

1574
    """
1575
    self.bad = False
1576
    self._feedback_fn = feedback_fn
1577

    
1578
    feedback_fn("* Verifying cluster config")
1579

    
1580
    for msg in self.cfg.VerifyConfig():
1581
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1582

    
1583
    feedback_fn("* Verifying cluster certificate files")
1584

    
1585
    for cert_filename in constants.ALL_CERT_FILES:
1586
      (errcode, msg) = _VerifyCertificate(cert_filename)
1587
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1588

    
1589
    feedback_fn("* Verifying hypervisor parameters")
1590

    
1591
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1592
                                                self.all_inst_info.values()))
1593

    
1594
    feedback_fn("* Verifying all nodes belong to an existing group")
1595

    
1596
    # We do this verification here because, should this bogus circumstance
1597
    # occur, it would never be caught by VerifyGroup, which only acts on
1598
    # nodes/instances reachable from existing node groups.
1599

    
1600
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1601
                         if node.group not in self.all_group_info)
1602

    
1603
    dangling_instances = {}
1604
    no_node_instances = []
1605

    
1606
    for inst in self.all_inst_info.values():
1607
      if inst.primary_node in dangling_nodes:
1608
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1609
      elif inst.primary_node not in self.all_node_info:
1610
        no_node_instances.append(inst.name)
1611

    
1612
    pretty_dangling = [
1613
        "%s (%s)" %
1614
        (node.name,
1615
         utils.CommaJoin(dangling_instances.get(node.name,
1616
                                                ["no instances"])))
1617
        for node in dangling_nodes]
1618

    
1619
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1620
                  None,
1621
                  "the following nodes (and their instances) belong to a non"
1622
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1623

    
1624
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1625
                  None,
1626
                  "the following instances have a non-existing primary-node:"
1627
                  " %s", utils.CommaJoin(no_node_instances))
1628

    
1629
    return not self.bad
1630

    
1631

    
1632
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1633
  """Verifies the status of a node group.
1634

1635
  """
1636
  HPATH = "cluster-verify"
1637
  HTYPE = constants.HTYPE_CLUSTER
1638
  REQ_BGL = False
1639

    
1640
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1641

    
1642
  class NodeImage(object):
1643
    """A class representing the logical and physical status of a node.
1644

1645
    @type name: string
1646
    @ivar name: the node name to which this object refers
1647
    @ivar volumes: a structure as returned from
1648
        L{ganeti.backend.GetVolumeList} (runtime)
1649
    @ivar instances: a list of running instances (runtime)
1650
    @ivar pinst: list of configured primary instances (config)
1651
    @ivar sinst: list of configured secondary instances (config)
1652
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1653
        instances for which this node is secondary (config)
1654
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1655
    @ivar dfree: free disk, as reported by the node (runtime)
1656
    @ivar offline: the offline status (config)
1657
    @type rpc_fail: boolean
1658
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1659
        not whether the individual keys were correct) (runtime)
1660
    @type lvm_fail: boolean
1661
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1662
    @type hyp_fail: boolean
1663
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1664
    @type ghost: boolean
1665
    @ivar ghost: whether this is a known node or not (config)
1666
    @type os_fail: boolean
1667
    @ivar os_fail: whether the RPC call didn't return valid OS data
1668
    @type oslist: list
1669
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1670
    @type vm_capable: boolean
1671
    @ivar vm_capable: whether the node can host instances
1672

1673
    """
1674
    def __init__(self, offline=False, name=None, vm_capable=True):
1675
      self.name = name
1676
      self.volumes = {}
1677
      self.instances = []
1678
      self.pinst = []
1679
      self.sinst = []
1680
      self.sbp = {}
1681
      self.mfree = 0
1682
      self.dfree = 0
1683
      self.offline = offline
1684
      self.vm_capable = vm_capable
1685
      self.rpc_fail = False
1686
      self.lvm_fail = False
1687
      self.hyp_fail = False
1688
      self.ghost = False
1689
      self.os_fail = False
1690
      self.oslist = {}
1691

    
1692
  def ExpandNames(self):
1693
    # This raises errors.OpPrereqError on its own:
1694
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1695

    
1696
    # Get instances in node group; this is unsafe and needs verification later
1697
    inst_names = self.cfg.GetNodeGroupInstances(self.group_uuid)
1698

    
1699
    self.needed_locks = {
1700
      locking.LEVEL_INSTANCE: inst_names,
1701
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1702
      locking.LEVEL_NODE: [],
1703
      }
1704

    
1705
    self.share_locks = _ShareAll()
1706

    
1707
  def DeclareLocks(self, level):
1708
    if level == locking.LEVEL_NODE:
1709
      # Get members of node group; this is unsafe and needs verification later
1710
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1711

    
1712
      all_inst_info = self.cfg.GetAllInstancesInfo()
1713

    
1714
      # In Exec(), we warn about mirrored instances that have primary and
1715
      # secondary living in separate node groups. To fully verify that
1716
      # volumes for these instances are healthy, we will need to do an
1717
      # extra call to their secondaries. We ensure here those nodes will
1718
      # be locked.
1719
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1720
        # Important: access only the instances whose lock is owned
1721
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1722
          nodes.update(all_inst_info[inst].secondary_nodes)
1723

    
1724
      self.needed_locks[locking.LEVEL_NODE] = nodes
1725

    
1726
  def CheckPrereq(self):
1727
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1728
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1729

    
1730
    group_nodes = set(self.group_info.members)
1731
    group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
1732

    
1733
    unlocked_nodes = \
1734
        group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1735

    
1736
    unlocked_instances = \
1737
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1738

    
1739
    if unlocked_nodes:
1740
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1741
                                 utils.CommaJoin(unlocked_nodes))
1742

    
1743
    if unlocked_instances:
1744
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1745
                                 utils.CommaJoin(unlocked_instances))
1746

    
1747
    self.all_node_info = self.cfg.GetAllNodesInfo()
1748
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1749

    
1750
    self.my_node_names = utils.NiceSort(group_nodes)
1751
    self.my_inst_names = utils.NiceSort(group_instances)
1752

    
1753
    self.my_node_info = dict((name, self.all_node_info[name])
1754
                             for name in self.my_node_names)
1755

    
1756
    self.my_inst_info = dict((name, self.all_inst_info[name])
1757
                             for name in self.my_inst_names)
1758

    
1759
    # We detect here the nodes that will need the extra RPC calls for verifying
1760
    # split LV volumes; they should be locked.
1761
    extra_lv_nodes = set()
1762

    
1763
    for inst in self.my_inst_info.values():
1764
      if inst.disk_template in constants.DTS_INT_MIRROR:
1765
        group = self.my_node_info[inst.primary_node].group
1766
        for nname in inst.secondary_nodes:
1767
          if self.all_node_info[nname].group != group:
1768
            extra_lv_nodes.add(nname)
1769

    
1770
    unlocked_lv_nodes = \
1771
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1772

    
1773
    if unlocked_lv_nodes:
1774
      raise errors.OpPrereqError("these nodes could be locked: %s" %
1775
                                 utils.CommaJoin(unlocked_lv_nodes))
1776
    self.extra_lv_nodes = list(extra_lv_nodes)
1777

    
1778
  def _VerifyNode(self, ninfo, nresult):
1779
    """Perform some basic validation on data returned from a node.
1780

1781
      - check the result data structure is well formed and has all the
1782
        mandatory fields
1783
      - check ganeti version
1784

1785
    @type ninfo: L{objects.Node}
1786
    @param ninfo: the node to check
1787
    @param nresult: the results from the node
1788
    @rtype: boolean
1789
    @return: whether overall this call was successful (and we can expect
1790
         reasonable values in the respose)
1791

1792
    """
1793
    node = ninfo.name
1794
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1795

    
1796
    # main result, nresult should be a non-empty dict
1797
    test = not nresult or not isinstance(nresult, dict)
1798
    _ErrorIf(test, constants.CV_ENODERPC, node,
1799
                  "unable to verify node: no data returned")
1800
    if test:
1801
      return False
1802

    
1803
    # compares ganeti version
1804
    local_version = constants.PROTOCOL_VERSION
1805
    remote_version = nresult.get("version", None)
1806
    test = not (remote_version and
1807
                isinstance(remote_version, (list, tuple)) and
1808
                len(remote_version) == 2)
1809
    _ErrorIf(test, constants.CV_ENODERPC, node,
1810
             "connection to node returned invalid data")
1811
    if test:
1812
      return False
1813

    
1814
    test = local_version != remote_version[0]
1815
    _ErrorIf(test, constants.CV_ENODEVERSION, node,
1816
             "incompatible protocol versions: master %s,"
1817
             " node %s", local_version, remote_version[0])
1818
    if test:
1819
      return False
1820

    
1821
    # node seems compatible, we can actually try to look into its results
1822

    
1823
    # full package version
1824
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1825
                  constants.CV_ENODEVERSION, node,
1826
                  "software version mismatch: master %s, node %s",
1827
                  constants.RELEASE_VERSION, remote_version[1],
1828
                  code=self.ETYPE_WARNING)
1829

    
1830
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1831
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1832
      for hv_name, hv_result in hyp_result.iteritems():
1833
        test = hv_result is not None
1834
        _ErrorIf(test, constants.CV_ENODEHV, node,
1835
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1836

    
1837
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1838
    if ninfo.vm_capable and isinstance(hvp_result, list):
1839
      for item, hv_name, hv_result in hvp_result:
1840
        _ErrorIf(True, constants.CV_ENODEHV, node,
1841
                 "hypervisor %s parameter verify failure (source %s): %s",
1842
                 hv_name, item, hv_result)
1843

    
1844
    test = nresult.get(constants.NV_NODESETUP,
1845
                       ["Missing NODESETUP results"])
1846
    _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1847
             "; ".join(test))
1848

    
1849
    return True
1850

    
1851
  def _VerifyNodeTime(self, ninfo, nresult,
1852
                      nvinfo_starttime, nvinfo_endtime):
1853
    """Check the node time.
1854

1855
    @type ninfo: L{objects.Node}
1856
    @param ninfo: the node to check
1857
    @param nresult: the remote results for the node
1858
    @param nvinfo_starttime: the start time of the RPC call
1859
    @param nvinfo_endtime: the end time of the RPC call
1860

1861
    """
1862
    node = ninfo.name
1863
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1864

    
1865
    ntime = nresult.get(constants.NV_TIME, None)
1866
    try:
1867
      ntime_merged = utils.MergeTime(ntime)
1868
    except (ValueError, TypeError):
1869
      _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1870
      return
1871

    
1872
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1873
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1874
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1875
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1876
    else:
1877
      ntime_diff = None
1878

    
1879
    _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1880
             "Node time diverges by at least %s from master node time",
1881
             ntime_diff)
1882

    
1883
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1884
    """Check the node LVM results.
1885

1886
    @type ninfo: L{objects.Node}
1887
    @param ninfo: the node to check
1888
    @param nresult: the remote results for the node
1889
    @param vg_name: the configured VG name
1890

1891
    """
1892
    if vg_name is None:
1893
      return
1894

    
1895
    node = ninfo.name
1896
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1897

    
1898
    # checks vg existence and size > 20G
1899
    vglist = nresult.get(constants.NV_VGLIST, None)
1900
    test = not vglist
1901
    _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1902
    if not test:
1903
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1904
                                            constants.MIN_VG_SIZE)
1905
      _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1906

    
1907
    # check pv names
1908
    pvlist = nresult.get(constants.NV_PVLIST, None)
1909
    test = pvlist is None
1910
    _ErrorIf(test, constants.CV_ENODELVM, node, "Can't get PV list from node")
1911
    if not test:
1912
      # check that ':' is not present in PV names, since it's a
1913
      # special character for lvcreate (denotes the range of PEs to
1914
      # use on the PV)
1915
      for _, pvname, owner_vg in pvlist:
1916
        test = ":" in pvname
1917
        _ErrorIf(test, constants.CV_ENODELVM, node,
1918
                 "Invalid character ':' in PV '%s' of VG '%s'",
1919
                 pvname, owner_vg)
1920

    
1921
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1922
    """Check the node bridges.
1923

1924
    @type ninfo: L{objects.Node}
1925
    @param ninfo: the node to check
1926
    @param nresult: the remote results for the node
1927
    @param bridges: the expected list of bridges
1928

1929
    """
1930
    if not bridges:
1931
      return
1932

    
1933
    node = ninfo.name
1934
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1935

    
1936
    missing = nresult.get(constants.NV_BRIDGES, None)
1937
    test = not isinstance(missing, list)
1938
    _ErrorIf(test, constants.CV_ENODENET, node,
1939
             "did not return valid bridge information")
1940
    if not test:
1941
      _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1942
               "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1943

    
1944
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1945
    """Check the results of user scripts presence and executability on the node
1946

1947
    @type ninfo: L{objects.Node}
1948
    @param ninfo: the node to check
1949
    @param nresult: the remote results for the node
1950

1951
    """
1952
    node = ninfo.name
1953

    
1954
    test = not constants.NV_USERSCRIPTS in nresult
1955
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1956
                  "did not return user scripts information")
1957

    
1958
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1959
    if not test:
1960
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1961
                    "user scripts not present or not executable: %s" %
1962
                    utils.CommaJoin(sorted(broken_scripts)))
1963

    
1964
  def _VerifyNodeNetwork(self, ninfo, nresult):
1965
    """Check the node network connectivity results.
1966

1967
    @type ninfo: L{objects.Node}
1968
    @param ninfo: the node to check
1969
    @param nresult: the remote results for the node
1970

1971
    """
1972
    node = ninfo.name
1973
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1974

    
1975
    test = constants.NV_NODELIST not in nresult
1976
    _ErrorIf(test, constants.CV_ENODESSH, node,
1977
             "node hasn't returned node ssh connectivity data")
1978
    if not test:
1979
      if nresult[constants.NV_NODELIST]:
1980
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1981
          _ErrorIf(True, constants.CV_ENODESSH, node,
1982
                   "ssh communication with node '%s': %s", a_node, a_msg)
1983

    
1984
    test = constants.NV_NODENETTEST not in nresult
1985
    _ErrorIf(test, constants.CV_ENODENET, node,
1986
             "node hasn't returned node tcp connectivity data")
1987
    if not test:
1988
      if nresult[constants.NV_NODENETTEST]:
1989
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1990
        for anode in nlist:
1991
          _ErrorIf(True, constants.CV_ENODENET, node,
1992
                   "tcp communication with node '%s': %s",
1993
                   anode, nresult[constants.NV_NODENETTEST][anode])
1994

    
1995
    test = constants.NV_MASTERIP not in nresult
1996
    _ErrorIf(test, constants.CV_ENODENET, node,
1997
             "node hasn't returned node master IP reachability data")
1998
    if not test:
1999
      if not nresult[constants.NV_MASTERIP]:
2000
        if node == self.master_node:
2001
          msg = "the master node cannot reach the master IP (not configured?)"
2002
        else:
2003
          msg = "cannot reach the master IP"
2004
        _ErrorIf(True, constants.CV_ENODENET, node, msg)
2005

    
2006
  def _VerifyInstance(self, instance, instanceconfig, node_image,
2007
                      diskstatus):
2008
    """Verify an instance.
2009

2010
    This function checks to see if the required block devices are
2011
    available on the instance's node.
2012

2013
    """
2014
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2015
    node_current = instanceconfig.primary_node
2016

    
2017
    node_vol_should = {}
2018
    instanceconfig.MapLVsByNode(node_vol_should)
2019

    
2020
    for node in node_vol_should:
2021
      n_img = node_image[node]
2022
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2023
        # ignore missing volumes on offline or broken nodes
2024
        continue
2025
      for volume in node_vol_should[node]:
2026
        test = volume not in n_img.volumes
2027
        _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
2028
                 "volume %s missing on node %s", volume, node)
2029

    
2030
    if instanceconfig.admin_up:
2031
      pri_img = node_image[node_current]
2032
      test = instance not in pri_img.instances and not pri_img.offline
2033
      _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
2034
               "instance not running on its primary node %s",
2035
               node_current)
2036

    
2037
    diskdata = [(nname, success, status, idx)
2038
                for (nname, disks) in diskstatus.items()
2039
                for idx, (success, status) in enumerate(disks)]
2040

    
2041
    for nname, success, bdev_status, idx in diskdata:
2042
      # the 'ghost node' construction in Exec() ensures that we have a
2043
      # node here
2044
      snode = node_image[nname]
2045
      bad_snode = snode.ghost or snode.offline
2046
      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
2047
               constants.CV_EINSTANCEFAULTYDISK, instance,
2048
               "couldn't retrieve status for disk/%s on %s: %s",
2049
               idx, nname, bdev_status)
2050
      _ErrorIf((instanceconfig.admin_up and success and
2051
                bdev_status.ldisk_status == constants.LDS_FAULTY),
2052
               constants.CV_EINSTANCEFAULTYDISK, instance,
2053
               "disk/%s on %s is faulty", idx, nname)
2054

    
2055
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2056
    """Verify if there are any unknown volumes in the cluster.
2057

2058
    The .os, .swap and backup volumes are ignored. All other volumes are
2059
    reported as unknown.
2060

2061
    @type reserved: L{ganeti.utils.FieldSet}
2062
    @param reserved: a FieldSet of reserved volume names
2063

2064
    """
2065
    for node, n_img in node_image.items():
2066
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2067
        # skip non-healthy nodes
2068
        continue
2069
      for volume in n_img.volumes:
2070
        test = ((node not in node_vol_should or
2071
                volume not in node_vol_should[node]) and
2072
                not reserved.Matches(volume))
2073
        self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
2074
                      "volume %s is unknown", volume)
2075

    
2076
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
2077
    """Verify N+1 Memory Resilience.
2078

2079
    Check that if one single node dies we can still start all the
2080
    instances it was primary for.
2081

2082
    """
2083
    cluster_info = self.cfg.GetClusterInfo()
2084
    for node, n_img in node_image.items():
2085
      # This code checks that every node which is now listed as
2086
      # secondary has enough memory to host all instances it is
2087
      # supposed to should a single other node in the cluster fail.
2088
      # FIXME: not ready for failover to an arbitrary node
2089
      # FIXME: does not support file-backed instances
2090
      # WARNING: we currently take into account down instances as well
2091
      # as up ones, considering that even if they're down someone
2092
      # might want to start them even in the event of a node failure.
2093
      if n_img.offline:
2094
        # we're skipping offline nodes from the N+1 warning, since
2095
        # most likely we don't have good memory infromation from them;
2096
        # we already list instances living on such nodes, and that's
2097
        # enough warning
2098
        continue
2099
      for prinode, instances in n_img.sbp.items():
2100
        needed_mem = 0
2101
        for instance in instances:
2102
          bep = cluster_info.FillBE(instance_cfg[instance])
2103
          if bep[constants.BE_AUTO_BALANCE]:
2104
            needed_mem += bep[constants.BE_MEMORY]
2105
        test = n_img.mfree < needed_mem
2106
        self._ErrorIf(test, constants.CV_ENODEN1, node,
2107
                      "not enough memory to accomodate instance failovers"
2108
                      " should node %s fail (%dMiB needed, %dMiB available)",
2109
                      prinode, needed_mem, n_img.mfree)
2110

    
2111
  @classmethod
2112
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
2113
                   (files_all, files_opt, files_mc, files_vm)):
2114
    """Verifies file checksums collected from all nodes.
2115

2116
    @param errorif: Callback for reporting errors
2117
    @param nodeinfo: List of L{objects.Node} objects
2118
    @param master_node: Name of master node
2119
    @param all_nvinfo: RPC results
2120

2121
    """
2122
    # Define functions determining which nodes to consider for a file
2123
    files2nodefn = [
2124
      (files_all, None),
2125
      (files_mc, lambda node: (node.master_candidate or
2126
                               node.name == master_node)),
2127
      (files_vm, lambda node: node.vm_capable),
2128
      ]
2129

    
2130
    # Build mapping from filename to list of nodes which should have the file
2131
    nodefiles = {}
2132
    for (files, fn) in files2nodefn:
2133
      if fn is None:
2134
        filenodes = nodeinfo
2135
      else:
2136
        filenodes = filter(fn, nodeinfo)
2137
      nodefiles.update((filename,
2138
                        frozenset(map(operator.attrgetter("name"), filenodes)))
2139
                       for filename in files)
2140

    
2141
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2142

    
2143
    fileinfo = dict((filename, {}) for filename in nodefiles)
2144
    ignore_nodes = set()
2145

    
2146
    for node in nodeinfo:
2147
      if node.offline:
2148
        ignore_nodes.add(node.name)
2149
        continue
2150

    
2151
      nresult = all_nvinfo[node.name]
2152

    
2153
      if nresult.fail_msg or not nresult.payload:
2154
        node_files = None
2155
      else:
2156
        node_files = nresult.payload.get(constants.NV_FILELIST, None)
2157

    
2158
      test = not (node_files and isinstance(node_files, dict))
2159
      errorif(test, constants.CV_ENODEFILECHECK, node.name,
2160
              "Node did not return file checksum data")
2161
      if test:
2162
        ignore_nodes.add(node.name)
2163
        continue
2164

    
2165
      # Build per-checksum mapping from filename to nodes having it
2166
      for (filename, checksum) in node_files.items():
2167
        assert filename in nodefiles
2168
        fileinfo[filename].setdefault(checksum, set()).add(node.name)
2169

    
2170
    for (filename, checksums) in fileinfo.items():
2171
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2172

    
2173
      # Nodes having the file
2174
      with_file = frozenset(node_name
2175
                            for nodes in fileinfo[filename].values()
2176
                            for node_name in nodes) - ignore_nodes
2177

    
2178
      expected_nodes = nodefiles[filename] - ignore_nodes
2179

    
2180
      # Nodes missing file
2181
      missing_file = expected_nodes - with_file
2182

    
2183
      if filename in files_opt:
2184
        # All or no nodes
2185
        errorif(missing_file and missing_file != expected_nodes,
2186
                constants.CV_ECLUSTERFILECHECK, None,
2187
                "File %s is optional, but it must exist on all or no"
2188
                " nodes (not found on %s)",
2189
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2190
      else:
2191
        errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2192
                "File %s is missing from node(s) %s", filename,
2193
                utils.CommaJoin(utils.NiceSort(missing_file)))
2194

    
2195
        # Warn if a node has a file it shouldn't
2196
        unexpected = with_file - expected_nodes
2197
        errorif(unexpected,
2198
                constants.CV_ECLUSTERFILECHECK, None,
2199
                "File %s should not exist on node(s) %s",
2200
                filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2201

    
2202
      # See if there are multiple versions of the file
2203
      test = len(checksums) > 1
2204
      if test:
2205
        variants = ["variant %s on %s" %
2206
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2207
                    for (idx, (checksum, nodes)) in
2208
                      enumerate(sorted(checksums.items()))]
2209
      else:
2210
        variants = []
2211

    
2212
      errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2213
              "File %s found with %s different checksums (%s)",
2214
              filename, len(checksums), "; ".join(variants))
2215

    
2216
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2217
                      drbd_map):
2218
    """Verifies and the node DRBD status.
2219

2220
    @type ninfo: L{objects.Node}
2221
    @param ninfo: the node to check
2222
    @param nresult: the remote results for the node
2223
    @param instanceinfo: the dict of instances
2224
    @param drbd_helper: the configured DRBD usermode helper
2225
    @param drbd_map: the DRBD map as returned by
2226
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2227

2228
    """
2229
    node = ninfo.name
2230
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2231

    
2232
    if drbd_helper:
2233
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2234
      test = (helper_result == None)
2235
      _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2236
               "no drbd usermode helper returned")
2237
      if helper_result:
2238
        status, payload = helper_result
2239
        test = not status
2240
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2241
                 "drbd usermode helper check unsuccessful: %s", payload)
2242
        test = status and (payload != drbd_helper)
2243
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2244
                 "wrong drbd usermode helper: %s", payload)
2245

    
2246
    # compute the DRBD minors
2247
    node_drbd = {}
2248
    for minor, instance in drbd_map[node].items():
2249
      test = instance not in instanceinfo
2250
      _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2251
               "ghost instance '%s' in temporary DRBD map", instance)
2252
        # ghost instance should not be running, but otherwise we
2253
        # don't give double warnings (both ghost instance and
2254
        # unallocated minor in use)
2255
      if test:
2256
        node_drbd[minor] = (instance, False)
2257
      else:
2258
        instance = instanceinfo[instance]
2259
        node_drbd[minor] = (instance.name, instance.admin_up)
2260

    
2261
    # and now check them
2262
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2263
    test = not isinstance(used_minors, (tuple, list))
2264
    _ErrorIf(test, constants.CV_ENODEDRBD, node,
2265
             "cannot parse drbd status file: %s", str(used_minors))
2266
    if test:
2267
      # we cannot check drbd status
2268
      return
2269

    
2270
    for minor, (iname, must_exist) in node_drbd.items():
2271
      test = minor not in used_minors and must_exist
2272
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2273
               "drbd minor %d of instance %s is not active", minor, iname)
2274
    for minor in used_minors:
2275
      test = minor not in node_drbd
2276
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2277
               "unallocated drbd minor %d is in use", minor)
2278

    
2279
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2280
    """Builds the node OS structures.
2281

2282
    @type ninfo: L{objects.Node}
2283
    @param ninfo: the node to check
2284
    @param nresult: the remote results for the node
2285
    @param nimg: the node image object
2286

2287
    """
2288
    node = ninfo.name
2289
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2290

    
2291
    remote_os = nresult.get(constants.NV_OSLIST, None)
2292
    test = (not isinstance(remote_os, list) or
2293
            not compat.all(isinstance(v, list) and len(v) == 7
2294
                           for v in remote_os))
2295

    
2296
    _ErrorIf(test, constants.CV_ENODEOS, node,
2297
             "node hasn't returned valid OS data")
2298

    
2299
    nimg.os_fail = test
2300

    
2301
    if test:
2302
      return
2303

    
2304
    os_dict = {}
2305

    
2306
    for (name, os_path, status, diagnose,
2307
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2308

    
2309
      if name not in os_dict:
2310
        os_dict[name] = []
2311

    
2312
      # parameters is a list of lists instead of list of tuples due to
2313
      # JSON lacking a real tuple type, fix it:
2314
      parameters = [tuple(v) for v in parameters]
2315
      os_dict[name].append((os_path, status, diagnose,
2316
                            set(variants), set(parameters), set(api_ver)))
2317

    
2318
    nimg.oslist = os_dict
2319

    
2320
  def _VerifyNodeOS(self, ninfo, nimg, base):
2321
    """Verifies the node OS list.
2322

2323
    @type ninfo: L{objects.Node}
2324
    @param ninfo: the node to check
2325
    @param nimg: the node image object
2326
    @param base: the 'template' node we match against (e.g. from the master)
2327

2328
    """
2329
    node = ninfo.name
2330
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2331

    
2332
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2333

    
2334
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2335
    for os_name, os_data in nimg.oslist.items():
2336
      assert os_data, "Empty OS status for OS %s?!" % os_name
2337
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2338
      _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2339
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2340
      _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2341
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2342
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2343
      # comparisons with the 'base' image
2344
      test = os_name not in base.oslist
2345
      _ErrorIf(test, constants.CV_ENODEOS, node,
2346
               "Extra OS %s not present on reference node (%s)",
2347
               os_name, base.name)
2348
      if test:
2349
        continue
2350
      assert base.oslist[os_name], "Base node has empty OS status?"
2351
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2352
      if not b_status:
2353
        # base OS is invalid, skipping
2354
        continue
2355
      for kind, a, b in [("API version", f_api, b_api),
2356
                         ("variants list", f_var, b_var),
2357
                         ("parameters", beautify_params(f_param),
2358
                          beautify_params(b_param))]:
2359
        _ErrorIf(a != b, constants.CV_ENODEOS, node,
2360
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2361
                 kind, os_name, base.name,
2362
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2363

    
2364
    # check any missing OSes
2365
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2366
    _ErrorIf(missing, constants.CV_ENODEOS, node,
2367
             "OSes present on reference node %s but missing on this node: %s",
2368
             base.name, utils.CommaJoin(missing))
2369

    
2370
  def _VerifyOob(self, ninfo, nresult):
2371
    """Verifies out of band functionality of a node.
2372

2373
    @type ninfo: L{objects.Node}
2374
    @param ninfo: the node to check
2375
    @param nresult: the remote results for the node
2376

2377
    """
2378
    node = ninfo.name
2379
    # We just have to verify the paths on master and/or master candidates
2380
    # as the oob helper is invoked on the master
2381
    if ((ninfo.master_candidate or ninfo.master_capable) and
2382
        constants.NV_OOB_PATHS in nresult):
2383
      for path_result in nresult[constants.NV_OOB_PATHS]:
2384
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2385

    
2386
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2387
    """Verifies and updates the node volume data.
2388

2389
    This function will update a L{NodeImage}'s internal structures
2390
    with data from the remote call.
2391

2392
    @type ninfo: L{objects.Node}
2393
    @param ninfo: the node to check
2394
    @param nresult: the remote results for the node
2395
    @param nimg: the node image object
2396
    @param vg_name: the configured VG name
2397

2398
    """
2399
    node = ninfo.name
2400
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2401

    
2402
    nimg.lvm_fail = True
2403
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2404
    if vg_name is None:
2405
      pass
2406
    elif isinstance(lvdata, basestring):
2407
      _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2408
               utils.SafeEncode(lvdata))
2409
    elif not isinstance(lvdata, dict):
2410
      _ErrorIf(True, constants.CV_ENODELVM, node,
2411
               "rpc call to node failed (lvlist)")
2412
    else:
2413
      nimg.volumes = lvdata
2414
      nimg.lvm_fail = False
2415

    
2416
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2417
    """Verifies and updates the node instance list.
2418

2419
    If the listing was successful, then updates this node's instance
2420
    list. Otherwise, it marks the RPC call as failed for the instance
2421
    list key.
2422

2423
    @type ninfo: L{objects.Node}
2424
    @param ninfo: the node to check
2425
    @param nresult: the remote results for the node
2426
    @param nimg: the node image object
2427

2428
    """
2429
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2430
    test = not isinstance(idata, list)
2431
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2432
                  "rpc call to node failed (instancelist): %s",
2433
                  utils.SafeEncode(str(idata)))
2434
    if test:
2435
      nimg.hyp_fail = True
2436
    else:
2437
      nimg.instances = idata
2438

    
2439
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2440
    """Verifies and computes a node information map
2441

2442
    @type ninfo: L{objects.Node}
2443
    @param ninfo: the node to check
2444
    @param nresult: the remote results for the node
2445
    @param nimg: the node image object
2446
    @param vg_name: the configured VG name
2447

2448
    """
2449
    node = ninfo.name
2450
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2451

    
2452
    # try to read free memory (from the hypervisor)
2453
    hv_info = nresult.get(constants.NV_HVINFO, None)
2454
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2455
    _ErrorIf(test, constants.CV_ENODEHV, node,
2456
             "rpc call to node failed (hvinfo)")
2457
    if not test:
2458
      try:
2459
        nimg.mfree = int(hv_info["memory_free"])
2460
      except (ValueError, TypeError):
2461
        _ErrorIf(True, constants.CV_ENODERPC, node,
2462
                 "node returned invalid nodeinfo, check hypervisor")
2463

    
2464
    # FIXME: devise a free space model for file based instances as well
2465
    if vg_name is not None:
2466
      test = (constants.NV_VGLIST not in nresult or
2467
              vg_name not in nresult[constants.NV_VGLIST])
2468
      _ErrorIf(test, constants.CV_ENODELVM, node,
2469
               "node didn't return data for the volume group '%s'"
2470
               " - it is either missing or broken", vg_name)
2471
      if not test:
2472
        try:
2473
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2474
        except (ValueError, TypeError):
2475
          _ErrorIf(True, constants.CV_ENODERPC, node,
2476
                   "node returned invalid LVM info, check LVM status")
2477

    
2478
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2479
    """Gets per-disk status information for all instances.
2480

2481
    @type nodelist: list of strings
2482
    @param nodelist: Node names
2483
    @type node_image: dict of (name, L{objects.Node})
2484
    @param node_image: Node objects
2485
    @type instanceinfo: dict of (name, L{objects.Instance})
2486
    @param instanceinfo: Instance objects
2487
    @rtype: {instance: {node: [(succes, payload)]}}
2488
    @return: a dictionary of per-instance dictionaries with nodes as
2489
        keys and disk information as values; the disk information is a
2490
        list of tuples (success, payload)
2491

2492
    """
2493
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2494

    
2495
    node_disks = {}
2496
    node_disks_devonly = {}
2497
    diskless_instances = set()
2498
    diskless = constants.DT_DISKLESS
2499

    
2500
    for nname in nodelist:
2501
      node_instances = list(itertools.chain(node_image[nname].pinst,
2502
                                            node_image[nname].sinst))
2503
      diskless_instances.update(inst for inst in node_instances
2504
                                if instanceinfo[inst].disk_template == diskless)
2505
      disks = [(inst, disk)
2506
               for inst in node_instances
2507
               for disk in instanceinfo[inst].disks]
2508

    
2509
      if not disks:
2510
        # No need to collect data
2511
        continue
2512

    
2513
      node_disks[nname] = disks
2514

    
2515
      # Creating copies as SetDiskID below will modify the objects and that can
2516
      # lead to incorrect data returned from nodes
2517
      devonly = [dev.Copy() for (_, dev) in disks]
2518

    
2519
      for dev in devonly:
2520
        self.cfg.SetDiskID(dev, nname)
2521

    
2522
      node_disks_devonly[nname] = devonly
2523

    
2524
    assert len(node_disks) == len(node_disks_devonly)
2525

    
2526
    # Collect data from all nodes with disks
2527
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2528
                                                          node_disks_devonly)
2529

    
2530
    assert len(result) == len(node_disks)
2531

    
2532
    instdisk = {}
2533

    
2534
    for (nname, nres) in result.items():
2535
      disks = node_disks[nname]
2536

    
2537
      if nres.offline:
2538
        # No data from this node
2539
        data = len(disks) * [(False, "node offline")]
2540
      else:
2541
        msg = nres.fail_msg
2542
        _ErrorIf(msg, constants.CV_ENODERPC, nname,
2543
                 "while getting disk information: %s", msg)
2544
        if msg:
2545
          # No data from this node
2546
          data = len(disks) * [(False, msg)]
2547
        else:
2548
          data = []
2549
          for idx, i in enumerate(nres.payload):
2550
            if isinstance(i, (tuple, list)) and len(i) == 2:
2551
              data.append(i)
2552
            else:
2553
              logging.warning("Invalid result from node %s, entry %d: %s",
2554
                              nname, idx, i)
2555
              data.append((False, "Invalid result from the remote node"))
2556

    
2557
      for ((inst, _), status) in zip(disks, data):
2558
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2559

    
2560
    # Add empty entries for diskless instances.
2561
    for inst in diskless_instances:
2562
      assert inst not in instdisk
2563
      instdisk[inst] = {}
2564

    
2565
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2566
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2567
                      compat.all(isinstance(s, (tuple, list)) and
2568
                                 len(s) == 2 for s in statuses)
2569
                      for inst, nnames in instdisk.items()
2570
                      for nname, statuses in nnames.items())
2571
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2572

    
2573
    return instdisk
2574

    
2575
  @staticmethod
2576
  def _SshNodeSelector(group_uuid, all_nodes):
2577
    """Create endless iterators for all potential SSH check hosts.
2578

2579
    """
2580
    nodes = [node for node in all_nodes
2581
             if (node.group != group_uuid and
2582
                 not node.offline)]
2583
    keyfunc = operator.attrgetter("group")
2584

    
2585
    return map(itertools.cycle,
2586
               [sorted(map(operator.attrgetter("name"), names))
2587
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2588
                                                  keyfunc)])
2589

    
2590
  @classmethod
2591
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2592
    """Choose which nodes should talk to which other nodes.
2593

2594
    We will make nodes contact all nodes in their group, and one node from
2595
    every other group.
2596

2597
    @warning: This algorithm has a known issue if one node group is much
2598
      smaller than others (e.g. just one node). In such a case all other
2599
      nodes will talk to the single node.
2600

2601
    """
2602
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2603
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2604

    
2605
    return (online_nodes,
2606
            dict((name, sorted([i.next() for i in sel]))
2607
                 for name in online_nodes))
2608

    
2609
  def BuildHooksEnv(self):
2610
    """Build hooks env.
2611

2612
    Cluster-Verify hooks just ran in the post phase and their failure makes
2613
    the output be logged in the verify output and the verification to fail.
2614

2615
    """
2616
    env = {
2617
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2618
      }
2619

    
2620
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2621
               for node in self.my_node_info.values())
2622

    
2623
    return env
2624

    
2625
  def BuildHooksNodes(self):
2626
    """Build hooks nodes.
2627

2628
    """
2629
    return ([], self.my_node_names)
2630

    
2631
  def Exec(self, feedback_fn):
2632
    """Verify integrity of the node group, performing various test on nodes.
2633

2634
    """
2635
    # This method has too many local variables. pylint: disable=R0914
2636
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2637

    
2638
    if not self.my_node_names:
2639
      # empty node group
2640
      feedback_fn("* Empty node group, skipping verification")
2641
      return True
2642

    
2643
    self.bad = False
2644
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2645
    verbose = self.op.verbose
2646
    self._feedback_fn = feedback_fn
2647

    
2648
    vg_name = self.cfg.GetVGName()
2649
    drbd_helper = self.cfg.GetDRBDHelper()
2650
    cluster = self.cfg.GetClusterInfo()
2651
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2652
    hypervisors = cluster.enabled_hypervisors
2653
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2654

    
2655
    i_non_redundant = [] # Non redundant instances
2656
    i_non_a_balanced = [] # Non auto-balanced instances
2657
    n_offline = 0 # Count of offline nodes
2658
    n_drained = 0 # Count of nodes being drained
2659
    node_vol_should = {}
2660

    
2661
    # FIXME: verify OS list
2662

    
2663
    # File verification
2664
    filemap = _ComputeAncillaryFiles(cluster, False)
2665

    
2666
    # do local checksums
2667
    master_node = self.master_node = self.cfg.GetMasterNode()
2668
    master_ip = self.cfg.GetMasterIP()
2669

    
2670
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2671

    
2672
    user_scripts = []
2673
    if self.cfg.GetUseExternalMipScript():
2674
      user_scripts.append(constants.EXTERNAL_MASTER_SETUP_SCRIPT)
2675

    
2676
    node_verify_param = {
2677
      constants.NV_FILELIST:
2678
        utils.UniqueSequence(filename
2679
                             for files in filemap
2680
                             for filename in files),
2681
      constants.NV_NODELIST:
2682
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2683
                                  self.all_node_info.values()),
2684
      constants.NV_HYPERVISOR: hypervisors,
2685
      constants.NV_HVPARAMS:
2686
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2687
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2688
                                 for node in node_data_list
2689
                                 if not node.offline],
2690
      constants.NV_INSTANCELIST: hypervisors,
2691
      constants.NV_VERSION: None,
2692
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2693
      constants.NV_NODESETUP: None,
2694
      constants.NV_TIME: None,
2695
      constants.NV_MASTERIP: (master_node, master_ip),
2696
      constants.NV_OSLIST: None,
2697
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2698
      constants.NV_USERSCRIPTS: user_scripts,
2699
      }
2700

    
2701
    if vg_name is not None:
2702
      node_verify_param[constants.NV_VGLIST] = None
2703
      node_verify_param[constants.NV_LVLIST] = vg_name
2704
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2705
      node_verify_param[constants.NV_DRBDLIST] = None
2706

    
2707
    if drbd_helper:
2708
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2709

    
2710
    # bridge checks
2711
    # FIXME: this needs to be changed per node-group, not cluster-wide
2712
    bridges = set()
2713
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2714
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2715
      bridges.add(default_nicpp[constants.NIC_LINK])
2716
    for instance in self.my_inst_info.values():
2717
      for nic in instance.nics:
2718
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2719
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2720
          bridges.add(full_nic[constants.NIC_LINK])
2721

    
2722
    if bridges:
2723
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2724

    
2725
    # Build our expected cluster state
2726
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2727
                                                 name=node.name,
2728
                                                 vm_capable=node.vm_capable))
2729
                      for node in node_data_list)
2730

    
2731
    # Gather OOB paths
2732
    oob_paths = []
2733
    for node in self.all_node_info.values():
2734
      path = _SupportsOob(self.cfg, node)
2735
      if path and path not in oob_paths:
2736
        oob_paths.append(path)
2737

    
2738
    if oob_paths:
2739
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2740

    
2741
    for instance in self.my_inst_names:
2742
      inst_config = self.my_inst_info[instance]
2743

    
2744
      for nname in inst_config.all_nodes:
2745
        if nname not in node_image:
2746
          gnode = self.NodeImage(name=nname)
2747
          gnode.ghost = (nname not in self.all_node_info)
2748
          node_image[nname] = gnode
2749

    
2750
      inst_config.MapLVsByNode(node_vol_should)
2751

    
2752
      pnode = inst_config.primary_node
2753
      node_image[pnode].pinst.append(instance)
2754

    
2755
      for snode in inst_config.secondary_nodes:
2756
        nimg = node_image[snode]
2757
        nimg.sinst.append(instance)
2758
        if pnode not in nimg.sbp:
2759
          nimg.sbp[pnode] = []
2760
        nimg.sbp[pnode].append(instance)
2761

    
2762
    # At this point, we have the in-memory data structures complete,
2763
    # except for the runtime information, which we'll gather next
2764

    
2765
    # Due to the way our RPC system works, exact response times cannot be
2766
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2767
    # time before and after executing the request, we can at least have a time
2768
    # window.
2769
    nvinfo_starttime = time.time()
2770
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2771
                                           node_verify_param,
2772
                                           self.cfg.GetClusterName())
2773
    nvinfo_endtime = time.time()
2774

    
2775
    if self.extra_lv_nodes and vg_name is not None:
2776
      extra_lv_nvinfo = \
2777
          self.rpc.call_node_verify(self.extra_lv_nodes,
2778
                                    {constants.NV_LVLIST: vg_name},
2779
                                    self.cfg.GetClusterName())
2780
    else:
2781
      extra_lv_nvinfo = {}
2782

    
2783
    all_drbd_map = self.cfg.ComputeDRBDMap()
2784

    
2785
    feedback_fn("* Gathering disk information (%s nodes)" %
2786
                len(self.my_node_names))
2787
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2788
                                     self.my_inst_info)
2789

    
2790
    feedback_fn("* Verifying configuration file consistency")
2791

    
2792
    # If not all nodes are being checked, we need to make sure the master node
2793
    # and a non-checked vm_capable node are in the list.
2794
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2795
    if absent_nodes:
2796
      vf_nvinfo = all_nvinfo.copy()
2797
      vf_node_info = list(self.my_node_info.values())
2798
      additional_nodes = []
2799
      if master_node not in self.my_node_info:
2800
        additional_nodes.append(master_node)
2801
        vf_node_info.append(self.all_node_info[master_node])
2802
      # Add the first vm_capable node we find which is not included
2803
      for node in absent_nodes:
2804
        nodeinfo = self.all_node_info[node]
2805
        if nodeinfo.vm_capable and not nodeinfo.offline:
2806
          additional_nodes.append(node)
2807
          vf_node_info.append(self.all_node_info[node])
2808
          break
2809
      key = constants.NV_FILELIST
2810
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2811
                                                 {key: node_verify_param[key]},
2812
                                                 self.cfg.GetClusterName()))
2813
    else:
2814
      vf_nvinfo = all_nvinfo
2815
      vf_node_info = self.my_node_info.values()
2816

    
2817
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2818

    
2819
    feedback_fn("* Verifying node status")
2820

    
2821
    refos_img = None
2822

    
2823
    for node_i in node_data_list:
2824
      node = node_i.name
2825
      nimg = node_image[node]
2826

    
2827
      if node_i.offline:
2828
        if verbose:
2829
          feedback_fn("* Skipping offline node %s" % (node,))
2830
        n_offline += 1
2831
        continue
2832

    
2833
      if node == master_node:
2834
        ntype = "master"
2835
      elif node_i.master_candidate:
2836
        ntype = "master candidate"
2837
      elif node_i.drained:
2838
        ntype = "drained"
2839
        n_drained += 1
2840
      else:
2841
        ntype = "regular"
2842
      if verbose:
2843
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2844

    
2845
      msg = all_nvinfo[node].fail_msg
2846
      _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2847
               msg)
2848
      if msg:
2849
        nimg.rpc_fail = True
2850
        continue
2851

    
2852
      nresult = all_nvinfo[node].payload
2853

    
2854
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2855
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2856
      self._VerifyNodeNetwork(node_i, nresult)
2857
      self._VerifyNodeUserScripts(node_i, nresult)
2858
      self._VerifyOob(node_i, nresult)
2859

    
2860
      if nimg.vm_capable:
2861
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2862
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2863
                             all_drbd_map)
2864

    
2865
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2866
        self._UpdateNodeInstances(node_i, nresult, nimg)
2867
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2868
        self._UpdateNodeOS(node_i, nresult, nimg)
2869

    
2870
        if not nimg.os_fail:
2871
          if refos_img is None:
2872
            refos_img = nimg
2873
          self._VerifyNodeOS(node_i, nimg, refos_img)
2874
        self._VerifyNodeBridges(node_i, nresult, bridges)
2875

    
2876
        # Check whether all running instancies are primary for the node. (This
2877
        # can no longer be done from _VerifyInstance below, since some of the
2878
        # wrong instances could be from other node groups.)
2879
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2880

    
2881
        for inst in non_primary_inst:
2882
          test = inst in self.all_inst_info
2883
          _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2884
                   "instance should not run on node %s", node_i.name)
2885
          _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2886
                   "node is running unknown instance %s", inst)
2887

    
2888
    for node, result in extra_lv_nvinfo.items():
2889
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2890
                              node_image[node], vg_name)
2891

    
2892
    feedback_fn("* Verifying instance status")
2893
    for instance in self.my_inst_names:
2894
      if verbose:
2895
        feedback_fn("* Verifying instance %s" % instance)
2896
      inst_config = self.my_inst_info[instance]
2897
      self._VerifyInstance(instance, inst_config, node_image,
2898
                           instdisk[instance])
2899
      inst_nodes_offline = []
2900

    
2901
      pnode = inst_config.primary_node
2902
      pnode_img = node_image[pnode]
2903
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2904
               constants.CV_ENODERPC, pnode, "instance %s, connection to"
2905
               " primary node failed", instance)
2906

    
2907
      _ErrorIf(inst_config.admin_up and pnode_img.offline,
2908
               constants.CV_EINSTANCEBADNODE, instance,
2909
               "instance is marked as running and lives on offline node %s",
2910
               inst_config.primary_node)
2911

    
2912
      # If the instance is non-redundant we cannot survive losing its primary
2913
      # node, so we are not N+1 compliant. On the other hand we have no disk
2914
      # templates with more than one secondary so that situation is not well
2915
      # supported either.
2916
      # FIXME: does not support file-backed instances
2917
      if not inst_config.secondary_nodes:
2918
        i_non_redundant.append(instance)
2919

    
2920
      _ErrorIf(len(inst_config.secondary_nodes) > 1,
2921
               constants.CV_EINSTANCELAYOUT,
2922
               instance, "instance has multiple secondary nodes: %s",
2923
               utils.CommaJoin(inst_config.secondary_nodes),
2924
               code=self.ETYPE_WARNING)
2925

    
2926
      if inst_config.disk_template in constants.DTS_INT_MIRROR:
2927
        pnode = inst_config.primary_node
2928
        instance_nodes = utils.NiceSort(inst_config.all_nodes)
2929
        instance_groups = {}
2930

    
2931
        for node in instance_nodes:
2932
          instance_groups.setdefault(self.all_node_info[node].group,
2933
                                     []).append(node)
2934

    
2935
        pretty_list = [
2936
          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
2937
          # Sort so that we always list the primary node first.
2938
          for group, nodes in sorted(instance_groups.items(),
2939
                                     key=lambda (_, nodes): pnode in nodes,
2940
                                     reverse=True)]
2941

    
2942
        self._ErrorIf(len(instance_groups) > 1,
2943
                      constants.CV_EINSTANCESPLITGROUPS,
2944
                      instance, "instance has primary and secondary nodes in"
2945
                      " different groups: %s", utils.CommaJoin(pretty_list),
2946
                      code=self.ETYPE_WARNING)
2947

    
2948
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2949
        i_non_a_balanced.append(instance)
2950

    
2951
      for snode in inst_config.secondary_nodes:
2952
        s_img = node_image[snode]
2953
        _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2954
                 snode, "instance %s, connection to secondary node failed",
2955
                 instance)
2956

    
2957
        if s_img.offline:
2958
          inst_nodes_offline.append(snode)
2959

    
2960
      # warn that the instance lives on offline nodes
2961
      _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
2962
               "instance has offline secondary node(s) %s",
2963
               utils.CommaJoin(inst_nodes_offline))
2964
      # ... or ghost/non-vm_capable nodes
2965
      for node in inst_config.all_nodes:
2966
        _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
2967
                 instance, "instance lives on ghost node %s", node)
2968
        _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
2969
                 instance, "instance lives on non-vm_capable node %s", node)
2970

    
2971
    feedback_fn("* Verifying orphan volumes")
2972
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2973

    
2974
    # We will get spurious "unknown volume" warnings if any node of this group
2975
    # is secondary for an instance whose primary is in another group. To avoid
2976
    # them, we find these instances and add their volumes to node_vol_should.
2977
    for inst in self.all_inst_info.values():
2978
      for secondary in inst.secondary_nodes:
2979
        if (secondary in self.my_node_info
2980
            and inst.name not in self.my_inst_info):
2981
          inst.MapLVsByNode(node_vol_should)
2982
          break
2983

    
2984
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2985

    
2986
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2987
      feedback_fn("* Verifying N+1 Memory redundancy")
2988
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2989

    
2990
    feedback_fn("* Other Notes")
2991
    if i_non_redundant:
2992
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2993
                  % len(i_non_redundant))
2994

    
2995
    if i_non_a_balanced:
2996
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2997
                  % len(i_non_a_balanced))
2998

    
2999
    if n_offline:
3000
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3001

    
3002
    if n_drained:
3003
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3004

    
3005
    return not self.bad
3006

    
3007
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3008
    """Analyze the post-hooks' result
3009

3010
    This method analyses the hook result, handles it, and sends some
3011
    nicely-formatted feedback back to the user.
3012

3013
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3014
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3015
    @param hooks_results: the results of the multi-node hooks rpc call
3016
    @param feedback_fn: function used send feedback back to the caller
3017
    @param lu_result: previous Exec result
3018
    @return: the new Exec result, based on the previous result
3019
        and hook results
3020

3021
    """
3022
    # We only really run POST phase hooks, only for non-empty groups,
3023
    # and are only interested in their results
3024
    if not self.my_node_names:
3025
      # empty node group
3026
      pass
3027
    elif phase == constants.HOOKS_PHASE_POST:
3028
      # Used to change hooks' output to proper indentation
3029
      feedback_fn("* Hooks Results")
3030
      assert hooks_results, "invalid result from hooks"
3031

    
3032
      for node_name in hooks_results:
3033
        res = hooks_results[node_name]
3034
        msg = res.fail_msg
3035
        test = msg and not res.offline
3036
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3037
                      "Communication failure in hooks execution: %s", msg)
3038
        if res.offline or msg:
3039
          # No need to investigate payload if node is offline or gave
3040
          # an error.
3041
          continue
3042
        for script, hkr, output in res.payload:
3043
          test = hkr == constants.HKR_FAIL
3044
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3045
                        "Script %s failed, output:", script)
3046
          if test:
3047
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3048
            feedback_fn("%s" % output)
3049
            lu_result = False
3050

    
3051
    return lu_result
3052

    
3053

    
3054
class LUClusterVerifyDisks(NoHooksLU):
3055
  """Verifies the cluster disks status.
3056

3057
  """
3058
  REQ_BGL = False
3059

    
3060
  def ExpandNames(self):
3061
    self.share_locks = _ShareAll()
3062
    self.needed_locks = {
3063
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3064
      }
3065

    
3066
  def Exec(self, feedback_fn):
3067
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3068

    
3069
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3070
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3071
                           for group in group_names])
3072

    
3073

    
3074
class LUGroupVerifyDisks(NoHooksLU):
3075
  """Verifies the status of all disks in a node group.
3076

3077
  """
3078
  REQ_BGL = False
3079

    
3080
  def ExpandNames(self):
3081
    # Raises errors.OpPrereqError on its own if group can't be found
3082
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
3083

    
3084
    self.share_locks = _ShareAll()
3085
    self.needed_locks = {
3086
      locking.LEVEL_INSTANCE: [],
3087
      locking.LEVEL_NODEGROUP: [],
3088
      locking.LEVEL_NODE: [],
3089
      }
3090

    
3091
  def DeclareLocks(self, level):
3092
    if level == locking.LEVEL_INSTANCE:
3093
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
3094

    
3095
      # Lock instances optimistically, needs verification once node and group
3096
      # locks have been acquired
3097
      self.needed_locks[locking.LEVEL_INSTANCE] = \
3098
        self.cfg.GetNodeGroupInstances(self.group_uuid)
3099

    
3100
    elif level == locking.LEVEL_NODEGROUP:
3101
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3102

    
3103
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
3104
        set([self.group_uuid] +
3105
            # Lock all groups used by instances optimistically; this requires
3106
            # going via the node before it's locked, requiring verification
3107
            # later on
3108
            [group_uuid
3109
             for instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
3110
             for group_uuid in self.cfg.GetInstanceNodeGroups(instance_name)])
3111

    
3112
    elif level == locking.LEVEL_NODE:
3113
      # This will only lock the nodes in the group to be verified which contain
3114
      # actual instances
3115
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3116
      self._LockInstancesNodes()
3117

    
3118
      # Lock all nodes in group to be verified
3119
      assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
3120
      member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
3121
      self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
3122

    
3123
  def CheckPrereq(self):
3124
    owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
3125
    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
3126
    owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
3127

    
3128
    assert self.group_uuid in owned_groups
3129

    
3130
    # Check if locked instances are still correct
3131
    _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
3132

    
3133
    # Get instance information
3134
    self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
3135

    
3136
    # Check if node groups for locked instances are still correct
3137
    for (instance_name, inst) in self.instances.items():
3138
      assert owned_nodes.issuperset(inst.all_nodes), \
3139
        "Instance %s's nodes changed while we kept the lock" % instance_name
3140

    
3141
      inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
3142
                                             owned_groups)
3143

    
3144
      assert self.group_uuid in inst_groups, \
3145
        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
3146

    
3147
  def Exec(self, feedback_fn):
3148
    """Verify integrity of cluster disks.
3149

3150
    @rtype: tuple of three items
3151
    @return: a tuple of (dict of node-to-node_error, list of instances
3152
        which need activate-disks, dict of instance: (node, volume) for
3153
        missing volumes
3154

3155
    """
3156
    res_nodes = {}
3157
    res_instances = set()
3158
    res_missing = {}
3159

    
3160
    nv_dict = _MapInstanceDisksToNodes([inst
3161
                                        for inst in self.instances.values()
3162
                                        if inst.admin_up])
3163

    
3164
    if nv_dict:
3165
      nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
3166
                             set(self.cfg.GetVmCapableNodeList()))
3167

    
3168
      node_lvs = self.rpc.call_lv_list(nodes, [])
3169

    
3170
      for (node, node_res) in node_lvs.items():
3171
        if node_res.offline:
3172
          continue
3173

    
3174
        msg = node_res.fail_msg
3175
        if msg:
3176
          logging.warning("Error enumerating LVs on node %s: %s", node, msg)
3177
          res_nodes[node] = msg
3178
          continue
3179

    
3180
        for lv_name, (_, _, lv_online) in node_res.payload.items():
3181
          inst = nv_dict.pop((node, lv_name), None)
3182
          if not (lv_online or inst is None):
3183
            res_instances.add(inst)
3184

    
3185
      # any leftover items in nv_dict are missing LVs, let's arrange the data
3186
      # better
3187
      for key, inst in nv_dict.iteritems():
3188
        res_missing.setdefault(inst, []).append(list(key))
3189

    
3190
    return (res_nodes, list(res_instances), res_missing)
3191

    
3192

    
3193
class LUClusterRepairDiskSizes(NoHooksLU):
3194
  """Verifies the cluster disks sizes.
3195

3196
  """
3197
  REQ_BGL = False
3198

    
3199
  def ExpandNames(self):
3200
    if self.op.instances:
3201
      self.wanted_names = _GetWantedInstances(self, self.op.instances)
3202
      self.needed_locks = {
3203
        locking.LEVEL_NODE: [],
3204
        locking.LEVEL_INSTANCE: self.wanted_names,
3205
        }
3206
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3207
    else:
3208
      self.wanted_names = None
3209
      self.needed_locks = {
3210
        locking.LEVEL_NODE: locking.ALL_SET,
3211
        locking.LEVEL_INSTANCE: locking.ALL_SET,
3212
        }
3213
    self.share_locks = _ShareAll()
3214

    
3215
  def DeclareLocks(self, level):
3216
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
3217
      self._LockInstancesNodes(primary_only=True)
3218

    
3219
  def CheckPrereq(self):
3220
    """Check prerequisites.
3221

3222
    This only checks the optional instance list against the existing names.
3223

3224
    """
3225
    if self.wanted_names is None:
3226
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
3227

    
3228
    self.wanted_instances = \
3229
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
3230

    
3231
  def _EnsureChildSizes(self, disk):
3232
    """Ensure children of the disk have the needed disk size.
3233

3234
    This is valid mainly for DRBD8 and fixes an issue where the
3235
    children have smaller disk size.
3236

3237
    @param disk: an L{ganeti.objects.Disk} object
3238

3239
    """
3240
    if disk.dev_type == constants.LD_DRBD8:
3241
      assert disk.children, "Empty children for DRBD8?"
3242
      fchild = disk.children[0]
3243
      mismatch = fchild.size < disk.size
3244
      if mismatch:
3245
        self.LogInfo("Child disk has size %d, parent %d, fixing",
3246
                     fchild.size, disk.size)
3247
        fchild.size = disk.size
3248

    
3249
      # and we recurse on this child only, not on the metadev
3250
      return self._EnsureChildSizes(fchild) or mismatch
3251
    else:
3252
      return False
3253

    
3254
  def Exec(self, feedback_fn):
3255
    """Verify the size of cluster disks.
3256

3257
    """
3258
    # TODO: check child disks too
3259
    # TODO: check differences in size between primary/secondary nodes
3260
    per_node_disks = {}
3261
    for instance in self.wanted_instances:
3262
      pnode = instance.primary_node
3263
      if pnode not in per_node_disks:
3264
        per_node_disks[pnode] = []
3265
      for idx, disk in enumerate(instance.disks):
3266
        per_node_disks[pnode].append((instance, idx, disk))
3267

    
3268
    changed = []
3269
    for node, dskl in per_node_disks.items():
3270
      newl = [v[2].Copy() for v in dskl]
3271
      for dsk in newl:
3272
        self.cfg.SetDiskID(dsk, node)
3273
      result = self.rpc.call_blockdev_getsize(node, newl)
3274
      if result.fail_msg:
3275
        self.LogWarning("Failure in blockdev_getsize call to node"
3276
                        " %s, ignoring", node)
3277
        continue
3278
      if len(result.payload) != len(dskl):
3279
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
3280
                        " result.payload=%s", node, len(dskl), result.payload)
3281
        self.LogWarning("Invalid result from node %s, ignoring node results",
3282
                        node)
3283
        continue
3284
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
3285
        if size is None:
3286
          self.LogWarning("Disk %d of instance %s did not return size"
3287
                          " information, ignoring", idx, instance.name)
3288
          continue
3289
        if not isinstance(size, (int, long)):
3290
          self.LogWarning("Disk %d of instance %s did not return valid"
3291
                          " size information, ignoring", idx, instance.name)
3292
          continue
3293
        size = size >> 20
3294
        if size != disk.size:
3295
          self.LogInfo("Disk %d of instance %s has mismatched size,"
3296
                       " correcting: recorded %d, actual %d", idx,
3297
                       instance.name, disk.size, size)
3298
          disk.size = size
3299
          self.cfg.Update(instance, feedback_fn)
3300
          changed.append((instance.name, idx, size))
3301
        if self._EnsureChildSizes(disk):
3302
          self.cfg.Update(instance, feedback_fn)
3303
          changed.append((instance.name, idx, disk.size))
3304
    return changed
3305

    
3306

    
3307
class LUClusterRename(LogicalUnit):
3308
  """Rename the cluster.
3309

3310
  """
3311
  HPATH = "cluster-rename"
3312
  HTYPE = constants.HTYPE_CLUSTER
3313

    
3314
  def BuildHooksEnv(self):
3315
    """Build hooks env.
3316

3317
    """
3318
    return {
3319
      "OP_TARGET": self.cfg.GetClusterName(),
3320
      "NEW_NAME": self.op.name,
3321
      }
3322

    
3323
  def BuildHooksNodes(self):
3324
    """Build hooks nodes.
3325

3326
    """
3327
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
3328

    
3329
  def CheckPrereq(self):
3330
    """Verify that the passed name is a valid one.
3331

3332
    """
3333
    hostname = netutils.GetHostname(name=self.op.name,
3334
                                    family=self.cfg.GetPrimaryIPFamily())
3335

    
3336
    new_name = hostname.name
3337
    self.ip = new_ip = hostname.ip
3338
    old_name = self.cfg.GetClusterName()
3339
    old_ip = self.cfg.GetMasterIP()
3340
    if new_name == old_name and new_ip == old_ip:
3341
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
3342
                                 " cluster has changed",
3343
                                 errors.ECODE_INVAL)
3344
    if new_ip != old_ip:
3345
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
3346
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
3347
                                   " reachable on the network" %
3348
                                   new_ip, errors.ECODE_NOTUNIQUE)
3349

    
3350
    self.op.name = new_name
3351

    
3352
  def Exec(self, feedback_fn):
3353
    """Rename the cluster.
3354

3355
    """
3356
    clustername = self.op.name
3357
    new_ip = self.ip
3358

    
3359
    # shutdown the master IP
3360
    master_params = self.cfg.GetMasterNetworkParameters()
3361
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
3362
                                                     master_params)
3363
    result.Raise("Could not disable the master role")
3364

    
3365
    try:
3366
      cluster = self.cfg.GetClusterInfo()
3367
      cluster.cluster_name = clustername
3368
      cluster.master_ip = new_ip
3369
      self.cfg.Update(cluster, feedback_fn)
3370

    
3371
      # update the known hosts file
3372
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
3373
      node_list = self.cfg.GetOnlineNodeList()
3374
      try:
3375
        node_list.remove(master_params.name)
3376
      except ValueError:
3377
        pass
3378
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
3379
    finally:
3380
      master_params.ip = new_ip
3381
      result = self.rpc.call_node_activate_master_ip(master_params.name,
3382
                                                     master_params)
3383
      msg = result.fail_msg
3384
      if msg:
3385
        self.LogWarning("Could not re-enable the master role on"
3386
                        " the master, please restart manually: %s", msg)
3387

    
3388
    return clustername
3389

    
3390

    
3391
def _ValidateNetmask(cfg, netmask):
3392
  """Checks if a netmask is valid.
3393

3394
  @type cfg: L{config.ConfigWriter}
3395
  @param cfg: The cluster configuration
3396
  @type netmask: int
3397
  @param netmask: the netmask to be verified
3398
  @raise errors.OpPrereqError: if the validation fails
3399

3400
  """
3401
  ip_family = cfg.GetPrimaryIPFamily()
3402
  try:
3403
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
3404
  except errors.ProgrammerError:
3405
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
3406
                               ip_family)
3407
  if not ipcls.ValidateNetmask(netmask):
3408
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
3409
                                (netmask))
3410

    
3411

    
3412
class LUClusterSetParams(LogicalUnit):
3413
  """Change the parameters of the cluster.
3414

3415
  """
3416
  HPATH = "cluster-modify"
3417
  HTYPE = constants.HTYPE_CLUSTER
3418
  REQ_BGL = False
3419

    
3420
  def CheckArguments(self):
3421
    """Check parameters
3422

3423
    """
3424
    if self.op.uid_pool:
3425
      uidpool.CheckUidPool(self.op.uid_pool)
3426

    
3427
    if self.op.add_uids:
3428
      uidpool.CheckUidPool(self.op.add_uids)
3429

    
3430
    if self.op.remove_uids:
3431
      uidpool.CheckUidPool(self.op.remove_uids)
3432

    
3433
    if self.op.master_netmask is not None:
3434
      _ValidateNetmask(self.cfg, self.op.master_netmask)
3435

    
3436
  def ExpandNames(self):
3437
    # FIXME: in the future maybe other cluster params won't require checking on
3438
    # all nodes to be modified.
3439
    self.needed_locks = {
3440
      locking.LEVEL_NODE: locking.ALL_SET,
3441
    }
3442
    self.share_locks[locking.LEVEL_NODE] = 1
3443

    
3444
  def BuildHooksEnv(self):
3445
    """Build hooks env.
3446

3447
    """
3448
    return {
3449
      "OP_TARGET": self.cfg.GetClusterName(),
3450
      "NEW_VG_NAME": self.op.vg_name,
3451
      }
3452

    
3453
  def BuildHooksNodes(self):
3454
    """Build hooks nodes.
3455

3456
    """
3457
    mn = self.cfg.GetMasterNode()
3458
    return ([mn], [mn])
3459

    
3460
  def CheckPrereq(self):
3461
    """Check prerequisites.
3462

3463
    This checks whether the given params don't conflict and
3464
    if the given volume group is valid.
3465

3466
    """
3467
    if self.op.vg_name is not None and not self.op.vg_name:
3468
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
3469
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
3470
                                   " instances exist", errors.ECODE_INVAL)
3471

    
3472
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
3473
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
3474
        raise errors.OpPrereqError("Cannot disable drbd helper while"
3475
                                   " drbd-based instances exist",
3476
                                   errors.ECODE_INVAL)
3477

    
3478
    node_list = self.owned_locks(locking.LEVEL_NODE)
3479

    
3480
    # if vg_name not None, checks given volume group on all nodes
3481
    if self.op.vg_name:
3482
      vglist = self.rpc.call_vg_list(node_list)
3483
      for node in node_list:
3484
        msg = vglist[node].fail_msg
3485
        if msg:
3486
          # ignoring down node
3487
          self.LogWarning("Error while gathering data on node %s"
3488
                          " (ignoring node): %s", node, msg)
3489
          continue
3490
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
3491
                                              self.op.vg_name,
3492
                                              constants.MIN_VG_SIZE)
3493
        if vgstatus:
3494
          raise errors.OpPrereqError("Error on node '%s': %s" %
3495
                                     (node, vgstatus), errors.ECODE_ENVIRON)
3496

    
3497
    if self.op.drbd_helper:
3498
      # checks given drbd helper on all nodes
3499
      helpers = self.rpc.call_drbd_helper(node_list)
3500
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
3501
        if ninfo.offline:
3502
          self.LogInfo("Not checking drbd helper on offline node %s", node)
3503
          continue
3504
        msg = helpers[node].fail_msg
3505
        if msg:
3506
          raise errors.OpPrereqError("Error checking drbd helper on node"
3507
                                     " '%s': %s" % (node, msg),
3508
                                     errors.ECODE_ENVIRON)
3509
        node_helper = helpers[node].payload
3510
        if node_helper != self.op.drbd_helper:
3511
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
3512
                                     (node, node_helper), errors.ECODE_ENVIRON)
3513

    
3514
    self.cluster = cluster = self.cfg.GetClusterInfo()
3515
    # validate params changes
3516
    if self.op.beparams:
3517
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
3518
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
3519

    
3520
    if self.op.ndparams:
3521
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
3522
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
3523

    
3524
      # TODO: we need a more general way to handle resetting
3525
      # cluster-level parameters to default values
3526
      if self.new_ndparams["oob_program"] == "":
3527
        self.new_ndparams["oob_program"] = \
3528
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
3529

    
3530
    if self.op.nicparams:
3531
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
3532
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
3533
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
3534
      nic_errors = []
3535

    
3536
      # check all instances for consistency
3537
      for instance in self.cfg.GetAllInstancesInfo().values():
3538
        for nic_idx, nic in enumerate(instance.nics):
3539
          params_copy = copy.deepcopy(nic.nicparams)
3540
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
3541

    
3542
          # check parameter syntax
3543
          try:
3544
            objects.NIC.CheckParameterSyntax(params_filled)
3545
          except errors.ConfigurationError, err:
3546
            nic_errors.append("Instance %s, nic/%d: %s" %
3547
                              (instance.name, nic_idx, err))
3548

    
3549
          # if we're moving instances to routed, check that they have an ip
3550
          target_mode = params_filled[constants.NIC_MODE]
3551
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
3552
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
3553
                              " address" % (instance.name, nic_idx))
3554
      if nic_errors:
3555
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
3556
                                   "\n".join(nic_errors))
3557

    
3558
    # hypervisor list/parameters
3559
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
3560
    if self.op.hvparams:
3561
      for hv_name, hv_dict in self.op.hvparams.items():
3562
        if hv_name not in self.new_hvparams:
3563
          self.new_hvparams[hv_name] = hv_dict
3564
        else:
3565
          self.new_hvparams[hv_name].update(hv_dict)
3566

    
3567
    # os hypervisor parameters
3568
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
3569
    if self.op.os_hvp:
3570
      for os_name, hvs in self.op.os_hvp.items():
3571
        if os_name not in self.new_os_hvp:
3572
          self.new_os_hvp[os_name] = hvs
3573
        else:
3574
          for hv_name, hv_dict in hvs.items():
3575
            if hv_name not in self.new_os_hvp[os_name]:
3576
              self.new_os_hvp[os_name][hv_name] = hv_dict
3577
            else:
3578
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
3579

    
3580
    # os parameters
3581
    self.new_osp = objects.FillDict(cluster.osparams, {})
3582
    if self.op.osparams:
3583
      for os_name, osp in self.op.osparams.items():
3584
        if os_name not in self.new_osp:
3585
          self.new_osp[os_name] = {}
3586

    
3587
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
3588
                                                  use_none=True)
3589

    
3590
        if not self.new_osp[os_name]:
3591
          # we removed all parameters
3592
          del self.new_osp[os_name]
3593
        else:
3594
          # check the parameter validity (remote check)
3595
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
3596
                         os_name, self.new_osp[os_name])
3597

    
3598
    # changes to the hypervisor list
3599
    if self.op.enabled_hypervisors is not None:
3600
      self.hv_list = self.op.enabled_hypervisors
3601
      for hv in self.hv_list:
3602
        # if the hypervisor doesn't already exist in the cluster
3603
        # hvparams, we initialize it to empty, and then (in both
3604
        # cases) we make sure to fill the defaults, as we might not
3605
        # have a complete defaults list if the hypervisor wasn't
3606
        # enabled before
3607
        if hv not in new_hvp:
3608
          new_hvp[hv] = {}
3609
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
3610
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
3611
    else:
3612
      self.hv_list = cluster.enabled_hypervisors
3613

    
3614
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
3615
      # either the enabled list has changed, or the parameters have, validate
3616
      for hv_name, hv_params in self.new_hvparams.items():
3617
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
3618
            (self.op.enabled_hypervisors and
3619
             hv_name in self.op.enabled_hypervisors)):
3620
          # either this is a new hypervisor, or its parameters have changed
3621
          hv_class = hypervisor.GetHypervisor(hv_name)
3622
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3623
          hv_class.CheckParameterSyntax(hv_params)
3624
          _CheckHVParams(self, node_list, hv_name, hv_params)
3625

    
3626
    if self.op.os_hvp:
3627
      # no need to check any newly-enabled hypervisors, since the
3628
      # defaults have already been checked in the above code-block
3629
      for os_name, os_hvp in self.new_os_hvp.items():
3630
        for hv_name, hv_params in os_hvp.items():
3631
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3632
          # we need to fill in the new os_hvp on top of the actual hv_p
3633
          cluster_defaults = self.new_hvparams.get(hv_name, {})
3634
          new_osp = objects.FillDict(cluster_defaults, hv_params)
3635
          hv_class = hypervisor.GetHypervisor(hv_name)
3636
          hv_class.CheckParameterSyntax(new_osp)
3637
          _CheckHVParams(self, node_list, hv_name, new_osp)
3638

    
3639
    if self.op.default_iallocator:
3640
      alloc_script = utils.FindFile(self.op.default_iallocator,
3641
                                    constants.IALLOCATOR_SEARCH_PATH,
3642
                                    os.path.isfile)
3643
      if alloc_script is None:
3644
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
3645
                                   " specified" % self.op.default_iallocator,
3646
                                   errors.ECODE_INVAL)
3647

    
3648
  def Exec(self, feedback_fn):
3649
    """Change the parameters of the cluster.
3650

3651
    """
3652
    if self.op.vg_name is not None:
3653
      new_volume = self.op.vg_name
3654
      if not new_volume:
3655
        new_volume = None
3656
      if new_volume != self.cfg.GetVGName():
3657
        self.cfg.SetVGName(new_volume)
3658
      else:
3659
        feedback_fn("Cluster LVM configuration already in desired"
3660
                    " state, not changing")
3661
    if self.op.drbd_helper is not None:
3662
      new_helper = self.op.drbd_helper
3663
      if not new_helper:
3664
        new_helper = None
3665
      if new_helper != self.cfg.GetDRBDHelper():
3666
        self.cfg.SetDRBDHelper(new_helper)
3667
      else:
3668
        feedback_fn("Cluster DRBD helper already in desired state,"
3669
                    " not changing")
3670
    if self.op.hvparams:
3671
      self.cluster.hvparams = self.new_hvparams
3672
    if self.op.os_hvp:
3673
      self.cluster.os_hvp = self.new_os_hvp
3674
    if self.op.enabled_hypervisors is not None:
3675
      self.cluster.hvparams = self.new_hvparams
3676
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
3677
    if self.op.beparams:
3678
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
3679
    if self.op.nicparams:
3680
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
3681
    if self.op.osparams:
3682
      self.cluster.osparams = self.new_osp
3683
    if self.op.ndparams:
3684
      self.cluster.ndparams = self.new_ndparams
3685

    
3686
    if self.op.candidate_pool_size is not None:
3687
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3688
      # we need to update the pool size here, otherwise the save will fail
3689
      _AdjustCandidatePool(self, [])
3690

    
3691
    if self.op.maintain_node_health is not None:
3692
      self.cluster.maintain_node_health = self.op.maintain_node_health
3693

    
3694
    if self.op.prealloc_wipe_disks is not None:
3695
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3696

    
3697
    if self.op.add_uids is not None:
3698
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3699

    
3700
    if self.op.remove_uids is not None:
3701
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3702

    
3703
    if self.op.uid_pool is not None:
3704
      self.cluster.uid_pool = self.op.uid_pool
3705

    
3706
    if self.op.default_iallocator is not None:
3707
      self.cluster.default_iallocator = self.op.default_iallocator
3708

    
3709
    if self.op.reserved_lvs is not None:
3710
      self.cluster.reserved_lvs = self.op.reserved_lvs
3711

    
3712
    if self.op.use_external_mip_script is not None:
3713
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
3714

    
3715
    def helper_os(aname, mods, desc):
3716
      desc += " OS list"
3717
      lst = getattr(self.cluster, aname)
3718
      for key, val in mods:
3719
        if key == constants.DDM_ADD:
3720
          if val in lst:
3721
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3722
          else:
3723
            lst.append(val)
3724
        elif key == constants.DDM_REMOVE:
3725
          if val in lst:
3726
            lst.remove(val)
3727
          else:
3728
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3729
        else:
3730
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3731

    
3732
    if self.op.hidden_os:
3733
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3734

    
3735
    if self.op.blacklisted_os:
3736
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3737

    
3738
    if self.op.master_netdev:
3739
      master_params = self.cfg.GetMasterNetworkParameters()
3740
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3741
                  self.cluster.master_netdev)
3742
      result = self.rpc.call_node_deactivate_master_ip(master_params.name,
3743
                                                       master_params)
3744
      result.Raise("Could not disable the master ip")
3745
      feedback_fn("Changing master_netdev from %s to %s" %
3746
                  (master_params.netdev, self.op.master_netdev))
3747
      self.cluster.master_netdev = self.op.master_netdev
3748

    
3749
    if self.op.master_netmask:
3750
      master_params = self.cfg.GetMasterNetworkParameters()
3751
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
3752
      result = self.rpc.call_node_change_master_netmask(master_params.name,
3753
                                                        master_params.netmask,
3754
                                                        self.op.master_netmask,
3755
                                                        master_params.ip,
3756
                                                        master_params.netdev)
3757
      if result.fail_msg:
3758
        msg = "Could not change the master IP netmask: %s" % result.fail_msg
3759
        feedback_fn(msg)
3760

    
3761
      self.cluster.master_netmask = self.op.master_netmask
3762

    
3763
    self.cfg.Update(self.cluster, feedback_fn)
3764

    
3765
    if self.op.master_netdev:
3766
      master_params = self.cfg.GetMasterNetworkParameters()
3767
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3768
                  self.op.master_netdev)
3769
      result = self.rpc.call_node_activate_master_ip(master_params.name,
3770
                                                     master_params)
3771
      if result.fail_msg:
3772
        self.LogWarning("Could not re-enable the master ip on"
3773
                        " the master, please restart manually: %s",
3774
                        result.fail_msg)
3775

    
3776

    
3777
def _UploadHelper(lu, nodes, fname):
3778
  """Helper for uploading a file and showing warnings.
3779

3780
  """
3781
  if os.path.exists(fname):
3782
    result = lu.rpc.call_upload_file(nodes, fname)
3783
    for to_node, to_result in result.items():
3784
      msg = to_result.fail_msg
3785
      if msg:
3786
        msg = ("Copy of file %s to node %s failed: %s" %
3787
               (fname, to_node, msg))
3788
        lu.proc.LogWarning(msg)
3789

    
3790

    
3791
def _ComputeAncillaryFiles(cluster, redist):
3792
  """Compute files external to Ganeti which need to be consistent.
3793

3794
  @type redist: boolean
3795
  @param redist: Whether to include files which need to be redistributed
3796

3797
  """
3798
  # Compute files for all nodes
3799
  files_all = set([
3800
    constants.SSH_KNOWN_HOSTS_FILE,
3801
    constants.CONFD_HMAC_KEY,
3802
    constants.CLUSTER_DOMAIN_SECRET_FILE,
3803
    constants.SPICE_CERT_FILE,
3804
    constants.SPICE_CACERT_FILE,
3805
    constants.RAPI_USERS_FILE,
3806
    ])
3807

    
3808
  if not redist:
3809
    files_all.update(constants.ALL_CERT_FILES)
3810
    files_all.update(ssconf.SimpleStore().GetFileList())
3811
  else:
3812
    # we need to ship at least the RAPI certificate
3813
    files_all.add(constants.RAPI_CERT_FILE)
3814

    
3815
  if cluster.modify_etc_hosts:
3816
    files_all.add(constants.ETC_HOSTS)
3817

    
3818
  # Files which are optional, these must:
3819
  # - be present in one other category as well
3820
  # - either exist or not exist on all nodes of that category (mc, vm all)
3821
  files_opt = set([
3822
    constants.RAPI_USERS_FILE,
3823
    ])
3824

    
3825
  # Files which should only be on master candidates
3826
  files_mc = set()
3827

    
3828
  if not redist:
3829
    files_mc.add(constants.CLUSTER_CONF_FILE)
3830

    
3831
    # FIXME: this should also be replicated but Ganeti doesn't support files_mc
3832
    # replication
3833
    files_mc.add(constants.DEFAULT_MASTER_SETUP_SCRIPT)
3834

    
3835
  # Files which should only be on VM-capable nodes
3836
  files_vm = set(filename
3837
    for hv_name in cluster.enabled_hypervisors
3838
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0])
3839

    
3840
  files_opt |= set(filename
3841
    for hv_name in cluster.enabled_hypervisors
3842
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1])
3843

    
3844
  # Filenames in each category must be unique
3845
  all_files_set = files_all | files_mc | files_vm
3846
  assert (len(all_files_set) ==
3847
          sum(map(len, [files_all, files_mc, files_vm]))), \
3848
         "Found file listed in more than one file list"
3849

    
3850
  # Optional files must be present in one other category
3851
  assert all_files_set.issuperset(files_opt), \
3852
         "Optional file not in a different required list"
3853

    
3854
  return (files_all, files_opt, files_mc, files_vm)
3855

    
3856

    
3857
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3858
  """Distribute additional files which are part of the cluster configuration.
3859

3860
  ConfigWriter takes care of distributing the config and ssconf files, but
3861
  there are more files which should be distributed to all nodes. This function
3862
  makes sure those are copied.
3863

3864
  @param lu: calling logical unit
3865
  @param additional_nodes: list of nodes not in the config to distribute to
3866
  @type additional_vm: boolean
3867
  @param additional_vm: whether the additional nodes are vm-capable or not
3868

3869
  """
3870
  # Gather target nodes
3871
  cluster = lu.cfg.GetClusterInfo()
3872
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3873

    
3874
  online_nodes = lu.cfg.GetOnlineNodeList()
3875
  vm_nodes = lu.cfg.GetVmCapableNodeList()
3876

    
3877
  if additional_nodes is not None:
3878
    online_nodes.extend(additional_nodes)
3879
    if additional_vm:
3880
      vm_nodes.extend(additional_nodes)
3881

    
3882
  # Never distribute to master node
3883
  for nodelist in [online_nodes, vm_nodes]:
3884
    if master_info.name in nodelist:
3885
      nodelist.remove(master_info.name)
3886

    
3887
  # Gather file lists
3888
  (files_all, _, files_mc, files_vm) = \
3889
    _ComputeAncillaryFiles(cluster, True)
3890

    
3891
  # Never re-distribute configuration file from here
3892
  assert not (constants.CLUSTER_CONF_FILE in files_all or
3893
              constants.CLUSTER_CONF_FILE in files_vm)
3894
  assert not files_mc, "Master candidates not handled in this function"
3895

    
3896
  filemap = [
3897
    (online_nodes, files_all),
3898
    (vm_nodes, files_vm),
3899
    ]
3900

    
3901
  # Upload the files
3902
  for (node_list, files) in filemap:
3903
    for fname in files:
3904
      _UploadHelper(lu, node_list, fname)
3905

    
3906

    
3907
class LUClusterRedistConf(NoHooksLU):
3908
  """Force the redistribution of cluster configuration.
3909

3910
  This is a very simple LU.
3911

3912
  """
3913
  REQ_BGL = False
3914

    
3915
  def ExpandNames(self):
3916
    self.needed_locks = {
3917
      locking.LEVEL_NODE: locking.ALL_SET,
3918
    }
3919
    self.share_locks[locking.LEVEL_NODE] = 1
3920

    
3921
  def Exec(self, feedback_fn):
3922
    """Redistribute the configuration.
3923

3924
    """
3925
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3926
    _RedistributeAncillaryFiles(self)
3927

    
3928

    
3929
class LUClusterActivateMasterIp(NoHooksLU):
3930
  """Activate the master IP on the master node.
3931

3932
  """
3933
  def Exec(self, feedback_fn):
3934
    """Activate the master IP.
3935

3936
    """
3937
    master_params = self.cfg.GetMasterNetworkParameters()
3938
    self.rpc.call_node_activate_master_ip(master_params.name,
3939
                                          master_params)
3940

    
3941

    
3942
class LUClusterDeactivateMasterIp(NoHooksLU):
3943
  """Deactivate the master IP on the master node.
3944

3945
  """
3946
  def Exec(self, feedback_fn):
3947
    """Deactivate the master IP.
3948

3949
    """
3950
    master_params = self.cfg.GetMasterNetworkParameters()
3951
    self.rpc.call_node_deactivate_master_ip(master_params.name, master_params)
3952

    
3953

    
3954
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3955
  """Sleep and poll for an instance's disk to sync.
3956

3957
  """
3958
  if not instance.disks or disks is not None and not disks:
3959
    return True
3960

    
3961
  disks = _ExpandCheckDisks(instance, disks)
3962

    
3963
  if not oneshot:
3964
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3965

    
3966
  node = instance.primary_node
3967

    
3968
  for dev in disks:
3969
    lu.cfg.SetDiskID(dev, node)
3970

    
3971
  # TODO: Convert to utils.Retry
3972

    
3973
  retries = 0
3974
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3975
  while True:
3976
    max_time = 0
3977
    done = True
3978
    cumul_degraded = False
3979
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3980
    msg = rstats.fail_msg
3981
    if msg:
3982
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3983
      retries += 1
3984
      if retries >= 10:
3985
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3986
                                 " aborting." % node)
3987
      time.sleep(6)
3988
      continue
3989
    rstats = rstats.payload
3990
    retries = 0
3991
    for i, mstat in enumerate(rstats):
3992
      if mstat is None:
3993
        lu.LogWarning("Can't compute data for node %s/%s",
3994
                           node, disks[i].iv_name)
3995
        continue
3996

    
3997
      cumul_degraded = (cumul_degraded or
3998
                        (mstat.is_degraded and mstat.sync_percent is None))
3999
      if mstat.sync_percent is not None:
4000
        done = False
4001
        if mstat.estimated_time is not None:
4002
          rem_time = ("%s remaining (estimated)" %
4003
                      utils.FormatSeconds(mstat.estimated_time))
4004
          max_time = mstat.estimated_time
4005
        else:
4006
          rem_time = "no time estimate"
4007
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
4008
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
4009

    
4010
    # if we're done but degraded, let's do a few small retries, to
4011
    # make sure we see a stable and not transient situation; therefore
4012
    # we force restart of the loop
4013
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
4014
      logging.info("Degraded disks found, %d retries left", degr_retries)
4015
      degr_retries -= 1
4016
      time.sleep(1)
4017
      continue
4018

    
4019
    if done or oneshot:
4020
      break
4021

    
4022
    time.sleep(min(60, max_time))
4023

    
4024
  if done:
4025
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
4026
  return not cumul_degraded
4027

    
4028

    
4029
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
4030
  """Check that mirrors are not degraded.
4031

4032
  The ldisk parameter, if True, will change the test from the
4033
  is_degraded attribute (which represents overall non-ok status for
4034
  the device(s)) to the ldisk (representing the local storage status).
4035

4036
  """
4037
  lu.cfg.SetDiskID(dev, node)
4038

    
4039
  result = True
4040

    
4041
  if on_primary or dev.AssembleOnSecondary():
4042
    rstats = lu.rpc.call_blockdev_find(node, dev)
4043
    msg = rstats.fail_msg
4044
    if msg:
4045
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
4046
      result = False
4047
    elif not rstats.payload:
4048
      lu.LogWarning("Can't find disk on node %s", node)
4049
      result = False
4050
    else:
4051
      if ldisk:
4052
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
4053
      else:
4054
        result = result and not rstats.payload.is_degraded
4055

    
4056
  if dev.children:
4057
    for child in dev.children:
4058
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
4059

    
4060
  return result
4061

    
4062

    
4063
class LUOobCommand(NoHooksLU):
4064
  """Logical unit for OOB handling.
4065

4066
  """
4067
  REG_BGL = False
4068
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
4069

    
4070
  def ExpandNames(self):
4071
    """Gather locks we need.
4072

4073
    """
4074
    if self.op.node_names:
4075
      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
4076
      lock_names = self.op.node_names
4077
    else:
4078
      lock_names = locking.ALL_SET
4079

    
4080
    self.needed_locks = {
4081
      locking.LEVEL_NODE: lock_names,
4082
      }
4083

    
4084
  def CheckPrereq(self):
4085
    """Check prerequisites.
4086

4087
    This checks:
4088
     - the node exists in the configuration
4089
     - OOB is supported
4090

4091
    Any errors are signaled by raising errors.OpPrereqError.
4092

4093
    """
4094
    self.nodes = []
4095
    self.master_node = self.cfg.GetMasterNode()
4096

    
4097
    assert self.op.power_delay >= 0.0
4098

    
4099
    if self.op.node_names:
4100
      if (self.op.command in