Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ f9d20654

History | View | Annotate | Download (483.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay too many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42
import itertools
43
import operator
44

    
45
from ganeti import ssh
46
from ganeti import utils
47
from ganeti import errors
48
from ganeti import hypervisor
49
from ganeti import locking
50
from ganeti import constants
51
from ganeti import objects
52
from ganeti import serializer
53
from ganeti import ssconf
54
from ganeti import uidpool
55
from ganeti import compat
56
from ganeti import masterd
57
from ganeti import netutils
58
from ganeti import query
59
from ganeti import qlang
60
from ganeti import opcodes
61
from ganeti import ht
62
from ganeti import rpc
63

    
64
import ganeti.masterd.instance # pylint: disable=W0611
65

    
66

    
67
#: Size of DRBD meta block device
68
DRBD_META_SIZE = 128
69

    
70

    
71
class ResultWithJobs:
72
  """Data container for LU results with jobs.
73

74
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
75
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
76
  contained in the C{jobs} attribute and include the job IDs in the opcode
77
  result.
78

79
  """
80
  def __init__(self, jobs, **kwargs):
81
    """Initializes this class.
82

83
    Additional return values can be specified as keyword arguments.
84

85
    @type jobs: list of lists of L{opcode.OpCode}
86
    @param jobs: A list of lists of opcode objects
87

88
    """
89
    self.jobs = jobs
90
    self.other = kwargs
91

    
92

    
93
class LogicalUnit(object):
94
  """Logical Unit base class.
95

96
  Subclasses must follow these rules:
97
    - implement ExpandNames
98
    - implement CheckPrereq (except when tasklets are used)
99
    - implement Exec (except when tasklets are used)
100
    - implement BuildHooksEnv
101
    - implement BuildHooksNodes
102
    - redefine HPATH and HTYPE
103
    - optionally redefine their run requirements:
104
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
105

106
  Note that all commands require root permissions.
107

108
  @ivar dry_run_result: the value (if any) that will be returned to the caller
109
      in dry-run mode (signalled by opcode dry_run parameter)
110

111
  """
112
  HPATH = None
113
  HTYPE = None
114
  REQ_BGL = True
115

    
116
  def __init__(self, processor, op, context, rpc_runner):
117
    """Constructor for LogicalUnit.
118

119
    This needs to be overridden in derived classes in order to check op
120
    validity.
121

122
    """
123
    self.proc = processor
124
    self.op = op
125
    self.cfg = context.cfg
126
    self.glm = context.glm
127
    # readability alias
128
    self.owned_locks = context.glm.list_owned
129
    self.context = context
130
    self.rpc = rpc_runner
131
    # Dicts used to declare locking needs to mcpu
132
    self.needed_locks = None
133
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
134
    self.add_locks = {}
135
    self.remove_locks = {}
136
    # Used to force good behavior when calling helper functions
137
    self.recalculate_locks = {}
138
    # logging
139
    self.Log = processor.Log # pylint: disable=C0103
140
    self.LogWarning = processor.LogWarning # pylint: disable=C0103
141
    self.LogInfo = processor.LogInfo # pylint: disable=C0103
142
    self.LogStep = processor.LogStep # pylint: disable=C0103
143
    # support for dry-run
144
    self.dry_run_result = None
145
    # support for generic debug attribute
146
    if (not hasattr(self.op, "debug_level") or
147
        not isinstance(self.op.debug_level, int)):
148
      self.op.debug_level = 0
149

    
150
    # Tasklets
151
    self.tasklets = None
152

    
153
    # Validate opcode parameters and set defaults
154
    self.op.Validate(True)
155

    
156
    self.CheckArguments()
157

    
158
  def CheckArguments(self):
159
    """Check syntactic validity for the opcode arguments.
160

161
    This method is for doing a simple syntactic check and ensure
162
    validity of opcode parameters, without any cluster-related
163
    checks. While the same can be accomplished in ExpandNames and/or
164
    CheckPrereq, doing these separate is better because:
165

166
      - ExpandNames is left as as purely a lock-related function
167
      - CheckPrereq is run after we have acquired locks (and possible
168
        waited for them)
169

170
    The function is allowed to change the self.op attribute so that
171
    later methods can no longer worry about missing parameters.
172

173
    """
174
    pass
175

    
176
  def ExpandNames(self):
177
    """Expand names for this LU.
178

179
    This method is called before starting to execute the opcode, and it should
180
    update all the parameters of the opcode to their canonical form (e.g. a
181
    short node name must be fully expanded after this method has successfully
182
    completed). This way locking, hooks, logging, etc. can work correctly.
183

184
    LUs which implement this method must also populate the self.needed_locks
185
    member, as a dict with lock levels as keys, and a list of needed lock names
186
    as values. Rules:
187

188
      - use an empty dict if you don't need any lock
189
      - if you don't need any lock at a particular level omit that level
190
      - don't put anything for the BGL level
191
      - if you want all locks at a level use locking.ALL_SET as a value
192

193
    If you need to share locks (rather than acquire them exclusively) at one
194
    level you can modify self.share_locks, setting a true value (usually 1) for
195
    that level. By default locks are not shared.
196

197
    This function can also define a list of tasklets, which then will be
198
    executed in order instead of the usual LU-level CheckPrereq and Exec
199
    functions, if those are not defined by the LU.
200

201
    Examples::
202

203
      # Acquire all nodes and one instance
204
      self.needed_locks = {
205
        locking.LEVEL_NODE: locking.ALL_SET,
206
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
207
      }
208
      # Acquire just two nodes
209
      self.needed_locks = {
210
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
211
      }
212
      # Acquire no locks
213
      self.needed_locks = {} # No, you can't leave it to the default value None
214

215
    """
216
    # The implementation of this method is mandatory only if the new LU is
217
    # concurrent, so that old LUs don't need to be changed all at the same
218
    # time.
219
    if self.REQ_BGL:
220
      self.needed_locks = {} # Exclusive LUs don't need locks.
221
    else:
222
      raise NotImplementedError
223

    
224
  def DeclareLocks(self, level):
225
    """Declare LU locking needs for a level
226

227
    While most LUs can just declare their locking needs at ExpandNames time,
228
    sometimes there's the need to calculate some locks after having acquired
229
    the ones before. This function is called just before acquiring locks at a
230
    particular level, but after acquiring the ones at lower levels, and permits
231
    such calculations. It can be used to modify self.needed_locks, and by
232
    default it does nothing.
233

234
    This function is only called if you have something already set in
235
    self.needed_locks for the level.
236

237
    @param level: Locking level which is going to be locked
238
    @type level: member of ganeti.locking.LEVELS
239

240
    """
241

    
242
  def CheckPrereq(self):
243
    """Check prerequisites for this LU.
244

245
    This method should check that the prerequisites for the execution
246
    of this LU are fulfilled. It can do internode communication, but
247
    it should be idempotent - no cluster or system changes are
248
    allowed.
249

250
    The method should raise errors.OpPrereqError in case something is
251
    not fulfilled. Its return value is ignored.
252

253
    This method should also update all the parameters of the opcode to
254
    their canonical form if it hasn't been done by ExpandNames before.
255

256
    """
257
    if self.tasklets is not None:
258
      for (idx, tl) in enumerate(self.tasklets):
259
        logging.debug("Checking prerequisites for tasklet %s/%s",
260
                      idx + 1, len(self.tasklets))
261
        tl.CheckPrereq()
262
    else:
263
      pass
264

    
265
  def Exec(self, feedback_fn):
266
    """Execute the LU.
267

268
    This method should implement the actual work. It should raise
269
    errors.OpExecError for failures that are somewhat dealt with in
270
    code, or expected.
271

272
    """
273
    if self.tasklets is not None:
274
      for (idx, tl) in enumerate(self.tasklets):
275
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
276
        tl.Exec(feedback_fn)
277
    else:
278
      raise NotImplementedError
279

    
280
  def BuildHooksEnv(self):
281
    """Build hooks environment for this LU.
282

283
    @rtype: dict
284
    @return: Dictionary containing the environment that will be used for
285
      running the hooks for this LU. The keys of the dict must not be prefixed
286
      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
287
      will extend the environment with additional variables. If no environment
288
      should be defined, an empty dictionary should be returned (not C{None}).
289
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
290
      will not be called.
291

292
    """
293
    raise NotImplementedError
294

    
295
  def BuildHooksNodes(self):
296
    """Build list of nodes to run LU's hooks.
297

298
    @rtype: tuple; (list, list)
299
    @return: Tuple containing a list of node names on which the hook
300
      should run before the execution and a list of node names on which the
301
      hook should run after the execution. No nodes should be returned as an
302
      empty list (and not None).
303
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
304
      will not be called.
305

306
    """
307
    raise NotImplementedError
308

    
309
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
310
    """Notify the LU about the results of its hooks.
311

312
    This method is called every time a hooks phase is executed, and notifies
313
    the Logical Unit about the hooks' result. The LU can then use it to alter
314
    its result based on the hooks.  By default the method does nothing and the
315
    previous result is passed back unchanged but any LU can define it if it
316
    wants to use the local cluster hook-scripts somehow.
317

318
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
319
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
320
    @param hook_results: the results of the multi-node hooks rpc call
321
    @param feedback_fn: function used send feedback back to the caller
322
    @param lu_result: the previous Exec result this LU had, or None
323
        in the PRE phase
324
    @return: the new Exec result, based on the previous result
325
        and hook results
326

327
    """
328
    # API must be kept, thus we ignore the unused argument and could
329
    # be a function warnings
330
    # pylint: disable=W0613,R0201
331
    return lu_result
332

    
333
  def _ExpandAndLockInstance(self):
334
    """Helper function to expand and lock an instance.
335

336
    Many LUs that work on an instance take its name in self.op.instance_name
337
    and need to expand it and then declare the expanded name for locking. This
338
    function does it, and then updates self.op.instance_name to the expanded
339
    name. It also initializes needed_locks as a dict, if this hasn't been done
340
    before.
341

342
    """
343
    if self.needed_locks is None:
344
      self.needed_locks = {}
345
    else:
346
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
347
        "_ExpandAndLockInstance called with instance-level locks set"
348
    self.op.instance_name = _ExpandInstanceName(self.cfg,
349
                                                self.op.instance_name)
350
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
351

    
352
  def _LockInstancesNodes(self, primary_only=False):
353
    """Helper function to declare instances' nodes for locking.
354

355
    This function should be called after locking one or more instances to lock
356
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
357
    with all primary or secondary nodes for instances already locked and
358
    present in self.needed_locks[locking.LEVEL_INSTANCE].
359

360
    It should be called from DeclareLocks, and for safety only works if
361
    self.recalculate_locks[locking.LEVEL_NODE] is set.
362

363
    In the future it may grow parameters to just lock some instance's nodes, or
364
    to just lock primaries or secondary nodes, if needed.
365

366
    If should be called in DeclareLocks in a way similar to::
367

368
      if level == locking.LEVEL_NODE:
369
        self._LockInstancesNodes()
370

371
    @type primary_only: boolean
372
    @param primary_only: only lock primary nodes of locked instances
373

374
    """
375
    assert locking.LEVEL_NODE in self.recalculate_locks, \
376
      "_LockInstancesNodes helper function called with no nodes to recalculate"
377

    
378
    # TODO: check if we're really been called with the instance locks held
379

    
380
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
381
    # future we might want to have different behaviors depending on the value
382
    # of self.recalculate_locks[locking.LEVEL_NODE]
383
    wanted_nodes = []
384
    locked_i = self.owned_locks(locking.LEVEL_INSTANCE)
385
    for _, instance in self.cfg.GetMultiInstanceInfo(locked_i):
386
      wanted_nodes.append(instance.primary_node)
387
      if not primary_only:
388
        wanted_nodes.extend(instance.secondary_nodes)
389

    
390
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
391
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
392
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
393
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
394

    
395
    del self.recalculate_locks[locking.LEVEL_NODE]
396

    
397

    
398
class NoHooksLU(LogicalUnit): # pylint: disable=W0223
399
  """Simple LU which runs no hooks.
400

401
  This LU is intended as a parent for other LogicalUnits which will
402
  run no hooks, in order to reduce duplicate code.
403

404
  """
405
  HPATH = None
406
  HTYPE = None
407

    
408
  def BuildHooksEnv(self):
409
    """Empty BuildHooksEnv for NoHooksLu.
410

411
    This just raises an error.
412

413
    """
414
    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
415

    
416
  def BuildHooksNodes(self):
417
    """Empty BuildHooksNodes for NoHooksLU.
418

419
    """
420
    raise AssertionError("BuildHooksNodes called for NoHooksLU")
421

    
422

    
423
class Tasklet:
424
  """Tasklet base class.
425

426
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
427
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
428
  tasklets know nothing about locks.
429

430
  Subclasses must follow these rules:
431
    - Implement CheckPrereq
432
    - Implement Exec
433

434
  """
435
  def __init__(self, lu):
436
    self.lu = lu
437

    
438
    # Shortcuts
439
    self.cfg = lu.cfg
440
    self.rpc = lu.rpc
441

    
442
  def CheckPrereq(self):
443
    """Check prerequisites for this tasklets.
444

445
    This method should check whether the prerequisites for the execution of
446
    this tasklet are fulfilled. It can do internode communication, but it
447
    should be idempotent - no cluster or system changes are allowed.
448

449
    The method should raise errors.OpPrereqError in case something is not
450
    fulfilled. Its return value is ignored.
451

452
    This method should also update all parameters to their canonical form if it
453
    hasn't been done before.
454

455
    """
456
    pass
457

    
458
  def Exec(self, feedback_fn):
459
    """Execute the tasklet.
460

461
    This method should implement the actual work. It should raise
462
    errors.OpExecError for failures that are somewhat dealt with in code, or
463
    expected.
464

465
    """
466
    raise NotImplementedError
467

    
468

    
469
class _QueryBase:
470
  """Base for query utility classes.
471

472
  """
473
  #: Attribute holding field definitions
474
  FIELDS = None
475

    
476
  def __init__(self, qfilter, fields, use_locking):
477
    """Initializes this class.
478

479
    """
480
    self.use_locking = use_locking
481

    
482
    self.query = query.Query(self.FIELDS, fields, qfilter=qfilter,
483
                             namefield="name")
484
    self.requested_data = self.query.RequestedData()
485
    self.names = self.query.RequestedNames()
486

    
487
    # Sort only if no names were requested
488
    self.sort_by_name = not self.names
489

    
490
    self.do_locking = None
491
    self.wanted = None
492

    
493
  def _GetNames(self, lu, all_names, lock_level):
494
    """Helper function to determine names asked for in the query.
495

496
    """
497
    if self.do_locking:
498
      names = lu.owned_locks(lock_level)
499
    else:
500
      names = all_names
501

    
502
    if self.wanted == locking.ALL_SET:
503
      assert not self.names
504
      # caller didn't specify names, so ordering is not important
505
      return utils.NiceSort(names)
506

    
507
    # caller specified names and we must keep the same order
508
    assert self.names
509
    assert not self.do_locking or lu.glm.is_owned(lock_level)
510

    
511
    missing = set(self.wanted).difference(names)
512
    if missing:
513
      raise errors.OpExecError("Some items were removed before retrieving"
514
                               " their data: %s" % missing)
515

    
516
    # Return expanded names
517
    return self.wanted
518

    
519
  def ExpandNames(self, lu):
520
    """Expand names for this query.
521

522
    See L{LogicalUnit.ExpandNames}.
523

524
    """
525
    raise NotImplementedError()
526

    
527
  def DeclareLocks(self, lu, level):
528
    """Declare locks for this query.
529

530
    See L{LogicalUnit.DeclareLocks}.
531

532
    """
533
    raise NotImplementedError()
534

    
535
  def _GetQueryData(self, lu):
536
    """Collects all data for this query.
537

538
    @return: Query data object
539

540
    """
541
    raise NotImplementedError()
542

    
543
  def NewStyleQuery(self, lu):
544
    """Collect data and execute query.
545

546
    """
547
    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
548
                                  sort_by_name=self.sort_by_name)
549

    
550
  def OldStyleQuery(self, lu):
551
    """Collect data and execute query.
552

553
    """
554
    return self.query.OldStyleQuery(self._GetQueryData(lu),
555
                                    sort_by_name=self.sort_by_name)
556

    
557

    
558
def _ShareAll():
559
  """Returns a dict declaring all lock levels shared.
560

561
  """
562
  return dict.fromkeys(locking.LEVELS, 1)
563

    
564

    
565
def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
566
  """Checks if the owned node groups are still correct for an instance.
567

568
  @type cfg: L{config.ConfigWriter}
569
  @param cfg: The cluster configuration
570
  @type instance_name: string
571
  @param instance_name: Instance name
572
  @type owned_groups: set or frozenset
573
  @param owned_groups: List of currently owned node groups
574

575
  """
576
  inst_groups = cfg.GetInstanceNodeGroups(instance_name)
577

    
578
  if not owned_groups.issuperset(inst_groups):
579
    raise errors.OpPrereqError("Instance %s's node groups changed since"
580
                               " locks were acquired, current groups are"
581
                               " are '%s', owning groups '%s'; retry the"
582
                               " operation" %
583
                               (instance_name,
584
                                utils.CommaJoin(inst_groups),
585
                                utils.CommaJoin(owned_groups)),
586
                               errors.ECODE_STATE)
