Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 33be7576

History | View | Annotate | Download (483.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay too many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42
import itertools
43
import operator
44

    
45
from ganeti import ssh
46
from ganeti import utils
47
from ganeti import errors
48
from ganeti import hypervisor
49
from ganeti import locking
50
from ganeti import constants
51
from ganeti import objects
52
from ganeti import serializer
53
from ganeti import ssconf
54
from ganeti import uidpool
55
from ganeti import compat
56
from ganeti import masterd
57
from ganeti import netutils
58
from ganeti import query
59
from ganeti import qlang
60
from ganeti import opcodes
61
from ganeti import ht
62
from ganeti import rpc
63

    
64
import ganeti.masterd.instance # pylint: disable=W0611
65

    
66

    
67
#: Size of DRBD meta block device
68
DRBD_META_SIZE = 128
69

    
70

    
71
class ResultWithJobs:
72
  """Data container for LU results with jobs.
73

74
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
75
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
76
  contained in the C{jobs} attribute and include the job IDs in the opcode
77
  result.
78

79
  """
80
  def __init__(self, jobs, **kwargs):
81
    """Initializes this class.
82

83
    Additional return values can be specified as keyword arguments.
84

85
    @type jobs: list of lists of L{opcode.OpCode}
86
    @param jobs: A list of lists of opcode objects
87

88
    """
89
    self.jobs = jobs
90
    self.other = kwargs
91

    
92

    
93
class LogicalUnit(object):
94
  """Logical Unit base class.
95

96
  Subclasses must follow these rules:
97
    - implement ExpandNames
98
    - implement CheckPrereq (except when tasklets are used)
99
    - implement Exec (except when tasklets are used)
100
    - implement BuildHooksEnv
101
    - implement BuildHooksNodes
102
    - redefine HPATH and HTYPE
103
    - optionally redefine their run requirements:
104
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
105

106
  Note that all commands require root permissions.
107

108
  @ivar dry_run_result: the value (if any) that will be returned to the caller
109
      in dry-run mode (signalled by opcode dry_run parameter)
110

111
  """
112
  HPATH = None
113
  HTYPE = None
114
  REQ_BGL = True
115

    
116
  def __init__(self, processor, op, context, rpc_runner):
117
    """Constructor for LogicalUnit.
118

119
    This needs to be overridden in derived classes in order to check op
120
    validity.
121

122
    """
123
    self.proc = processor
124
    self.op = op
125
    self.cfg = context.cfg
126
    self.glm = context.glm
127
    # readability alias
128
    self.owned_locks = context.glm.list_owned
129
    self.context = context
130
    self.rpc = rpc_runner
131
    # Dicts used to declare locking needs to mcpu
132
    self.needed_locks = None
133
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
134
    self.add_locks = {}
135
    self.remove_locks = {}
136
    # Used to force good behavior when calling helper functions
137
    self.recalculate_locks = {}
138
    # logging
139
    self.Log = processor.Log # pylint: disable=C0103
140
    self.LogWarning = processor.LogWarning # pylint: disable=C0103
141
    self.LogInfo = processor.LogInfo # pylint: disable=C0103
142
    self.LogStep = processor.LogStep # pylint: disable=C0103
143
    # support for dry-run
144
    self.dry_run_result = None
145
    # support for generic debug attribute
146
    if (not hasattr(self.op, "debug_level") or
147
        not isinstance(self.op.debug_level, int)):
148
      self.op.debug_level = 0
149

    
150
    # Tasklets
151
    self.tasklets = None
152

    
153
    # Validate opcode parameters and set defaults
154
    self.op.Validate(True)
155

    
156
    self.CheckArguments()
157

    
158
  def CheckArguments(self):
159
    """Check syntactic validity for the opcode arguments.
160

161
    This method is for doing a simple syntactic check and ensure
162
    validity of opcode parameters, without any cluster-related
163
    checks. While the same can be accomplished in ExpandNames and/or
164
    CheckPrereq, doing these separate is better because:
165

166
      - ExpandNames is left as as purely a lock-related function
167
      - CheckPrereq is run after we have acquired locks (and possible
168
        waited for them)
169

170
    The function is allowed to change the self.op attribute so that
171
    later methods can no longer worry about missing parameters.
172

173
    """
174
    pass
175

    
176
  def ExpandNames(self):
177
    """Expand names for this LU.
178

179
    This method is called before starting to execute the opcode, and it should
180
    update all the parameters of the opcode to their canonical form (e.g. a
181
    short node name must be fully expanded after this method has successfully
182
    completed). This way locking, hooks, logging, etc. can work correctly.
183

184
    LUs which implement this method must also populate the self.needed_locks
185
    member, as a dict with lock levels as keys, and a list of needed lock names
186
    as values. Rules:
187

188
      - use an empty dict if you don't need any lock
189
      - if you don't need any lock at a particular level omit that level
190
      - don't put anything for the BGL level
191
      - if you want all locks at a level use locking.ALL_SET as a value
192

193
    If you need to share locks (rather than acquire them exclusively) at one
194
    level you can modify self.share_locks, setting a true value (usually 1) for
195
    that level. By default locks are not shared.
196

197
    This function can also define a list of tasklets, which then will be
198
    executed in order instead of the usual LU-level CheckPrereq and Exec
199
    functions, if those are not defined by the LU.
200

201
    Examples::
202

203
      # Acquire all nodes and one instance
204
      self.needed_locks = {
205
        locking.LEVEL_NODE: locking.ALL_SET,
206
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
207
      }
208
      # Acquire just two nodes
209
      self.needed_locks = {
210
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
211
      }
212
      # Acquire no locks
213
      self.needed_locks = {} # No, you can't leave it to the default value None
214

215
    """
216
    # The implementation of this method is mandatory only if the new LU is
217
    # concurrent, so that old LUs don't need to be changed all at the same
218
    # time.
219
    if self.REQ_BGL:
220
      self.needed_locks = {} # Exclusive LUs don't need locks.
221
    else:
222
      raise NotImplementedError
223

    
224
  def DeclareLocks(self, level):
225
    """Declare LU locking needs for a level
226

227
    While most LUs can just declare their locking needs at ExpandNames time,
228
    sometimes there's the need to calculate some locks after having acquired
229
    the ones before. This function is called just before acquiring locks at a
230
    particular level, but after acquiring the ones at lower levels, and permits
231
    such calculations. It can be used to modify self.needed_locks, and by
232
    default it does nothing.
233

234
    This function is only called if you have something already set in
235
    self.needed_locks for the level.
236

237
    @param level: Locking level which is going to be locked
238
    @type level: member of ganeti.locking.LEVELS
239

240
    """
241

    
242
  def CheckPrereq(self):
243
    """Check prerequisites for this LU.
244

245
    This method should check that the prerequisites for the execution
246
    of this LU are fulfilled. It can do internode communication, but
247
    it should be idempotent - no cluster or system changes are
248
    allowed.
249

250
    The method should raise errors.OpPrereqError in case something is
251
    not fulfilled. Its return value is ignored.
252

253
    This method should also update all the parameters of the opcode to
254
    their canonical form if it hasn't been done by ExpandNames before.
255

256
    """
257
    if self.tasklets is not None:
258
      for (idx, tl) in enumerate(self.tasklets):
259
        logging.debug("Checking prerequisites for tasklet %s/%s",
260
                      idx + 1, len(self.tasklets))
261
        tl.CheckPrereq()
262
    else:
263
      pass
264

    
265
  def Exec(self, feedback_fn):
266
    """Execute the LU.
267

268
    This method should implement the actual work. It should raise
269
    errors.OpExecError for failures that are somewhat dealt with in
270
    code, or expected.
271

272
    """
273
    if self.tasklets is not None:
274
      for (idx, tl) in enumerate(self.tasklets):
275
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
276
        tl.Exec(feedback_fn)
277
    else:
278
      raise NotImplementedError
279

    
280
  def BuildHooksEnv(self):
281
    """Build hooks environment for this LU.
282

283
    @rtype: dict
284
    @return: Dictionary containing the environment that will be used for
285
      running the hooks for this LU. The keys of the dict must not be prefixed
286
      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
287
      will extend the environment with additional variables. If no environment
288
      should be defined, an empty dictionary should be returned (not C{None}).
289
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
290
      will not be called.
291

292
    """
293
    raise NotImplementedError
294

    
295
  def BuildHooksNodes(self):
296
    """Build list of nodes to run LU's hooks.
297

298
    @rtype: tuple; (list, list)
299
    @return: Tuple containing a list of node names on which the hook
300
      should run before the execution and a list of node names on which the
301
      hook should run after the execution. No nodes should be returned as an
302
      empty list (and not None).
303
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
304
      will not be called.
305

306
    """
307
    raise NotImplementedError
308

    
309
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
310
    """Notify the LU about the results of its hooks.
311

312
    This method is called every time a hooks phase is executed, and notifies
313
    the Logical Unit about the hooks' result. The LU can then use it to alter
314
    its result based on the hooks.  By default the method does nothing and the
315
    previous result is passed back unchanged but any LU can define it if it
316
    wants to use the local cluster hook-scripts somehow.
317

318
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
319
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
320
    @param hook_results: the results of the multi-node hooks rpc call
321
    @param feedback_fn: function used send feedback back to the caller
322
    @param lu_result: the previous Exec result this LU had, or None
323
        in the PRE phase
324
    @return: the new Exec result, based on the previous result
325
        and hook results
326

327
    """
328
    # API must be kept, thus we ignore the unused argument and could
329
    # be a function warnings
330
    # pylint: disable=W0613,R0201
331
    return lu_result
332

    
333
  def _ExpandAndLockInstance(self):
334
    """Helper function to expand and lock an instance.
335

336
    Many LUs that work on an instance take its name in self.op.instance_name
337
    and need to expand it and then declare the expanded name for locking. This
338
    function does it, and then updates self.op.instance_name to the expanded
339
    name. It also initializes needed_locks as a dict, if this hasn't been done
340
    before.
341

342
    """
343
    if self.needed_locks is None:
344
      self.needed_locks = {}
345
    else:
346
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
347
        "_ExpandAndLockInstance called with instance-level locks set"
348
    self.op.instance_name = _ExpandInstanceName(self.cfg,
349
                                                self.op.instance_name)
350
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
351

    
352
  def _LockInstancesNodes(self, primary_only=False,
353
                          level=locking.LEVEL_NODE):
354
    """Helper function to declare instances' nodes for locking.
355

356
    This function should be called after locking one or more instances to lock
357
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
358
    with all primary or secondary nodes for instances already locked and
359
    present in self.needed_locks[locking.LEVEL_INSTANCE].
360

361
    It should be called from DeclareLocks, and for safety only works if
362
    self.recalculate_locks[locking.LEVEL_NODE] is set.
363

364
    In the future it may grow parameters to just lock some instance's nodes, or
365
    to just lock primaries or secondary nodes, if needed.
366

367
    If should be called in DeclareLocks in a way similar to::
368

369
      if level == locking.LEVEL_NODE:
370
        self._LockInstancesNodes()
371

372
    @type primary_only: boolean
373
    @param primary_only: only lock primary nodes of locked instances
374
    @param level: Which lock level to use for locking nodes
375

376
    """
377
    assert level in self.recalculate_locks, \
378
      "_LockInstancesNodes helper function called with no nodes to recalculate"
379

    
380
    # TODO: check if we're really been called with the instance locks held
381

    
382
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
383
    # future we might want to have different behaviors depending on the value
384
    # of self.recalculate_locks[locking.LEVEL_NODE]
385
    wanted_nodes = []
386
    locked_i = self.owned_locks(locking.LEVEL_INSTANCE)
387
    for _, instance in self.cfg.GetMultiInstanceInfo(locked_i):
388
      wanted_nodes.append(instance.primary_node)
389
      if not primary_only:
390
        wanted_nodes.extend(instance.secondary_nodes)
391

    
392
    if self.recalculate_locks[level] == constants.LOCKS_REPLACE:
393
      self.needed_locks[level] = wanted_nodes
394
    elif self.recalculate_locks[level] == constants.LOCKS_APPEND:
395
      self.needed_locks[level].extend(wanted_nodes)
396
    else:
397
      raise errors.ProgrammerError("Unknown recalculation mode")
398

    
399
    del self.recalculate_locks[level]
400

    
401

    
402
class NoHooksLU(LogicalUnit): # pylint: disable=W0223
403
  """Simple LU which runs no hooks.
404

405
  This LU is intended as a parent for other LogicalUnits which will
406
  run no hooks, in order to reduce duplicate code.
407

408
  """
409
  HPATH = None
410
  HTYPE = None
411

    
412
  def BuildHooksEnv(self):
413
    """Empty BuildHooksEnv for NoHooksLu.
414

415
    This just raises an error.
416

417
    """
418
    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
419

    
420
  def BuildHooksNodes(self):
421
    """Empty BuildHooksNodes for NoHooksLU.
422

423
    """
424
    raise AssertionError("BuildHooksNodes called for NoHooksLU")
425

    
426

    
427
class Tasklet:
428
  """Tasklet base class.
429

430
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
431
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
432
  tasklets know nothing about locks.
433

434
  Subclasses must follow these rules:
435
    - Implement CheckPrereq
436
    - Implement Exec
437

438
  """
439
  def __init__(self, lu):
440
    self.lu = lu
441

    
442
    # Shortcuts
443
    self.cfg = lu.cfg
444
    self.rpc = lu.rpc
445

    
446
  def CheckPrereq(self):
447
    """Check prerequisites for this tasklets.
448

449
    This method should check whether the prerequisites for the execution of
450
    this tasklet are fulfilled. It can do internode communication, but it
451
    should be idempotent - no cluster or system changes are allowed.
452

453
    The method should raise errors.OpPrereqError in case something is not
454
    fulfilled. Its return value is ignored.
455

456
    This method should also update all parameters to their canonical form if it
457
    hasn't been done before.
458

459
    """
460
    pass
461

    
462
  def Exec(self, feedback_fn):
463
    """Execute the tasklet.
464

465
    This method should implement the actual work. It should raise
466
    errors.OpExecError for failures that are somewhat dealt with in code, or
467
    expected.
468

469
    """
470
    raise NotImplementedError
471

    
472

    
473
class _QueryBase:
474
  """Base for query utility classes.
475

476
  """
477
  #: Attribute holding field definitions
478
  FIELDS = None
479

    
480
  def __init__(self, qfilter, fields, use_locking):
481
    """Initializes this class.
482

483
    """
484
    self.use_locking = use_locking
485

    
486
    self.query = query.Query(self.FIELDS, fields, qfilter=qfilter,
487
                             namefield="name")
488
    self.requested_data = self.query.RequestedData()
489
    self.names = self.query.RequestedNames()
490

    
491
    # Sort only if no names were requested
492
    self.sort_by_name = not self.names
493

    
494
    self.do_locking = None
495
    self.wanted = None
496

    
497
  def _GetNames(self, lu, all_names, lock_level):
498
    """Helper function to determine names asked for in the query.
499

500
    """
501
    if self.do_locking:
502
      names = lu.owned_locks(lock_level)
503
    else:
504
      names = all_names
505

    
506
    if self.wanted == locking.ALL_SET:
507
      assert not self.names
508
      # caller didn't specify names, so ordering is not important
509
      return utils.NiceSort(names)
510

    
511
    # caller specified names and we must keep the same order
512
    assert self.names
513
    assert not self.do_locking or lu.glm.is_owned(lock_level)
514

    
515
    missing = set(self.wanted).difference(names)
516
    if missing:
517
      raise errors.OpExecError("Some items were removed before retrieving"
518
                               " their data: %s" % missing)
519

    
520
    # Return expanded names
521
    return self.wanted
522

    
523
  def ExpandNames(self, lu):
524
    """Expand names for this query.
525

526
    See L{LogicalUnit.ExpandNames}.
527

528
    """
529
    raise NotImplementedError()
530

    
531
  def DeclareLocks(self, lu, level):
532
    """Declare locks for this query.
533

534
    See L{LogicalUnit.DeclareLocks}.
535

536
    """
537
    raise NotImplementedError()
538

    
539
  def _GetQueryData(self, lu):
540
    """Collects all data for this query.
541

542
    @return: Query data object
543

544
    """
545
    raise NotImplementedError()
546

    
547
  def NewStyleQuery(self, lu):
548
    """Collect data and execute query.
549

550
    """
551
    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
552
                                  sort_by_name=self.sort_by_name)
553

    
554
  def OldStyleQuery(self, lu):
555
    """Collect data and execute query.
556

557
    """
558
    return self.query.OldStyleQuery(self._GetQueryData(lu),
559
                                    sort_by_name=self.sort_by_name)
560

    
561

    
562
def _ShareAll():
563
  """Returns a dict declaring all lock levels shared.
564

565
  """
566
  return dict.fromkeys(locking.LEVELS, 1)
567

    
568

    
569
def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
570
  """Checks if the owned node groups are still correct for an instance.
571

572
  @type cfg: L{config.ConfigWriter}
573
  @param cfg: The cluster configuration
574
  @type instance_name: string
575
  @param instance_name: Instance name
576
  @type owned_groups: set or frozenset
577
  @param owned_groups: List of currently owned node groups
578

579
  """
580
  inst_groups = cfg.GetInstanceNodeGroups(instance_name)
581

    
582
  if not owned_groups.issuperset(inst_groups):
583
    raise errors.OpPrereqError("Instance %s's node groups changed since"
584
                               " locks were acquired, current groups are"
585
                               " are '%s', owning groups '%s'; retry the"
586
                               " operation" %
587
                               (instance_name,
588
                                utils.CommaJoin(inst_groups),
589
                                utils.CommaJoin(owned_groups)),
590
                               errors.ECODE_STATE)
591

    
592
  return inst_groups
593

    
594

    
595
def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
596
  """Checks if the instances in a node group are still correct.
597

598
  @type cfg: L{config.ConfigWriter}
599
  @param cfg: The cluster configuration
600
  @type group_uuid: string
601
  @param group_uuid: Node group UUID
602
  @type owned_instances: set or frozenset
603
  @param owned_instances: List of currently owned instances
604

605
  """
606
  wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
607
  if owned_instances != wanted_instances:
608
    raise errors.OpPrereqError("Instances in node group '%s' changed since"
609
                               " locks were acquired, wanted '%s', have '%s';"
610
                               " retry the operation" %
611
                               (group_uuid,
612
                                utils.CommaJoin(wanted_instances),
613
                                utils.CommaJoin(owned_instances)),
614
                               errors.ECODE_STATE)
615

    
616
  return wanted_instances
617

    
618

    
619
def _SupportsOob(cfg, node):
620
  """Tells if node supports OOB.
621

622
  @type cfg: L{config.ConfigWriter}
623
  @param cfg: The cluster configuration
624
  @type node: L{objects.Node}
625
  @param node: The node
626
  @return: The OOB script if supported or an empty string otherwise
627

628
  """
629
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
630

    
631

    
632
def _GetWantedNodes(lu, nodes):
633
  """Returns list of checked and expanded node names.
634

635
  @type lu: L{LogicalUnit}
636
  @param lu: the logical unit on whose behalf we execute
637
  @type nodes: list
638
  @param nodes: list of node names or None for all nodes
639
  @rtype: list
640
  @return: the list of nodes, sorted
641
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
642

643
  """
644
  if nodes:
645
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
646

    
647
  return utils.NiceSort(lu.cfg.GetNodeList())
648

    
649

    
650
def _GetWantedInstances(lu, instances):
651
  """Returns list of checked and expanded instance names.
652

653
  @type lu: L{LogicalUnit}
654
  @param lu: the logical unit on whose behalf we execute
655
  @type instances: list
656
  @param instances: list of instance names or None for all instances
657
  @rtype: list
658
  @return: the list of instances, sorted
659
  @raise errors.OpPrereqError: if the instances parameter is wrong type
660
  @raise errors.OpPrereqError: if any of the passed instances is not found
661

662
  """
663
  if instances:
664
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
665
  else:
666
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
667
  return wanted
668

    
669

    
670
def _GetUpdatedParams(old_params, update_dict,
671
                      use_default=True, use_none=False):
672
  """Return the new version of a parameter dictionary.
673

674
  @type old_params: dict
675
  @param old_params: old parameters
676
  @type update_dict: dict
677
  @param update_dict: dict containing new parameter values, or
678
      constants.VALUE_DEFAULT to reset the parameter to its default
679
      value
680
  @param use_default: boolean
681
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
682
      values as 'to be deleted' values
683
  @param use_none: boolean
684
  @type use_none: whether to recognise C{None} values as 'to be
685
      deleted' values
686
  @rtype: dict
687
  @return: the new parameter dictionary
688

689
  """
690
  params_copy = copy.deepcopy(old_params)
691
  for key, val in update_dict.iteritems():
692
    if ((use_default and val == constants.VALUE_DEFAULT) or
693
        (use_none and val is None)):
694
      try:
695
        del params_copy[key]
696
      except KeyError:
697
        pass
698
    else:
699
      params_copy[key] = val
700
  return params_copy
701

    
702

    
703
def _ReleaseLocks(lu, level, names=None, keep=None):
704
  """Releases locks owned by an LU.
705

706
  @type lu: L{LogicalUnit}
707
  @param level: Lock level
708
  @type names: list or None
709
  @param names: Names of locks to release
710
  @type keep: list or None
711
  @param keep: Names of locks to retain
712

713
  """
714
  assert not (keep is not None and names is not None), \
715
         "Only one of the 'names' and the 'keep' parameters can be given"
716

    
717
  if names is not None:
718
    should_release = names.__contains__
719
  elif keep:
720
    should_release = lambda name: name not in keep
721
  else:
722
    should_release = None
723

    
724
  if should_release:
725
    retain = []
726
    release = []
727

    
728
    # Determine which locks to release
729
    for name in lu.owned_locks(level):
730
      if should_release(name):
731
        release.append(name)
732
      else:
733
        retain.append(name)
734

    
735
    assert len(lu.owned_locks(level)) == (len(retain) + len(release))
736

    
737
    # Release just some locks
738
    lu.glm.release(level, names=release)
739

    
740
    assert frozenset(lu.owned_locks(level)) == frozenset(retain)
741
  else:
742
    # Release everything
743
    lu.glm.release(level)
744

    
745
    assert not lu.glm.is_owned(level), "No locks should be owned"
746

    
747

    
748
def _MapInstanceDisksToNodes(instances):
749
  """Creates a map from (node, volume) to instance name.
750

751
  @type instances: list of L{objects.Instance}
752
  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
753

754
  """
755
  return dict(((node, vol), inst.name)
756
              for inst in instances
757
              for (node, vols) in inst.MapLVsByNode().items()
758
              for vol in vols)
759

    
760

    
761
def _RunPostHook(lu, node_name):
762
  """Runs the post-hook for an opcode on a single node.
763

764
  """
765
  hm = lu.proc.BuildHooksManager(lu)
766
  try:
767
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
768
  except:
769
    # pylint: disable=W0702
770
    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
771

    
772

    
773
def _CheckOutputFields(static, dynamic, selected):
774
  """Checks whether all selected fields are valid.
775

776
  @type static: L{utils.FieldSet}
777
  @param static: static fields set
778
  @type dynamic: L{utils.FieldSet}
779
  @param dynamic: dynamic fields set
780

781
  """
782
  f = utils.FieldSet()
783
  f.Extend(static)
784
  f.Extend(dynamic)
785

    
786
  delta = f.NonMatching(selected)
787
  if delta:
788
    raise errors.OpPrereqError("Unknown output fields selected: %s"
789
                               % ",".join(delta), errors.ECODE_INVAL)
790

    
791

    
792
def _CheckGlobalHvParams(params):
793
  """Validates that given hypervisor params are not global ones.
794

795
  This will ensure that instances don't get customised versions of
796
  global params.
797

798
  """
799
  used_globals = constants.HVC_GLOBALS.intersection(params)
800
  if used_globals:
801
    msg = ("The following hypervisor parameters are global and cannot"
802
           " be customized at instance level, please modify them at"
803
           " cluster level: %s" % utils.CommaJoin(used_globals))
804
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
805

    
806

    
807
def _CheckNodeOnline(lu, node, msg=None):
808
  """Ensure that a given node is online.
809

810
  @param lu: the LU on behalf of which we make the check
811
  @param node: the node to check
812
  @param msg: if passed, should be a message to replace the default one
813
  @raise errors.OpPrereqError: if the node is offline
814

815
  """
816
  if msg is None:
817
    msg = "Can't use offline node"
818
  if lu.cfg.GetNodeInfo(node).offline:
819
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
820

    
821

    
822
def _CheckNodeNotDrained(lu, node):
823
  """Ensure that a given node is not drained.
824

825
  @param lu: the LU on behalf of which we make the check
826
  @param node: the node to check
827
  @raise errors.OpPrereqError: if the node is drained
828

829
  """
830
  if lu.cfg.GetNodeInfo(node).drained:
831
    raise errors.OpPrereqError("Can't use drained node %s" % node,
832
                               errors.ECODE_STATE)
833

    
834

    
835
def _CheckNodeVmCapable(lu, node):
836
  """Ensure that a given node is vm capable.
837

838
  @param lu: the LU on behalf of which we make the check
839
  @param node: the node to check
840
  @raise errors.OpPrereqError: if the node is not vm capable
841

842
  """
843
  if not lu.cfg.GetNodeInfo(node).vm_capable:
844
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
845
                               errors.ECODE_STATE)
846

    
847

    
848
def _CheckNodeHasOS(lu, node, os_name, force_variant):
849
  """Ensure that a node supports a given OS.
850

851
  @param lu: the LU on behalf of which we make the check
852
  @param node: the node to check
853
  @param os_name: the OS to query about
854
  @param force_variant: whether to ignore variant errors
855
  @raise errors.OpPrereqError: if the node is not supporting the OS
856

857
  """
858
  result = lu.rpc.call_os_get(node, os_name)
859
  result.Raise("OS '%s' not in supported OS list for node %s" %
860
               (os_name, node),
861
               prereq=True, ecode=errors.ECODE_INVAL)
862
  if not force_variant:
863
    _CheckOSVariant(result.payload, os_name)
864

    
865

    
866
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
867
  """Ensure that a node has the given secondary ip.
868

869
  @type lu: L{LogicalUnit}
870
  @param lu: the LU on behalf of which we make the check
871
  @type node: string
872
  @param node: the node to check
873
  @type secondary_ip: string
874
  @param secondary_ip: the ip to check
875
  @type prereq: boolean
876
  @param prereq: whether to throw a prerequisite or an execute error
877
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
878
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
879

880
  """
881
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
882
  result.Raise("Failure checking secondary ip on node %s" % node,
883
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
884
  if not result.payload:
885
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
886
           " please fix and re-run this command" % secondary_ip)
887
    if prereq:
888
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
889
    else:
890
      raise errors.OpExecError(msg)
891

    
892

    
893
def _GetClusterDomainSecret():
894
  """Reads the cluster domain secret.
895

896
  """
897
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
898
                               strict=True)
899

    
900

    
901
def _CheckInstanceDown(lu, instance, reason):
902
  """Ensure that an instance is not running."""
903
  if instance.admin_up:
904
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
905
                               (instance.name, reason), errors.ECODE_STATE)
906

    
907
  pnode = instance.primary_node
908
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
909
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
910
              prereq=True, ecode=errors.ECODE_ENVIRON)
911

    
912
  if instance.name in ins_l.payload:
913
    raise errors.OpPrereqError("Instance %s is running, %s" %
914
                               (instance.name, reason), errors.ECODE_STATE)
915

    
916

    
917
def _ExpandItemName(fn, name, kind):
918
  """Expand an item name.
919

920
  @param fn: the function to use for expansion
921
  @param name: requested item name
922
  @param kind: text description ('Node' or 'Instance')
923
  @return: the resolved (full) name
924
  @raise errors.OpPrereqError: if the item is not found
925

926
  """
927
  full_name = fn(name)
928
  if full_name is None:
929
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
930
                               errors.ECODE_NOENT)
931
  return full_name
932

    
933

    
934
def _ExpandNodeName(cfg, name):
935
  """Wrapper over L{_ExpandItemName} for nodes."""
936
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
937

    
938

    
939
def _ExpandInstanceName(cfg, name):
940
  """Wrapper over L{_ExpandItemName} for instance."""
941
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
942

    
943

    
944
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
945
                          memory, vcpus, nics, disk_template, disks,
946
                          bep, hvp, hypervisor_name, tags):
947
  """Builds instance related env variables for hooks
948

949
  This builds the hook environment from individual variables.
950

951
  @type name: string
952
  @param name: the name of the instance
953
  @type primary_node: string
954
  @param primary_node: the name of the instance's primary node
955
  @type secondary_nodes: list
956
  @param secondary_nodes: list of secondary nodes as strings
957
  @type os_type: string
958
  @param os_type: the name of the instance's OS
959
  @type status: boolean
960
  @param status: the should_run status of the instance
961
  @type memory: string
962
  @param memory: the memory size of the instance
963
  @type vcpus: string
964
  @param vcpus: the count of VCPUs the instance has
965
  @type nics: list
966
  @param nics: list of tuples (ip, mac, mode, link) representing
967
      the NICs the instance has
968
  @type disk_template: string
969
  @param disk_template: the disk template of the instance
970
  @type disks: list
971
  @param disks: the list of (size, mode) pairs
972
  @type bep: dict
973
  @param bep: the backend parameters for the instance
974
  @type hvp: dict
975
  @param hvp: the hypervisor parameters for the instance
976
  @type hypervisor_name: string
977
  @param hypervisor_name: the hypervisor for the instance
978
  @type tags: list
979
  @param tags: list of instance tags as strings
980
  @rtype: dict
981
  @return: the hook environment for this instance
982

983
  """
984
  if status:
985
    str_status = "up"
986
  else:
987
    str_status = "down"
988
  env = {
989
    "OP_TARGET": name,
990
    "INSTANCE_NAME": name,
991
    "INSTANCE_PRIMARY": primary_node,
992
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
993
    "INSTANCE_OS_TYPE": os_type,
994
    "INSTANCE_STATUS": str_status,
995
    "INSTANCE_MEMORY": memory,
996
    "INSTANCE_VCPUS": vcpus,
997
    "INSTANCE_DISK_TEMPLATE": disk_template,
998
    "INSTANCE_HYPERVISOR": hypervisor_name,
999
  }
1000

    
1001
  if nics:
1002
    nic_count = len(nics)
1003
    for idx, (ip, mac, mode, link) in enumerate(nics):
1004
      if ip is None:
1005
        ip = ""
1006
      env["INSTANCE_NIC%d_IP" % idx] = ip
1007
      env["INSTANCE_NIC%d_MAC" % idx] = mac
1008
      env["INSTANCE_NIC%d_MODE" % idx] = mode
1009
      env["INSTANCE_NIC%d_LINK" % idx] = link
1010
      if mode == constants.NIC_MODE_BRIDGED:
1011
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
1012
  else:
1013
    nic_count = 0
1014

    
1015
  env["INSTANCE_NIC_COUNT"] = nic_count
1016

    
1017
  if disks:
1018
    disk_count = len(disks)
1019
    for idx, (size, mode) in enumerate(disks):
1020
      env["INSTANCE_DISK%d_SIZE" % idx] = size
1021
      env["INSTANCE_DISK%d_MODE" % idx] = mode
1022
  else:
1023
    disk_count = 0
1024

    
1025
  env["INSTANCE_DISK_COUNT"] = disk_count
1026

    
1027
  if not tags:
1028
    tags = []
1029

    
1030
  env["INSTANCE_TAGS"] = " ".join(tags)
1031

    
1032
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
1033
    for key, value in source.items():
1034
      env["INSTANCE_%s_%s" % (kind, key)] = value
1035

    
1036
  return env
1037

    
1038

    
1039
def _NICListToTuple(lu, nics):
1040
  """Build a list of nic information tuples.
1041

1042
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
1043
  value in LUInstanceQueryData.
1044

1045
  @type lu:  L{LogicalUnit}
1046
  @param lu: the logical unit on whose behalf we execute
1047
  @type nics: list of L{objects.NIC}
1048
  @param nics: list of nics to convert to hooks tuples
1049

1050
  """
1051
  hooks_nics = []
1052
  cluster = lu.cfg.GetClusterInfo()
1053
  for nic in nics:
1054
    ip = nic.ip
1055
    mac = nic.mac
1056
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
1057
    mode = filled_params[constants.NIC_MODE]
1058
    link = filled_params[constants.NIC_LINK]
1059
    hooks_nics.append((ip, mac, mode, link))
1060
  return hooks_nics
1061

    
1062

    
1063
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
1064
  """Builds instance related env variables for hooks from an object.
1065

1066
  @type lu: L{LogicalUnit}
1067
  @param lu: the logical unit on whose behalf we execute
1068
  @type instance: L{objects.Instance}
1069
  @param instance: the instance for which we should build the
1070
      environment
1071
  @type override: dict
1072
  @param override: dictionary with key/values that will override
1073
      our values
1074
  @rtype: dict
1075
  @return: the hook environment dictionary
1076

1077
  """
1078
  cluster = lu.cfg.GetClusterInfo()
1079
  bep = cluster.FillBE(instance)
1080
  hvp = cluster.FillHV(instance)
1081
  args = {
1082
    "name": instance.name,
1083
    "primary_node": instance.primary_node,
1084
    "secondary_nodes": instance.secondary_nodes,
1085
    "os_type": instance.os,
1086
    "status": instance.admin_up,
1087
    "memory": bep[constants.BE_MEMORY],
1088
    "vcpus": bep[constants.BE_VCPUS],
1089
    "nics": _NICListToTuple(lu, instance.nics),
1090
    "disk_template": instance.disk_template,
1091
    "disks": [(disk.size, disk.mode) for disk in instance.disks],
1092
    "bep": bep,
1093
    "hvp": hvp,
1094
    "hypervisor_name": instance.hypervisor,
1095
    "tags": instance.tags,
1096
  }
1097
  if override:
1098
    args.update(override)
1099
  return _BuildInstanceHookEnv(**args) # pylint: disable=W0142
1100

    
1101

    
1102
def _AdjustCandidatePool(lu, exceptions):
1103
  """Adjust the candidate pool after node operations.
1104

1105
  """
1106
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1107
  if mod_list:
1108
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1109
               utils.CommaJoin(node.name for node in mod_list))
1110
    for name in mod_list:
1111
      lu.context.ReaddNode(name)
1112
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1113
  if mc_now > mc_max:
1114
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1115
               (mc_now, mc_max))