587

    
588
  return inst_groups
589

    
590

    
591
def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
592
  """Checks if the instances in a node group are still correct.
593

594
  @type cfg: L{config.ConfigWriter}
595
  @param cfg: The cluster configuration
596
  @type group_uuid: string
597
  @param group_uuid: Node group UUID
598
  @type owned_instances: set or frozenset
599
  @param owned_instances: List of currently owned instances
600

601
  """
602
  wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
603
  if owned_instances != wanted_instances:
604
    raise errors.OpPrereqError("Instances in node group '%s' changed since"
605
                               " locks were acquired, wanted '%s', have '%s';"
606
                               " retry the operation" %
607
                               (group_uuid,
608
                                utils.CommaJoin(wanted_instances),
609
                                utils.CommaJoin(owned_instances)),
610
                               errors.ECODE_STATE)
611

    
612
  return wanted_instances
613

    
614

    
615
def _SupportsOob(cfg, node):
616
  """Tells if node supports OOB.
617

618
  @type cfg: L{config.ConfigWriter}
619
  @param cfg: The cluster configuration
620
  @type node: L{objects.Node}
621
  @param node: The node
622
  @return: The OOB script if supported or an empty string otherwise
623

624
  """
625
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
626

    
627

    
628
def _GetWantedNodes(lu, nodes):
629
  """Returns list of checked and expanded node names.
630

631
  @type lu: L{LogicalUnit}
632
  @param lu: the logical unit on whose behalf we execute
633
  @type nodes: list
634
  @param nodes: list of node names or None for all nodes
635
  @rtype: list
636
  @return: the list of nodes, sorted
637
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
638

639
  """
640
  if nodes:
641
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
642

    
643
  return utils.NiceSort(lu.cfg.GetNodeList())
644

    
645

    
646
def _GetWantedInstances(lu, instances):
647
  """Returns list of checked and expanded instance names.
648

649
  @type lu: L{LogicalUnit}
650
  @param lu: the logical unit on whose behalf we execute
651
  @type instances: list
652
  @param instances: list of instance names or None for all instances
653
  @rtype: list
654
  @return: the list of instances, sorted
655
  @raise errors.OpPrereqError: if the instances parameter is wrong type
656
  @raise errors.OpPrereqError: if any of the passed instances is not found
657

658
  """
659
  if instances:
660
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
661
  else:
662
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
663
  return wanted
664

    
665

    
666
def _GetUpdatedParams(old_params, update_dict,
667
                      use_default=True, use_none=False):
668
  """Return the new version of a parameter dictionary.
669

670
  @type old_params: dict
671
  @param old_params: old parameters
672
  @type update_dict: dict
673
  @param update_dict: dict containing new parameter values, or
674
      constants.VALUE_DEFAULT to reset the parameter to its default
675
      value
676
  @param use_default: boolean
677
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
678
      values as 'to be deleted' values
679
  @param use_none: boolean
680
  @type use_none: whether to recognise C{None} values as 'to be
681
      deleted' values
682
  @rtype: dict
683
  @return: the new parameter dictionary
684

685
  """
686
  params_copy = copy.deepcopy(old_params)
687
  for key, val in update_dict.iteritems():
688
    if ((use_default and val == constants.VALUE_DEFAULT) or
689
        (use_none and val is None)):
690
      try:
691
        del params_copy[key]
692
      except KeyError:
693
        pass
694
    else:
695
      params_copy[key] = val
696
  return params_copy
697

    
698

    
699
def _ReleaseLocks(lu, level, names=None, keep=None):
700
  """Releases locks owned by an LU.
701

702
  @type lu: L{LogicalUnit}
703
  @param level: Lock level
704
  @type names: list or None
705
  @param names: Names of locks to release
706
  @type keep: list or None
707
  @param keep: Names of locks to retain
708

709
  """
710
  assert not (keep is not None and names is not None), \
711
         "Only one of the 'names' and the 'keep' parameters can be given"
712

    
713
  if names is not None:
714
    should_release = names.__contains__
715
  elif keep:
716
    should_release = lambda name: name not in keep
717
  else:
718
    should_release = None
719

    
720
  if should_release:
721
    retain = []
722
    release = []
723

    
724
    # Determine which locks to release
725
    for name in lu.owned_locks(level):
726
      if should_release(name):
727
        release.append(name)
728
      else:
729
        retain.append(name)
730

    
731
    assert len(lu.owned_locks(level)) == (len(retain) + len(release))
732

    
733
    # Release just some locks
734
    lu.glm.release(level, names=release)
735

    
736
    assert frozenset(lu.owned_locks(level)) == frozenset(retain)
737
  else:
738
    # Release everything
739
    lu.glm.release(level)
740

    
741
    assert not lu.glm.is_owned(level), "No locks should be owned"
742

    
743

    
744
def _MapInstanceDisksToNodes(instances):
745
  """Creates a map from (node, volume) to instance name.
746

747
  @type instances: list of L{objects.Instance}
748
  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
749

750
  """
751
  return dict(((node, vol), inst.name)
752
              for inst in instances
753
              for (node, vols) in inst.MapLVsByNode().items()
754
              for vol in vols)
755

    
756

    
757
def _RunPostHook(lu, node_name):
758
  """Runs the post-hook for an opcode on a single node.
759

760
  """
761
  hm = lu.proc.BuildHooksManager(lu)
762
  try:
763
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
764
  except:
765
    # pylint: disable=W0702
766
    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
767

    
768

    
769
def _CheckOutputFields(static, dynamic, selected):
770
  """Checks whether all selected fields are valid.
771

772
  @type static: L{utils.FieldSet}
773
  @param static: static fields set
774
  @type dynamic: L{utils.FieldSet}
775
  @param dynamic: dynamic fields set
776

777
  """
778
  f = utils.FieldSet()
779
  f.Extend(static)
780
  f.Extend(dynamic)
781

    
782
  delta = f.NonMatching(selected)
783
  if delta:
784
    raise errors.OpPrereqError("Unknown output fields selected: %s"
785
                               % ",".join(delta), errors.ECODE_INVAL)
786

    
787

    
788
def _CheckGlobalHvParams(params):
789
  """Validates that given hypervisor params are not global ones.
790

791
  This will ensure that instances don't get customised versions of
792
  global params.
793

794
  """
795
  used_globals = constants.HVC_GLOBALS.intersection(params)
796
  if used_globals:
797
    msg = ("The following hypervisor parameters are global and cannot"
798
           " be customized at instance level, please modify them at"
799
           " cluster level: %s" % utils.CommaJoin(used_globals))
800
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
801

    
802

    
803
def _CheckNodeOnline(lu, node, msg=None):
804
  """Ensure that a given node is online.
805

806
  @param lu: the LU on behalf of which we make the check
807
  @param node: the node to check
808
  @param msg: if passed, should be a message to replace the default one
809
  @raise errors.OpPrereqError: if the node is offline
810

811
  """
812
  if msg is None:
813
    msg = "Can't use offline node"
814
  if lu.cfg.GetNodeInfo(node).offline:
815
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
816

    
817

    
818
def _CheckNodeNotDrained(lu, node):
819
  """Ensure that a given node is not drained.
820

821
  @param lu: the LU on behalf of which we make the check
822
  @param node: the node to check
823
  @raise errors.OpPrereqError: if the node is drained
824

825
  """
826
  if lu.cfg.GetNodeInfo(node).drained:
827
    raise errors.OpPrereqError("Can't use drained node %s" % node,
828
                               errors.ECODE_STATE)
829

    
830

    
831
def _CheckNodeVmCapable(lu, node):
832
  """Ensure that a given node is vm capable.
833

834
  @param lu: the LU on behalf of which we make the check
835
  @param node: the node to check
836
  @raise errors.OpPrereqError: if the node is not vm capable
837

838
  """
839
  if not lu.cfg.GetNodeInfo(node).vm_capable:
840
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
841
                               errors.ECODE_STATE)
842

    
843

    
844
def _CheckNodeHasOS(lu, node, os_name, force_variant):
845
  """Ensure that a node supports a given OS.
846

847
  @param lu: the LU on behalf of which we make the check
848
  @param node: the node to check
849
  @param os_name: the OS to query about
850
  @param force_variant: whether to ignore variant errors
851
  @raise errors.OpPrereqError: if the node is not supporting the OS
852

853
  """
854
  result = lu.rpc.call_os_get(node, os_name)
855
  result.Raise("OS '%s' not in supported OS list for node %s" %
856
               (os_name, node),
857
               prereq=True, ecode=errors.ECODE_INVAL)
858
  if not force_variant:
859
    _CheckOSVariant(result.payload, os_name)
860

    
861

    
862
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
863
  """Ensure that a node has the given secondary ip.
864

865
  @type lu: L{LogicalUnit}
866
  @param lu: the LU on behalf of which we make the check
867
  @type node: string
868
  @param node: the node to check
869
  @type secondary_ip: string
870
  @param secondary_ip: the ip to check
871
  @type prereq: boolean
872
  @param prereq: whether to throw a prerequisite or an execute error
873
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
874
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
875

876
  """
877
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
878
  result.Raise("Failure checking secondary ip on node %s" % node,
879
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
880
  if not result.payload:
881
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
882
           " please fix and re-run this command" % secondary_ip)
883
    if prereq:
884
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
885
    else:
886
      raise errors.OpExecError(msg)
887

    
888

    
889
def _GetClusterDomainSecret():
890
  """Reads the cluster domain secret.
891

892
  """
893
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
894
                               strict=True)
895

    
896

    
897
def _CheckInstanceDown(lu, instance, reason):
898
  """Ensure that an instance is not running."""
899
  if instance.admin_up:
900
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
901
                               (instance.name, reason), errors.ECODE_STATE)
902

    
903
  pnode = instance.primary_node
904
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
905
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
906
              prereq=True, ecode=errors.ECODE_ENVIRON)
907

    
908
  if instance.name in ins_l.payload:
909
    raise errors.OpPrereqError("Instance %s is running, %s" %
910
                               (instance.name, reason), errors.ECODE_STATE)
911

    
912

    
913
def _ExpandItemName(fn, name, kind):
914
  """Expand an item name.
915

916
  @param fn: the function to use for expansion
917
  @param name: requested item name
918
  @param kind: text description ('Node' or 'Instance')
919
  @return: the resolved (full) name
920
  @raise errors.OpPrereqError: if the item is not found
921

922
  """
923
  full_name = fn(name)
924
  if full_name is None:
925
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
926
                               errors.ECODE_NOENT)
927
  return full_name
928

    
929

    
930
def _ExpandNodeName(cfg, name):
931
  """Wrapper over L{_ExpandItemName} for nodes."""
932
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
933

    
934

    
935
def _ExpandInstanceName(cfg, name):
936
  """Wrapper over L{_ExpandItemName} for instance."""
937
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
938

    
939

    
940
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
941
                          memory, vcpus, nics, disk_template, disks,
942
                          bep, hvp, hypervisor_name, tags):
943
  """Builds instance related env variables for hooks
944

945
  This builds the hook environment from individual variables.
946

947
  @type name: string
948
  @param name: the name of the instance
949
  @type primary_node: string
950
  @param primary_node: the name of the instance's primary node
951
  @type secondary_nodes: list
952
  @param secondary_nodes: list of secondary nodes as strings
953
  @type os_type: string
954
  @param os_type: the name of the instance's OS
955
  @type status: boolean
956
  @param status: the should_run status of the instance
957
  @type memory: string
958
  @param memory: the memory size of the instance
959
  @type vcpus: string
960
  @param vcpus: the count of VCPUs the instance has
961
  @type nics: list
962
  @param nics: list of tuples (ip, mac, mode, link) representing
963
      the NICs the instance has
964
  @type disk_template: string
965
  @param disk_template: the disk template of the instance
966
  @type disks: list
967
  @param disks: the list of (size, mode) pairs
968
  @type bep: dict
969
  @param bep: the backend parameters for the instance
970
  @type hvp: dict
971
  @param hvp: the hypervisor parameters for the instance
972
  @type hypervisor_name: string
973
  @param hypervisor_name: the hypervisor for the instance
974
  @type tags: list
975
  @param tags: list of instance tags as strings
976
  @rtype: dict
977
  @return: the hook environment for this instance
978

979
  """
980
  if status:
981
    str_status = "up"
982
  else:
983
    str_status = "down"
984
  env = {
985
    "OP_TARGET": name,
986
    "INSTANCE_NAME": name,
987
    "INSTANCE_PRIMARY": primary_node,
988
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
989
    "INSTANCE_OS_TYPE": os_type,
990
    "INSTANCE_STATUS": str_status,
991
    "INSTANCE_MEMORY": memory,
992
    "INSTANCE_VCPUS": vcpus,
993
    "INSTANCE_DISK_TEMPLATE": disk_template,
994
    "INSTANCE_HYPERVISOR": hypervisor_name,
995
  }
996

    
997
  if nics:
998
    nic_count = len(nics)
999
    for idx, (ip, mac, mode, link) in enumerate(nics):
1000
      if ip is None:
1001
        ip = ""
1002
      env["INSTANCE_NIC%d_IP" % idx] = ip
1003
      env["INSTANCE_NIC%d_MAC" % idx] = mac
1004
      env["INSTANCE_NIC%d_MODE" % idx] = mode
1005
      env["INSTANCE_NIC%d_LINK" % idx] = link
1006
      if mode == constants.NIC_MODE_BRIDGED:
1007
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
1008
  else:
1009
    nic_count = 0
1010

    
1011
  env["INSTANCE_NIC_COUNT"] = nic_count
1012

    
1013
  if disks:
1014
    disk_count = len(disks)
1015
    for idx, (size, mode) in enumerate(disks):
1016
      env["INSTANCE_DISK%d_SIZE" % idx] = size
1017
      env["INSTANCE_DISK%d_MODE" % idx] = mode
1018
  else:
1019
    disk_count = 0
1020

    
1021
  env["INSTANCE_DISK_COUNT"] = disk_count
1022

    
1023
  if not tags:
1024
    tags = []
1025

    
1026
  env["INSTANCE_TAGS"] = " ".join(tags)
1027

    
1028
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
1029
    for key, value in source.items():
1030
      env["INSTANCE_%s_%s" % (kind, key)] = value
1031

    
1032
  return env
1033

    
1034

    
1035
def _NICListToTuple(lu, nics):
1036
  """Build a list of nic information tuples.
1037

1038
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
1039
  value in LUInstanceQueryData.
1040

1041
  @type lu:  L{LogicalUnit}
1042
  @param lu: the logical unit on whose behalf we execute
1043
  @type nics: list of L{objects.NIC}
1044
  @param nics: list of nics to convert to hooks tuples
1045

1046
  """
1047
  hooks_nics = []
1048
  cluster = lu.cfg.GetClusterInfo()
1049
  for nic in nics:
1050
    ip = nic.ip
1051
    mac = nic.mac
1052
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
1053
    mode = filled_params[constants.NIC_MODE]
1054
    link = filled_params[constants.NIC_LINK]
1055
    hooks_nics.append((ip, mac, mode, link))
1056
  return hooks_nics
1057

    
1058

    
1059
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
1060
  """Builds instance related env variables for hooks from an object.
1061

1062
  @type lu: L{LogicalUnit}
1063
  @param lu: the logical unit on whose behalf we execute
1064
  @type instance: L{objects.Instance}
1065
  @param instance: the instance for which we should build the
1066
      environment
1067
  @type override: dict
1068
  @param override: dictionary with key/values that will override
1069
      our values
1070
  @rtype: dict
1071
  @return: the hook environment dictionary
1072

1073
  """
1074
  cluster = lu.cfg.GetClusterInfo()
1075
  bep = cluster.FillBE(instance)
1076
  hvp = cluster.FillHV(instance)
1077
  args = {
1078
    "name": instance.name,
1079
    "primary_node": instance.primary_node,
1080
    "secondary_nodes": instance.secondary_nodes,
1081
    "os_type": instance.os,
1082
    "status": instance.admin_up,
1083
    "memory": bep[constants.BE_MEMORY],
1084
    "vcpus": bep[constants.BE_VCPUS],
1085
    "nics": _NICListToTuple(lu, instance.nics),
1086
    "disk_template": instance.disk_template,
1087
    "disks": [(disk.size, disk.mode) for disk in instance.disks],
1088
    "bep": bep,
1089
    "hvp": hvp,
1090
    "hypervisor_name": instance.hypervisor,
1091
    "tags": instance.tags,
1092
  }
1093
  if override:
1094
    args.update(override)
1095
  return _BuildInstanceHookEnv(**args) # pylint: disable=W0142
1096

    
1097

    
1098
def _AdjustCandidatePool(lu, exceptions):
1099
  """Adjust the candidate pool after node operations.
1100

1101
  """
1102
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1103
  if mod_list:
1104
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1105
               utils.CommaJoin(node.name for node in mod_list))
1106
    for name in mod_list:
1107
      lu.context.ReaddNode(name)
1108
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1109
  if mc_now > mc_max:
1110
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1111
               (mc_now, mc_max))
1112

    
1113

    
1114
def _DecideSelfPromotion(lu, exceptions=None):
1115
  """Decide whether I should promote myself as a master candidate.
1116

1117
  """