1116

    
1117

    
1118
def _DecideSelfPromotion(lu, exceptions=None):
1119
  """Decide whether I should promote myself as a master candidate.
1120

1121
  """
1122
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1123
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1124
  # the new node will increase mc_max with one, so:
1125
  mc_should = min(mc_should + 1, cp_size)
1126
  return mc_now < mc_should
1127

    
1128

    
1129
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1130
  """Check that the brigdes needed by a list of nics exist.
1131

1132
  """
1133
  cluster = lu.cfg.GetClusterInfo()
1134
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1135
  brlist = [params[constants.NIC_LINK] for params in paramslist
1136
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1137
  if brlist:
1138
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1139
    result.Raise("Error checking bridges on destination node '%s'" %
1140
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1141

    
1142

    
1143
def _CheckInstanceBridgesExist(lu, instance, node=None):
1144
  """Check that the brigdes needed by an instance exist.
1145

1146
  """
1147
  if node is None:
1148
    node = instance.primary_node
1149
  _CheckNicsBridgesExist(lu, instance.nics, node)
1150

    
1151

    
1152
def _CheckOSVariant(os_obj, name):
1153
  """Check whether an OS name conforms to the os variants specification.
1154

1155
  @type os_obj: L{objects.OS}
1156
  @param os_obj: OS object to check
1157
  @type name: string
1158
  @param name: OS name passed by the user, to check for validity
1159

1160
  """
1161
  variant = objects.OS.GetVariant(name)
1162
  if not os_obj.supported_variants:
1163
    if variant:
1164
      raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
1165
                                 " passed)" % (os_obj.name, variant),
1166
                                 errors.ECODE_INVAL)
1167
    return
1168
  if not variant:
1169
    raise errors.OpPrereqError("OS name must include a variant",
1170
                               errors.ECODE_INVAL)
1171

    
1172
  if variant not in os_obj.supported_variants:
1173
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1174

    
1175

    
1176
def _GetNodeInstancesInner(cfg, fn):
1177
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1178

    
1179

    
1180
def _GetNodeInstances(cfg, node_name):
1181
  """Returns a list of all primary and secondary instances on a node.
1182

1183
  """
1184

    
1185
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1186

    
1187

    
1188
def _GetNodePrimaryInstances(cfg, node_name):
1189
  """Returns primary instances on a node.
1190

1191
  """
1192
  return _GetNodeInstancesInner(cfg,
1193
                                lambda inst: node_name == inst.primary_node)
1194

    
1195

    
1196
def _GetNodeSecondaryInstances(cfg, node_name):
1197
  """Returns secondary instances on a node.
1198

1199
  """
1200
  return _GetNodeInstancesInner(cfg,
1201
                                lambda inst: node_name in inst.secondary_nodes)
1202

    
1203

    
1204
def _GetStorageTypeArgs(cfg, storage_type):
1205
  """Returns the arguments for a storage type.
1206

1207
  """
1208
  # Special case for file storage
1209
  if storage_type == constants.ST_FILE:
1210
    # storage.FileStorage wants a list of storage directories
1211
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1212

    
1213
  return []
1214

    
1215

    
1216
def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
1217
  faulty = []
1218

    
1219
  for dev in instance.disks:
1220
    cfg.SetDiskID(dev, node_name)
1221

    
1222
  result = rpc_runner.call_blockdev_getmirrorstatus(node_name, instance.disks)
1223
  result.Raise("Failed to get disk status from node %s" % node_name,
1224
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1225

    
1226
  for idx, bdev_status in enumerate(result.payload):
1227
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1228
      faulty.append(idx)
1229

    
1230
  return faulty
1231

    
1232

    
1233
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1234
  """Check the sanity of iallocator and node arguments and use the
1235
  cluster-wide iallocator if appropriate.
1236

1237
  Check that at most one of (iallocator, node) is specified. If none is
1238
  specified, then the LU's opcode's iallocator slot is filled with the
1239
  cluster-wide default iallocator.
1240

1241
  @type iallocator_slot: string
1242
  @param iallocator_slot: the name of the opcode iallocator slot
1243
  @type node_slot: string
1244
  @param node_slot: the name of the opcode target node slot
1245

1246
  """
1247
  node = getattr(lu.op, node_slot, None)
1248
  iallocator = getattr(lu.op, iallocator_slot, None)
1249

    
1250
  if node is not None and iallocator is not None:
1251
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
1252
                               errors.ECODE_INVAL)
1253
  elif node is None and iallocator is None:
1254
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1255
    if default_iallocator:
1256
      setattr(lu.op, iallocator_slot, default_iallocator)
1257
    else:
1258
      raise errors.OpPrereqError("No iallocator or node given and no"
1259
                                 " cluster-wide default iallocator found;"
1260
                                 " please specify either an iallocator or a"
1261
                                 " node, or set a cluster-wide default"
1262
                                 " iallocator")
1263

    
1264

    
1265
def _GetDefaultIAllocator(cfg, iallocator):
1266
  """Decides on which iallocator to use.
1267

1268
  @type cfg: L{config.ConfigWriter}
1269
  @param cfg: Cluster configuration object
1270
  @type iallocator: string or None
1271
  @param iallocator: Iallocator specified in opcode
1272
  @rtype: string
1273
  @return: Iallocator name
1274

1275
  """
1276
  if not iallocator:
1277
    # Use default iallocator
1278
    iallocator = cfg.GetDefaultIAllocator()
1279

    
1280
  if not iallocator:
1281
    raise errors.OpPrereqError("No iallocator was specified, neither in the"
1282
                               " opcode nor as a cluster-wide default",
1283
                               errors.ECODE_INVAL)
1284

    
1285
  return iallocator
1286

    
1287

    
1288
class LUClusterPostInit(LogicalUnit):
1289
  """Logical unit for running hooks after cluster initialization.
1290

1291
  """
1292
  HPATH = "cluster-init"
1293
  HTYPE = constants.HTYPE_CLUSTER
1294

    
1295
  def BuildHooksEnv(self):
1296
    """Build hooks env.
1297

1298
    """
1299
    return {
1300
      "OP_TARGET": self.cfg.GetClusterName(),
1301
      }
1302

    
1303
  def BuildHooksNodes(self):
1304
    """Build hooks nodes.
1305

1306
    """
1307
    return ([], [self.cfg.GetMasterNode()])
1308

    
1309
  def Exec(self, feedback_fn):
1310
    """Nothing to do.
1311

1312
    """
1313
    return True
1314

    
1315

    
1316
class LUClusterDestroy(LogicalUnit):
1317
  """Logical unit for destroying the cluster.
1318

1319
  """
1320
  HPATH = "cluster-destroy"
1321
  HTYPE = constants.HTYPE_CLUSTER
1322

    
1323
  def BuildHooksEnv(self):
1324
    """Build hooks env.
1325

1326
    """
1327
    return {
1328
      "OP_TARGET": self.cfg.GetClusterName(),
1329
      }
1330

    
1331
  def BuildHooksNodes(self):
1332
    """Build hooks nodes.
1333

1334
    """
1335
    return ([], [])
1336

    
1337
  def CheckPrereq(self):
1338
    """Check prerequisites.
1339

1340
    This checks whether the cluster is empty.
1341

1342
    Any errors are signaled by raising errors.OpPrereqError.
1343

1344
    """
1345
    master = self.cfg.GetMasterNode()
1346

    
1347
    nodelist = self.cfg.GetNodeList()
1348
    if len(nodelist) != 1 or nodelist[0] != master:
1349
      raise errors.OpPrereqError("There are still %d node(s) in"
1350
                                 " this cluster." % (len(nodelist) - 1),
1351
                                 errors.ECODE_INVAL)
1352
    instancelist = self.cfg.GetInstanceList()
1353
    if instancelist:
1354
      raise errors.OpPrereqError("There are still %d instance(s) in"
1355
                                 " this cluster." % len(instancelist),
1356
                                 errors.ECODE_INVAL)
1357

    
1358
  def Exec(self, feedback_fn):
1359
    """Destroys the cluster.
1360

1361
    """
1362
    master_params = self.cfg.GetMasterNetworkParameters()
1363

    
1364
    # Run post hooks on master node before it's removed
1365
    _RunPostHook(self, master_params.name)
1366

    
1367
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1368
                                                     master_params)
1369
    result.Raise("Could not disable the master role")
1370

    
1371
    return master_params.name
1372

    
1373

    
1374
def _VerifyCertificate(filename):
1375
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1376

1377
  @type filename: string
1378
  @param filename: Path to PEM file
1379

1380
  """
1381
  try:
1382
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1383
                                           utils.ReadFile(filename))
1384
  except Exception, err: # pylint: disable=W0703
1385
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1386
            "Failed to load X509 certificate %s: %s" % (filename, err))
1387

    
1388
  (errcode, msg) = \
1389
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1390
                                constants.SSL_CERT_EXPIRATION_ERROR)
1391

    
1392
  if msg:
1393
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1394
  else:
1395
    fnamemsg = None
1396

    
1397
  if errcode is None:
1398
    return (None, fnamemsg)
1399
  elif errcode == utils.CERT_WARNING:
1400
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1401
  elif errcode == utils.CERT_ERROR:
1402
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1403

    
1404
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1405

    
1406

    
1407
def _GetAllHypervisorParameters(cluster, instances):
1408
  """Compute the set of all hypervisor parameters.
1409

1410
  @type cluster: L{objects.Cluster}
1411
  @param cluster: the cluster object
1412
  @param instances: list of L{objects.Instance}
1413
  @param instances: additional instances from which to obtain parameters
1414
  @rtype: list of (origin, hypervisor, parameters)
1415
  @return: a list with all parameters found, indicating the hypervisor they
1416
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1417

1418
  """
1419
  hvp_data = []
1420

    
1421
  for hv_name in cluster.enabled_hypervisors:
1422
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1423

    
1424
  for os_name, os_hvp in cluster.os_hvp.items():
1425
    for hv_name, hv_params in os_hvp.items():
1426
      if hv_params:
1427
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1428
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1429

    
1430
  # TODO: collapse identical parameter values in a single one
1431
  for instance in instances:
1432
    if instance.hvparams:
1433
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1434
                       cluster.FillHV(instance)))
1435

    
1436
  return hvp_data
1437

    
1438

    
1439
class _VerifyErrors(object):
1440
  """Mix-in for cluster/group verify LUs.
1441

1442
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1443
  self.op and self._feedback_fn to be available.)
1444

1445
  """
1446

    
1447
  ETYPE_FIELD = "code"
1448
  ETYPE_ERROR = "ERROR"
1449
  ETYPE_WARNING = "WARNING"
1450

    
1451
  def _Error(self, ecode, item, msg, *args, **kwargs):
1452
    """Format an error message.
1453

1454
    Based on the opcode's error_codes parameter, either format a
1455
    parseable error code, or a simpler error string.
1456

1457
    This must be called only from Exec and functions called from Exec.
1458

1459
    """
1460
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1461
    itype, etxt, _ = ecode
1462
    # first complete the msg
1463
    if args:
1464
      msg = msg % args
1465
    # then format the whole message
1466
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1467
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1468
    else:
1469
      if item:
1470
        item = " " + item
1471
      else:
1472
        item = ""
1473
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1474
    # and finally report it via the feedback_fn
1475
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1476

    
1477
  def _ErrorIf(self, cond, ecode, *args, **kwargs):
1478
    """Log an error message if the passed condition is True.
1479

1480
    """
1481
    cond = (bool(cond)
1482
            or self.op.debug_simulate_errors) # pylint: disable=E1101
1483

    
1484
    # If the error code is in the list of ignored errors, demote the error to a
1485
    # warning
1486
    (_, etxt, _) = ecode
1487
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1488
      kwargs[self.ETYPE_FIELD] = self.ETYPE_WARNING
1489

    
1490
    if cond:
1491
      self._Error(ecode, *args, **kwargs)
1492

    
1493
    # do not mark the operation as failed for WARN cases only
1494
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1495
      self.bad = self.bad or cond
1496

    
1497

    
1498
class LUClusterVerify(NoHooksLU):
1499
  """Submits all jobs necessary to verify the cluster.
1500

1501
  """
1502
  REQ_BGL = False
1503

    
1504
  def ExpandNames(self):
1505
    self.needed_locks = {}
1506

    
1507
  def Exec(self, feedback_fn):
1508
    jobs = []
1509

    
1510
    if self.op.group_name:
1511
      groups = [self.op.group_name]
1512
      depends_fn = lambda: None
1513
    else:
1514
      groups = self.cfg.GetNodeGroupList()
1515

    
1516
      # Verify global configuration
1517
      jobs.append([
1518
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors)
1519
        ])
1520

    
1521
      # Always depend on global verification
1522
      depends_fn = lambda: [(-len(jobs), [])]
1523

    
1524
    jobs.extend([opcodes.OpClusterVerifyGroup(group_name=group,
1525
                                            ignore_errors=self.op.ignore_errors,
1526
                                            depends=depends_fn())]
1527
                for group in groups)
1528

    
1529
    # Fix up all parameters
1530
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1531
      op.debug_simulate_errors = self.op.debug_simulate_errors
1532
      op.verbose = self.op.verbose
1533
      op.error_codes = self.op.error_codes
1534
      try:
1535
        op.skip_checks = self.op.skip_checks
1536
      except AttributeError:
1537
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1538

    
1539
    return ResultWithJobs(jobs)
1540

    
1541

    
1542
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1543
  """Verifies the cluster config.
1544

1545
  """
1546
  REQ_BGL = True
1547

    
1548
  def _VerifyHVP(self, hvp_data):
1549
    """Verifies locally the syntax of the hypervisor parameters.
1550

1551
    """
1552
    for item, hv_name, hv_params in hvp_data:
1553
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1554
             (item, hv_name))
1555
      try:
1556
        hv_class = hypervisor.GetHypervisor(hv_name)
1557
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1558
        hv_class.CheckParameterSyntax(hv_params)
1559
      except errors.GenericError, err:
1560
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1561

    
1562
  def ExpandNames(self):
1563
    # Information can be safely retrieved as the BGL is acquired in exclusive
1564
    # mode
1565
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
1566
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1567
    self.all_node_info = self.cfg.GetAllNodesInfo()
1568
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1569
    self.needed_locks = {}
1570

    
1571
  def Exec(self, feedback_fn):
1572
    """Verify integrity of cluster, performing various test on nodes.
1573

1574
    """
1575
    self.bad = False
1576
    self._feedback_fn = feedback_fn
1577

    
1578
    feedback_fn("* Verifying cluster config")
1579

    
1580
    for msg in self.cfg.VerifyConfig():
1581
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1582

    
1583
    feedback_fn("* Verifying cluster certificate files")
1584

    
1585
    for cert_filename in constants.ALL_CERT_FILES:
1586
      (errcode, msg) = _VerifyCertificate(cert_filename)
1587
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1588

    
1589
    feedback_fn("* Verifying hypervisor parameters")
1590

    
1591
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1592
                                                self.all_inst_info.values()))
1593

    
1594
    feedback_fn("* Verifying all nodes belong to an existing group")
1595

    
1596
    # We do this verification here because, should this bogus circumstance
1597
    # occur, it would never be caught by VerifyGroup, which only acts on
1598
    # nodes/instances reachable from existing node groups.
1599

    
1600
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1601
                         if node.group not in self.all_group_info)
1602

    
1603
    dangling_instances = {}
1604
    no_node_instances = []
1605

    
1606
    for inst in self.all_inst_info.values():
1607
      if inst.primary_node in dangling_nodes:
1608
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1609
      elif inst.primary_node not in self.all_node_info:
1610
        no_node_instances.append(inst.name)
1611

    
1612
    pretty_dangling = [
1613
        "%s (%s)" %
1614
        (node.name,
1615
         utils.CommaJoin(dangling_instances.get(node.name,
1616
                                                ["no instances"])))
1617
        for node in dangling_nodes]
1618

    
1619
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1620
                  None,
1621
                  "the following nodes (and their instances) belong to a non"
1622
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1623

    
1624
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1625
                  None,
1626
                  "the following instances have a non-existing primary-node:"
1627
                  " %s", utils.CommaJoin(no_node_instances))
1628

    
1629
    return not self.bad
1630

    
1631

    
1632
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1633
  """Verifies the status of a node group.
1634

1635
  """
1636
  HPATH = "cluster-verify"
1637
  HTYPE = constants.HTYPE_CLUSTER
1638
  REQ_BGL = False
1639

    
1640
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1641

    
1642
  class NodeImage(object):
1643
    """A class representing the logical and physical status of a node.
1644

1645
    @type name: string
1646
    @ivar name: the node name to which this object refers
1647
    @ivar volumes: a structure as returned from
1648
        L{ganeti.backend.GetVolumeList} (runtime)
1649
    @ivar instances: a list of running instances (runtime)
1650
    @ivar pinst: list of configured primary instances (config)
1651
    @ivar sinst: list of configured secondary instances (config)
1652
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1653
        instances for which this node is secondary (config)
1654
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1655
    @ivar dfree: free disk, as reported by the node (runtime)
1656
    @ivar offline: the offline status (config)
1657
    @type rpc_fail: boolean
1658
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1659
        not whether the individual keys were correct) (runtime)
1660
    @type lvm_fail: boolean
1661
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1662
    @type hyp_fail: boolean
1663
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1664
    @type ghost: boolean
1665
    @ivar ghost: whether this is a known node or not (config)
1666
    @type os_fail: boolean
1667
    @ivar os_fail: whether the RPC call didn't return valid OS data
1668
    @type oslist: list
1669
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1670
    @type vm_capable: boolean
1671
    @ivar vm_capable: whether the node can host instances
1672

1673
    """
1674
    def __init__(self, offline=False, name=None, vm_capable=True):
1675
      self.name = name
1676
      self.volumes = {}
1677
      self.instances = []
1678
      self.pinst = []
1679
      self.sinst = []
1680
      self.sbp = {}
1681
      self.mfree = 0
1682
      self.dfree = 0
1683
      self.offline = offline
1684
      self.vm_capable = vm_capable
1685
      self.rpc_fail = False
1686
      self.lvm_fail = False
1687
      self.hyp_fail = False
1688
      self.ghost = False
1689
      self.os_fail = False
1690
      self.oslist = {}
1691

    
1692
  def ExpandNames(self):
1693
    # This raises errors.OpPrereqError on its own:
1694
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1695

    
1696
    # Get instances in node group; this is unsafe and needs verification later
1697
    inst_names = self.cfg.GetNodeGroupInstances(self.group_uuid)
1698

    
1699
    self.needed_locks = {
1700
      locking.LEVEL_INSTANCE: inst_names,
1701
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1702
      locking.LEVEL_NODE: [],
1703
      }
1704

    
1705
    self.share_locks = _ShareAll()
1706

    
1707
  def DeclareLocks(self, level):
1708
    if level == locking.LEVEL_NODE:
1709
      # Get members of node group; this is unsafe and needs verification later
1710
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1711

    
1712
      all_inst_info = self.cfg.GetAllInstancesInfo()
1713

    
1714
      # In Exec(), we warn about mirrored instances that have primary and
1715
      # secondary living in separate node groups. To fully verify that
1716
      # volumes for these instances are healthy, we will need to do an
1717
      # extra call to their secondaries. We ensure here those nodes will
1718
      # be locked.
1719
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1720
        # Important: access only the instances whose lock is owned
1721
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1722
          nodes.update(all_inst_info[inst].secondary_nodes)
1723

    
1724
      self.needed_locks[locking.LEVEL_NODE] = nodes
1725

    
1726
  def CheckPrereq(self):
1727
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1728
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1729

    
1730
    group_nodes = set(self.group_info.members)
1731
    group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
1732

    
1733
    unlocked_nodes = \
1734
        group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1735

    
1736
    unlocked_instances = \
1737
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1738

    
1739
    if unlocked_nodes:
1740
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1741
                                 utils.CommaJoin(unlocked_nodes))
1742

    
1743
    if unlocked_instances:
1744
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1745
                                 utils.CommaJoin(unlocked_instances))
1746

    
1747
    self.all_node_info = self.cfg.GetAllNodesInfo()
1748
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1749

    
1750
    self.my_node_names = utils.NiceSort(group_nodes)
1751
    self.my_inst_names = utils.NiceSort(group_instances)
1752

    
1753
    self.my_node_info = dict((name, self.all_node_info[name])
1754
                             for name in self.my_node_names)
1755

    
1756
    self.my_inst_info = dict((name, self.all_inst_info[name])
1757
                             for name in self.my_inst_names)
1758

    
1759
    # We detect here the nodes that will need the extra RPC calls for verifying
1760
    # split LV volumes; they should be locked.
1761
    extra_lv_nodes = set()
1762

    
1763
    for inst in self.my_inst_info.values():
1764
      if inst.disk_template in constants.DTS_INT_MIRROR:
1765
        group = self.my_node_info[inst.primary_node].group
1766
        for nname in inst.secondary_nodes:
1767
          if self.all_node_info[nname].group != group:
1768
            extra_lv_nodes.add(nname)
1769

    
1770
    unlocked_lv_nodes = \
1771
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1772

    
1773
    if unlocked_lv_nodes:
1774
      raise errors.OpPrereqError("these nodes could be locked: %s" %
1775
                                 utils.CommaJoin(unlocked_lv_nodes))
1776
    self.extra_lv_nodes = list(extra_lv_nodes)
1777

    
1778
  def _VerifyNode(self, ninfo, nresult):
1779
    """Perform some basic validation on data returned from a node.
1780

1781
      - check the result data structure is well formed and has all the
1782
        mandatory fields
1783
      - check ganeti version
1784

1785
    @type ninfo: L{objects.Node}
1786
    @param ninfo: the node to check
1787
    @param nresult: the results from the node
1788
    @rtype: boolean
1789
    @return: whether overall this call was successful (and we can expect
1790
         reasonable values in the respose)
1791

1792
    """
1793
    node = ninfo.name
1794
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1795

    
1796
    # main result, nresult should be a non-empty dict
1797
    test = not nresult or not isinstance(nresult, dict)
1798
    _ErrorIf(test, constants.CV_ENODERPC, node,
1799
                  "unable to verify node: no data returned")
1800
    if test:
1801
      return False
1802

    
1803
    # compares ganeti version
1804
    local_version = constants.PROTOCOL_VERSION
1805
    remote_version = nresult.get("version", None)
1806
    test = not (remote_version and
1807
                isinstance(remote_version, (list, tuple)) and
1808
                len(remote_version) == 2)
1809
    _ErrorIf(test, constants.CV_ENODERPC, node,
1810
             "connection to node returned invalid data")
1811
    if test:
1812
      return False
1813

    
1814
    test = local_version != remote_version[0]
1815
    _ErrorIf(test, constants.CV_ENODEVERSION, node,
1816
             "incompatible protocol versions: master %s,"
1817
             " node %s", local_version, remote_version[0])
1818
    if test:
1819
      return False
1820

    
1821
    # node seems compatible, we can actually try to look into its results
1822

    
1823
    # full package version
1824
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1825
                  constants.CV_ENODEVERSION, node,
1826
                  "software version mismatch: master %s, node %s",
1827
                  constants.RELEASE_VERSION, remote_version[1],
1828
                  code=self.ETYPE_WARNING)
1829

    
1830
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1831
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1832
      for hv_name, hv_result in hyp_result.iteritems():
1833
        test = hv_result is not None
1834
        _ErrorIf(test, constants.CV_ENODEHV, node,
1835
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1836

    
1837
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1838
    if ninfo.vm_capable and isinstance(hvp_result, list):
1839
      for item, hv_name, hv_result in hvp_result:
1840
        _ErrorIf(True, constants.CV_ENODEHV, node,
1841
                 "hypervisor %s parameter verify failure (source %s): %s",
1842
                 hv_name, item, hv_result)
1843

    
1844
    test = nresult.get(constants.NV_NODESETUP,
1845
                       ["Missing NODESETUP results"])
1846
    _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1847
             "; ".join(test))
1848

    
1849
    return True
1850

    
1851
  def _VerifyNodeTime(self, ninfo, nresult,
1852
                      nvinfo_starttime, nvinfo_endtime):
1853
    """Check the node time.
1854

1855
    @type ninfo: L{objects.Node}
1856
    @param ninfo: the node to check
1857
    @param nresult: the remote results for the node
1858
    @param nvinfo_starttime: the start time of the RPC call
1859
    @param nvinfo_endtime: the end time of the RPC call
1860

1861
    """
1862
    node = ninfo.name
1863
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1864

    
1865
    ntime = nresult.get(constants.NV_TIME, None)
1866
    try:
1867
      ntime_merged = utils.MergeTime(ntime)
1868
    except (ValueError, TypeError):
1869
      _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1870
      return
1871

    
1872
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1873
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1874
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1875
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1876
    else:
1877
      ntime_diff = None
1878

    
1879
    _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1880
             "Node time diverges by at least %s from master node time",
1881
             ntime_diff)
1882

    
1883
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1884
    """Check the node LVM results.
1885

1886
    @type ninfo: L{objects.Node}
1887
    @param ninfo: the node to check
1888
    @param nresult: the remote results for the node
1889
    @param vg_name: the configured VG name
1890

1891
    """
1892
    if vg_name is None:
1893
      return
1894

    
1895
    node = ninfo.name
1896
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1897

    
1898
    # checks vg existence and size > 20G
1899
    vglist = nresult.get(constants.NV_VGLIST, None)
1900
    test = not vglist
1901
    _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1902
    if not test:
1903
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1904
                                            constants.MIN_VG_SIZE)
1905
      _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1906

    
1907
    # check pv names
1908
    pvlist = nresult.get(constants.NV_PVLIST, None)
1909
    test = pvlist is None
1910
    _ErrorIf(test, constants.CV_ENODELVM, node, "Can't get PV list from node")
1911
    if not test:
1912
      # check that ':' is not present in PV names, since it's a
1913
      # special character for lvcreate (denotes the range of PEs to
1914
      # use on the PV)
1915
      for _, pvname, owner_vg in pvlist:
1916
        test = ":" in pvname
1917
        _ErrorIf(test, constants.CV_ENODELVM, node,
1918
                 "Invalid character ':' in PV '%s' of VG '%s'",
1919
                 pvname, owner_vg)
1920

    
1921
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1922
    """Check the node bridges.
1923

1924
    @type ninfo: L{objects.Node}
1925
    @param ninfo: the node to check
1926
    @param nresult: the remote results for the node
1927
    @param bridges: the expected list of bridges
1928

1929
    """
1930
    if not bridges:
1931
      return
1932

    
1933
    node = ninfo.name
1934
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1935

    
1936
    missing = nresult.get(constants.NV_BRIDGES, None)
1937
    test = not isinstance(missing, list)
1938
    _ErrorIf(test, constants.CV_ENODENET, node,
1939
             "did not return valid bridge information")
1940
    if not test:
1941
      _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1942
               "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1943

    
1944
  def _VerifyNodeNetwork(self, ninfo, nresult):
1945
    """Check the node network connectivity results.
1946

1947
    @type ninfo: L{objects.Node}
1948
    @param ninfo: the node to check
1949
    @param nresult: the remote results for the node
1950

1951
    """
1952
    node = ninfo.name
1953
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1954

    
1955
    test = constants.NV_NODELIST not in nresult
1956
    _ErrorIf(test, constants.CV_ENODESSH, node,
1957
             "node hasn't returned node ssh connectivity data")
1958
    if not test:
1959
      if nresult[constants.NV_NODELIST]:
1960
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1961
          _ErrorIf(True, constants.CV_ENODESSH, node,
1962
                   "ssh communication with node '%s': %s", a_node, a_msg)
1963

    
1964
    test = constants.NV_NODENETTEST not in nresult
1965
    _ErrorIf(test, constants.CV_ENODENET, node,
1966
             "node hasn't returned node tcp connectivity data")
1967
    if not test:
1968
      if nresult[constants.NV_NODENETTEST]:
1969
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1970
        for anode in nlist:
1971
          _ErrorIf(True, constants.CV_ENODENET, node,
1972
                   "tcp communication with node '%s': %s",
1973
                   anode, nresult[constants.NV_NODENETTEST][anode])
1974

    
1975
    test = constants.NV_MASTERIP not in nresult
1976
    _ErrorIf(test, constants.CV_ENODENET, node,
1977
             "node hasn't returned node master IP reachability data")
1978
    if not test:
1979
      if not nresult[constants.NV_MASTERIP]:
1980
        if node == self.master_node:
1981
          msg = "the master node cannot reach the master IP (not configured?)"
1982
        else:
1983
          msg = "cannot reach the master IP"
1984
        _ErrorIf(True, constants.CV_ENODENET, node, msg)
1985

    
1986
  def _VerifyInstance(self, instance, instanceconfig, node_image,
1987
                      diskstatus):
1988
    """Verify an instance.
1989

1990
    This function checks to see if the required block devices are
1991
    available on the instance's node.
1992

1993
    """
1994
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1995
    node_current = instanceconfig.primary_node
1996

    
1997
    node_vol_should = {}
1998
    instanceconfig.MapLVsByNode(node_vol_should)
1999

    
2000
    for node in node_vol_should:
2001
      n_img = node_image[node]
2002
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2003
        # ignore missing volumes on offline or broken nodes
2004
        continue
2005
      for volume in node_vol_should[node]:
2006
        test = volume not in n_img.volumes
2007
        _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
2008
                 "volume %s missing on node %s", volume, node)
2009

    
2010
    if instanceconfig.admin_up:
2011
      pri_img = node_image[node_current]
2012
      test = instance not in pri_img.instances and not pri_img.offline
2013
      _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
2014
               "instance not running on its primary node %s",
2015
               node_current)
2016

    
2017
    diskdata = [(nname, success, status, idx)
2018
                for (nname, disks) in diskstatus.items()
2019
                for idx, (success, status) in enumerate(disks)]
2020

    
2021
    for nname, success, bdev_status, idx in diskdata:
2022
      # the 'ghost node' construction in Exec() ensures that we have a
2023
      # node here
2024
      snode = node_image[nname]
2025
      bad_snode = snode.ghost or snode.offline
2026
      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
2027
               constants.CV_EINSTANCEFAULTYDISK, instance,
2028
               "couldn't retrieve status for disk/%s on %s: %s",
2029
               idx, nname, bdev_status)
2030
      _ErrorIf((instanceconfig.admin_up and success and
2031
                bdev_status.ldisk_status == constants.LDS_FAULTY),
2032
               constants.CV_EINSTANCEFAULTYDISK, instance,
2033
               "disk/%s on %s is faulty", idx, nname)
2034

    
2035
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2036
    """Verify if there are any unknown volumes in the cluster.
2037

2038
    The .os, .swap and backup volumes are ignored. All other volumes are
2039
    reported as unknown.
2040

2041
    @type reserved: L{ganeti.utils.FieldSet}
2042
    @param reserved: a FieldSet of reserved volume names
2043

2044
    """
2045
    for node, n_img in node_image.items():
2046
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2047
        # skip non-healthy nodes
2048
        continue
2049
      for volume in n_img.volumes:
2050
        test = ((node not in node_vol_should or
2051
                volume not in node_vol_should[node]) and
2052
                not reserved.Matches(volume))
2053
        self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
2054
                      "volume %s is unknown", volume)
2055

    
2056
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
2057
    """Verify N+1 Memory Resilience.
2058

2059
    Check that if one single node dies we can still start all the
2060
    instances it was primary for.
2061

2062
    """
2063
    cluster_info = self.cfg.GetClusterInfo()
2064
    for node, n_img in node_image.items():
2065
      # This code checks that every node which is now listed as
2066
      # secondary has enough memory to host all instances it is
2067
      # supposed to should a single other node in the cluster fail.
2068
      # FIXME: not ready for failover to an arbitrary node
2069
      # FIXME: does not support file-backed instances
2070
      # WARNING: we currently take into account down instances as well
2071
      # as up ones, considering that even if they're down someone
2072
      # might want to start them even in the event of a node failure.
2073
      if n_img.offline:
2074
        # we're skipping offline nodes from the N+1 warning, since
2075
        # most likely we don't have good memory infromation from them;
2076
        # we already list instances living on such nodes, and that's
2077
        # enough warning
2078
        continue
2079
      for prinode, instances in n_img.sbp.items():
2080
        needed_mem = 0
2081
        for instance in instances:
2082
          bep = cluster_info.FillBE(instance_cfg[instance])
2083
          if bep[constants.BE_AUTO_BALANCE]:
2084
            needed_mem += bep[constants.BE_MEMORY]
2085
        test = n_img.mfree < needed_mem
2086
        self._ErrorIf(test, constants.CV_ENODEN1, node,
2087
                      "not enough memory to accomodate instance failovers"
2088
                      " should node %s fail (%dMiB needed, %dMiB available)",
2089
                      prinode, needed_mem, n_img.mfree)
2090

    
2091
  @classmethod
2092
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
2093
                   (files_all, files_opt, files_mc, files_vm)):
2094
    """Verifies file checksums collected from all nodes.
2095

2096
    @param errorif: Callback for reporting errors
2097
    @param nodeinfo: List of L{objects.Node} objects
2098
    @param master_node: Name of master node
2099
    @param all_nvinfo: RPC results
2100