1118
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1119
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1120
  # the new node will increase mc_max with one, so:
1121
  mc_should = min(mc_should + 1, cp_size)
1122
  return mc_now < mc_should
1123

    
1124

    
1125
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1126
  """Check that the brigdes needed by a list of nics exist.
1127

1128
  """
1129
  cluster = lu.cfg.GetClusterInfo()
1130
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1131
  brlist = [params[constants.NIC_LINK] for params in paramslist
1132
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1133
  if brlist:
1134
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1135
    result.Raise("Error checking bridges on destination node '%s'" %
1136
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1137

    
1138

    
1139
def _CheckInstanceBridgesExist(lu, instance, node=None):
1140
  """Check that the brigdes needed by an instance exist.
1141

1142
  """
1143
  if node is None:
1144
    node = instance.primary_node
1145
  _CheckNicsBridgesExist(lu, instance.nics, node)
1146

    
1147

    
1148
def _CheckOSVariant(os_obj, name):
1149
  """Check whether an OS name conforms to the os variants specification.
1150

1151
  @type os_obj: L{objects.OS}
1152
  @param os_obj: OS object to check
1153
  @type name: string
1154
  @param name: OS name passed by the user, to check for validity
1155

1156
  """
1157
  variant = objects.OS.GetVariant(name)
1158
  if not os_obj.supported_variants:
1159
    if variant:
1160
      raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
1161
                                 " passed)" % (os_obj.name, variant),
1162
                                 errors.ECODE_INVAL)
1163
    return
1164
  if not variant:
1165
    raise errors.OpPrereqError("OS name must include a variant",
1166
                               errors.ECODE_INVAL)
1167

    
1168
  if variant not in os_obj.supported_variants:
1169
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1170

    
1171

    
1172
def _GetNodeInstancesInner(cfg, fn):
1173
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1174

    
1175

    
1176
def _GetNodeInstances(cfg, node_name):
1177
  """Returns a list of all primary and secondary instances on a node.
1178

1179
  """
1180

    
1181
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1182

    
1183

    
1184
def _GetNodePrimaryInstances(cfg, node_name):
1185
  """Returns primary instances on a node.
1186

1187
  """
1188
  return _GetNodeInstancesInner(cfg,
1189
                                lambda inst: node_name == inst.primary_node)
1190

    
1191

    
1192
def _GetNodeSecondaryInstances(cfg, node_name):
1193
  """Returns secondary instances on a node.
1194

1195
  """
1196
  return _GetNodeInstancesInner(cfg,
1197
                                lambda inst: node_name in inst.secondary_nodes)
1198

    
1199

    
1200
def _GetStorageTypeArgs(cfg, storage_type):
1201
  """Returns the arguments for a storage type.
1202

1203
  """
1204
  # Special case for file storage
1205
  if storage_type == constants.ST_FILE:
1206
    # storage.FileStorage wants a list of storage directories
1207
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1208

    
1209
  return []
1210

    
1211

    
1212
def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
1213
  faulty = []
1214

    
1215
  for dev in instance.disks:
1216
    cfg.SetDiskID(dev, node_name)
1217

    
1218
  result = rpc_runner.call_blockdev_getmirrorstatus(node_name, instance.disks)
1219
  result.Raise("Failed to get disk status from node %s" % node_name,
1220
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1221

    
1222
  for idx, bdev_status in enumerate(result.payload):
1223
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1224
      faulty.append(idx)
1225

    
1226
  return faulty
1227

    
1228

    
1229
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1230
  """Check the sanity of iallocator and node arguments and use the
1231
  cluster-wide iallocator if appropriate.
1232

1233
  Check that at most one of (iallocator, node) is specified. If none is
1234
  specified, then the LU's opcode's iallocator slot is filled with the
1235
  cluster-wide default iallocator.
1236

1237
  @type iallocator_slot: string
1238
  @param iallocator_slot: the name of the opcode iallocator slot
1239
  @type node_slot: string
1240
  @param node_slot: the name of the opcode target node slot
1241

1242
  """
1243
  node = getattr(lu.op, node_slot, None)
1244
  iallocator = getattr(lu.op, iallocator_slot, None)
1245

    
1246
  if node is not None and iallocator is not None:
1247
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
1248
                               errors.ECODE_INVAL)
1249
  elif node is None and iallocator is None:
1250
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1251
    if default_iallocator:
1252
      setattr(lu.op, iallocator_slot, default_iallocator)
1253
    else:
1254
      raise errors.OpPrereqError("No iallocator or node given and no"
1255
                                 " cluster-wide default iallocator found;"
1256
                                 " please specify either an iallocator or a"
1257
                                 " node, or set a cluster-wide default"
1258
                                 " iallocator")
1259

    
1260

    
1261
def _GetDefaultIAllocator(cfg, iallocator):
1262
  """Decides on which iallocator to use.
1263

1264
  @type cfg: L{config.ConfigWriter}
1265
  @param cfg: Cluster configuration object
1266
  @type iallocator: string or None
1267
  @param iallocator: Iallocator specified in opcode
1268
  @rtype: string
1269
  @return: Iallocator name
1270

1271
  """
1272
  if not iallocator:
1273
    # Use default iallocator
1274
    iallocator = cfg.GetDefaultIAllocator()
1275

    
1276
  if not iallocator:
1277
    raise errors.OpPrereqError("No iallocator was specified, neither in the"
1278
                               " opcode nor as a cluster-wide default",
1279
                               errors.ECODE_INVAL)
1280

    
1281
  return iallocator
1282

    
1283

    
1284
class LUClusterPostInit(LogicalUnit):
1285
  """Logical unit for running hooks after cluster initialization.
1286

1287
  """
1288
  HPATH = "cluster-init"
1289
  HTYPE = constants.HTYPE_CLUSTER
1290

    
1291
  def BuildHooksEnv(self):
1292
    """Build hooks env.
1293

1294
    """
1295
    return {
1296
      "OP_TARGET": self.cfg.GetClusterName(),
1297
      }
1298

    
1299
  def BuildHooksNodes(self):
1300
    """Build hooks nodes.
1301

1302
    """
1303
    return ([], [self.cfg.GetMasterNode()])
1304

    
1305
  def Exec(self, feedback_fn):
1306
    """Nothing to do.
1307

1308
    """
1309
    return True
1310

    
1311

    
1312
class LUClusterDestroy(LogicalUnit):
1313
  """Logical unit for destroying the cluster.
1314

1315
  """
1316
  HPATH = "cluster-destroy"
1317
  HTYPE = constants.HTYPE_CLUSTER
1318

    
1319
  def BuildHooksEnv(self):
1320
    """Build hooks env.
1321

1322
    """
1323
    return {
1324
      "OP_TARGET": self.cfg.GetClusterName(),
1325
      }
1326

    
1327
  def BuildHooksNodes(self):
1328
    """Build hooks nodes.
1329

1330
    """
1331
    return ([], [])
1332

    
1333
  def CheckPrereq(self):
1334
    """Check prerequisites.
1335

1336
    This checks whether the cluster is empty.
1337

1338
    Any errors are signaled by raising errors.OpPrereqError.
1339

1340
    """
1341
    master = self.cfg.GetMasterNode()
1342

    
1343
    nodelist = self.cfg.GetNodeList()
1344
    if len(nodelist) != 1 or nodelist[0] != master:
1345
      raise errors.OpPrereqError("There are still %d node(s) in"
1346
                                 " this cluster." % (len(nodelist) - 1),
1347
                                 errors.ECODE_INVAL)
1348
    instancelist = self.cfg.GetInstanceList()
1349
    if instancelist:
1350
      raise errors.OpPrereqError("There are still %d instance(s) in"
1351
                                 " this cluster." % len(instancelist),
1352
                                 errors.ECODE_INVAL)
1353

    
1354
  def Exec(self, feedback_fn):
1355
    """Destroys the cluster.
1356

1357
    """
1358
    master_params = self.cfg.GetMasterNetworkParameters()
1359

    
1360
    # Run post hooks on master node before it's removed
1361
    _RunPostHook(self, master_params.name)
1362

    
1363
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1364
                                                     master_params.ip,
1365
                                                     master_params.netmask,
1366
                                                     master_params.netdev,
1367
                                                     master_params.ip_family)
1368
    result.Raise("Could not disable the master role")
1369

    
1370
    return master_params.name
1371

    
1372

    
1373
def _VerifyCertificate(filename):
1374
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1375

1376
  @type filename: string
1377
  @param filename: Path to PEM file
1378

1379
  """
1380
  try:
1381
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1382
                                           utils.ReadFile(filename))
1383
  except Exception, err: # pylint: disable=W0703
1384
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1385
            "Failed to load X509 certificate %s: %s" % (filename, err))
1386

    
1387
  (errcode, msg) = \
1388
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1389
                                constants.SSL_CERT_EXPIRATION_ERROR)
1390

    
1391
  if msg:
1392
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1393
  else:
1394
    fnamemsg = None
1395

    
1396
  if errcode is None:
1397
    return (None, fnamemsg)
1398
  elif errcode == utils.CERT_WARNING:
1399
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1400
  elif errcode == utils.CERT_ERROR:
1401
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1402

    
1403
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1404

    
1405

    
1406
def _GetAllHypervisorParameters(cluster, instances):
1407
  """Compute the set of all hypervisor parameters.
1408

1409
  @type cluster: L{objects.Cluster}
1410
  @param cluster: the cluster object
1411
  @param instances: list of L{objects.Instance}
1412
  @param instances: additional instances from which to obtain parameters
1413
  @rtype: list of (origin, hypervisor, parameters)
1414
  @return: a list with all parameters found, indicating the hypervisor they
1415
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1416

1417
  """
1418
  hvp_data = []
1419

    
1420
  for hv_name in cluster.enabled_hypervisors:
1421
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1422

    
1423
  for os_name, os_hvp in cluster.os_hvp.items():
1424
    for hv_name, hv_params in os_hvp.items():
1425
      if hv_params:
1426
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1427
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1428

    
1429
  # TODO: collapse identical parameter values in a single one
1430
  for instance in instances:
1431
    if instance.hvparams:
1432
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1433
                       cluster.FillHV(instance)))
1434

    
1435
  return hvp_data
1436

    
1437

    
1438
class _VerifyErrors(object):
1439
  """Mix-in for cluster/group verify LUs.
1440

1441
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1442
  self.op and self._feedback_fn to be available.)
1443

1444
  """
1445

    
1446
  ETYPE_FIELD = "code"
1447
  ETYPE_ERROR = "ERROR"
1448
  ETYPE_WARNING = "WARNING"
1449

    
1450
  def _Error(self, ecode, item, msg, *args, **kwargs):
1451
    """Format an error message.
1452

1453
    Based on the opcode's error_codes parameter, either format a
1454
    parseable error code, or a simpler error string.
1455

1456
    This must be called only from Exec and functions called from Exec.
1457

1458
    """
1459
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1460
    itype, etxt, _ = ecode
1461
    # first complete the msg
1462
    if args:
1463
      msg = msg % args
1464
    # then format the whole message
1465
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1466
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1467
    else:
1468
      if item:
1469
        item = " " + item
1470
      else:
1471
        item = ""
1472
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1473
    # and finally report it via the feedback_fn
1474
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1475

    
1476
  def _ErrorIf(self, cond, ecode, *args, **kwargs):
1477
    """Log an error message if the passed condition is True.
1478

1479
    """
1480
    cond = (bool(cond)
1481
            or self.op.debug_simulate_errors) # pylint: disable=E1101
1482

    
1483
    # If the error code is in the list of ignored errors, demote the error to a
1484
    # warning
1485
    (_, etxt, _) = ecode
1486
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1487
      kwargs[self.ETYPE_FIELD] = self.ETYPE_WARNING
1488

    
1489
    if cond:
1490
      self._Error(ecode, *args, **kwargs)
1491

    
1492
    # do not mark the operation as failed for WARN cases only
1493
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1494
      self.bad = self.bad or cond
1495

    
1496

    
1497
class LUClusterVerify(NoHooksLU):
1498
  """Submits all jobs necessary to verify the cluster.
1499

1500
  """
1501
  REQ_BGL = False
1502

    
1503
  def ExpandNames(self):
1504
    self.needed_locks = {}
1505

    
1506
  def Exec(self, feedback_fn):
1507
    jobs = []
1508

    
1509
    if self.op.group_name:
1510
      groups = [self.op.group_name]
1511
      depends_fn = lambda: None
1512
    else:
1513
      groups = self.cfg.GetNodeGroupList()
1514

    
1515
      # Verify global configuration
1516
      jobs.append([
1517
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors)
1518
        ])
1519

    
1520
      # Always depend on global verification
1521
      depends_fn = lambda: [(-len(jobs), [])]
1522

    
1523
    jobs.extend([opcodes.OpClusterVerifyGroup(group_name=group,
1524
                                            ignore_errors=self.op.ignore_errors,
1525
                                            depends=depends_fn())]
1526
                for group in groups)
1527

    
1528
    # Fix up all parameters
1529
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1530
      op.debug_simulate_errors = self.op.debug_simulate_errors
1531
      op.verbose = self.op.verbose
1532
      op.error_codes = self.op.error_codes
1533
      try:
1534
        op.skip_checks = self.op.skip_checks
1535
      except AttributeError:
1536
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1537

    
1538
    return ResultWithJobs(jobs)
1539

    
1540

    
1541
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1542
  """Verifies the cluster config.
1543

1544
  """
1545
  REQ_BGL = True
1546

    
1547
  def _VerifyHVP(self, hvp_data):
1548
    """Verifies locally the syntax of the hypervisor parameters.
1549

1550
    """
1551
    for item, hv_name, hv_params in hvp_data:
1552
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1553
             (item, hv_name))
1554
      try:
1555
        hv_class = hypervisor.GetHypervisor(hv_name)
1556
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1557
        hv_class.CheckParameterSyntax(hv_params)
1558
      except errors.GenericError, err:
1559
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1560

    
1561
  def ExpandNames(self):
1562
    # Information can be safely retrieved as the BGL is acquired in exclusive
1563
    # mode
1564
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
1565
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1566
    self.all_node_info = self.cfg.GetAllNodesInfo()
1567
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1568
    self.needed_locks = {}
1569

    
1570
  def Exec(self, feedback_fn):
1571
    """Verify integrity of cluster, performing various test on nodes.
1572

1573
    """
1574
    self.bad = False
1575
    self._feedback_fn = feedback_fn
1576

    
1577
    feedback_fn("* Verifying cluster config")
1578

    
1579
    for msg in self.cfg.VerifyConfig():
1580
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1581

    
1582
    feedback_fn("* Verifying cluster certificate files")
1583

    
1584
    for cert_filename in constants.ALL_CERT_FILES:
1585
      (errcode, msg) = _VerifyCertificate(cert_filename)
1586
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1587

    
1588
    feedback_fn("* Verifying hypervisor parameters")
1589

    
1590
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1591
                                                self.all_inst_info.values()))
1592

    
1593
    feedback_fn("* Verifying all nodes belong to an existing group")
1594

    
1595
    # We do this verification here because, should this bogus circumstance
1596
    # occur, it would never be caught by VerifyGroup, which only acts on
1597
    # nodes/instances reachable from existing node groups.
1598

    
1599
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1600
                         if node.group not in self.all_group_info)
1601

    
1602
    dangling_instances = {}
1603
    no_node_instances = []
1604

    
1605
    for inst in self.all_inst_info.values():
1606
      if inst.primary_node in dangling_nodes:
1607
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1608
      elif inst.primary_node not in self.all_node_info:
1609
        no_node_instances.append(inst.name)
1610

    
1611
    pretty_dangling = [
1612
        "%s (%s)" %
1613
        (node.name,
1614
         utils.CommaJoin(dangling_instances.get(node.name,
1615
                                                ["no instances"])))
1616
        for node in dangling_nodes]
1617

    
1618
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1619
                  None,
1620
                  "the following nodes (and their instances) belong to a non"
1621
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1622

    
1623
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1624
                  None,
1625
                  "the following instances have a non-existing primary-node:"
1626
                  " %s", utils.CommaJoin(no_node_instances))
1627

    
1628
    return not self.bad
1629

    
1630

    
1631
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1632
  """Verifies the status of a node group.
1633

1634
  """
1635
  HPATH = "cluster-verify"
1636
  HTYPE = constants.HTYPE_CLUSTER
1637
  REQ_BGL = False
1638

    
1639
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1640

    
1641
  class NodeImage(object):
1642
    """A class representing the logical and physical status of a node.
1643

1644
    @type name: string
1645
    @ivar name: the node name to which this object refers
1646
    @ivar volumes: a structure as returned from
1647
        L{ganeti.backend.GetVolumeList} (runtime)
1648
    @ivar instances: a list of running instances (runtime)
1649
    @ivar pinst: list of configured primary instances (config)
1650
    @ivar sinst: list of configured secondary instances (config)
1651
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1652
        instances for which this node is secondary (config)
1653
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1654
    @ivar dfree: free disk, as reported by the node (runtime)
1655
    @ivar offline: the offline status (config)
1656
    @type rpc_fail: boolean
1657
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1658
        not whether the individual keys were correct) (runtime)
1659
    @type lvm_fail: boolean
1660
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1661
    @type hyp_fail: boolean
1662
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1663
    @type ghost: boolean
1664
    @ivar ghost: whether this is a known node or not (config)
1665
    @type os_fail: boolean
1666
    @ivar os_fail: whether the RPC call didn't return valid OS data
1667
    @type oslist: list
1668
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1669
    @type vm_capable: boolean
1670
    @ivar vm_capable: whether the node can host instances
1671

1672
    """
1673
    def __init__(self, offline=False, name=None, vm_capable=True):
1674
      self.name = name
1675
      self.volumes = {}
1676
      self.instances = []
1677
      self.pinst = []
1678
      self.sinst = []
1679
      self.sbp = {}
1680
      self.mfree = 0
1681
      self.dfree = 0
1682
      self.offline = offline
1683
      self.vm_capable = vm_capable
1684
      self.rpc_fail = False
1685
      self.lvm_fail = False
1686
      self.hyp_fail = False
1687
      self.ghost = False
1688
      self.os_fail = False
1689
      self.oslist = {}
1690

    
1691
  def ExpandNames(self):
1692
    # This raises errors.OpPrereqError on its own:
1693
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1694

    
1695
    # Get instances in node group; this is unsafe and needs verification later
1696
    inst_names = self.cfg.GetNodeGroupInstances(self.group_uuid)
1697

    
1698
    self.needed_locks = {
1699
      locking.LEVEL_INSTANCE: inst_names,
1700
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1701
      locking.LEVEL_NODE: [],
1702
      }
1703

    
1704
    self.share_locks = _ShareAll()
1705

    
1706
  def DeclareLocks(self, level):
1707
    if level == locking.LEVEL_NODE:
1708
      # Get members of node group; this is unsafe and needs verification later
1709
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1710

    
1711
      all_inst_info = self.cfg.GetAllInstancesInfo()
1712

    
1713
      # In Exec(), we warn about mirrored instances that have primary and
1714
      # secondary living in separate node groups. To fully verify that
1715
      # volumes for these instances are healthy, we will need to do an
1716
      # extra call to their secondaries. We ensure here those nodes will
1717
      # be locked.
1718
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1719
        # Important: access only the instances whose lock is owned
1720
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1721
          nodes.update(all_inst_info[inst].secondary_nodes)
1722

    
1723
      self.needed_locks[locking.LEVEL_NODE] = nodes
1724

    
1725
  def CheckPrereq(self):
1726
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1727
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1728

    
1729
    group_nodes = set(self.group_info.members)
1730
    group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
1731

    
1732
    unlocked_nodes = \
1733
        group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1734

    
1735
    unlocked_instances = \
1736
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1737

    
1738
    if unlocked_nodes:
1739
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1740
                                 utils.CommaJoin(unlocked_nodes))
1741

    
1742
    if unlocked_instances:
1743
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1744
                                 utils.CommaJoin(unlocked_instances))
1745

    
1746
    self.all_node_info = self.cfg.GetAllNodesInfo()
1747
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1748

    
1749
    self.my_node_names = utils.NiceSort(group_nodes)
1750
    self.my_inst_names = utils.NiceSort(group_instances)
1751

    
1752
    self.my_node_info = dict((name, self.all_node_info[name])
1753
                             for name in self.my_node_names)
1754

    
1755
    self.my_inst_info = dict((name, self.all_inst_info[name])
1756
                             for name in self.my_inst_names)
1757

    
1758
    # We detect here the nodes that will need the extra RPC calls for verifying
1759
    # split LV volumes; they should be locked.
1760
    extra_lv_nodes = set()
1761

    
1762
    for inst in self.my_inst_info.values():
1763
      if inst.disk_template in constants.DTS_INT_MIRROR:
1764
        group = self.my_node_info[inst.primary_node].group
1765
        for nname in inst.secondary_nodes:
1766
          if self.all_node_info[nname].group != group:
1767
            extra_lv_nodes.add(nname)
1768

    
1769
    unlocked_lv_nodes = \
1770
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1771

    
1772
    if unlocked_lv_nodes:
1773
      raise errors.OpPrereqError("these nodes could be locked: %s" %
1774
                                 utils.CommaJoin(unlocked_lv_nodes))
1775
    self.extra_lv_nodes = list(extra_lv_nodes)
1776

    
1777
  def _VerifyNode(self, ninfo, nresult):
1778
    """Perform some basic validation on data returned from a node.
1779

1780
      - check the result data structure is well formed and has all the
1781
        mandatory fields
1782
      - check ganeti version
1783

1784
    @type ninfo: L{objects.Node}
1785
    @param ninfo: the node to check
1786
    @param nresult: the results from the node
1787
    @rtype: boolean
1788
    @return: whether overall this call was successful (and we can expect
1789
         reasonable values in the respose)
1790

1791
    """
1792
    node = ninfo.name
1793
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1794

    
1795
    # main result, nresult should be a non-empty dict
1796
    test = not nresult or not isinstance(nresult, dict)
1797
    _ErrorIf(test, constants.CV_ENODERPC, node,
1798
                  "unable to verify node: no data returned")
1799
    if test:
1800
      return False
1801

    
1802
    # compares ganeti version
1803
    local_version = constants.PROTOCOL_VERSION
1804
    remote_version = nresult.get("version", None)
1805
    test = not (remote_version and
1806
                isinstance(remote_version, (list, tuple)) and
1807
                len(remote_version) == 2)
1808
    _ErrorIf(test, constants.CV_ENODERPC, node,
1809
             "connection to node returned invalid data")
1810
    if test:
1811
      return False
1812

    
1813
    test = local_version != remote_version[0]
1814
    _ErrorIf(test, constants.CV_ENODEVERSION, node,
1815
             "incompatible protocol versions: master %s,"
1816
             " node %s", local_version, remote_version[0])
1817
    if test:
1818
      return False
1819

    
1820
    # node seems compatible, we can actually try to look into its results
1821

    
1822
    # full package version
1823
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1824
                  constants.CV_ENODEVERSION, node,
1825
                  "software version mismatch: master %s, node %s",
1826
                  constants.RELEASE_VERSION, remote_version[1],
1827
                  code=self.ETYPE_WARNING)
1828

    
1829
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1830
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1831
      for hv_name, hv_result in hyp_result.iteritems():
1832
        test = hv_result is not None
1833
        _ErrorIf(test, constants.CV_ENODEHV, node,
1834
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1835

    
1836
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1837
    if ninfo.vm_capable and isinstance(hvp_result, list):
1838
      for item, hv_name, hv_result in hvp_result:
1839
        _ErrorIf(True, constants.CV_ENODEHV, node,
1840
                 "hypervisor %s parameter verify failure (source %s): %s",
1841
                 hv_name, item, hv_result)
1842

    
1843
    test = nresult.get(constants.NV_NODESETUP,
1844
                       ["Missing NODESETUP results"])
1845
    _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1846
             "; ".join(test))
1847

    
1848
    return True
1849

    
1850
  def _VerifyNodeTime(self, ninfo, nresult,
1851
                      nvinfo_starttime, nvinfo_endtime):
1852
    """Check the node time.
1853

1854
    @type ninfo: L{objects.Node}
1855
    @param ninfo: the node to check
1856
    @param nresult: the remote results for the node
1857
    @param nvinfo_starttime: the start time of the RPC call
1858
    @param nvinfo_endtime: the end time of the RPC call
1859

1860
    """
1861
    node = ninfo.name
1862
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1863

    
1864
    ntime = nresult.get(constants.NV_TIME, None)
1865
    try:
1866
      ntime_merged = utils.MergeTime(ntime)
1867
    except (ValueError, TypeError):
1868
      _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1869
      return
1870

    
1871
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1872
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1873
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1874
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1875
    else:
1876
      ntime_diff = None
1877

    
1878
    _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1879
             "Node time diverges by at least %s from master node time",
1880
             ntime_diff)
1881

    
1882
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1883
    """Check the node LVM results.
1884

1885
    @type ninfo: L{objects.Node}
1886
    @param ninfo: the node to check
1887
    @param nresult: the remote results for the node
1888
    @param vg_name: the configured VG name
1889

1890
    """
1891
    if vg_name is None:
1892
      return
1893

    
1894
    node = ninfo.name
1895
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1896

    
1897
    # checks vg existence and size > 20G
1898
    vglist = nresult.get(constants.NV_VGLIST, None)
1899
    test = not vglist
1900
    _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1901
    if not test:
1902
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1903
                                            constants.MIN_VG_SIZE)
1904
      _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1905

    
1906
    # check pv names
1907
    pvlist = nresult.get(constants.NV_PVLIST, None)
1908
    test = pvlist is None
1909
    _ErrorIf(test, constants.CV_ENODELVM, node, "Can't get PV list from node")
1910
    if not test:
1911
      # check that ':' is not present in PV names, since it's a
1912
      # special character for lvcreate (denotes the range of PEs to
1913
      # use on the PV)
1914
      for _, pvname, owner_vg in pvlist:
1915
        test = ":" in pvname
1916
        _ErrorIf(test, constants.CV_ENODELVM, node,
1917
                 "Invalid character ':' in PV '%s' of VG '%s'",
1918
                 pvname, owner_vg)
1919

    
1920
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1921
    """Check the node bridges.
1922

1923
    @type ninfo: L{objects.Node}
1924
    @param ninfo: the node to check
1925
    @param nresult: the remote results for the node
1926
    @param bridges: the expected list of bridges
1927

1928
    """
1929
    if not bridges:
1930
      return
1931

    
1932
    node = ninfo.name
1933
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1934

    
1935
    missing = nresult.get(constants.NV_BRIDGES, None)
1936
    test = not isinstance(missing, list)
1937
    _ErrorIf(test, constants.CV_ENODENET, node,
1938
             "did not return valid bridge information")
1939
    if not test:
1940
      _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1941
               "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1942

    
1943
  def _VerifyNodeNetwork(self, ninfo, nresult):
1944
    """Check the node network connectivity results.
1945

1946
    @type ninfo: L{objects.Node}
1947
    @param ninfo: the node to check
1948
    @param nresult: the remote results for the node
1949

1950
    """
1951
    node = ninfo.name
1952
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1953

    
1954
    test = constants.NV_NODELIST not in nresult
1955
    _ErrorIf(test, constants.CV_ENODESSH, node,
1956
             "node hasn't returned node ssh connectivity data")
1957
    if not test:
1958
      if nresult[constants.NV_NODELIST]:
1959
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1960
          _ErrorIf(True, constants.CV_ENODESSH, node,
1961
                   "ssh communication with node '%s': %s", a_node, a_msg)
1962

    
1963
    test = constants.NV_NODENETTEST not in nresult
1964
    _ErrorIf(test, constants.CV_ENODENET, node,
1965
             "node hasn't returned node tcp connectivity data")
1966
    if not test:
1967
      if nresult[constants.NV_NODENETTEST]:
1968
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1969
        for anode in nlist:
1970
          _ErrorIf(True, constants.CV_ENODENET, node,
1971
                   "tcp communication with node '%s': %s",
1972
                   anode, nresult[constants.NV_NODENETTEST][anode])
1973

    
1974
    test = constants.NV_MASTERIP not in nresult
1975
    _ErrorIf(test, constants.CV_ENODENET, node,
1976
             "node hasn't returned node master IP reachability data")
1977
    if not test:
1978
      if not nresult[constants.NV_MASTERIP]:
1979
        if node == self.master_node:
1980
          msg = "the master node cannot reach the master IP (not configured?)"
1981
        else:
1982
          msg = "cannot reach the master IP"
1983
        _ErrorIf(True, constants.CV_ENODENET, node, msg)
1984

    
1985
  def _VerifyInstance(self, instance, instanceconfig, node_image,
1986
                      diskstatus):
1987
    """Verify an instance.
1988

1989
    This function checks to see if the required block devices are
1990
    available on the instance's node.
1991

1992
    """
1993
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1994
    node_current = instanceconfig.primary_node
1995

    
1996
    node_vol_should = {}
1997
    instanceconfig.MapLVsByNode(node_vol_should)
1998

    
1999
    for node in node_vol_should:
2000
      n_img = node_image[node]
2001
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2002
        # ignore missing volumes on offline or broken nodes
2003
        continue
2004
      for volume in node_vol_should[node]:
2005
        test = volume not in n_img.volumes
2006
        _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
2007
                 "volume %s missing on node %s", volume, node)
2008

    
2009
    if instanceconfig.admin_up:
2010
      pri_img = node_image[node_current]
2011
      test = instance not in pri_img.instances and not pri_img.offline
2012
      _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
2013
               "instance not running on its primary node %s",
2014
               node_current)
2015

    
2016
    diskdata = [(nname, success, status, idx)
2017
                for (nname, disks) in diskstatus.items()
2018
                for idx, (success, status) in enumerate(disks)]
2019

    
2020
    for nname, success, bdev_status, idx in diskdata:
2021
      # the 'ghost node' construction in Exec() ensures that we have a
2022
      # node here
2023
      snode = node_image[nname]
2024
      bad_snode = snode.ghost or snode.offline
2025
      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
2026
               constants.CV_EINSTANCEFAULTYDISK, instance,
2027
               "couldn't retrieve status for disk/%s on %s: %s",
2028
               idx, nname, bdev_status)
2029
      _ErrorIf((instanceconfig.admin_up and success and
2030
                bdev_status.ldisk_status == constants.LDS_FAULTY),
2031
               constants.CV_EINSTANCEFAULTYDISK, instance,
2032
               "disk/%s on %s is faulty", idx, nname)
2033

    
2034
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2035
    """Verify if there are any unknown volumes in the cluster.
2036

2037
    The .os, .swap and backup volumes are ignored. All other volumes are
2038
    reported as unknown.
2039

2040
    @type reserved: L{ganeti.utils.FieldSet}
2041
    @param reserved: a FieldSet of reserved volume names
2042

2043
    """
2044
    for node, n_img in node_image.items():
2045
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2046
        # skip non-healthy nodes
2047
        continue
2048
      for volume in n_img.volumes:
2049
        test = ((node not in node_vol_should or
2050
                volume not in node_vol_should[node]) and
2051
                not reserved.Matches(volume))
2052
        self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
2053
                      "volume %s is unknown", volume)
2054

    
2055
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
2056
    """Verify N+1 Memory Resilience.
2057

2058
    Check that if one single node dies we can still start all the
2059
    instances it was primary for.
2060

2061
    """
2062
    cluster_info = self.cfg.GetClusterInfo()
2063
    for node, n_img in node_image.items():
2064
      # This code checks that every node which is now listed as
2065
      # secondary has enough memory to host all instances it is
2066
      # supposed to should a single other node in the cluster fail.
2067
      # FIXME: not ready for failover to an arbitrary node
2068
      # FIXME: does not support file-backed instances
2069
      # WARNING: we currently take into account down instances as well
2070
      # as up ones, considering that even if they're down someone
2071
      # might want to start them even in the event of a node failure.
2072
      if n_img.offline:
2073
        # we're skipping offline nodes from the N+1 warning, since
2074
        # most likely we don't have good memory infromation from them;
2075
        # we already list instances living on such nodes, and that's
2076
        # enough warning
2077
        continue
2078
      for prinode, instances in n_img.sbp.items():
2079
        needed_mem = 0
2080
        for instance in instances:
2081
          bep = cluster_info.FillBE(instance_cfg[instance])
2082
          if bep[constants.BE_AUTO_BALANCE]:
2083
            needed_mem += bep[constants.BE_MEMORY]
2084
        test = n_img.mfree < needed_mem
2085
        self._ErrorIf(test, constants.CV_ENODEN1, node,
2086
                      "not enough memory to accomodate instance failovers"
2087
                      " should node %s fail (%dMiB needed, %dMiB available)",
2088
                      prinode, needed_mem, n_img.mfree)
2089

    
2090
  @classmethod
2091
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
2092
                   (files_all, files_opt, files_mc, files_vm)):
2093
    """Verifies file checksums collected from all nodes.
2094

2095
    @param errorif: Callback for reporting errors
2096
    @param nodeinfo: List of L{objects.Node} objects
2097
    @param master_node: Name of master node
2098
    @param all_nvinfo: RPC results
2099