2101
    """
2102
    # Define functions determining which nodes to consider for a file
2103
    files2nodefn = [
2104
      (files_all, None),
2105
      (files_mc, lambda node: (node.master_candidate or
2106
                               node.name == master_node)),
2107
      (files_vm, lambda node: node.vm_capable),
2108
      ]
2109

    
2110
    # Build mapping from filename to list of nodes which should have the file
2111
    nodefiles = {}
2112
    for (files, fn) in files2nodefn:
2113
      if fn is None:
2114
        filenodes = nodeinfo
2115
      else:
2116
        filenodes = filter(fn, nodeinfo)
2117
      nodefiles.update((filename,
2118
                        frozenset(map(operator.attrgetter("name"), filenodes)))
2119
                       for filename in files)
2120

    
2121
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2122

    
2123
    fileinfo = dict((filename, {}) for filename in nodefiles)
2124
    ignore_nodes = set()
2125

    
2126
    for node in nodeinfo:
2127
      if node.offline:
2128
        ignore_nodes.add(node.name)
2129
        continue
2130

    
2131
      nresult = all_nvinfo[node.name]
2132

    
2133
      if nresult.fail_msg or not nresult.payload:
2134
        node_files = None
2135
      else:
2136
        node_files = nresult.payload.get(constants.NV_FILELIST, None)
2137

    
2138
      test = not (node_files and isinstance(node_files, dict))
2139
      errorif(test, constants.CV_ENODEFILECHECK, node.name,
2140
              "Node did not return file checksum data")
2141
      if test:
2142
        ignore_nodes.add(node.name)
2143
        continue
2144

    
2145
      # Build per-checksum mapping from filename to nodes having it
2146
      for (filename, checksum) in node_files.items():
2147
        assert filename in nodefiles
2148
        fileinfo[filename].setdefault(checksum, set()).add(node.name)
2149

    
2150
    for (filename, checksums) in fileinfo.items():
2151
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2152

    
2153
      # Nodes having the file
2154
      with_file = frozenset(node_name
2155
                            for nodes in fileinfo[filename].values()
2156
                            for node_name in nodes) - ignore_nodes
2157

    
2158
      expected_nodes = nodefiles[filename] - ignore_nodes
2159

    
2160
      # Nodes missing file
2161
      missing_file = expected_nodes - with_file
2162

    
2163
      if filename in files_opt:
2164
        # All or no nodes
2165
        errorif(missing_file and missing_file != expected_nodes,
2166
                constants.CV_ECLUSTERFILECHECK, None,
2167
                "File %s is optional, but it must exist on all or no"
2168
                " nodes (not found on %s)",
2169
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2170
      else:
2171
        errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2172
                "File %s is missing from node(s) %s", filename,
2173
                utils.CommaJoin(utils.NiceSort(missing_file)))
2174

    
2175
        # Warn if a node has a file it shouldn't
2176
        unexpected = with_file - expected_nodes
2177
        errorif(unexpected,
2178
                constants.CV_ECLUSTERFILECHECK, None,
2179
                "File %s should not exist on node(s) %s",
2180
                filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2181

    
2182
      # See if there are multiple versions of the file
2183
      test = len(checksums) > 1
2184
      if test:
2185
        variants = ["variant %s on %s" %
2186
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2187
                    for (idx, (checksum, nodes)) in
2188
                      enumerate(sorted(checksums.items()))]
2189
      else:
2190
        variants = []
2191

    
2192
      errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2193
              "File %s found with %s different checksums (%s)",
2194
              filename, len(checksums), "; ".join(variants))
2195

    
2196
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2197
                      drbd_map):
2198
    """Verifies and the node DRBD status.
2199

2200
    @type ninfo: L{objects.Node}
2201
    @param ninfo: the node to check
2202
    @param nresult: the remote results for the node
2203
    @param instanceinfo: the dict of instances
2204
    @param drbd_helper: the configured DRBD usermode helper
2205
    @param drbd_map: the DRBD map as returned by
2206
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2207

2208
    """
2209
    node = ninfo.name
2210
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2211

    
2212
    if drbd_helper:
2213
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2214
      test = (helper_result == None)
2215
      _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2216
               "no drbd usermode helper returned")
2217
      if helper_result:
2218
        status, payload = helper_result
2219
        test = not status
2220
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2221
                 "drbd usermode helper check unsuccessful: %s", payload)
2222
        test = status and (payload != drbd_helper)
2223
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2224
                 "wrong drbd usermode helper: %s", payload)
2225

    
2226
    # compute the DRBD minors
2227
    node_drbd = {}
2228
    for minor, instance in drbd_map[node].items():
2229
      test = instance not in instanceinfo
2230
      _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2231
               "ghost instance '%s' in temporary DRBD map", instance)
2232
        # ghost instance should not be running, but otherwise we
2233
        # don't give double warnings (both ghost instance and
2234
        # unallocated minor in use)
2235
      if test:
2236
        node_drbd[minor] = (instance, False)
2237
      else:
2238
        instance = instanceinfo[instance]
2239
        node_drbd[minor] = (instance.name, instance.admin_up)
2240

    
2241
    # and now check them
2242
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2243
    test = not isinstance(used_minors, (tuple, list))
2244
    _ErrorIf(test, constants.CV_ENODEDRBD, node,
2245
             "cannot parse drbd status file: %s", str(used_minors))
2246
    if test:
2247
      # we cannot check drbd status
2248
      return
2249

    
2250
    for minor, (iname, must_exist) in node_drbd.items():
2251
      test = minor not in used_minors and must_exist
2252
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2253
               "drbd minor %d of instance %s is not active", minor, iname)
2254
    for minor in used_minors:
2255
      test = minor not in node_drbd
2256
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2257
               "unallocated drbd minor %d is in use", minor)
2258

    
2259
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2260
    """Builds the node OS structures.
2261

2262
    @type ninfo: L{objects.Node}
2263
    @param ninfo: the node to check
2264
    @param nresult: the remote results for the node
2265
    @param nimg: the node image object
2266

2267
    """
2268
    node = ninfo.name
2269
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2270

    
2271
    remote_os = nresult.get(constants.NV_OSLIST, None)
2272
    test = (not isinstance(remote_os, list) or
2273
            not compat.all(isinstance(v, list) and len(v) == 7
2274
                           for v in remote_os))
2275

    
2276
    _ErrorIf(test, constants.CV_ENODEOS, node,
2277
             "node hasn't returned valid OS data")
2278

    
2279
    nimg.os_fail = test
2280

    
2281
    if test:
2282
      return
2283

    
2284
    os_dict = {}
2285

    
2286
    for (name, os_path, status, diagnose,
2287
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2288

    
2289
      if name not in os_dict:
2290
        os_dict[name] = []
2291

    
2292
      # parameters is a list of lists instead of list of tuples due to
2293
      # JSON lacking a real tuple type, fix it:
2294
      parameters = [tuple(v) for v in parameters]
2295
      os_dict[name].append((os_path, status, diagnose,
2296
                            set(variants), set(parameters), set(api_ver)))
2297

    
2298
    nimg.oslist = os_dict
2299

    
2300
  def _VerifyNodeOS(self, ninfo, nimg, base):
2301
    """Verifies the node OS list.
2302

2303
    @type ninfo: L{objects.Node}
2304
    @param ninfo: the node to check
2305
    @param nimg: the node image object
2306
    @param base: the 'template' node we match against (e.g. from the master)
2307

2308
    """
2309
    node = ninfo.name
2310
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2311

    
2312
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2313

    
2314
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2315
    for os_name, os_data in nimg.oslist.items():
2316
      assert os_data, "Empty OS status for OS %s?!" % os_name
2317
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2318
      _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2319
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2320
      _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2321
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2322
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2323
      # comparisons with the 'base' image
2324
      test = os_name not in base.oslist
2325
      _ErrorIf(test, constants.CV_ENODEOS, node,
2326
               "Extra OS %s not present on reference node (%s)",
2327
               os_name, base.name)
2328
      if test:
2329
        continue
2330
      assert base.oslist[os_name], "Base node has empty OS status?"
2331
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2332
      if not b_status:
2333
        # base OS is invalid, skipping
2334
        continue
2335
      for kind, a, b in [("API version", f_api, b_api),
2336
                         ("variants list", f_var, b_var),
2337
                         ("parameters", beautify_params(f_param),
2338
                          beautify_params(b_param))]:
2339
        _ErrorIf(a != b, constants.CV_ENODEOS, node,
2340
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2341
                 kind, os_name, base.name,
2342
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2343

    
2344
    # check any missing OSes
2345
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2346
    _ErrorIf(missing, constants.CV_ENODEOS, node,
2347
             "OSes present on reference node %s but missing on this node: %s",
2348
             base.name, utils.CommaJoin(missing))
2349

    
2350
  def _VerifyOob(self, ninfo, nresult):
2351
    """Verifies out of band functionality of a node.
2352

2353
    @type ninfo: L{objects.Node}
2354
    @param ninfo: the node to check
2355
    @param nresult: the remote results for the node
2356

2357
    """
2358
    node = ninfo.name
2359
    # We just have to verify the paths on master and/or master candidates
2360
    # as the oob helper is invoked on the master
2361
    if ((ninfo.master_candidate or ninfo.master_capable) and
2362
        constants.NV_OOB_PATHS in nresult):
2363
      for path_result in nresult[constants.NV_OOB_PATHS]:
2364
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2365

    
2366
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2367
    """Verifies and updates the node volume data.
2368

2369
    This function will update a L{NodeImage}'s internal structures
2370
    with data from the remote call.
2371

2372
    @type ninfo: L{objects.Node}
2373
    @param ninfo: the node to check
2374
    @param nresult: the remote results for the node
2375
    @param nimg: the node image object
2376
    @param vg_name: the configured VG name
2377

2378
    """
2379
    node = ninfo.name
2380
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2381

    
2382
    nimg.lvm_fail = True
2383
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2384
    if vg_name is None:
2385
      pass
2386
    elif isinstance(lvdata, basestring):
2387
      _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2388
               utils.SafeEncode(lvdata))
2389
    elif not isinstance(lvdata, dict):
2390
      _ErrorIf(True, constants.CV_ENODELVM, node,
2391
               "rpc call to node failed (lvlist)")
2392
    else:
2393
      nimg.volumes = lvdata
2394
      nimg.lvm_fail = False
2395

    
2396
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2397
    """Verifies and updates the node instance list.
2398

2399
    If the listing was successful, then updates this node's instance
2400
    list. Otherwise, it marks the RPC call as failed for the instance
2401
    list key.
2402

2403
    @type ninfo: L{objects.Node}
2404
    @param ninfo: the node to check
2405
    @param nresult: the remote results for the node
2406
    @param nimg: the node image object
2407

2408
    """
2409
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2410
    test = not isinstance(idata, list)
2411
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2412
                  "rpc call to node failed (instancelist): %s",
2413
                  utils.SafeEncode(str(idata)))
2414
    if test:
2415
      nimg.hyp_fail = True
2416
    else:
2417
      nimg.instances = idata
2418

    
2419
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2420
    """Verifies and computes a node information map
2421

2422
    @type ninfo: L{objects.Node}
2423
    @param ninfo: the node to check
2424
    @param nresult: the remote results for the node
2425
    @param nimg: the node image object
2426
    @param vg_name: the configured VG name
2427

2428
    """
2429
    node = ninfo.name
2430
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2431

    
2432
    # try to read free memory (from the hypervisor)
2433
    hv_info = nresult.get(constants.NV_HVINFO, None)
2434
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2435
    _ErrorIf(test, constants.CV_ENODEHV, node,
2436
             "rpc call to node failed (hvinfo)")
2437
    if not test:
2438
      try:
2439
        nimg.mfree = int(hv_info["memory_free"])
2440
      except (ValueError, TypeError):
2441
        _ErrorIf(True, constants.CV_ENODERPC, node,
2442
                 "node returned invalid nodeinfo, check hypervisor")
2443

    
2444
    # FIXME: devise a free space model for file based instances as well
2445
    if vg_name is not None:
2446
      test = (constants.NV_VGLIST not in nresult or
2447
              vg_name not in nresult[constants.NV_VGLIST])
2448
      _ErrorIf(test, constants.CV_ENODELVM, node,
2449
               "node didn't return data for the volume group '%s'"
2450
               " - it is either missing or broken", vg_name)
2451
      if not test:
2452
        try:
2453
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2454
        except (ValueError, TypeError):
2455
          _ErrorIf(True, constants.CV_ENODERPC, node,
2456
                   "node returned invalid LVM info, check LVM status")
2457

    
2458
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2459
    """Gets per-disk status information for all instances.
2460

2461
    @type nodelist: list of strings
2462
    @param nodelist: Node names
2463
    @type node_image: dict of (name, L{objects.Node})
2464
    @param node_image: Node objects
2465
    @type instanceinfo: dict of (name, L{objects.Instance})
2466
    @param instanceinfo: Instance objects
2467
    @rtype: {instance: {node: [(succes, payload)]}}
2468
    @return: a dictionary of per-instance dictionaries with nodes as
2469
        keys and disk information as values; the disk information is a
2470
        list of tuples (success, payload)
2471

2472
    """
2473
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2474

    
2475
    node_disks = {}
2476
    node_disks_devonly = {}
2477
    diskless_instances = set()
2478
    diskless = constants.DT_DISKLESS
2479

    
2480
    for nname in nodelist:
2481
      node_instances = list(itertools.chain(node_image[nname].pinst,
2482
                                            node_image[nname].sinst))
2483
      diskless_instances.update(inst for inst in node_instances
2484
                                if instanceinfo[inst].disk_template == diskless)
2485
      disks = [(inst, disk)
2486
               for inst in node_instances
2487
               for disk in instanceinfo[inst].disks]
2488

    
2489
      if not disks:
2490
        # No need to collect data
2491
        continue
2492

    
2493
      node_disks[nname] = disks
2494

    
2495
      # Creating copies as SetDiskID below will modify the objects and that can
2496
      # lead to incorrect data returned from nodes
2497
      devonly = [dev.Copy() for (_, dev) in disks]
2498

    
2499
      for dev in devonly:
2500
        self.cfg.SetDiskID(dev, nname)
2501

    
2502
      node_disks_devonly[nname] = devonly
2503

    
2504
    assert len(node_disks) == len(node_disks_devonly)
2505

    
2506
    # Collect data from all nodes with disks
2507
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2508
                                                          node_disks_devonly)
2509

    
2510
    assert len(result) == len(node_disks)
2511

    
2512
    instdisk = {}
2513

    
2514
    for (nname, nres) in result.items():
2515
      disks = node_disks[nname]
2516

    
2517
      if nres.offline:
2518
        # No data from this node
2519
        data = len(disks) * [(False, "node offline")]
2520
      else:
2521
        msg = nres.fail_msg
2522
        _ErrorIf(msg, constants.CV_ENODERPC, nname,
2523
                 "while getting disk information: %s", msg)
2524
        if msg:
2525
          # No data from this node
2526
          data = len(disks) * [(False, msg)]
2527
        else:
2528
          data = []
2529
          for idx, i in enumerate(nres.payload):
2530
            if isinstance(i, (tuple, list)) and len(i) == 2:
2531
              data.append(i)
2532
            else:
2533
              logging.warning("Invalid result from node %s, entry %d: %s",
2534
                              nname, idx, i)
2535
              data.append((False, "Invalid result from the remote node"))
2536

    
2537
      for ((inst, _), status) in zip(disks, data):
2538
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2539

    
2540
    # Add empty entries for diskless instances.
2541
    for inst in diskless_instances:
2542
      assert inst not in instdisk
2543
      instdisk[inst] = {}
2544

    
2545
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2546
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2547
                      compat.all(isinstance(s, (tuple, list)) and
2548
                                 len(s) == 2 for s in statuses)
2549
                      for inst, nnames in instdisk.items()
2550
                      for nname, statuses in nnames.items())
2551
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2552

    
2553
    return instdisk
2554

    
2555
  @staticmethod
2556
  def _SshNodeSelector(group_uuid, all_nodes):
2557
    """Create endless iterators for all potential SSH check hosts.
2558

2559
    """
2560
    nodes = [node for node in all_nodes
2561
             if (node.group != group_uuid and
2562
                 not node.offline)]
2563
    keyfunc = operator.attrgetter("group")
2564

    
2565
    return map(itertools.cycle,
2566
               [sorted(map(operator.attrgetter("name"), names))
2567
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2568
                                                  keyfunc)])
2569

    
2570
  @classmethod
2571
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2572
    """Choose which nodes should talk to which other nodes.
2573

2574
    We will make nodes contact all nodes in their group, and one node from
2575
    every other group.
2576

2577
    @warning: This algorithm has a known issue if one node group is much
2578
      smaller than others (e.g. just one node). In such a case all other
2579
      nodes will talk to the single node.
2580

2581
    """
2582
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2583
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2584

    
2585
    return (online_nodes,
2586
            dict((name, sorted([i.next() for i in sel]))
2587
                 for name in online_nodes))
2588

    
2589
  def BuildHooksEnv(self):
2590
    """Build hooks env.
2591

2592
    Cluster-Verify hooks just ran in the post phase and their failure makes
2593
    the output be logged in the verify output and the verification to fail.
2594

2595
    """
2596
    env = {
2597
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2598
      }
2599

    
2600
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2601
               for node in self.my_node_info.values())
2602

    
2603
    return env
2604

    
2605
  def BuildHooksNodes(self):
2606
    """Build hooks nodes.
2607

2608
    """
2609
    return ([], self.my_node_names)
2610

    
2611
  def Exec(self, feedback_fn):
2612
    """Verify integrity of the node group, performing various test on nodes.
2613

2614
    """
2615
    # This method has too many local variables. pylint: disable=R0914
2616
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2617

    
2618
    if not self.my_node_names:
2619
      # empty node group
2620
      feedback_fn("* Empty node group, skipping verification")
2621
      return True
2622

    
2623
    self.bad = False
2624
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2625
    verbose = self.op.verbose
2626
    self._feedback_fn = feedback_fn
2627

    
2628
    vg_name = self.cfg.GetVGName()
2629
    drbd_helper = self.cfg.GetDRBDHelper()
2630
    cluster = self.cfg.GetClusterInfo()
2631
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2632
    hypervisors = cluster.enabled_hypervisors
2633
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2634

    
2635
    i_non_redundant = [] # Non redundant instances
2636
    i_non_a_balanced = [] # Non auto-balanced instances
2637
    n_offline = 0 # Count of offline nodes
2638
    n_drained = 0 # Count of nodes being drained
2639
    node_vol_should = {}
2640

    
2641
    # FIXME: verify OS list
2642

    
2643
    # File verification
2644
    filemap = _ComputeAncillaryFiles(cluster, False)
2645

    
2646
    # do local checksums
2647
    master_node = self.master_node = self.cfg.GetMasterNode()
2648
    master_ip = self.cfg.GetMasterIP()
2649

    
2650
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2651

    
2652
    node_verify_param = {
2653
      constants.NV_FILELIST:
2654
        utils.UniqueSequence(filename
2655
                             for files in filemap
2656
                             for filename in files),
2657
      constants.NV_NODELIST:
2658
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2659
                                  self.all_node_info.values()),
2660
      constants.NV_HYPERVISOR: hypervisors,
2661
      constants.NV_HVPARAMS:
2662
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2663
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2664
                                 for node in node_data_list
2665
                                 if not node.offline],
2666
      constants.NV_INSTANCELIST: hypervisors,
2667
      constants.NV_VERSION: None,
2668
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2669
      constants.NV_NODESETUP: None,
2670
      constants.NV_TIME: None,
2671
      constants.NV_MASTERIP: (master_node, master_ip),
2672
      constants.NV_OSLIST: None,
2673
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2674
      }
2675

    
2676
    if vg_name is not None:
2677
      node_verify_param[constants.NV_VGLIST] = None
2678
      node_verify_param[constants.NV_LVLIST] = vg_name
2679
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2680
      node_verify_param[constants.NV_DRBDLIST] = None
2681

    
2682
    if drbd_helper:
2683
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2684

    
2685
    # bridge checks
2686
    # FIXME: this needs to be changed per node-group, not cluster-wide
2687
    bridges = set()
2688
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2689
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2690
      bridges.add(default_nicpp[constants.NIC_LINK])
2691
    for instance in self.my_inst_info.values():
2692
      for nic in instance.nics:
2693
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2694
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2695
          bridges.add(full_nic[constants.NIC_LINK])
2696

    
2697
    if bridges:
2698
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2699

    
2700
    # Build our expected cluster state
2701
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2702
                                                 name=node.name,
2703
                                                 vm_capable=node.vm_capable))
2704
                      for node in node_data_list)
2705

    
2706
    # Gather OOB paths
2707
    oob_paths = []
2708
    for node in self.all_node_info.values():
2709
      path = _SupportsOob(self.cfg, node)
2710
      if path and path not in oob_paths:
2711
        oob_paths.append(path)
2712

    
2713
    if oob_paths:
2714
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2715

    
2716
    for instance in self.my_inst_names:
2717
      inst_config = self.my_inst_info[instance]
2718

    
2719
      for nname in inst_config.all_nodes:
2720
        if nname not in node_image:
2721
          gnode = self.NodeImage(name=nname)
2722
          gnode.ghost = (nname not in self.all_node_info)
2723
          node_image[nname] = gnode
2724

    
2725
      inst_config.MapLVsByNode(node_vol_should)
2726

    
2727
      pnode = inst_config.primary_node
2728
      node_image[pnode].pinst.append(instance)
2729

    
2730
      for snode in inst_config.secondary_nodes:
2731
        nimg = node_image[snode]
2732
        nimg.sinst.append(instance)
2733
        if pnode not in nimg.sbp:
2734
          nimg.sbp[pnode] = []
2735
        nimg.sbp[pnode].append(instance)
2736

    
2737
    # At this point, we have the in-memory data structures complete,
2738
    # except for the runtime information, which we'll gather next
2739

    
2740
    # Due to the way our RPC system works, exact response times cannot be
2741
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2742
    # time before and after executing the request, we can at least have a time
2743
    # window.
2744
    nvinfo_starttime = time.time()
2745
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2746
                                           node_verify_param,
2747
                                           self.cfg.GetClusterName())
2748
    nvinfo_endtime = time.time()
2749

    
2750
    if self.extra_lv_nodes and vg_name is not None:
2751
      extra_lv_nvinfo = \
2752
          self.rpc.call_node_verify(self.extra_lv_nodes,
2753
                                    {constants.NV_LVLIST: vg_name},
2754
                                    self.cfg.GetClusterName())
2755
    else:
2756
      extra_lv_nvinfo = {}
2757

    
2758
    all_drbd_map = self.cfg.ComputeDRBDMap()
2759

    
2760
    feedback_fn("* Gathering disk information (%s nodes)" %
2761
                len(self.my_node_names))
2762
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2763
                                     self.my_inst_info)
2764

    
2765
    feedback_fn("* Verifying configuration file consistency")
2766

    
2767
    # If not all nodes are being checked, we need to make sure the master node
2768
    # and a non-checked vm_capable node are in the list.
2769
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2770
    if absent_nodes:
2771
      vf_nvinfo = all_nvinfo.copy()
2772
      vf_node_info = list(self.my_node_info.values())
2773
      additional_nodes = []
2774
      if master_node not in self.my_node_info:
2775
        additional_nodes.append(master_node)
2776
        vf_node_info.append(self.all_node_info[master_node])
2777
      # Add the first vm_capable node we find which is not included
2778
      for node in absent_nodes:
2779
        nodeinfo = self.all_node_info[node]
2780
        if nodeinfo.vm_capable and not nodeinfo.offline:
2781
          additional_nodes.append(node)
2782
          vf_node_info.append(self.all_node_info[node])
2783
          break
2784
      key = constants.NV_FILELIST
2785
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2786
                                                 {key: node_verify_param[key]},
2787
                                                 self.cfg.GetClusterName()))
2788
    else:
2789
      vf_nvinfo = all_nvinfo
2790
      vf_node_info = self.my_node_info.values()
2791

    
2792
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2793

    
2794
    feedback_fn("* Verifying node status")
2795

    
2796
    refos_img = None
2797

    
2798
    for node_i in node_data_list:
2799
      node = node_i.name
2800
      nimg = node_image[node]
2801

    
2802
      if node_i.offline:
2803
        if verbose:
2804
          feedback_fn("* Skipping offline node %s" % (node,))
2805
        n_offline += 1
2806
        continue
2807

    
2808
      if node == master_node:
2809
        ntype = "master"
2810
      elif node_i.master_candidate:
2811
        ntype = "master candidate"
2812
      elif node_i.drained:
2813
        ntype = "drained"
2814
        n_drained += 1
2815
      else:
2816
        ntype = "regular"
2817
      if verbose:
2818
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2819

    
2820
      msg = all_nvinfo[node].fail_msg
2821
      _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2822
               msg)
2823
      if msg:
2824
        nimg.rpc_fail = True
2825
        continue
2826

    
2827
      nresult = all_nvinfo[node].payload
2828

    
2829
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2830
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2831
      self._VerifyNodeNetwork(node_i, nresult)
2832
      self._VerifyOob(node_i, nresult)
2833

    
2834
      if nimg.vm_capable:
2835
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2836
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2837
                             all_drbd_map)
2838

    
2839
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2840
        self._UpdateNodeInstances(node_i, nresult, nimg)
2841
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2842
        self._UpdateNodeOS(node_i, nresult, nimg)
2843

    
2844
        if not nimg.os_fail:
2845
          if refos_img is None:
2846
            refos_img = nimg
2847
          self._VerifyNodeOS(node_i, nimg, refos_img)
2848
        self._VerifyNodeBridges(node_i, nresult, bridges)
2849

    
2850
        # Check whether all running instancies are primary for the node. (This
2851
        # can no longer be done from _VerifyInstance below, since some of the
2852
        # wrong instances could be from other node groups.)
2853
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2854

    
2855
        for inst in non_primary_inst:
2856
          test = inst in self.all_inst_info
2857
          _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2858
                   "instance should not run on node %s", node_i.name)
2859
          _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2860
                   "node is running unknown instance %s", inst)
2861

    
2862
    for node, result in extra_lv_nvinfo.items():
2863
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2864
                              node_image[node], vg_name)
2865

    
2866
    feedback_fn("* Verifying instance status")
2867
    for instance in self.my_inst_names:
2868
      if verbose:
2869
        feedback_fn("* Verifying instance %s" % instance)
2870
      inst_config = self.my_inst_info[instance]
2871
      self._VerifyInstance(instance, inst_config, node_image,
2872
                           instdisk[instance])
2873
      inst_nodes_offline = []
2874

    
2875
      pnode = inst_config.primary_node
2876
      pnode_img = node_image[pnode]
2877
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2878
               constants.CV_ENODERPC, pnode, "instance %s, connection to"
2879
               " primary node failed", instance)
2880

    
2881
      _ErrorIf(inst_config.admin_up and pnode_img.offline,
2882
               constants.CV_EINSTANCEBADNODE, instance,
2883
               "instance is marked as running and lives on offline node %s",
2884
               inst_config.primary_node)
2885

    
2886
      # If the instance is non-redundant we cannot survive losing its primary
2887
      # node, so we are not N+1 compliant. On the other hand we have no disk
2888
      # templates with more than one secondary so that situation is not well
2889
      # supported either.
2890
      # FIXME: does not support file-backed instances
2891
      if not inst_config.secondary_nodes:
2892
        i_non_redundant.append(instance)
2893

    
2894
      _ErrorIf(len(inst_config.secondary_nodes) > 1,
2895
               constants.CV_EINSTANCELAYOUT,
2896
               instance, "instance has multiple secondary nodes: %s",
2897
               utils.CommaJoin(inst_config.secondary_nodes),
2898
               code=self.ETYPE_WARNING)
2899

    
2900
      if inst_config.disk_template in constants.DTS_INT_MIRROR:
2901
        pnode = inst_config.primary_node
2902
        instance_nodes = utils.NiceSort(inst_config.all_nodes)
2903
        instance_groups = {}
2904

    
2905
        for node in instance_nodes:
2906
          instance_groups.setdefault(self.all_node_info[node].group,
2907
                                     []).append(node)
2908

    
2909
        pretty_list = [
2910
          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
2911
          # Sort so that we always list the primary node first.
2912
          for group, nodes in sorted(instance_groups.items(),
2913
                                     key=lambda (_, nodes): pnode in nodes,
2914
                                     reverse=True)]
2915

    
2916
        self._ErrorIf(len(instance_groups) > 1,
2917
                      constants.CV_EINSTANCESPLITGROUPS,
2918
                      instance, "instance has primary and secondary nodes in"
2919
                      " different groups: %s", utils.CommaJoin(pretty_list),
2920
                      code=self.ETYPE_WARNING)
2921

    
2922
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2923
        i_non_a_balanced.append(instance)
2924

    
2925
      for snode in inst_config.secondary_nodes:
2926
        s_img = node_image[snode]
2927
        _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2928
                 snode, "instance %s, connection to secondary node failed",
2929
                 instance)
2930

    
2931
        if s_img.offline:
2932
          inst_nodes_offline.append(snode)
2933

    
2934
      # warn that the instance lives on offline nodes
2935
      _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
2936
               "instance has offline secondary node(s) %s",
2937
               utils.CommaJoin(inst_nodes_offline))
2938
      # ... or ghost/non-vm_capable nodes
2939
      for node in inst_config.all_nodes:
2940
        _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
2941
                 instance, "instance lives on ghost node %s", node)
2942
        _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
2943
                 instance, "instance lives on non-vm_capable node %s", node)
2944

    
2945
    feedback_fn("* Verifying orphan volumes")
2946
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2947

    
2948
    # We will get spurious "unknown volume" warnings if any node of this group
2949
    # is secondary for an instance whose primary is in another group. To avoid
2950
    # them, we find these instances and add their volumes to node_vol_should.
2951
    for inst in self.all_inst_info.values():
2952
      for secondary in inst.secondary_nodes:
2953
        if (secondary in self.my_node_info
2954
            and inst.name not in self.my_inst_info):
2955
          inst.MapLVsByNode(node_vol_should)
2956
          break
2957

    
2958
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2959

    
2960
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2961
      feedback_fn("* Verifying N+1 Memory redundancy")
2962
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2963

    
2964
    feedback_fn("* Other Notes")
2965
    if i_non_redundant:
2966
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2967
                  % len(i_non_redundant))
2968

    
2969
    if i_non_a_balanced:
2970
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2971
                  % len(i_non_a_balanced))
2972

    
2973
    if n_offline:
2974
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2975

    
2976
    if n_drained:
2977
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2978

    
2979
    return not self.bad
2980

    
2981
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2982
    """Analyze the post-hooks' result
2983

2984
    This method analyses the hook result, handles it, and sends some
2985
    nicely-formatted feedback back to the user.
2986

2987
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2988
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2989
    @param hooks_results: the results of the multi-node hooks rpc call
2990
    @param feedback_fn: function used send feedback back to the caller
2991
    @param lu_result: previous Exec result
2992
    @return: the new Exec result, based on the previous result
2993
        and hook results
2994

2995
    """
2996
    # We only really run POST phase hooks, only for non-empty groups,
2997
    # and are only interested in their results
2998
    if not self.my_node_names:
2999
      # empty node group
3000
      pass
3001
    elif phase == constants.HOOKS_PHASE_POST:
3002
      # Used to change hooks' output to proper indentation
3003
      feedback_fn("* Hooks Results")
3004
      assert hooks_results, "invalid result from hooks"
3005

    
3006
      for node_name in hooks_results:
3007
        res = hooks_results[node_name]
3008
        msg = res.fail_msg
3009
        test = msg and not res.offline
3010
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3011
                      "Communication failure in hooks execution: %s", msg)
3012
        if res.offline or msg:
3013
          # No need to investigate payload if node is offline or gave
3014
          # an error.
3015
          continue
3016
        for script, hkr, output in res.payload:
3017
          test = hkr == constants.HKR_FAIL
3018
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3019
                        "Script %s failed, output:", script)
3020
          if test:
3021
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3022
            feedback_fn("%s" % output)
3023
            lu_result = False
3024

    
3025
    return lu_result
3026

    
3027

    
3028
class LUClusterVerifyDisks(NoHooksLU):
3029
  """Verifies the cluster disks status.
3030

3031
  """
3032
  REQ_BGL = False
3033

    
3034
  def ExpandNames(self):
3035
    self.share_locks = _ShareAll()
3036
    self.needed_locks = {
3037
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3038
      }
3039

    
3040
  def Exec(self, feedback_fn):
3041
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3042

    
3043
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3044
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3045
                           for group in group_names])
3046

    
3047

    
3048
class LUGroupVerifyDisks(NoHooksLU):
3049
  """Verifies the status of all disks in a node group.
3050

3051
  """
3052
  REQ_BGL = False
3053

    
3054
  def ExpandNames(self):
3055
    # Raises errors.OpPrereqError on its own if group can't be found
3056
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
3057

    
3058
    self.share_locks = _ShareAll()
3059
    self.needed_locks = {
3060
      locking.LEVEL_INSTANCE: [],
3061
      locking.LEVEL_NODEGROUP: [],
3062
      locking.LEVEL_NODE: [],
3063
      }
3064

    
3065
  def DeclareLocks(self, level):
3066
    if level == locking.LEVEL_INSTANCE:
3067
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
3068

    
3069
      # Lock instances optimistically, needs verification once node and group
3070
      # locks have been acquired
3071
      self.needed_locks[locking.LEVEL_INSTANCE] = \
3072
        self.cfg.GetNodeGroupInstances(self.group_uuid)
3073

    
3074
    elif level == locking.LEVEL_NODEGROUP:
3075
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3076

    
3077
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
3078
        set([self.group_uuid] +
3079
            # Lock all groups used by instances optimistically; this requires
3080
            # going via the node before it's locked, requiring verification
3081
            # later on
3082
            [group_uuid
3083
             for instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
3084
             for group_uuid in self.cfg.GetInstanceNodeGroups(instance_name)])
3085

    
3086
    elif level == locking.LEVEL_NODE:
3087
      # This will only lock the nodes in the group to be verified which contain
3088
      # actual instances
3089
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3090
      self._LockInstancesNodes()
3091

    
3092
      # Lock all nodes in group to be verified
3093
      assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
3094
      member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
3095
      self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
3096

    
3097
  def CheckPrereq(self):
3098
    owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
3099
    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
3100
    owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
3101

    
3102
    assert self.group_uuid in owned_groups
3103

    
3104
    # Check if locked instances are still correct
3105
    _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
3106

    
3107
    # Get instance information
3108
    self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
3109

    
3110
    # Check if node groups for locked instances are still correct
3111
    for (instance_name, inst) in self.instances.items():
3112
      assert owned_nodes.issuperset(inst.all_nodes), \
3113
        "Instance %s's nodes changed while we kept the lock" % instance_name
3114

    
3115
      inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
3116
                                             owned_groups)
3117

    
3118
      assert self.group_uuid in inst_groups, \
3119
        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
3120

    
3121
  def Exec(self, feedback_fn):
3122
    """Verify integrity of cluster disks.
3123

3124
    @rtype: tuple of three items