2100
    """
2101
    # Define functions determining which nodes to consider for a file
2102
    files2nodefn = [
2103
      (files_all, None),
2104
      (files_mc, lambda node: (node.master_candidate or
2105
                               node.name == master_node)),
2106
      (files_vm, lambda node: node.vm_capable),
2107
      ]
2108

    
2109
    # Build mapping from filename to list of nodes which should have the file
2110
    nodefiles = {}
2111
    for (files, fn) in files2nodefn:
2112
      if fn is None:
2113
        filenodes = nodeinfo
2114
      else:
2115
        filenodes = filter(fn, nodeinfo)
2116
      nodefiles.update((filename,
2117
                        frozenset(map(operator.attrgetter("name"), filenodes)))
2118
                       for filename in files)
2119

    
2120
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2121

    
2122
    fileinfo = dict((filename, {}) for filename in nodefiles)
2123
    ignore_nodes = set()
2124

    
2125
    for node in nodeinfo:
2126
      if node.offline:
2127
        ignore_nodes.add(node.name)
2128
        continue
2129

    
2130
      nresult = all_nvinfo[node.name]
2131

    
2132
      if nresult.fail_msg or not nresult.payload:
2133
        node_files = None
2134
      else:
2135
        node_files = nresult.payload.get(constants.NV_FILELIST, None)
2136

    
2137
      test = not (node_files and isinstance(node_files, dict))
2138
      errorif(test, constants.CV_ENODEFILECHECK, node.name,
2139
              "Node did not return file checksum data")
2140
      if test:
2141
        ignore_nodes.add(node.name)
2142
        continue
2143

    
2144
      # Build per-checksum mapping from filename to nodes having it
2145
      for (filename, checksum) in node_files.items():
2146
        assert filename in nodefiles
2147
        fileinfo[filename].setdefault(checksum, set()).add(node.name)
2148

    
2149
    for (filename, checksums) in fileinfo.items():
2150
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2151

    
2152
      # Nodes having the file
2153
      with_file = frozenset(node_name
2154
                            for nodes in fileinfo[filename].values()
2155
                            for node_name in nodes) - ignore_nodes
2156

    
2157
      expected_nodes = nodefiles[filename] - ignore_nodes
2158

    
2159
      # Nodes missing file
2160
      missing_file = expected_nodes - with_file
2161

    
2162
      if filename in files_opt:
2163
        # All or no nodes
2164
        errorif(missing_file and missing_file != expected_nodes,
2165
                constants.CV_ECLUSTERFILECHECK, None,
2166
                "File %s is optional, but it must exist on all or no"
2167
                " nodes (not found on %s)",
2168
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2169
      else:
2170
        errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2171
                "File %s is missing from node(s) %s", filename,
2172
                utils.CommaJoin(utils.NiceSort(missing_file)))
2173

    
2174
        # Warn if a node has a file it shouldn't
2175
        unexpected = with_file - expected_nodes
2176
        errorif(unexpected,
2177
                constants.CV_ECLUSTERFILECHECK, None,
2178
                "File %s should not exist on node(s) %s",
2179
                filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2180

    
2181
      # See if there are multiple versions of the file
2182
      test = len(checksums) > 1
2183
      if test:
2184
        variants = ["variant %s on %s" %
2185
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2186
                    for (idx, (checksum, nodes)) in
2187
                      enumerate(sorted(checksums.items()))]
2188
      else:
2189
        variants = []
2190

    
2191
      errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2192
              "File %s found with %s different checksums (%s)",
2193
              filename, len(checksums), "; ".join(variants))
2194

    
2195
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2196
                      drbd_map):
2197
    """Verifies and the node DRBD status.
2198

2199
    @type ninfo: L{objects.Node}
2200
    @param ninfo: the node to check
2201
    @param nresult: the remote results for the node
2202
    @param instanceinfo: the dict of instances
2203
    @param drbd_helper: the configured DRBD usermode helper
2204
    @param drbd_map: the DRBD map as returned by
2205
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2206

2207
    """
2208
    node = ninfo.name
2209
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2210

    
2211
    if drbd_helper:
2212
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2213
      test = (helper_result == None)
2214
      _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2215
               "no drbd usermode helper returned")
2216
      if helper_result:
2217
        status, payload = helper_result
2218
        test = not status
2219
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2220
                 "drbd usermode helper check unsuccessful: %s", payload)
2221
        test = status and (payload != drbd_helper)
2222
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2223
                 "wrong drbd usermode helper: %s", payload)
2224

    
2225
    # compute the DRBD minors
2226
    node_drbd = {}
2227
    for minor, instance in drbd_map[node].items():
2228
      test = instance not in instanceinfo
2229
      _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2230
               "ghost instance '%s' in temporary DRBD map", instance)
2231
        # ghost instance should not be running, but otherwise we
2232
        # don't give double warnings (both ghost instance and
2233
        # unallocated minor in use)
2234
      if test:
2235
        node_drbd[minor] = (instance, False)
2236
      else:
2237
        instance = instanceinfo[instance]
2238
        node_drbd[minor] = (instance.name, instance.admin_up)
2239

    
2240
    # and now check them
2241
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2242
    test = not isinstance(used_minors, (tuple, list))
2243
    _ErrorIf(test, constants.CV_ENODEDRBD, node,
2244
             "cannot parse drbd status file: %s", str(used_minors))
2245
    if test:
2246
      # we cannot check drbd status
2247
      return
2248

    
2249
    for minor, (iname, must_exist) in node_drbd.items():
2250
      test = minor not in used_minors and must_exist
2251
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2252
               "drbd minor %d of instance %s is not active", minor, iname)
2253
    for minor in used_minors:
2254
      test = minor not in node_drbd
2255
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2256
               "unallocated drbd minor %d is in use", minor)
2257

    
2258
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2259
    """Builds the node OS structures.
2260

2261
    @type ninfo: L{objects.Node}
2262
    @param ninfo: the node to check
2263
    @param nresult: the remote results for the node
2264
    @param nimg: the node image object
2265

2266
    """
2267
    node = ninfo.name
2268
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2269

    
2270
    remote_os = nresult.get(constants.NV_OSLIST, None)
2271
    test = (not isinstance(remote_os, list) or
2272
            not compat.all(isinstance(v, list) and len(v) == 7
2273
                           for v in remote_os))
2274

    
2275
    _ErrorIf(test, constants.CV_ENODEOS, node,
2276
             "node hasn't returned valid OS data")
2277

    
2278
    nimg.os_fail = test
2279

    
2280
    if test:
2281
      return
2282

    
2283
    os_dict = {}
2284

    
2285
    for (name, os_path, status, diagnose,
2286
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2287

    
2288
      if name not in os_dict:
2289
        os_dict[name] = []
2290

    
2291
      # parameters is a list of lists instead of list of tuples due to
2292
      # JSON lacking a real tuple type, fix it:
2293
      parameters = [tuple(v) for v in parameters]
2294
      os_dict[name].append((os_path, status, diagnose,
2295
                            set(variants), set(parameters), set(api_ver)))
2296

    
2297
    nimg.oslist = os_dict
2298

    
2299
  def _VerifyNodeOS(self, ninfo, nimg, base):
2300
    """Verifies the node OS list.
2301

2302
    @type ninfo: L{objects.Node}
2303
    @param ninfo: the node to check
2304
    @param nimg: the node image object
2305
    @param base: the 'template' node we match against (e.g. from the master)
2306

2307
    """
2308
    node = ninfo.name
2309
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2310

    
2311
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2312

    
2313
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2314
    for os_name, os_data in nimg.oslist.items():
2315
      assert os_data, "Empty OS status for OS %s?!" % os_name
2316
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2317
      _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2318
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2319
      _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2320
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2321
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2322
      # comparisons with the 'base' image
2323
      test = os_name not in base.oslist
2324
      _ErrorIf(test, constants.CV_ENODEOS, node,
2325
               "Extra OS %s not present on reference node (%s)",
2326
               os_name, base.name)
2327
      if test:
2328
        continue
2329
      assert base.oslist[os_name], "Base node has empty OS status?"
2330
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2331
      if not b_status:
2332
        # base OS is invalid, skipping
2333
        continue
2334
      for kind, a, b in [("API version", f_api, b_api),
2335
                         ("variants list", f_var, b_var),
2336
                         ("parameters", beautify_params(f_param),
2337
                          beautify_params(b_param))]:
2338
        _ErrorIf(a != b, constants.CV_ENODEOS, node,
2339
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2340
                 kind, os_name, base.name,
2341
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2342

    
2343
    # check any missing OSes
2344
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2345
    _ErrorIf(missing, constants.CV_ENODEOS, node,
2346
             "OSes present on reference node %s but missing on this node: %s",
2347
             base.name, utils.CommaJoin(missing))
2348

    
2349
  def _VerifyOob(self, ninfo, nresult):
2350
    """Verifies out of band functionality of a node.
2351

2352
    @type ninfo: L{objects.Node}
2353
    @param ninfo: the node to check
2354
    @param nresult: the remote results for the node
2355

2356
    """
2357
    node = ninfo.name
2358
    # We just have to verify the paths on master and/or master candidates
2359
    # as the oob helper is invoked on the master
2360
    if ((ninfo.master_candidate or ninfo.master_capable) and
2361
        constants.NV_OOB_PATHS in nresult):
2362
      for path_result in nresult[constants.NV_OOB_PATHS]:
2363
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2364

    
2365
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2366
    """Verifies and updates the node volume data.
2367

2368
    This function will update a L{NodeImage}'s internal structures
2369
    with data from the remote call.
2370

2371
    @type ninfo: L{objects.Node}
2372
    @param ninfo: the node to check
2373
    @param nresult: the remote results for the node
2374
    @param nimg: the node image object
2375
    @param vg_name: the configured VG name
2376

2377
    """
2378
    node = ninfo.name
2379
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2380

    
2381
    nimg.lvm_fail = True
2382
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2383
    if vg_name is None:
2384
      pass
2385
    elif isinstance(lvdata, basestring):
2386
      _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2387
               utils.SafeEncode(lvdata))
2388
    elif not isinstance(lvdata, dict):
2389
      _ErrorIf(True, constants.CV_ENODELVM, node,
2390
               "rpc call to node failed (lvlist)")
2391
    else:
2392
      nimg.volumes = lvdata
2393
      nimg.lvm_fail = False
2394

    
2395
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2396
    """Verifies and updates the node instance list.
2397

2398
    If the listing was successful, then updates this node's instance
2399
    list. Otherwise, it marks the RPC call as failed for the instance
2400
    list key.
2401

2402
    @type ninfo: L{objects.Node}
2403
    @param ninfo: the node to check
2404
    @param nresult: the remote results for the node
2405
    @param nimg: the node image object
2406

2407
    """
2408
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2409
    test = not isinstance(idata, list)
2410
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2411
                  "rpc call to node failed (instancelist): %s",
2412
                  utils.SafeEncode(str(idata)))
2413
    if test:
2414
      nimg.hyp_fail = True
2415
    else:
2416
      nimg.instances = idata
2417

    
2418
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2419
    """Verifies and computes a node information map
2420

2421
    @type ninfo: L{objects.Node}
2422
    @param ninfo: the node to check
2423
    @param nresult: the remote results for the node
2424
    @param nimg: the node image object
2425
    @param vg_name: the configured VG name
2426

2427
    """
2428
    node = ninfo.name
2429
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2430

    
2431
    # try to read free memory (from the hypervisor)
2432
    hv_info = nresult.get(constants.NV_HVINFO, None)
2433
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2434
    _ErrorIf(test, constants.CV_ENODEHV, node,
2435
             "rpc call to node failed (hvinfo)")
2436
    if not test:
2437
      try:
2438
        nimg.mfree = int(hv_info["memory_free"])
2439
      except (ValueError, TypeError):
2440
        _ErrorIf(True, constants.CV_ENODERPC, node,
2441
                 "node returned invalid nodeinfo, check hypervisor")
2442

    
2443
    # FIXME: devise a free space model for file based instances as well
2444
    if vg_name is not None:
2445
      test = (constants.NV_VGLIST not in nresult or
2446
              vg_name not in nresult[constants.NV_VGLIST])
2447
      _ErrorIf(test, constants.CV_ENODELVM, node,
2448
               "node didn't return data for the volume group '%s'"
2449
               " - it is either missing or broken", vg_name)
2450
      if not test:
2451
        try:
2452
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2453
        except (ValueError, TypeError):
2454
          _ErrorIf(True, constants.CV_ENODERPC, node,
2455
                   "node returned invalid LVM info, check LVM status")
2456

    
2457
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2458
    """Gets per-disk status information for all instances.
2459

2460
    @type nodelist: list of strings
2461
    @param nodelist: Node names
2462
    @type node_image: dict of (name, L{objects.Node})
2463
    @param node_image: Node objects
2464
    @type instanceinfo: dict of (name, L{objects.Instance})
2465
    @param instanceinfo: Instance objects
2466
    @rtype: {instance: {node: [(succes, payload)]}}
2467
    @return: a dictionary of per-instance dictionaries with nodes as
2468
        keys and disk information as values; the disk information is a
2469
        list of tuples (success, payload)
2470

2471
    """
2472
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2473

    
2474
    node_disks = {}
2475
    node_disks_devonly = {}
2476
    diskless_instances = set()
2477
    diskless = constants.DT_DISKLESS
2478

    
2479
    for nname in nodelist:
2480
      node_instances = list(itertools.chain(node_image[nname].pinst,
2481
                                            node_image[nname].sinst))
2482
      diskless_instances.update(inst for inst in node_instances
2483
                                if instanceinfo[inst].disk_template == diskless)
2484
      disks = [(inst, disk)
2485
               for inst in node_instances
2486
               for disk in instanceinfo[inst].disks]
2487

    
2488
      if not disks:
2489
        # No need to collect data
2490
        continue
2491

    
2492
      node_disks[nname] = disks
2493

    
2494
      # Creating copies as SetDiskID below will modify the objects and that can
2495
      # lead to incorrect data returned from nodes
2496
      devonly = [dev.Copy() for (_, dev) in disks]
2497

    
2498
      for dev in devonly:
2499
        self.cfg.SetDiskID(dev, nname)
2500

    
2501
      node_disks_devonly[nname] = devonly
2502

    
2503
    assert len(node_disks) == len(node_disks_devonly)
2504

    
2505
    # Collect data from all nodes with disks
2506
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2507
                                                          node_disks_devonly)
2508

    
2509
    assert len(result) == len(node_disks)
2510

    
2511
    instdisk = {}
2512

    
2513
    for (nname, nres) in result.items():
2514
      disks = node_disks[nname]
2515

    
2516
      if nres.offline:
2517
        # No data from this node
2518
        data = len(disks) * [(False, "node offline")]
2519
      else:
2520
        msg = nres.fail_msg
2521
        _ErrorIf(msg, constants.CV_ENODERPC, nname,
2522
                 "while getting disk information: %s", msg)
2523
        if msg:
2524
          # No data from this node
2525
          data = len(disks) * [(False, msg)]
2526
        else:
2527
          data = []
2528
          for idx, i in enumerate(nres.payload):
2529
            if isinstance(i, (tuple, list)) and len(i) == 2:
2530
              data.append(i)
2531
            else:
2532
              logging.warning("Invalid result from node %s, entry %d: %s",
2533
                              nname, idx, i)
2534
              data.append((False, "Invalid result from the remote node"))
2535

    
2536
      for ((inst, _), status) in zip(disks, data):
2537
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2538

    
2539
    # Add empty entries for diskless instances.
2540
    for inst in diskless_instances:
2541
      assert inst not in instdisk
2542
      instdisk[inst] = {}
2543

    
2544
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2545
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2546
                      compat.all(isinstance(s, (tuple, list)) and
2547
                                 len(s) == 2 for s in statuses)
2548
                      for inst, nnames in instdisk.items()
2549
                      for nname, statuses in nnames.items())
2550
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2551

    
2552
    return instdisk
2553

    
2554
  @staticmethod
2555
  def _SshNodeSelector(group_uuid, all_nodes):
2556
    """Create endless iterators for all potential SSH check hosts.
2557

2558
    """
2559
    nodes = [node for node in all_nodes
2560
             if (node.group != group_uuid and
2561
                 not node.offline)]
2562
    keyfunc = operator.attrgetter("group")
2563

    
2564
    return map(itertools.cycle,
2565
               [sorted(map(operator.attrgetter("name"), names))
2566
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2567
                                                  keyfunc)])
2568

    
2569
  @classmethod
2570
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2571
    """Choose which nodes should talk to which other nodes.
2572

2573
    We will make nodes contact all nodes in their group, and one node from
2574
    every other group.
2575

2576
    @warning: This algorithm has a known issue if one node group is much
2577
      smaller than others (e.g. just one node). In such a case all other
2578
      nodes will talk to the single node.
2579

2580
    """
2581
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2582
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2583

    
2584
    return (online_nodes,
2585
            dict((name, sorted([i.next() for i in sel]))
2586
                 for name in online_nodes))
2587

    
2588
  def BuildHooksEnv(self):
2589
    """Build hooks env.
2590

2591
    Cluster-Verify hooks just ran in the post phase and their failure makes
2592
    the output be logged in the verify output and the verification to fail.
2593

2594
    """
2595
    env = {
2596
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2597
      }
2598

    
2599
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2600
               for node in self.my_node_info.values())
2601

    
2602
    return env
2603

    
2604
  def BuildHooksNodes(self):
2605
    """Build hooks nodes.
2606

2607
    """
2608
    return ([], self.my_node_names)
2609

    
2610
  def Exec(self, feedback_fn):
2611
    """Verify integrity of the node group, performing various test on nodes.
2612

2613
    """
2614
    # This method has too many local variables. pylint: disable=R0914
2615
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2616

    
2617
    if not self.my_node_names:
2618
      # empty node group
2619
      feedback_fn("* Empty node group, skipping verification")
2620
      return True
2621

    
2622
    self.bad = False
2623
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2624
    verbose = self.op.verbose
2625
    self._feedback_fn = feedback_fn
2626

    
2627
    vg_name = self.cfg.GetVGName()
2628
    drbd_helper = self.cfg.GetDRBDHelper()
2629
    cluster = self.cfg.GetClusterInfo()
2630
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2631
    hypervisors = cluster.enabled_hypervisors
2632
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2633

    
2634
    i_non_redundant = [] # Non redundant instances
2635
    i_non_a_balanced = [] # Non auto-balanced instances
2636
    n_offline = 0 # Count of offline nodes
2637
    n_drained = 0 # Count of nodes being drained
2638
    node_vol_should = {}
2639

    
2640
    # FIXME: verify OS list
2641

    
2642
    # File verification
2643
    filemap = _ComputeAncillaryFiles(cluster, False)
2644

    
2645
    # do local checksums
2646
    master_node = self.master_node = self.cfg.GetMasterNode()
2647
    master_ip = self.cfg.GetMasterIP()
2648

    
2649
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2650

    
2651
    node_verify_param = {
2652
      constants.NV_FILELIST:
2653
        utils.UniqueSequence(filename
2654
                             for files in filemap
2655
                             for filename in files),
2656
      constants.NV_NODELIST:
2657
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2658
                                  self.all_node_info.values()),
2659
      constants.NV_HYPERVISOR: hypervisors,
2660
      constants.NV_HVPARAMS:
2661
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2662
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2663
                                 for node in node_data_list
2664
                                 if not node.offline],
2665
      constants.NV_INSTANCELIST: hypervisors,
2666
      constants.NV_VERSION: None,
2667
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2668
      constants.NV_NODESETUP: None,
2669
      constants.NV_TIME: None,
2670
      constants.NV_MASTERIP: (master_node, master_ip),
2671
      constants.NV_OSLIST: None,
2672
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2673
      }
2674

    
2675
    if vg_name is not None:
2676
      node_verify_param[constants.NV_VGLIST] = None
2677
      node_verify_param[constants.NV_LVLIST] = vg_name
2678
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2679
      node_verify_param[constants.NV_DRBDLIST] = None
2680

    
2681
    if drbd_helper:
2682
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2683

    
2684
    # bridge checks
2685
    # FIXME: this needs to be changed per node-group, not cluster-wide
2686
    bridges = set()
2687
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2688
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2689
      bridges.add(default_nicpp[constants.NIC_LINK])
2690
    for instance in self.my_inst_info.values():
2691
      for nic in instance.nics:
2692
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2693
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2694
          bridges.add(full_nic[constants.NIC_LINK])
2695

    
2696
    if bridges:
2697
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2698

    
2699
    # Build our expected cluster state
2700
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2701
                                                 name=node.name,
2702
                                                 vm_capable=node.vm_capable))
2703
                      for node in node_data_list)
2704

    
2705
    # Gather OOB paths
2706
    oob_paths = []
2707
    for node in self.all_node_info.values():
2708
      path = _SupportsOob(self.cfg, node)
2709
      if path and path not in oob_paths:
2710
        oob_paths.append(path)
2711

    
2712
    if oob_paths:
2713
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2714

    
2715
    for instance in self.my_inst_names:
2716
      inst_config = self.my_inst_info[instance]
2717

    
2718
      for nname in inst_config.all_nodes:
2719
        if nname not in node_image:
2720
          gnode = self.NodeImage(name=nname)
2721
          gnode.ghost = (nname not in self.all_node_info)
2722
          node_image[nname] = gnode
2723

    
2724
      inst_config.MapLVsByNode(node_vol_should)
2725

    
2726
      pnode = inst_config.primary_node
2727
      node_image[pnode].pinst.append(instance)
2728

    
2729
      for snode in inst_config.secondary_nodes:
2730
        nimg = node_image[snode]
2731
        nimg.sinst.append(instance)
2732
        if pnode not in nimg.sbp:
2733
          nimg.sbp[pnode] = []
2734
        nimg.sbp[pnode].append(instance)
2735

    
2736
    # At this point, we have the in-memory data structures complete,
2737
    # except for the runtime information, which we'll gather next
2738

    
2739
    # Due to the way our RPC system works, exact response times cannot be
2740
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2741
    # time before and after executing the request, we can at least have a time
2742
    # window.
2743
    nvinfo_starttime = time.time()
2744
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2745
                                           node_verify_param,
2746
                                           self.cfg.GetClusterName())
2747
    nvinfo_endtime = time.time()
2748

    
2749
    if self.extra_lv_nodes and vg_name is not None:
2750
      extra_lv_nvinfo = \
2751
          self.rpc.call_node_verify(self.extra_lv_nodes,
2752
                                    {constants.NV_LVLIST: vg_name},
2753
                                    self.cfg.GetClusterName())
2754
    else:
2755
      extra_lv_nvinfo = {}
2756

    
2757
    all_drbd_map = self.cfg.ComputeDRBDMap()
2758

    
2759
    feedback_fn("* Gathering disk information (%s nodes)" %
2760
                len(self.my_node_names))
2761
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2762
                                     self.my_inst_info)
2763

    
2764
    feedback_fn("* Verifying configuration file consistency")
2765

    
2766
    # If not all nodes are being checked, we need to make sure the master node
2767
    # and a non-checked vm_capable node are in the list.
2768
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2769
    if absent_nodes:
2770
      vf_nvinfo = all_nvinfo.copy()
2771
      vf_node_info = list(self.my_node_info.values())
2772
      additional_nodes = []
2773
      if master_node not in self.my_node_info:
2774
        additional_nodes.append(master_node)
2775
        vf_node_info.append(self.all_node_info[master_node])
2776
      # Add the first vm_capable node we find which is not included
2777
      for node in absent_nodes:
2778
        nodeinfo = self.all_node_info[node]
2779
        if nodeinfo.vm_capable and not nodeinfo.offline:
2780
          additional_nodes.append(node)
2781
          vf_node_info.append(self.all_node_info[node])
2782
          break
2783
      key = constants.NV_FILELIST
2784
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2785
                                                 {key: node_verify_param[key]},
2786
                                                 self.cfg.GetClusterName()))
2787
    else:
2788
      vf_nvinfo = all_nvinfo
2789
      vf_node_info = self.my_node_info.values()
2790

    
2791
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2792

    
2793
    feedback_fn("* Verifying node status")
2794

    
2795
    refos_img = None
2796

    
2797
    for node_i in node_data_list:
2798
      node = node_i.name
2799
      nimg = node_image[node]
2800

    
2801
      if node_i.offline:
2802
        if verbose:
2803
          feedback_fn("* Skipping offline node %s" % (node,))
2804
        n_offline += 1
2805
        continue
2806

    
2807
      if node == master_node:
2808
        ntype = "master"
2809
      elif node_i.master_candidate:
2810
        ntype = "master candidate"
2811
      elif node_i.drained:
2812
        ntype = "drained"
2813
        n_drained += 1
2814
      else:
2815
        ntype = "regular"
2816
      if verbose:
2817
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2818

    
2819
      msg = all_nvinfo[node].fail_msg
2820
      _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2821
               msg)
2822
      if msg:
2823
        nimg.rpc_fail = True
2824
        continue
2825

    
2826
      nresult = all_nvinfo[node].payload
2827

    
2828
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2829
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2830
      self._VerifyNodeNetwork(node_i, nresult)
2831
      self._VerifyOob(node_i, nresult)
2832

    
2833
      if nimg.vm_capable:
2834
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2835
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2836
                             all_drbd_map)
2837

    
2838
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2839
        self._UpdateNodeInstances(node_i, nresult, nimg)
2840
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2841
        self._UpdateNodeOS(node_i, nresult, nimg)
2842

    
2843
        if not nimg.os_fail:
2844
          if refos_img is None:
2845
            refos_img = nimg
2846
          self._VerifyNodeOS(node_i, nimg, refos_img)
2847
        self._VerifyNodeBridges(node_i, nresult, bridges)
2848

    
2849
        # Check whether all running instancies are primary for the node. (This
2850
        # can no longer be done from _VerifyInstance below, since some of the
2851
        # wrong instances could be from other node groups.)
2852
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2853

    
2854
        for inst in non_primary_inst:
2855
          test = inst in self.all_inst_info
2856
          _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2857
                   "instance should not run on node %s", node_i.name)
2858
          _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2859
                   "node is running unknown instance %s", inst)
2860

    
2861
    for node, result in extra_lv_nvinfo.items():
2862
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2863
                              node_image[node], vg_name)
2864

    
2865
    feedback_fn("* Verifying instance status")
2866
    for instance in self.my_inst_names:
2867
      if verbose:
2868
        feedback_fn("* Verifying instance %s" % instance)
2869
      inst_config = self.my_inst_info[instance]
2870
      self._VerifyInstance(instance, inst_config, node_image,
2871
                           instdisk[instance])
2872
      inst_nodes_offline = []
2873

    
2874
      pnode = inst_config.primary_node
2875
      pnode_img = node_image[pnode]
2876
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2877
               constants.CV_ENODERPC, pnode, "instance %s, connection to"
2878
               " primary node failed", instance)
2879

    
2880
      _ErrorIf(inst_config.admin_up and pnode_img.offline,
2881
               constants.CV_EINSTANCEBADNODE, instance,
2882
               "instance is marked as running and lives on offline node %s",
2883
               inst_config.primary_node)
2884

    
2885
      # If the instance is non-redundant we cannot survive losing its primary
2886
      # node, so we are not N+1 compliant. On the other hand we have no disk
2887
      # templates with more than one secondary so that situation is not well
2888
      # supported either.
2889
      # FIXME: does not support file-backed instances
2890
      if not inst_config.secondary_nodes:
2891
        i_non_redundant.append(instance)
2892

    
2893
      _ErrorIf(len(inst_config.secondary_nodes) > 1,
2894
               constants.CV_EINSTANCELAYOUT,
2895
               instance, "instance has multiple secondary nodes: %s",
2896
               utils.CommaJoin(inst_config.secondary_nodes),
2897
               code=self.ETYPE_WARNING)
2898

    
2899
      if inst_config.disk_template in constants.DTS_INT_MIRROR:
2900
        pnode = inst_config.primary_node
2901
        instance_nodes = utils.NiceSort(inst_config.all_nodes)
2902
        instance_groups = {}
2903

    
2904
        for node in instance_nodes:
2905
          instance_groups.setdefault(self.all_node_info[node].group,
2906
                                     []).append(node)
2907

    
2908
        pretty_list = [
2909
          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
2910
          # Sort so that we always list the primary node first.
2911
          for group, nodes in sorted(instance_groups.items(),
2912
                                     key=lambda (_, nodes): pnode in nodes,
2913
                                     reverse=True)]
2914

    
2915
        self._ErrorIf(len(instance_groups) > 1,
2916
                      constants.CV_EINSTANCESPLITGROUPS,
2917
                      instance, "instance has primary and secondary nodes in"
2918
                      " different groups: %s", utils.CommaJoin(pretty_list),
2919
                      code=self.ETYPE_WARNING)
2920

    
2921
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2922
        i_non_a_balanced.append(instance)
2923

    
2924
      for snode in inst_config.secondary_nodes:
2925
        s_img = node_image[snode]
2926
        _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2927
                 snode, "instance %s, connection to secondary node failed",
2928
                 instance)
2929

    
2930
        if s_img.offline:
2931
          inst_nodes_offline.append(snode)
2932

    
2933
      # warn that the instance lives on offline nodes
2934
      _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
2935
               "instance has offline secondary node(s) %s",
2936
               utils.CommaJoin(inst_nodes_offline))
2937
      # ... or ghost/non-vm_capable nodes
2938
      for node in inst_config.all_nodes:
2939
        _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
2940
                 instance, "instance lives on ghost node %s", node)
2941
        _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
2942
                 instance, "instance lives on non-vm_capable node %s", node)
2943

    
2944
    feedback_fn("* Verifying orphan volumes")
2945
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2946

    
2947
    # We will get spurious "unknown volume" warnings if any node of this group
2948
    # is secondary for an instance whose primary is in another group. To avoid
2949
    # them, we find these instances and add their volumes to node_vol_should.
2950
    for inst in self.all_inst_info.values():
2951
      for secondary in inst.secondary_nodes:
2952
        if (secondary in self.my_node_info
2953
            and inst.name not in self.my_inst_info):
2954
          inst.MapLVsByNode(node_vol_should)
2955
          break
2956

    
2957
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2958

    
2959
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2960
      feedback_fn("* Verifying N+1 Memory redundancy")
2961
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2962

    
2963
    feedback_fn("* Other Notes")
2964
    if i_non_redundant:
2965
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2966
                  % len(i_non_redundant))
2967

    
2968
    if i_non_a_balanced:
2969
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2970
                  % len(i_non_a_balanced))
2971

    
2972
    if n_offline:
2973
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2974

    
2975
    if n_drained:
2976
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2977

    
2978
    return not self.bad
2979

    
2980
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2981
    """Analyze the post-hooks' result
2982

2983
    This method analyses the hook result, handles it, and sends some
2984
    nicely-formatted feedback back to the user.
2985

2986
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2987
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2988
    @param hooks_results: the results of the multi-node hooks rpc call
2989
    @param feedback_fn: function used send feedback back to the caller
2990
    @param lu_result: previous Exec result
2991
    @return: the new Exec result, based on the previous result
2992
        and hook results
2993

2994
    """
2995
    # We only really run POST phase hooks, only for non-empty groups,
2996
    # and are only interested in their results
2997
    if not self.my_node_names:
2998
      # empty node group
2999
      pass
3000
    elif phase == constants.HOOKS_PHASE_POST:
3001
      # Used to change hooks' output to proper indentation
3002
      feedback_fn("* Hooks Results")
3003
      assert hooks_results, "invalid result from hooks"
3004

    
3005
      for node_name in hooks_results:
3006
        res = hooks_results[node_name]
3007
        msg = res.fail_msg
3008
        test = msg and not res.offline
3009
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3010
                      "Communication failure in hooks execution: %s", msg)
3011
        if res.offline or msg:
3012
          # No need to investigate payload if node is offline or gave
3013
          # an error.
3014
          continue
3015
        for script, hkr, output in res.payload:
3016
          test = hkr == constants.HKR_FAIL
3017
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3018
                        "Script %s failed, output:", script)
3019
          if test:
3020
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3021
            feedback_fn("%s" % output)
3022
            lu_result = False
3023

    
3024
    return lu_result
3025

    
3026

    
3027
class LUClusterVerifyDisks(NoHooksLU):
3028
  """Verifies the cluster disks status.
3029

3030
  """
3031
  REQ_BGL = False
3032

    
3033
  def ExpandNames(self):
3034
    self.share_locks = _ShareAll()
3035
    self.needed_locks = {
3036
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3037
      }
3038

    
3039
  def Exec(self, feedback_fn):
3040
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3041

    
3042
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3043
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3044
                           for group in group_names])
3045

    
3046

    
3047
class LUGroupVerifyDisks(NoHooksLU):
3048
  """Verifies the status of all disks in a node group.
3049

3050
  """
3051
  REQ_BGL = False
3052

    
3053
  def ExpandNames(self):
3054
    # Raises errors.OpPrereqError on its own if group can't be found
3055
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
3056

    
3057
    self.share_locks = _ShareAll()
3058
    self.needed_locks = {
3059
      locking.LEVEL_INSTANCE: [],
3060
      locking.LEVEL_NODEGROUP: [],
3061
      locking.LEVEL_NODE: [],
3062
      }
3063

    
3064
  def DeclareLocks(self, level):
3065
    if level == locking.LEVEL_INSTANCE:
3066
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
3067

    
3068
      # Lock instances optimistically, needs verification once node and group
3069
      # locks have been acquired
3070
      self.needed_locks[locking.LEVEL_INSTANCE] = \
3071
        self.cfg.GetNodeGroupInstances(self.group_uuid)
3072

    
3073
    elif level == locking.LEVEL_NODEGROUP:
3074
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3075

    
3076
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
3077
        set([self.group_uuid] +
3078
            # Lock all groups used by instances optimistically; this requires
3079
            # going via the node before it's locked, requiring verification
3080
            # later on
3081
            [group_uuid
3082
             for instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
3083
             for group_uuid in self.cfg.GetInstanceNodeGroups(instance_name)])
3084

    
3085
    elif level == locking.LEVEL_NODE:
3086
      # This will only lock the nodes in the group to be verified which contain
3087
      # actual instances
3088
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3089
      self._LockInstancesNodes()
3090

    
3091
      # Lock all nodes in group to be verified
3092
      assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
3093
      member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
3094
      self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
3095

    
3096
  def CheckPrereq(self):
3097
    owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
3098
    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
3099
    owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
3100

    
3101
    assert self.group_uuid in owned_groups
3102

    
3103
    # Check if locked instances are still correct
3104
    _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
3105

    
3106
    # Get instance information
3107
    self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
3108

    
3109
    # Check if node groups for locked instances are still correct
3110
    for (instance_name, inst) in self.instances.items():
3111
      assert owned_nodes.issuperset(inst.all_nodes), \
3112
        "Instance %s's nodes changed while we kept the lock" % instance_name
3113

    
3114
      inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
3115
                                             owned_groups)
3116

    
3117
      assert self.group_uuid in inst_groups, \
3118
        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
3119

    
3120
  def Exec(self, feedback_fn):
3121
    """Verify integrity of cluster disks.
3122

3123
    @rtype: tuple of three items