3125
    @return: a tuple of (dict of node-to-node_error, list of instances
3126
        which need activate-disks, dict of instance: (node, volume) for
3127
        missing volumes
3128

3129
    """
3130
    res_nodes = {}
3131
    res_instances = set()
3132
    res_missing = {}
3133

    
3134
    nv_dict = _MapInstanceDisksToNodes([inst
3135
                                        for inst in self.instances.values()
3136
                                        if inst.admin_up])
3137

    
3138
    if nv_dict:
3139
      nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
3140
                             set(self.cfg.GetVmCapableNodeList()))
3141

    
3142
      node_lvs = self.rpc.call_lv_list(nodes, [])
3143

    
3144
      for (node, node_res) in node_lvs.items():
3145
        if node_res.offline:
3146
          continue
3147

    
3148
        msg = node_res.fail_msg
3149
        if msg:
3150
          logging.warning("Error enumerating LVs on node %s: %s", node, msg)
3151
          res_nodes[node] = msg
3152
          continue
3153

    
3154
        for lv_name, (_, _, lv_online) in node_res.payload.items():
3155
          inst = nv_dict.pop((node, lv_name), None)
3156
          if not (lv_online or inst is None):
3157
            res_instances.add(inst)
3158

    
3159
      # any leftover items in nv_dict are missing LVs, let's arrange the data
3160
      # better
3161
      for key, inst in nv_dict.iteritems():
3162
        res_missing.setdefault(inst, []).append(list(key))
3163

    
3164
    return (res_nodes, list(res_instances), res_missing)
3165

    
3166

    
3167
class LUClusterRepairDiskSizes(NoHooksLU):
3168
  """Verifies the cluster disks sizes.
3169

3170
  """
3171
  REQ_BGL = False
3172

    
3173
  def ExpandNames(self):
3174
    if self.op.instances:
3175
      self.wanted_names = _GetWantedInstances(self, self.op.instances)
3176
      self.needed_locks = {
3177
        locking.LEVEL_NODE: [],
3178
        locking.LEVEL_INSTANCE: self.wanted_names,
3179
        }
3180
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3181
    else:
3182
      self.wanted_names = None
3183
      self.needed_locks = {
3184
        locking.LEVEL_NODE: locking.ALL_SET,
3185
        locking.LEVEL_INSTANCE: locking.ALL_SET,
3186
        }
3187
    self.share_locks = _ShareAll()
3188

    
3189
  def DeclareLocks(self, level):
3190
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
3191
      self._LockInstancesNodes(primary_only=True)
3192

    
3193
  def CheckPrereq(self):
3194
    """Check prerequisites.
3195

3196
    This only checks the optional instance list against the existing names.
3197

3198
    """
3199
    if self.wanted_names is None:
3200
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
3201

    
3202
    self.wanted_instances = \
3203
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
3204

    
3205
  def _EnsureChildSizes(self, disk):
3206
    """Ensure children of the disk have the needed disk size.
3207

3208
    This is valid mainly for DRBD8 and fixes an issue where the
3209
    children have smaller disk size.
3210

3211
    @param disk: an L{ganeti.objects.Disk} object
3212

3213
    """
3214
    if disk.dev_type == constants.LD_DRBD8:
3215
      assert disk.children, "Empty children for DRBD8?"
3216
      fchild = disk.children[0]
3217
      mismatch = fchild.size < disk.size
3218
      if mismatch:
3219
        self.LogInfo("Child disk has size %d, parent %d, fixing",
3220
                     fchild.size, disk.size)
3221
        fchild.size = disk.size
3222

    
3223
      # and we recurse on this child only, not on the metadev
3224
      return self._EnsureChildSizes(fchild) or mismatch
3225
    else:
3226
      return False
3227

    
3228
  def Exec(self, feedback_fn):
3229
    """Verify the size of cluster disks.
3230

3231
    """
3232
    # TODO: check child disks too
3233
    # TODO: check differences in size between primary/secondary nodes
3234
    per_node_disks = {}
3235
    for instance in self.wanted_instances:
3236
      pnode = instance.primary_node
3237
      if pnode not in per_node_disks:
3238
        per_node_disks[pnode] = []
3239
      for idx, disk in enumerate(instance.disks):
3240
        per_node_disks[pnode].append((instance, idx, disk))
3241

    
3242
    changed = []
3243
    for node, dskl in per_node_disks.items():
3244
      newl = [v[2].Copy() for v in dskl]
3245
      for dsk in newl:
3246
        self.cfg.SetDiskID(dsk, node)
3247
      result = self.rpc.call_blockdev_getsize(node, newl)
3248
      if result.fail_msg:
3249
        self.LogWarning("Failure in blockdev_getsize call to node"
3250
                        " %s, ignoring", node)
3251
        continue
3252
      if len(result.payload) != len(dskl):
3253
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
3254
                        " result.payload=%s", node, len(dskl), result.payload)
3255
        self.LogWarning("Invalid result from node %s, ignoring node results",
3256
                        node)
3257
        continue
3258
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
3259
        if size is None:
3260
          self.LogWarning("Disk %d of instance %s did not return size"
3261
                          " information, ignoring", idx, instance.name)
3262
          continue
3263
        if not isinstance(size, (int, long)):
3264
          self.LogWarning("Disk %d of instance %s did not return valid"
3265
                          " size information, ignoring", idx, instance.name)
3266
          continue
3267
        size = size >> 20
3268
        if size != disk.size:
3269
          self.LogInfo("Disk %d of instance %s has mismatched size,"
3270
                       " correcting: recorded %d, actual %d", idx,
3271
                       instance.name, disk.size, size)
3272
          disk.size = size
3273
          self.cfg.Update(instance, feedback_fn)
3274
          changed.append((instance.name, idx, size))
3275
        if self._EnsureChildSizes(disk):
3276
          self.cfg.Update(instance, feedback_fn)
3277
          changed.append((instance.name, idx, disk.size))
3278
    return changed
3279

    
3280

    
3281
class LUClusterRename(LogicalUnit):
3282
  """Rename the cluster.
3283

3284
  """
3285
  HPATH = "cluster-rename"
3286
  HTYPE = constants.HTYPE_CLUSTER
3287

    
3288
  def BuildHooksEnv(self):
3289
    """Build hooks env.
3290

3291
    """
3292
    return {
3293
      "OP_TARGET": self.cfg.GetClusterName(),
3294
      "NEW_NAME": self.op.name,
3295
      }
3296

    
3297
  def BuildHooksNodes(self):
3298
    """Build hooks nodes.
3299

3300
    """
3301
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
3302

    
3303
  def CheckPrereq(self):
3304
    """Verify that the passed name is a valid one.
3305

3306
    """
3307
    hostname = netutils.GetHostname(name=self.op.name,
3308
                                    family=self.cfg.GetPrimaryIPFamily())
3309

    
3310
    new_name = hostname.name
3311
    self.ip = new_ip = hostname.ip
3312
    old_name = self.cfg.GetClusterName()
3313
    old_ip = self.cfg.GetMasterIP()
3314
    if new_name == old_name and new_ip == old_ip:
3315
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
3316
                                 " cluster has changed",
3317
                                 errors.ECODE_INVAL)
3318
    if new_ip != old_ip:
3319
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
3320
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
3321
                                   " reachable on the network" %
3322
                                   new_ip, errors.ECODE_NOTUNIQUE)
3323

    
3324
    self.op.name = new_name
3325

    
3326
  def Exec(self, feedback_fn):
3327
    """Rename the cluster.
3328

3329
    """
3330
    clustername = self.op.name
3331
    new_ip = self.ip
3332

    
3333
    # shutdown the master IP
3334
    master_params = self.cfg.GetMasterNetworkParameters()
3335
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
3336
                                                     master_params)
3337
    result.Raise("Could not disable the master role")
3338

    
3339
    try:
3340
      cluster = self.cfg.GetClusterInfo()
3341
      cluster.cluster_name = clustername
3342
      cluster.master_ip = new_ip
3343
      self.cfg.Update(cluster, feedback_fn)
3344

    
3345
      # update the known hosts file
3346
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
3347
      node_list = self.cfg.GetOnlineNodeList()
3348
      try:
3349
        node_list.remove(master_params.name)
3350
      except ValueError:
3351
        pass
3352
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
3353
    finally:
3354
      master_params.ip = new_ip
3355
      result = self.rpc.call_node_activate_master_ip(master_params.name,
3356
                                                     master_params)
3357
      msg = result.fail_msg
3358
      if msg:
3359
        self.LogWarning("Could not re-enable the master role on"
3360
                        " the master, please restart manually: %s", msg)
3361

    
3362
    return clustername
3363

    
3364

    
3365
def _ValidateNetmask(cfg, netmask):
3366
  """Checks if a netmask is valid.
3367

3368
  @type cfg: L{config.ConfigWriter}
3369
  @param cfg: The cluster configuration
3370
  @type netmask: int
3371
  @param netmask: the netmask to be verified
3372
  @raise errors.OpPrereqError: if the validation fails
3373

3374
  """
3375
  ip_family = cfg.GetPrimaryIPFamily()
3376
  try:
3377
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
3378
  except errors.ProgrammerError:
3379
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
3380
                               ip_family)
3381
  if not ipcls.ValidateNetmask(netmask):
3382
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
3383
                                (netmask))
3384

    
3385

    
3386
class LUClusterSetParams(LogicalUnit):
3387
  """Change the parameters of the cluster.
3388

3389
  """
3390
  HPATH = "cluster-modify"
3391
  HTYPE = constants.HTYPE_CLUSTER
3392
  REQ_BGL = False
3393

    
3394
  def CheckArguments(self):
3395
    """Check parameters
3396

3397
    """
3398
    if self.op.uid_pool:
3399
      uidpool.CheckUidPool(self.op.uid_pool)
3400

    
3401
    if self.op.add_uids:
3402
      uidpool.CheckUidPool(self.op.add_uids)
3403

    
3404
    if self.op.remove_uids:
3405
      uidpool.CheckUidPool(self.op.remove_uids)
3406

    
3407
    if self.op.master_netmask is not None:
3408
      _ValidateNetmask(self.cfg, self.op.master_netmask)
3409

    
3410
  def ExpandNames(self):
3411
    # FIXME: in the future maybe other cluster params won't require checking on
3412
    # all nodes to be modified.
3413
    self.needed_locks = {
3414
      locking.LEVEL_NODE: locking.ALL_SET,
3415
    }
3416
    self.share_locks[locking.LEVEL_NODE] = 1
3417

    
3418
  def BuildHooksEnv(self):
3419
    """Build hooks env.
3420

3421
    """
3422
    return {
3423
      "OP_TARGET": self.cfg.GetClusterName(),
3424
      "NEW_VG_NAME": self.op.vg_name,
3425
      }
3426

    
3427
  def BuildHooksNodes(self):
3428
    """Build hooks nodes.
3429

3430
    """
3431
    mn = self.cfg.GetMasterNode()
3432
    return ([mn], [mn])
3433

    
3434
  def CheckPrereq(self):
3435
    """Check prerequisites.
3436

3437
    This checks whether the given params don't conflict and
3438
    if the given volume group is valid.
3439

3440
    """
3441
    if self.op.vg_name is not None and not self.op.vg_name:
3442
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
3443
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
3444
                                   " instances exist", errors.ECODE_INVAL)
3445

    
3446
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
3447
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
3448
        raise errors.OpPrereqError("Cannot disable drbd helper while"
3449
                                   " drbd-based instances exist",
3450
                                   errors.ECODE_INVAL)
3451

    
3452
    node_list = self.owned_locks(locking.LEVEL_NODE)
3453

    
3454
    # if vg_name not None, checks given volume group on all nodes
3455
    if self.op.vg_name:
3456
      vglist = self.rpc.call_vg_list(node_list)
3457
      for node in node_list:
3458
        msg = vglist[node].fail_msg
3459
        if msg:
3460
          # ignoring down node
3461
          self.LogWarning("Error while gathering data on node %s"
3462
                          " (ignoring node): %s", node, msg)
3463
          continue
3464
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
3465
                                              self.op.vg_name,
3466
                                              constants.MIN_VG_SIZE)
3467
        if vgstatus:
3468
          raise errors.OpPrereqError("Error on node '%s': %s" %
3469
                                     (node, vgstatus), errors.ECODE_ENVIRON)
3470

    
3471
    if self.op.drbd_helper:
3472
      # checks given drbd helper on all nodes
3473
      helpers = self.rpc.call_drbd_helper(node_list)
3474
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
3475
        if ninfo.offline:
3476
          self.LogInfo("Not checking drbd helper on offline node %s", node)
3477
          continue
3478
        msg = helpers[node].fail_msg
3479
        if msg:
3480
          raise errors.OpPrereqError("Error checking drbd helper on node"
3481
                                     " '%s': %s" % (node, msg),
3482
                                     errors.ECODE_ENVIRON)
3483
        node_helper = helpers[node].payload
3484
        if node_helper != self.op.drbd_helper:
3485
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
3486
                                     (node, node_helper), errors.ECODE_ENVIRON)
3487

    
3488
    self.cluster = cluster = self.cfg.GetClusterInfo()
3489
    # validate params changes
3490
    if self.op.beparams:
3491
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
3492
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
3493

    
3494
    if self.op.ndparams:
3495
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
3496
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
3497

    
3498
      # TODO: we need a more general way to handle resetting
3499
      # cluster-level parameters to default values
3500
      if self.new_ndparams["oob_program"] == "":
3501
        self.new_ndparams["oob_program"] = \
3502
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
3503

    
3504
    if self.op.nicparams:
3505
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
3506
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
3507
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
3508
      nic_errors = []
3509

    
3510
      # check all instances for consistency
3511
      for instance in self.cfg.GetAllInstancesInfo().values():
3512
        for nic_idx, nic in enumerate(instance.nics):
3513
          params_copy = copy.deepcopy(nic.nicparams)
3514
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
3515

    
3516
          # check parameter syntax
3517
          try:
3518
            objects.NIC.CheckParameterSyntax(params_filled)
3519
          except errors.ConfigurationError, err:
3520
            nic_errors.append("Instance %s, nic/%d: %s" %
3521
                              (instance.name, nic_idx, err))
3522

    
3523
          # if we're moving instances to routed, check that they have an ip
3524
          target_mode = params_filled[constants.NIC_MODE]
3525
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
3526
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
3527
                              " address" % (instance.name, nic_idx))
3528
      if nic_errors:
3529
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
3530
                                   "\n".join(nic_errors))
3531

    
3532
    # hypervisor list/parameters
3533
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
3534
    if self.op.hvparams:
3535
      for hv_name, hv_dict in self.op.hvparams.items():
3536
        if hv_name not in self.new_hvparams:
3537
          self.new_hvparams[hv_name] = hv_dict
3538
        else:
3539
          self.new_hvparams[hv_name].update(hv_dict)
3540

    
3541
    # os hypervisor parameters
3542
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
3543
    if self.op.os_hvp:
3544
      for os_name, hvs in self.op.os_hvp.items():
3545
        if os_name not in self.new_os_hvp:
3546
          self.new_os_hvp[os_name] = hvs
3547
        else:
3548
          for hv_name, hv_dict in hvs.items():
3549
            if hv_name not in self.new_os_hvp[os_name]:
3550
              self.new_os_hvp[os_name][hv_name] = hv_dict
3551
            else:
3552
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
3553

    
3554
    # os parameters
3555
    self.new_osp = objects.FillDict(cluster.osparams, {})
3556
    if self.op.osparams:
3557
      for os_name, osp in self.op.osparams.items():
3558
        if os_name not in self.new_osp:
3559
          self.new_osp[os_name] = {}
3560

    
3561
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
3562
                                                  use_none=True)
3563

    
3564
        if not self.new_osp[os_name]:
3565
          # we removed all parameters
3566
          del self.new_osp[os_name]
3567
        else:
3568
          # check the parameter validity (remote check)
3569
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
3570
                         os_name, self.new_osp[os_name])
3571

    
3572
    # changes to the hypervisor list
3573
    if self.op.enabled_hypervisors is not None:
3574
      self.hv_list = self.op.enabled_hypervisors
3575
      for hv in self.hv_list:
3576
        # if the hypervisor doesn't already exist in the cluster
3577
        # hvparams, we initialize it to empty, and then (in both
3578
        # cases) we make sure to fill the defaults, as we might not
3579
        # have a complete defaults list if the hypervisor wasn't
3580
        # enabled before
3581
        if hv not in new_hvp:
3582
          new_hvp[hv] = {}
3583
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
3584
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
3585
    else:
3586
      self.hv_list = cluster.enabled_hypervisors
3587

    
3588
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
3589
      # either the enabled list has changed, or the parameters have, validate
3590
      for hv_name, hv_params in self.new_hvparams.items():
3591
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
3592
            (self.op.enabled_hypervisors and
3593
             hv_name in self.op.enabled_hypervisors)):
3594
          # either this is a new hypervisor, or its parameters have changed
3595
          hv_class = hypervisor.GetHypervisor(hv_name)
3596
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3597
          hv_class.CheckParameterSyntax(hv_params)
3598
          _CheckHVParams(self, node_list, hv_name, hv_params)
3599

    
3600
    if self.op.os_hvp:
3601
      # no need to check any newly-enabled hypervisors, since the
3602
      # defaults have already been checked in the above code-block
3603
      for os_name, os_hvp in self.new_os_hvp.items():
3604
        for hv_name, hv_params in os_hvp.items():
3605
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3606
          # we need to fill in the new os_hvp on top of the actual hv_p
3607
          cluster_defaults = self.new_hvparams.get(hv_name, {})
3608
          new_osp = objects.FillDict(cluster_defaults, hv_params)
3609
          hv_class = hypervisor.GetHypervisor(hv_name)
3610
          hv_class.CheckParameterSyntax(new_osp)
3611
          _CheckHVParams(self, node_list, hv_name, new_osp)
3612

    
3613
    if self.op.default_iallocator:
3614
      alloc_script = utils.FindFile(self.op.default_iallocator,
3615
                                    constants.IALLOCATOR_SEARCH_PATH,
3616
                                    os.path.isfile)
3617
      if alloc_script is None:
3618
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
3619
                                   " specified" % self.op.default_iallocator,
3620
                                   errors.ECODE_INVAL)
3621

    
3622
  def Exec(self, feedback_fn):
3623
    """Change the parameters of the cluster.
3624

3625
    """
3626
    if self.op.vg_name is not None:
3627
      new_volume = self.op.vg_name
3628
      if not new_volume:
3629
        new_volume = None
3630
      if new_volume != self.cfg.GetVGName():
3631
        self.cfg.SetVGName(new_volume)
3632
      else:
3633
        feedback_fn("Cluster LVM configuration already in desired"
3634
                    " state, not changing")
3635
    if self.op.drbd_helper is not None:
3636
      new_helper = self.op.drbd_helper
3637
      if not new_helper:
3638
        new_helper = None
3639
      if new_helper != self.cfg.GetDRBDHelper():
3640
        self.cfg.SetDRBDHelper(new_helper)
3641
      else:
3642
        feedback_fn("Cluster DRBD helper already in desired state,"
3643
                    " not changing")
3644
    if self.op.hvparams:
3645
      self.cluster.hvparams = self.new_hvparams
3646
    if self.op.os_hvp:
3647
      self.cluster.os_hvp = self.new_os_hvp
3648
    if self.op.enabled_hypervisors is not None:
3649
      self.cluster.hvparams = self.new_hvparams
3650
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
3651
    if self.op.beparams:
3652
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
3653
    if self.op.nicparams:
3654
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
3655
    if self.op.osparams:
3656
      self.cluster.osparams = self.new_osp
3657
    if self.op.ndparams:
3658
      self.cluster.ndparams = self.new_ndparams
3659

    
3660
    if self.op.candidate_pool_size is not None:
3661
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3662
      # we need to update the pool size here, otherwise the save will fail
3663
      _AdjustCandidatePool(self, [])
3664

    
3665
    if self.op.maintain_node_health is not None:
3666
      self.cluster.maintain_node_health = self.op.maintain_node_health
3667

    
3668
    if self.op.prealloc_wipe_disks is not None:
3669
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3670

    
3671
    if self.op.add_uids is not None:
3672
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3673

    
3674
    if self.op.remove_uids is not None:
3675
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3676

    
3677
    if self.op.uid_pool is not None:
3678
      self.cluster.uid_pool = self.op.uid_pool
3679

    
3680
    if self.op.default_iallocator is not None:
3681
      self.cluster.default_iallocator = self.op.default_iallocator
3682

    
3683
    if self.op.reserved_lvs is not None:
3684
      self.cluster.reserved_lvs = self.op.reserved_lvs
3685

    
3686
    def helper_os(aname, mods, desc):
3687
      desc += " OS list"
3688
      lst = getattr(self.cluster, aname)
3689
      for key, val in mods:
3690
        if key == constants.DDM_ADD:
3691
          if val in lst:
3692
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3693
          else:
3694
            lst.append(val)
3695
        elif key == constants.DDM_REMOVE:
3696
          if val in lst:
3697
            lst.remove(val)
3698
          else:
3699
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3700
        else:
3701
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3702

    
3703
    if self.op.hidden_os:
3704
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3705

    
3706
    if self.op.blacklisted_os:
3707
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3708

    
3709
    if self.op.master_netdev:
3710
      master_params = self.cfg.GetMasterNetworkParameters()
3711
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3712
                  self.cluster.master_netdev)
3713
      result = self.rpc.call_node_deactivate_master_ip(master_params.name,
3714
                                                       master_params)
3715
      result.Raise("Could not disable the master ip")
3716
      feedback_fn("Changing master_netdev from %s to %s" %
3717
                  (master_params.netdev, self.op.master_netdev))
3718
      self.cluster.master_netdev = self.op.master_netdev
3719

    
3720
    if self.op.master_netmask:
3721
      master_params = self.cfg.GetMasterNetworkParameters()
3722
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
3723
      result = self.rpc.call_node_change_master_netmask(master_params.name,
3724
                                                        master_params.netmask,
3725
                                                        self.op.master_netmask,
3726
                                                        master_params.ip,
3727
                                                        master_params.netdev)
3728
      if result.fail_msg:
3729
        msg = "Could not change the master IP netmask: %s" % result.fail_msg
3730
        feedback_fn(msg)
3731

    
3732
      self.cluster.master_netmask = self.op.master_netmask
3733

    
3734
    self.cfg.Update(self.cluster, feedback_fn)
3735

    
3736
    if self.op.master_netdev:
3737
      master_params = self.cfg.GetMasterNetworkParameters()
3738
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3739
                  self.op.master_netdev)
3740
      result = self.rpc.call_node_activate_master_ip(master_params.name,
3741
                                                     master_params)
3742
      if result.fail_msg:
3743
        self.LogWarning("Could not re-enable the master ip on"
3744
                        " the master, please restart manually: %s",
3745
                        result.fail_msg)
3746

    
3747

    
3748
def _UploadHelper(lu, nodes, fname):
3749
  """Helper for uploading a file and showing warnings.
3750

3751
  """
3752
  if os.path.exists(fname):
3753
    result = lu.rpc.call_upload_file(nodes, fname)
3754
    for to_node, to_result in result.items():
3755
      msg = to_result.fail_msg
3756
      if msg:
3757
        msg = ("Copy of file %s to node %s failed: %s" %
3758
               (fname, to_node, msg))
3759
        lu.proc.LogWarning(msg)
3760

    
3761

    
3762
def _ComputeAncillaryFiles(cluster, redist):
3763
  """Compute files external to Ganeti which need to be consistent.
3764

3765
  @type redist: boolean
3766
  @param redist: Whether to include files which need to be redistributed
3767

3768
  """
3769
  # Compute files for all nodes
3770
  files_all = set([
3771
    constants.SSH_KNOWN_HOSTS_FILE,
3772
    constants.CONFD_HMAC_KEY,
3773
    constants.CLUSTER_DOMAIN_SECRET_FILE,
3774
    constants.SPICE_CERT_FILE,
3775
    constants.SPICE_CACERT_FILE,
3776
    constants.RAPI_USERS_FILE,
3777
    ])
3778

    
3779
  if not redist:
3780
    files_all.update(constants.ALL_CERT_FILES)
3781
    files_all.update(ssconf.SimpleStore().GetFileList())
3782
  else:
3783
    # we need to ship at least the RAPI certificate
3784
    files_all.add(constants.RAPI_CERT_FILE)
3785

    
3786
  if cluster.modify_etc_hosts:
3787
    files_all.add(constants.ETC_HOSTS)
3788

    
3789
  # Files which are optional, these must:
3790
  # - be present in one other category as well
3791
  # - either exist or not exist on all nodes of that category (mc, vm all)
3792
  files_opt = set([
3793
    constants.RAPI_USERS_FILE,
3794
    ])
3795

    
3796
  # Files which should only be on master candidates
3797
  files_mc = set()
3798
  if not redist:
3799
    files_mc.add(constants.CLUSTER_CONF_FILE)
3800

    
3801
  # Files which should only be on VM-capable nodes
3802
  files_vm = set(filename
3803
    for hv_name in cluster.enabled_hypervisors
3804
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0])
3805

    
3806
  files_opt |= set(filename
3807
    for hv_name in cluster.enabled_hypervisors
3808
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1])
3809

    
3810
  # Filenames in each category must be unique
3811
  all_files_set = files_all | files_mc | files_vm
3812
  assert (len(all_files_set) ==
3813
          sum(map(len, [files_all, files_mc, files_vm]))), \
3814
         "Found file listed in more than one file list"
3815

    
3816
  # Optional files must be present in one other category
3817
  assert all_files_set.issuperset(files_opt), \
3818
         "Optional file not in a different required list"
3819

    
3820
  return (files_all, files_opt, files_mc, files_vm)
3821

    
3822

    
3823
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3824
  """Distribute additional files which are part of the cluster configuration.
3825

3826
  ConfigWriter takes care of distributing the config and ssconf files, but
3827
  there are more files which should be distributed to all nodes. This function
3828
  makes sure those are copied.
3829

3830
  @param lu: calling logical unit
3831
  @param additional_nodes: list of nodes not in the config to distribute to
3832
  @type additional_vm: boolean
3833
  @param additional_vm: whether the additional nodes are vm-capable or not
3834

3835
  """
3836
  # Gather target nodes
3837
  cluster = lu.cfg.GetClusterInfo()
3838
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3839

    
3840
  online_nodes = lu.cfg.GetOnlineNodeList()
3841
  vm_nodes = lu.cfg.GetVmCapableNodeList()
3842

    
3843
  if additional_nodes is not None:
3844
    online_nodes.extend(additional_nodes)
3845
    if additional_vm:
3846
      vm_nodes.extend(additional_nodes)
3847

    
3848
  # Never distribute to master node
3849
  for nodelist in [online_nodes, vm_nodes]:
3850
    if master_info.name in nodelist:
3851
      nodelist.remove(master_info.name)
3852

    
3853
  # Gather file lists
3854
  (files_all, _, files_mc, files_vm) = \
3855
    _ComputeAncillaryFiles(cluster, True)
3856

    
3857
  # Never re-distribute configuration file from here
3858
  assert not (constants.CLUSTER_CONF_FILE in files_all or
3859
              constants.CLUSTER_CONF_FILE in files_vm)
3860
  assert not files_mc, "Master candidates not handled in this function"
3861

    
3862
  filemap = [
3863
    (online_nodes, files_all),
3864
    (vm_nodes, files_vm),
3865
    ]
3866

    
3867
  # Upload the files
3868
  for (node_list, files) in filemap:
3869
    for fname in files:
3870
      _UploadHelper(lu, node_list, fname)
3871

    
3872

    
3873
class LUClusterRedistConf(NoHooksLU):
3874
  """Force the redistribution of cluster configuration.
3875

3876
  This is a very simple LU.
3877

3878
  """
3879
  REQ_BGL = False
3880

    
3881
  def ExpandNames(self):
3882
    self.needed_locks = {
3883
      locking.LEVEL_NODE: locking.ALL_SET,
3884
    }
3885
    self.share_locks[locking.LEVEL_NODE] = 1
3886

    
3887
  def Exec(self, feedback_fn):
3888
    """Redistribute the configuration.
3889

3890
    """
3891
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3892
    _RedistributeAncillaryFiles(self)
3893

    
3894

    
3895
class LUClusterActivateMasterIp(NoHooksLU):
3896
  """Activate the master IP on the master node.
3897

3898
  """
3899
  def Exec(self, feedback_fn):
3900
    """Activate the master IP.
3901

3902
    """
3903
    master_params = self.cfg.GetMasterNetworkParameters()
3904
    self.rpc.call_node_activate_master_ip(master_params.name,
3905
                                          master_params)
3906

    
3907

    
3908
class LUClusterDeactivateMasterIp(NoHooksLU):
3909
  """Deactivate the master IP on the master node.
3910

3911
  """
3912
  def Exec(self, feedback_fn):
3913
    """Deactivate the master IP.
3914

3915
    """
3916
    master_params = self.cfg.GetMasterNetworkParameters()
3917
    self.rpc.call_node_deactivate_master_ip(master_params.name, master_params)
3918

    
3919

    
3920
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3921
  """Sleep and poll for an instance's disk to sync.
3922

3923
  """
3924
  if not instance.disks or disks is not None and not disks:
3925
    return True
3926

    
3927
  disks = _ExpandCheckDisks(instance, disks)
3928

    
3929
  if not oneshot:
3930
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3931

    
3932
  node = instance.primary_node
3933

    
3934
  for dev in disks:
3935
    lu.cfg.SetDiskID(dev, node)
3936

    
3937
  # TODO: Convert to utils.Retry
3938

    
3939
  retries = 0
3940
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3941
  while True:
3942
    max_time = 0
3943
    done = True
3944
    cumul_degraded = False
3945
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3946
    msg = rstats.fail_msg
3947
    if msg:
3948
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3949
      retries += 1
3950
      if retries >= 10:
3951
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3952
                                 " aborting." % node)
3953
      time.sleep(6)
3954
      continue
3955
    rstats = rstats.payload
3956
    retries = 0
3957
    for i, mstat in enumerate(rstats):
3958
      if mstat is None:
3959
        lu.LogWarning("Can't compute data for node %s/%s",
3960
                           node, disks[i].iv_name)
3961
        continue
3962

    
3963
      cumul_degraded = (cumul_degraded or
3964
                        (mstat.is_degraded and mstat.sync_percent is None))
3965
      if mstat.sync_percent is not None:
3966
        done = False
3967
        if mstat.estimated_time is not None:
3968
          rem_time = ("%s remaining (estimated)" %
3969
                      utils.FormatSeconds(mstat.estimated_time))
3970
          max_time = mstat.estimated_time
3971
        else:
3972
          rem_time = "no time estimate"
3973
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
3974
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
3975

    
3976
    # if we're done but degraded, let's do a few small retries, to
3977
    # make sure we see a stable and not transient situation; therefore
3978
    # we force restart of the loop
3979
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3980
      logging.info("Degraded disks found, %d retries left", degr_retries)
3981
      degr_retries -= 1
3982
      time.sleep(1)
3983
      continue
3984

    
3985
    if done or oneshot:
3986
      break
3987

    
3988
    time.sleep(min(60, max_time))
3989

    
3990
  if done:
3991
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
3992
  return not cumul_degraded
3993

    
3994

    
3995
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3996
  """Check that mirrors are not degraded.
3997

3998
  The ldisk parameter, if True, will change the test from the
3999
  is_degraded attribute (which represents overall non-ok status for
4000
  the device(s)) to the ldisk (representing the local storage status).
4001

4002
  """
4003
  lu.cfg.SetDiskID(dev, node)
4004

    
4005
  result = True
4006

    
4007
  if on_primary or dev.AssembleOnSecondary():
4008
    rstats = lu.rpc.call_blockdev_find(node, dev)
4009
    msg = rstats.fail_msg
4010
    if msg:
4011
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
4012
      result = False
4013
    elif not rstats.payload:
4014
      lu.LogWarning("Can't find disk on node %s", node)
4015
      result = False
4016
    else:
4017
      if ldisk:
4018
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
4019
      else:
4020
        result = result and not rstats.payload.is_degraded
4021

    
4022
  if dev.children:
4023
    for child in dev.children:
4024
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
4025

    
4026
  return result
4027

    
4028

    
4029
class LUOobCommand(NoHooksLU):
4030
  """Logical unit for OOB handling.
4031

4032
  """
4033
  REG_BGL = False
4034
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
4035

    
4036
  def ExpandNames(self):
4037
    """Gather locks we need.
4038

4039
    """
4040
    if self.op.node_names:
4041
      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
4042
      lock_names = self.op.node_names
4043
    else:
4044
      lock_names = locking.ALL_SET
4045

    
4046
    self.needed_locks = {
4047
      locking.LEVEL_NODE: lock_names,
4048
      }
4049

    
4050
  def CheckPrereq(self):
4051
    """Check prerequisites.
4052

4053
    This checks:
4054
     - the node exists in the configuration
4055
     - OOB is supported
4056

4057
    Any errors are signaled by raising errors.OpPrereqError.
4058

4059
    """
4060
    self.nodes = []
4061
    self.master_node = self.cfg.GetMasterNode()
4062

    
4063
    assert self.op.power_delay >= 0.0
4064

    
4065
    if self.op.node_names:
4066
      if (self.op.command in self._SKIP_MASTER and
4067
          self.master_node in self.op.node_names):
4068
        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
4069
        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
4070

    
4071
        if master_oob_handler:
4072
          additional_text = ("run '%s %s %s' if you want to operate on the"
4073
                             " master regardless") % (master_oob_handler,
4074
                                                      self.op.command,
4075
                                                      self.master_node)
4076
        else:
4077
          additional_text = "it does not support out-of-band operations"
4078

    
4079
        raise errors.OpPrereqError(("Operating on the master node %s is not"
4080
                                    " allowed for %s; %s") %
4081
                                   (self.master_node, self.op.command,
4082
                                    additional_text), errors.ECODE_INVAL)
4083
    else:
4084
      self.op.node_names = self.cfg.GetNodeList()
4085
      if self.op.command in self._SKIP_MASTER:
4086
        self.op.node_names.remove(self.master_node)
4087

    
4088
    if self.op.command in self._SKIP_MASTER:
4089
      assert self.master_node not in self.op.node_names
4090

    
4091
    for (node_name, node) in self.cfg.GetMultiNodeInfo(self.op.node_names):
4092
      if node is None:
4093
        raise errors.OpPrereqError("Node %s not found" % node_name,
4094
                                   errors.ECODE_NOENT)
4095
      else:
4096
        self.nodes.append(node)
4097

    
4098
      if (not self.op.ignore_status and
4099
          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
4100
        raise errors.OpPrereqError(("Cannot power off node %s because it is"
4101
                                    " not marked offline") % node_name,
4102
                                   errors.ECODE_STATE)
4103

    
4104
  def Exec(self, feedback_fn):
4105
    """Execute OOB and return result if we expect any.
4106

4107
    """
4108
    master_node = self.master_node
4109
    ret = []
4110

    
4111
    for idx, node in enumerate(utils.NiceSort(self.nodes,
4112
                                              key=lambda node: node.name)):
4113
      node_entry = [(constants.RS_NORMAL, node.name)]
4114
      ret.append(node_entry)
4115

    
4116
      oob_program = _SupportsOob(self.cfg, node)
4117

    
4118
      if not oob_program:
41