3124
    @return: a tuple of (dict of node-to-node_error, list of instances
3125
        which need activate-disks, dict of instance: (node, volume) for
3126
        missing volumes
3127

3128
    """
3129
    res_nodes = {}
3130
    res_instances = set()
3131
    res_missing = {}
3132

    
3133
    nv_dict = _MapInstanceDisksToNodes([inst
3134
                                        for inst in self.instances.values()
3135
                                        if inst.admin_up])
3136

    
3137
    if nv_dict:
3138
      nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
3139
                             set(self.cfg.GetVmCapableNodeList()))
3140

    
3141
      node_lvs = self.rpc.call_lv_list(nodes, [])
3142

    
3143
      for (node, node_res) in node_lvs.items():
3144
        if node_res.offline:
3145
          continue
3146

    
3147
        msg = node_res.fail_msg
3148
        if msg:
3149
          logging.warning("Error enumerating LVs on node %s: %s", node, msg)
3150
          res_nodes[node] = msg
3151
          continue
3152

    
3153
        for lv_name, (_, _, lv_online) in node_res.payload.items():
3154
          inst = nv_dict.pop((node, lv_name), None)
3155
          if not (lv_online or inst is None):
3156
            res_instances.add(inst)
3157

    
3158
      # any leftover items in nv_dict are missing LVs, let's arrange the data
3159
      # better
3160
      for key, inst in nv_dict.iteritems():
3161
        res_missing.setdefault(inst, []).append(list(key))
3162

    
3163
    return (res_nodes, list(res_instances), res_missing)
3164

    
3165

    
3166
class LUClusterRepairDiskSizes(NoHooksLU):
3167
  """Verifies the cluster disks sizes.
3168

3169
  """
3170
  REQ_BGL = False
3171

    
3172
  def ExpandNames(self):
3173
    if self.op.instances:
3174
      self.wanted_names = _GetWantedInstances(self, self.op.instances)
3175
      self.needed_locks = {
3176
        locking.LEVEL_NODE: [],
3177
        locking.LEVEL_INSTANCE: self.wanted_names,
3178
        }
3179
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3180
    else:
3181
      self.wanted_names = None
3182
      self.needed_locks = {
3183
        locking.LEVEL_NODE: locking.ALL_SET,
3184
        locking.LEVEL_INSTANCE: locking.ALL_SET,
3185
        }
3186
    self.share_locks = _ShareAll()
3187

    
3188
  def DeclareLocks(self, level):
3189
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
3190
      self._LockInstancesNodes(primary_only=True)
3191

    
3192
  def CheckPrereq(self):
3193
    """Check prerequisites.
3194

3195
    This only checks the optional instance list against the existing names.
3196

3197
    """
3198
    if self.wanted_names is None:
3199
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
3200

    
3201
    self.wanted_instances = \
3202
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
3203

    
3204
  def _EnsureChildSizes(self, disk):
3205
    """Ensure children of the disk have the needed disk size.
3206

3207
    This is valid mainly for DRBD8 and fixes an issue where the
3208
    children have smaller disk size.
3209

3210
    @param disk: an L{ganeti.objects.Disk} object
3211

3212
    """
3213
    if disk.dev_type == constants.LD_DRBD8:
3214
      assert disk.children, "Empty children for DRBD8?"
3215
      fchild = disk.children[0]
3216
      mismatch = fchild.size < disk.size
3217
      if mismatch:
3218
        self.LogInfo("Child disk has size %d, parent %d, fixing",
3219
                     fchild.size, disk.size)
3220
        fchild.size = disk.size
3221

    
3222
      # and we recurse on this child only, not on the metadev
3223
      return self._EnsureChildSizes(fchild) or mismatch
3224
    else:
3225
      return False
3226

    
3227
  def Exec(self, feedback_fn):
3228
    """Verify the size of cluster disks.
3229

3230
    """
3231
    # TODO: check child disks too
3232
    # TODO: check differences in size between primary/secondary nodes
3233
    per_node_disks = {}
3234
    for instance in self.wanted_instances:
3235
      pnode = instance.primary_node
3236
      if pnode not in per_node_disks:
3237
        per_node_disks[pnode] = []
3238
      for idx, disk in enumerate(instance.disks):
3239
        per_node_disks[pnode].append((instance, idx, disk))
3240

    
3241
    changed = []
3242
    for node, dskl in per_node_disks.items():
3243
      newl = [v[2].Copy() for v in dskl]
3244
      for dsk in newl:
3245
        self.cfg.SetDiskID(dsk, node)
3246
      result = self.rpc.call_blockdev_getsize(node, newl)
3247
      if result.fail_msg:
3248
        self.LogWarning("Failure in blockdev_getsize call to node"
3249
                        " %s, ignoring", node)
3250
        continue
3251
      if len(result.payload) != len(dskl):
3252
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
3253
                        " result.payload=%s", node, len(dskl), result.payload)
3254
        self.LogWarning("Invalid result from node %s, ignoring node results",
3255
                        node)
3256
        continue
3257
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
3258
        if size is None:
3259
          self.LogWarning("Disk %d of instance %s did not return size"
3260
                          " information, ignoring", idx, instance.name)
3261
          continue
3262
        if not isinstance(size, (int, long)):
3263
          self.LogWarning("Disk %d of instance %s did not return valid"
3264
                          " size information, ignoring", idx, instance.name)
3265
          continue
3266
        size = size >> 20
3267
        if size != disk.size:
3268
          self.LogInfo("Disk %d of instance %s has mismatched size,"
3269
                       " correcting: recorded %d, actual %d", idx,
3270
                       instance.name, disk.size, size)
3271
          disk.size = size
3272
          self.cfg.Update(instance, feedback_fn)
3273
          changed.append((instance.name, idx, size))
3274
        if self._EnsureChildSizes(disk):
3275
          self.cfg.Update(instance, feedback_fn)
3276
          changed.append((instance.name, idx, disk.size))
3277
    return changed
3278

    
3279

    
3280
class LUClusterRename(LogicalUnit):
3281
  """Rename the cluster.
3282

3283
  """
3284
  HPATH = "cluster-rename"
3285
  HTYPE = constants.HTYPE_CLUSTER
3286

    
3287
  def BuildHooksEnv(self):
3288
    """Build hooks env.
3289

3290
    """
3291
    return {
3292
      "OP_TARGET": self.cfg.GetClusterName(),
3293
      "NEW_NAME": self.op.name,
3294
      }
3295

    
3296
  def BuildHooksNodes(self):
3297
    """Build hooks nodes.
3298

3299
    """
3300
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
3301

    
3302
  def CheckPrereq(self):
3303
    """Verify that the passed name is a valid one.
3304

3305
    """
3306
    hostname = netutils.GetHostname(name=self.op.name,
3307
                                    family=self.cfg.GetPrimaryIPFamily())
3308

    
3309
    new_name = hostname.name
3310
    self.ip = new_ip = hostname.ip
3311
    old_name = self.cfg.GetClusterName()
3312
    old_ip = self.cfg.GetMasterIP()
3313
    if new_name == old_name and new_ip == old_ip:
3314
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
3315
                                 " cluster has changed",
3316
                                 errors.ECODE_INVAL)
3317
    if new_ip != old_ip:
3318
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
3319
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
3320
                                   " reachable on the network" %
3321
                                   new_ip, errors.ECODE_NOTUNIQUE)
3322

    
3323
    self.op.name = new_name
3324

    
3325
  def Exec(self, feedback_fn):
3326
    """Rename the cluster.
3327

3328
    """
3329
    clustername = self.op.name
3330
    new_ip = self.ip
3331

    
3332
    # shutdown the master IP
3333
    master_params = self.cfg.GetMasterNetworkParameters()
3334
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
3335
                                                     master_params.ip,
3336
                                                     master_params.netmask,
3337
                                                     master_params.netdev,
3338
                                                     master_params.ip_family)
3339
    result.Raise("Could not disable the master role")
3340

    
3341
    try:
3342
      cluster = self.cfg.GetClusterInfo()
3343
      cluster.cluster_name = clustername
3344
      cluster.master_ip = new_ip
3345
      self.cfg.Update(cluster, feedback_fn)
3346

    
3347
      # update the known hosts file
3348
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
3349
      node_list = self.cfg.GetOnlineNodeList()
3350
      try:
3351
        node_list.remove(master_params.name)
3352
      except ValueError:
3353
        pass
3354
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
3355
    finally:
3356
      result = self.rpc.call_node_activate_master_ip(master_params.name,
3357
                                                     new_ip,
3358
                                                     master_params.netmask,
3359
                                                     master_params.netdev,
3360
                                                     master_params.ip_family)
3361
      msg = result.fail_msg
3362
      if msg:
3363
        self.LogWarning("Could not re-enable the master role on"
3364
                        " the master, please restart manually: %s", msg)
3365

    
3366
    return clustername
3367

    
3368

    
3369
def _ValidateNetmask(cfg, netmask):
3370
  """Checks if a netmask is valid.
3371

3372
  @type cfg: L{config.ConfigWriter}
3373
  @param cfg: The cluster configuration
3374
  @type netmask: int
3375
  @param netmask: the netmask to be verified
3376
  @raise errors.OpPrereqError: if the validation fails
3377

3378
  """
3379
  ip_family = cfg.GetPrimaryIPFamily()
3380
  try:
3381
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
3382
  except errors.ProgrammerError:
3383
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
3384
                               ip_family)
3385
  if not ipcls.ValidateNetmask(netmask):
3386
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
3387
                                (netmask))
3388

    
3389

    
3390
class LUClusterSetParams(LogicalUnit):
3391
  """Change the parameters of the cluster.
3392

3393
  """
3394
  HPATH = "cluster-modify"
3395
  HTYPE = constants.HTYPE_CLUSTER
3396
  REQ_BGL = False
3397

    
3398
  def CheckArguments(self):
3399
    """Check parameters
3400

3401
    """
3402
    if self.op.uid_pool:
3403
      uidpool.CheckUidPool(self.op.uid_pool)
3404

    
3405
    if self.op.add_uids:
3406
      uidpool.CheckUidPool(self.op.add_uids)
3407

    
3408
    if self.op.remove_uids:
3409
      uidpool.CheckUidPool(self.op.remove_uids)
3410

    
3411
    if self.op.master_netmask is not None:
3412
      _ValidateNetmask(self.cfg, self.op.master_netmask)
3413

    
3414
  def ExpandNames(self):
3415
    # FIXME: in the future maybe other cluster params won't require checking on
3416
    # all nodes to be modified.
3417
    self.needed_locks = {
3418
      locking.LEVEL_NODE: locking.ALL_SET,
3419
    }
3420
    self.share_locks[locking.LEVEL_NODE] = 1
3421

    
3422
  def BuildHooksEnv(self):
3423
    """Build hooks env.
3424

3425
    """
3426
    return {
3427
      "OP_TARGET": self.cfg.GetClusterName(),
3428
      "NEW_VG_NAME": self.op.vg_name,
3429
      }
3430

    
3431
  def BuildHooksNodes(self):
3432
    """Build hooks nodes.
3433

3434
    """
3435
    mn = self.cfg.GetMasterNode()
3436
    return ([mn], [mn])
3437

    
3438
  def CheckPrereq(self):
3439
    """Check prerequisites.
3440

3441
    This checks whether the given params don't conflict and
3442
    if the given volume group is valid.
3443

3444
    """
3445
    if self.op.vg_name is not None and not self.op.vg_name:
3446
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
3447
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
3448
                                   " instances exist", errors.ECODE_INVAL)
3449

    
3450
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
3451
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
3452
        raise errors.OpPrereqError("Cannot disable drbd helper while"
3453
                                   " drbd-based instances exist",
3454
                                   errors.ECODE_INVAL)
3455

    
3456
    node_list = self.owned_locks(locking.LEVEL_NODE)
3457

    
3458
    # if vg_name not None, checks given volume group on all nodes
3459
    if self.op.vg_name:
3460
      vglist = self.rpc.call_vg_list(node_list)
3461
      for node in node_list:
3462
        msg = vglist[node].fail_msg
3463
        if msg:
3464
          # ignoring down node
3465
          self.LogWarning("Error while gathering data on node %s"
3466
                          " (ignoring node): %s", node, msg)
3467
          continue
3468
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
3469
                                              self.op.vg_name,
3470
                                              constants.MIN_VG_SIZE)
3471
        if vgstatus:
3472
          raise errors.OpPrereqError("Error on node '%s': %s" %
3473
                                     (node, vgstatus), errors.ECODE_ENVIRON)
3474

    
3475
    if self.op.drbd_helper:
3476
      # checks given drbd helper on all nodes
3477
      helpers = self.rpc.call_drbd_helper(node_list)
3478
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
3479
        if ninfo.offline:
3480
          self.LogInfo("Not checking drbd helper on offline node %s", node)
3481
          continue
3482
        msg = helpers[node].fail_msg
3483
        if msg:
3484
          raise errors.OpPrereqError("Error checking drbd helper on node"
3485
                                     " '%s': %s" % (node, msg),
3486
                                     errors.ECODE_ENVIRON)
3487
        node_helper = helpers[node].payload
3488
        if node_helper != self.op.drbd_helper:
3489
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
3490
                                     (node, node_helper), errors.ECODE_ENVIRON)
3491

    
3492
    self.cluster = cluster = self.cfg.GetClusterInfo()
3493
    # validate params changes
3494
    if self.op.beparams:
3495
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
3496
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
3497

    
3498
    if self.op.ndparams:
3499
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
3500
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
3501

    
3502
      # TODO: we need a more general way to handle resetting
3503
      # cluster-level parameters to default values
3504
      if self.new_ndparams["oob_program"] == "":
3505
        self.new_ndparams["oob_program"] = \
3506
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
3507

    
3508
    if self.op.nicparams:
3509
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
3510
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
3511
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
3512
      nic_errors = []
3513

    
3514
      # check all instances for consistency
3515
      for instance in self.cfg.GetAllInstancesInfo().values():
3516
        for nic_idx, nic in enumerate(instance.nics):
3517
          params_copy = copy.deepcopy(nic.nicparams)
3518
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
3519

    
3520
          # check parameter syntax
3521
          try:
3522
            objects.NIC.CheckParameterSyntax(params_filled)
3523
          except errors.ConfigurationError, err:
3524
            nic_errors.append("Instance %s, nic/%d: %s" %
3525
                              (instance.name, nic_idx, err))
3526

    
3527
          # if we're moving instances to routed, check that they have an ip
3528
          target_mode = params_filled[constants.NIC_MODE]
3529
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
3530
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
3531
                              " address" % (instance.name, nic_idx))
3532
      if nic_errors:
3533
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
3534
                                   "\n".join(nic_errors))
3535

    
3536
    # hypervisor list/parameters
3537
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
3538
    if self.op.hvparams:
3539
      for hv_name, hv_dict in self.op.hvparams.items():
3540
        if hv_name not in self.new_hvparams:
3541
          self.new_hvparams[hv_name] = hv_dict
3542
        else:
3543
          self.new_hvparams[hv_name].update(hv_dict)
3544

    
3545
    # os hypervisor parameters
3546
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
3547
    if self.op.os_hvp:
3548
      for os_name, hvs in self.op.os_hvp.items():
3549
        if os_name not in self.new_os_hvp:
3550
          self.new_os_hvp[os_name] = hvs
3551
        else:
3552
          for hv_name, hv_dict in hvs.items():
3553
            if hv_name not in self.new_os_hvp[os_name]:
3554
              self.new_os_hvp[os_name][hv_name] = hv_dict
3555
            else:
3556
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
3557

    
3558
    # os parameters
3559
    self.new_osp = objects.FillDict(cluster.osparams, {})
3560
    if self.op.osparams:
3561
      for os_name, osp in self.op.osparams.items():
3562
        if os_name not in self.new_osp:
3563
          self.new_osp[os_name] = {}
3564

    
3565
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
3566
                                                  use_none=True)
3567

    
3568
        if not self.new_osp[os_name]:
3569
          # we removed all parameters
3570
          del self.new_osp[os_name]
3571
        else:
3572
          # check the parameter validity (remote check)
3573
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
3574
                         os_name, self.new_osp[os_name])
3575

    
3576
    # changes to the hypervisor list
3577
    if self.op.enabled_hypervisors is not None:
3578
      self.hv_list = self.op.enabled_hypervisors
3579
      for hv in self.hv_list:
3580
        # if the hypervisor doesn't already exist in the cluster
3581
        # hvparams, we initialize it to empty, and then (in both
3582
        # cases) we make sure to fill the defaults, as we might not
3583
        # have a complete defaults list if the hypervisor wasn't
3584
        # enabled before
3585
        if hv not in new_hvp:
3586
          new_hvp[hv] = {}
3587
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
3588
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
3589
    else:
3590
      self.hv_list = cluster.enabled_hypervisors
3591

    
3592
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
3593
      # either the enabled list has changed, or the parameters have, validate
3594
      for hv_name, hv_params in self.new_hvparams.items():
3595
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
3596
            (self.op.enabled_hypervisors and
3597
             hv_name in self.op.enabled_hypervisors)):
3598
          # either this is a new hypervisor, or its parameters have changed
3599
          hv_class = hypervisor.GetHypervisor(hv_name)
3600
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3601
          hv_class.CheckParameterSyntax(hv_params)
3602
          _CheckHVParams(self, node_list, hv_name, hv_params)
3603

    
3604
    if self.op.os_hvp:
3605
      # no need to check any newly-enabled hypervisors, since the
3606
      # defaults have already been checked in the above code-block
3607
      for os_name, os_hvp in self.new_os_hvp.items():
3608
        for hv_name, hv_params in os_hvp.items():
3609
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3610
          # we need to fill in the new os_hvp on top of the actual hv_p
3611
          cluster_defaults = self.new_hvparams.get(hv_name, {})
3612
          new_osp = objects.FillDict(cluster_defaults, hv_params)
3613
          hv_class = hypervisor.GetHypervisor(hv_name)
3614
          hv_class.CheckParameterSyntax(new_osp)
3615
          _CheckHVParams(self, node_list, hv_name, new_osp)
3616

    
3617
    if self.op.default_iallocator:
3618
      alloc_script = utils.FindFile(self.op.default_iallocator,
3619
                                    constants.IALLOCATOR_SEARCH_PATH,
3620
                                    os.path.isfile)
3621
      if alloc_script is None:
3622
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
3623
                                   " specified" % self.op.default_iallocator,
3624
                                   errors.ECODE_INVAL)
3625

    
3626
  def Exec(self, feedback_fn):
3627
    """Change the parameters of the cluster.
3628

3629
    """
3630
    if self.op.vg_name is not None:
3631
      new_volume = self.op.vg_name
3632
      if not new_volume:
3633
        new_volume = None
3634
      if new_volume != self.cfg.GetVGName():
3635
        self.cfg.SetVGName(new_volume)
3636
      else:
3637
        feedback_fn("Cluster LVM configuration already in desired"
3638
                    " state, not changing")
3639
    if self.op.drbd_helper is not None:
3640
      new_helper = self.op.drbd_helper
3641
      if not new_helper:
3642
        new_helper = None
3643
      if new_helper != self.cfg.GetDRBDHelper():
3644
        self.cfg.SetDRBDHelper(new_helper)
3645
      else:
3646
        feedback_fn("Cluster DRBD helper already in desired state,"
3647
                    " not changing")
3648
    if self.op.hvparams:
3649
      self.cluster.hvparams = self.new_hvparams
3650
    if self.op.os_hvp:
3651
      self.cluster.os_hvp = self.new_os_hvp
3652
    if self.op.enabled_hypervisors is not None:
3653
      self.cluster.hvparams = self.new_hvparams
3654
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
3655
    if self.op.beparams:
3656
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
3657
    if self.op.nicparams:
3658
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
3659
    if self.op.osparams:
3660
      self.cluster.osparams = self.new_osp
3661
    if self.op.ndparams:
3662
      self.cluster.ndparams = self.new_ndparams
3663

    
3664
    if self.op.candidate_pool_size is not None:
3665
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3666
      # we need to update the pool size here, otherwise the save will fail
3667
      _AdjustCandidatePool(self, [])
3668

    
3669
    if self.op.maintain_node_health is not None:
3670
      self.cluster.maintain_node_health = self.op.maintain_node_health
3671

    
3672
    if self.op.prealloc_wipe_disks is not None:
3673
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3674

    
3675
    if self.op.add_uids is not None:
3676
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3677

    
3678
    if self.op.remove_uids is not None:
3679
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3680

    
3681
    if self.op.uid_pool is not None:
3682
      self.cluster.uid_pool = self.op.uid_pool
3683

    
3684
    if self.op.default_iallocator is not None:
3685
      self.cluster.default_iallocator = self.op.default_iallocator
3686

    
3687
    if self.op.reserved_lvs is not None:
3688
      self.cluster.reserved_lvs = self.op.reserved_lvs
3689

    
3690
    def helper_os(aname, mods, desc):
3691
      desc += " OS list"
3692
      lst = getattr(self.cluster, aname)
3693
      for key, val in mods:
3694
        if key == constants.DDM_ADD:
3695
          if val in lst:
3696
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3697
          else:
3698
            lst.append(val)
3699
        elif key == constants.DDM_REMOVE:
3700
          if val in lst:
3701
            lst.remove(val)
3702
          else:
3703
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3704
        else:
3705
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3706

    
3707
    if self.op.hidden_os:
3708
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3709

    
3710
    if self.op.blacklisted_os:
3711
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3712

    
3713
    if self.op.master_netdev:
3714
      master_params = self.cfg.GetMasterNetworkParameters()
3715
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3716
                  self.cluster.master_netdev)
3717
      result = self.rpc.call_node_deactivate_master_ip(master_params.name,
3718
                                                       master_params.ip,
3719
                                                       master_params.netmask,
3720
                                                       master_params.netdev,
3721
                                                       master_params.ip_family)
3722
      result.Raise("Could not disable the master ip")
3723
      feedback_fn("Changing master_netdev from %s to %s" %
3724
                  (master_params.netdev, self.op.master_netdev))
3725
      self.cluster.master_netdev = self.op.master_netdev
3726

    
3727
    if self.op.master_netmask:
3728
      master_params = self.cfg.GetMasterNetworkParameters()
3729
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
3730
      result = self.rpc.call_node_change_master_netmask(master_params.name,
3731
                                                        master_params.netmask,
3732
                                                        self.op.master_netmask,
3733
                                                        master_params.ip,
3734
                                                        master_params.netdev)
3735
      if result.fail_msg:
3736
        msg = "Could not change the master IP netmask: %s" % result.fail_msg
3737
        self.LogWarning(msg)
3738
        feedback_fn(msg)
3739
      else:
3740
        self.cluster.master_netmask = self.op.master_netmask
3741

    
3742
    self.cfg.Update(self.cluster, feedback_fn)
3743

    
3744
    if self.op.master_netdev:
3745
      master_params = self.cfg.GetMasterNetworkParameters()
3746
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3747
                  self.op.master_netdev)
3748
      result = self.rpc.call_node_activate_master_ip(master_params.name,
3749
                                                     master_params.ip,
3750
                                                     master_params.netmask,
3751
                                                     master_params.netdev,
3752
                                                     master_params.ip_family)
3753
      if result.fail_msg:
3754
        self.LogWarning("Could not re-enable the master ip on"
3755
                        " the master, please restart manually: %s",
3756
                        result.fail_msg)
3757

    
3758

    
3759
def _UploadHelper(lu, nodes, fname):
3760
  """Helper for uploading a file and showing warnings.
3761

3762
  """
3763
  if os.path.exists(fname):
3764
    result = lu.rpc.call_upload_file(nodes, fname)
3765
    for to_node, to_result in result.items():
3766
      msg = to_result.fail_msg
3767
      if msg:
3768
        msg = ("Copy of file %s to node %s failed: %s" %
3769
               (fname, to_node, msg))
3770
        lu.proc.LogWarning(msg)
3771

    
3772

    
3773
def _ComputeAncillaryFiles(cluster, redist):
3774
  """Compute files external to Ganeti which need to be consistent.
3775

3776
  @type redist: boolean
3777
  @param redist: Whether to include files which need to be redistributed
3778

3779
  """
3780
  # Compute files for all nodes
3781
  files_all = set([
3782
    constants.SSH_KNOWN_HOSTS_FILE,
3783
    constants.CONFD_HMAC_KEY,
3784
    constants.CLUSTER_DOMAIN_SECRET_FILE,
3785
    constants.SPICE_CERT_FILE,
3786
    constants.SPICE_CACERT_FILE,
3787
    constants.RAPI_USERS_FILE,
3788
    ])
3789

    
3790
  if not redist:
3791
    files_all.update(constants.ALL_CERT_FILES)
3792
    files_all.update(ssconf.SimpleStore().GetFileList())
3793
  else:
3794
    # we need to ship at least the RAPI certificate
3795
    files_all.add(constants.RAPI_CERT_FILE)
3796

    
3797
  if cluster.modify_etc_hosts:
3798
    files_all.add(constants.ETC_HOSTS)
3799

    
3800
  # Files which are optional, these must:
3801
  # - be present in one other category as well
3802
  # - either exist or not exist on all nodes of that category (mc, vm all)
3803
  files_opt = set([
3804
    constants.RAPI_USERS_FILE,
3805
    ])
3806

    
3807
  # Files which should only be on master candidates
3808
  files_mc = set()
3809
  if not redist:
3810
    files_mc.add(constants.CLUSTER_CONF_FILE)
3811

    
3812
  # Files which should only be on VM-capable nodes
3813
  files_vm = set(filename
3814
    for hv_name in cluster.enabled_hypervisors
3815
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0])
3816

    
3817
  files_opt |= set(filename
3818
    for hv_name in cluster.enabled_hypervisors
3819
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1])
3820

    
3821
  # Filenames in each category must be unique
3822
  all_files_set = files_all | files_mc | files_vm
3823
  assert (len(all_files_set) ==
3824
          sum(map(len, [files_all, files_mc, files_vm]))), \
3825
         "Found file listed in more than one file list"
3826

    
3827
  # Optional files must be present in one other category
3828
  assert all_files_set.issuperset(files_opt), \
3829
         "Optional file not in a different required list"
3830

    
3831
  return (files_all, files_opt, files_mc, files_vm)
3832

    
3833

    
3834
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3835
  """Distribute additional files which are part of the cluster configuration.
3836

3837
  ConfigWriter takes care of distributing the config and ssconf files, but
3838
  there are more files which should be distributed to all nodes. This function
3839
  makes sure those are copied.
3840

3841
  @param lu: calling logical unit
3842
  @param additional_nodes: list of nodes not in the config to distribute to
3843
  @type additional_vm: boolean
3844
  @param additional_vm: whether the additional nodes are vm-capable or not
3845

3846
  """
3847
  # Gather target nodes
3848
  cluster = lu.cfg.GetClusterInfo()
3849
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3850

    
3851
  online_nodes = lu.cfg.GetOnlineNodeList()
3852
  vm_nodes = lu.cfg.GetVmCapableNodeList()
3853

    
3854
  if additional_nodes is not None:
3855
    online_nodes.extend(additional_nodes)
3856
    if additional_vm:
3857
      vm_nodes.extend(additional_nodes)
3858

    
3859
  # Never distribute to master node
3860
  for nodelist in [online_nodes, vm_nodes]:
3861
    if master_info.name in nodelist:
3862
      nodelist.remove(master_info.name)
3863

    
3864
  # Gather file lists
3865
  (files_all, _, files_mc, files_vm) = \
3866
    _ComputeAncillaryFiles(cluster, True)
3867

    
3868
  # Never re-distribute configuration file from here
3869
  assert not (constants.CLUSTER_CONF_FILE in files_all or
3870
              constants.CLUSTER_CONF_FILE in files_vm)
3871
  assert not files_mc, "Master candidates not handled in this function"
3872

    
3873
  filemap = [
3874
    (online_nodes, files_all),
3875
    (vm_nodes, files_vm),
3876
    ]
3877

    
3878
  # Upload the files
3879
  for (node_list, files) in filemap:
3880
    for fname in files:
3881
      _UploadHelper(lu, node_list, fname)
3882

    
3883

    
3884
class LUClusterRedistConf(NoHooksLU):
3885
  """Force the redistribution of cluster configuration.
3886

3887
  This is a very simple LU.
3888

3889
  """
3890
  REQ_BGL = False
3891

    
3892
  def ExpandNames(self):
3893
    self.needed_locks = {
3894
      locking.LEVEL_NODE: locking.ALL_SET,
3895
    }
3896
    self.share_locks[locking.LEVEL_NODE] = 1
3897

    
3898
  def Exec(self, feedback_fn):
3899
    """Redistribute the configuration.
3900

3901
    """
3902
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3903
    _RedistributeAncillaryFiles(self)
3904

    
3905

    
3906
class LUClusterActivateMasterIp(NoHooksLU):
3907
  """Activate the master IP on the master node.
3908

3909
  """
3910
  def Exec(self, feedback_fn):
3911
    """Activate the master IP.
3912

3913
    """
3914
    master_params = self.cfg.GetMasterNetworkParameters()
3915
    self.rpc.call_node_activate_master_ip(master_params.name,
3916
                                          master_params.ip,
3917
                                          master_params.netmask,
3918
                                          master_params.netdev,
3919
                                          master_params.ip_family)
3920

    
3921

    
3922
class LUClusterDeactivateMasterIp(NoHooksLU):
3923
  """Deactivate the master IP on the master node.
3924

3925
  """
3926
  def Exec(self, feedback_fn):
3927
    """Deactivate the master IP.
3928

3929
    """
3930
    master_params = self.cfg.GetMasterNetworkParameters()
3931
    self.rpc.call_node_deactivate_master_ip(master_params.name,
3932
                                            master_params.ip,
3933
                                            master_params.netmask,
3934
                                            master_params.netdev,
3935
                                            master_params.ip_family)
3936

    
3937

    
3938
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3939
  """Sleep and poll for an instance's disk to sync.
3940

3941
  """
3942
  if not instance.disks or disks is not None and not disks:
3943
    return True
3944

    
3945
  disks = _ExpandCheckDisks(instance, disks)
3946

    
3947
  if not oneshot:
3948
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3949

    
3950
  node = instance.primary_node
3951

    
3952
  for dev in disks:
3953
    lu.cfg.SetDiskID(dev, node)
3954

    
3955
  # TODO: Convert to utils.Retry
3956

    
3957
  retries = 0
3958
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3959
  while True:
3960
    max_time = 0
3961
    done = True
3962
    cumul_degraded = False
3963
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3964
    msg = rstats.fail_msg
3965
    if msg:
3966
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3967
      retries += 1
3968
      if retries >= 10:
3969
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3970
                                 " aborting." % node)
3971
      time.sleep(6)
3972
      continue
3973
    rstats = rstats.payload
3974
    retries = 0
3975
    for i, mstat in enumerate(rstats):
3976
      if mstat is None:
3977
        lu.LogWarning("Can't compute data for node %s/%s",
3978
                           node, disks[i].iv_name)
3979
        continue
3980

    
3981
      cumul_degraded = (cumul_degraded or
3982
                        (mstat.is_degraded and mstat.sync_percent is None))
3983
      if mstat.sync_percent is not None:
3984
        done = False
3985
        if mstat.estimated_time is not None:
3986
          rem_time = ("%s remaining (estimated)" %
3987
                      utils.FormatSeconds(mstat.estimated_time))
3988
          max_time = mstat.estimated_time
3989
        else:
3990
          rem_time = "no time estimate"
3991
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
3992
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
3993

    
3994
    # if we're done but degraded, let's do a few small retries, to
3995
    # make sure we see a stable and not transient situation; therefore
3996
    # we force restart of the loop
3997
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3998
      logging.info("Degraded disks found, %d retries left", degr_retries)
3999
      degr_retries -= 1
4000
      time.sleep(1)
4001
      continue
4002

    
4003
    if done or oneshot:
4004
      break
4005

    
4006
    time.sleep(min(60, max_time))
4007

    
4008
  if done:
4009
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
4010
  return not cumul_degraded
4011

    
4012

    
4013
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
4014
  """Check that mirrors are not degraded.
4015

4016
  The ldisk parameter, if True, will change the test from the
4017
  is_degraded attribute (which represents overall non-ok status for
4018
  the device(s)) to the ldisk (representing the local storage status).
4019

4020
  """
4021
  lu.cfg.SetDiskID(dev, node)
4022

    
4023
  result = True
4024

    
4025
  if on_primary or dev.AssembleOnSecondary():
4026
    rstats = lu.rpc.call_blockdev_find(node, dev)
4027
    msg = rstats.fail_msg
4028
    if msg:
4029
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
4030
      result = False
4031
    elif not rstats.payload:
4032
      lu.LogWarning("Can't find disk on node %s", node)
4033
      result = False
4034
    else:
4035
      if ldisk:
4036
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
4037
      else:
4038
        result = result and not rstats.payload.is_degraded
4039

    
4040
  if dev.children:
4041
    for child in dev.children:
4042
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
4043

    
4044
  return result
4045

    
4046

    
4047
class LUOobCommand(NoHooksLU):
4048
  """Logical unit for OOB handling.
4049

4050
  """
4051
  REG_BGL = False
4052
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
4053

    
4054
  def ExpandNames(self):
4055
    """Gather locks we need.
4056

4057
    """
4058
    if self.op.node_names:
4059
      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
4060
      lock_names = self.op.node_names
4061
    else:
4062
      lock_names = locking.ALL_SET
4063

    
4064
    self.needed_locks = {
4065
      locking.LEVEL_NODE: lock_names,
4066
      }
4067

    
4068
  def CheckPrereq(self):
4069
    """Check prerequisites.
4070

4071
    This checks:
4072
     - the node exists in the configuration
4073
     - OOB is supported
4074

4075
    Any errors are signaled by raising errors.OpPrereqError.
4076

4077
    """
4078
    self.nodes = []
4079
    self.master_node = self.cfg.GetMasterNode()
4080

    
4081
    assert self.op.power_delay >= 0.0
4082

    
4083
    if self.op.node_names:
4084
      if (self.op.command in self._SKIP_MASTER and
4085
          self.master_node in self.op.node_names):
4086
        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
4087
        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
4088

    
4089
        if master_oob_handler:
4090
          additional_text = ("run '%s %s %s' if you want to operate on the"
4091
                             " master regardless") % (master_oob_handler,
4092
                                                      self.op.command,
4093
                                                      self.master_node)
4094
        else:
4095
          additional_text = "it does not support out-of-band operations"
4096

    
4097
        raise errors.OpPrereqError(("Operating on the master node %s is not"
4098
                                    " allowed for %s; %s") %
4099
                                   (self.master_node, se