Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 63742d78

History | View | Annotate | Download (489.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay too many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42
import itertools
43
import operator
44

    
45
from ganeti import ssh
46
from ganeti import utils
47
from ganeti import errors
48
from ganeti import hypervisor
49
from ganeti import locking
50
from ganeti import constants
51
from ganeti import objects
52
from ganeti import serializer
53
from ganeti import ssconf
54
from ganeti import uidpool
55
from ganeti import compat
56
from ganeti import masterd
57
from ganeti import netutils
58
from ganeti import query
59
from ganeti import qlang
60
from ganeti import opcodes
61
from ganeti import ht
62
from ganeti import rpc
63

    
64
import ganeti.masterd.instance # pylint: disable=W0611
65

    
66

    
67
#: Size of DRBD meta block device
68
DRBD_META_SIZE = 128
69

    
70

    
71
class ResultWithJobs:
72
  """Data container for LU results with jobs.
73

74
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
75
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
76
  contained in the C{jobs} attribute and include the job IDs in the opcode
77
  result.
78

79
  """
80
  def __init__(self, jobs, **kwargs):
81
    """Initializes this class.
82

83
    Additional return values can be specified as keyword arguments.
84

85
    @type jobs: list of lists of L{opcode.OpCode}
86
    @param jobs: A list of lists of opcode objects
87

88
    """
89
    self.jobs = jobs
90
    self.other = kwargs
91

    
92

    
93
class LogicalUnit(object):
94
  """Logical Unit base class.
95

96
  Subclasses must follow these rules:
97
    - implement ExpandNames
98
    - implement CheckPrereq (except when tasklets are used)
99
    - implement Exec (except when tasklets are used)
100
    - implement BuildHooksEnv
101
    - implement BuildHooksNodes
102
    - redefine HPATH and HTYPE
103
    - optionally redefine their run requirements:
104
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
105

106
  Note that all commands require root permissions.
107

108
  @ivar dry_run_result: the value (if any) that will be returned to the caller
109
      in dry-run mode (signalled by opcode dry_run parameter)
110

111
  """
112
  HPATH = None
113
  HTYPE = None
114
  REQ_BGL = True
115

    
116
  def __init__(self, processor, op, context, rpc_runner):
117
    """Constructor for LogicalUnit.
118

119
    This needs to be overridden in derived classes in order to check op
120
    validity.
121

122
    """
123
    self.proc = processor
124
    self.op = op
125
    self.cfg = context.cfg
126
    self.glm = context.glm
127
    # readability alias
128
    self.owned_locks = context.glm.list_owned
129
    self.context = context
130
    self.rpc = rpc_runner
131
    # Dicts used to declare locking needs to mcpu
132
    self.needed_locks = None
133
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
134
    self.add_locks = {}
135
    self.remove_locks = {}
136
    # Used to force good behavior when calling helper functions
137
    self.recalculate_locks = {}
138
    # logging
139
    self.Log = processor.Log # pylint: disable=C0103
140
    self.LogWarning = processor.LogWarning # pylint: disable=C0103
141
    self.LogInfo = processor.LogInfo # pylint: disable=C0103
142
    self.LogStep = processor.LogStep # pylint: disable=C0103
143
    # support for dry-run
144
    self.dry_run_result = None
145
    # support for generic debug attribute
146
    if (not hasattr(self.op, "debug_level") or
147
        not isinstance(self.op.debug_level, int)):
148
      self.op.debug_level = 0
149

    
150
    # Tasklets
151
    self.tasklets = None
152

    
153
    # Validate opcode parameters and set defaults
154
    self.op.Validate(True)
155

    
156
    self.CheckArguments()
157

    
158
  def CheckArguments(self):
159
    """Check syntactic validity for the opcode arguments.
160

161
    This method is for doing a simple syntactic check and ensure
162
    validity of opcode parameters, without any cluster-related
163
    checks. While the same can be accomplished in ExpandNames and/or
164
    CheckPrereq, doing these separate is better because:
165

166
      - ExpandNames is left as as purely a lock-related function
167
      - CheckPrereq is run after we have acquired locks (and possible
168
        waited for them)
169

170
    The function is allowed to change the self.op attribute so that
171
    later methods can no longer worry about missing parameters.
172

173
    """
174
    pass
175

    
176
  def ExpandNames(self):
177
    """Expand names for this LU.
178

179
    This method is called before starting to execute the opcode, and it should
180
    update all the parameters of the opcode to their canonical form (e.g. a
181
    short node name must be fully expanded after this method has successfully
182
    completed). This way locking, hooks, logging, etc. can work correctly.
183

184
    LUs which implement this method must also populate the self.needed_locks
185
    member, as a dict with lock levels as keys, and a list of needed lock names
186
    as values. Rules:
187

188
      - use an empty dict if you don't need any lock
189
      - if you don't need any lock at a particular level omit that level
190
      - don't put anything for the BGL level
191
      - if you want all locks at a level use locking.ALL_SET as a value
192

193
    If you need to share locks (rather than acquire them exclusively) at one
194
    level you can modify self.share_locks, setting a true value (usually 1) for
195
    that level. By default locks are not shared.
196

197
    This function can also define a list of tasklets, which then will be
198
    executed in order instead of the usual LU-level CheckPrereq and Exec
199
    functions, if those are not defined by the LU.
200

201
    Examples::
202

203
      # Acquire all nodes and one instance
204
      self.needed_locks = {
205
        locking.LEVEL_NODE: locking.ALL_SET,
206
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
207
      }
208
      # Acquire just two nodes
209
      self.needed_locks = {
210
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
211
      }
212
      # Acquire no locks
213
      self.needed_locks = {} # No, you can't leave it to the default value None
214

215
    """
216
    # The implementation of this method is mandatory only if the new LU is
217
    # concurrent, so that old LUs don't need to be changed all at the same
218
    # time.
219
    if self.REQ_BGL:
220
      self.needed_locks = {} # Exclusive LUs don't need locks.
221
    else:
222
      raise NotImplementedError
223

    
224
  def DeclareLocks(self, level):
225
    """Declare LU locking needs for a level
226

227
    While most LUs can just declare their locking needs at ExpandNames time,
228
    sometimes there's the need to calculate some locks after having acquired
229
    the ones before. This function is called just before acquiring locks at a
230
    particular level, but after acquiring the ones at lower levels, and permits
231
    such calculations. It can be used to modify self.needed_locks, and by
232
    default it does nothing.
233

234
    This function is only called if you have something already set in
235
    self.needed_locks for the level.
236

237
    @param level: Locking level which is going to be locked
238
    @type level: member of ganeti.locking.LEVELS
239

240
    """
241

    
242
  def CheckPrereq(self):
243
    """Check prerequisites for this LU.
244

245
    This method should check that the prerequisites for the execution
246
    of this LU are fulfilled. It can do internode communication, but
247
    it should be idempotent - no cluster or system changes are
248
    allowed.
249

250
    The method should raise errors.OpPrereqError in case something is
251
    not fulfilled. Its return value is ignored.
252

253
    This method should also update all the parameters of the opcode to
254
    their canonical form if it hasn't been done by ExpandNames before.
255

256
    """
257
    if self.tasklets is not None:
258
      for (idx, tl) in enumerate(self.tasklets):
259
        logging.debug("Checking prerequisites for tasklet %s/%s",
260
                      idx + 1, len(self.tasklets))
261
        tl.CheckPrereq()
262
    else:
263
      pass
264

    
265
  def Exec(self, feedback_fn):
266
    """Execute the LU.
267

268
    This method should implement the actual work. It should raise
269
    errors.OpExecError for failures that are somewhat dealt with in
270
    code, or expected.
271

272
    """
273
    if self.tasklets is not None:
274
      for (idx, tl) in enumerate(self.tasklets):
275
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
276
        tl.Exec(feedback_fn)
277
    else:
278
      raise NotImplementedError
279

    
280
  def BuildHooksEnv(self):
281
    """Build hooks environment for this LU.
282

283
    @rtype: dict
284
    @return: Dictionary containing the environment that will be used for
285
      running the hooks for this LU. The keys of the dict must not be prefixed
286
      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
287
      will extend the environment with additional variables. If no environment
288
      should be defined, an empty dictionary should be returned (not C{None}).
289
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
290
      will not be called.
291

292
    """
293
    raise NotImplementedError
294

    
295
  def BuildHooksNodes(self):
296
    """Build list of nodes to run LU's hooks.
297

298
    @rtype: tuple; (list, list)
299
    @return: Tuple containing a list of node names on which the hook
300
      should run before the execution and a list of node names on which the
301
      hook should run after the execution. No nodes should be returned as an
302
      empty list (and not None).
303
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
304
      will not be called.
305

306
    """
307
    raise NotImplementedError
308

    
309
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
310
    """Notify the LU about the results of its hooks.
311

312
    This method is called every time a hooks phase is executed, and notifies
313
    the Logical Unit about the hooks' result. The LU can then use it to alter
314
    its result based on the hooks.  By default the method does nothing and the
315
    previous result is passed back unchanged but any LU can define it if it
316
    wants to use the local cluster hook-scripts somehow.
317

318
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
319
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
320
    @param hook_results: the results of the multi-node hooks rpc call
321
    @param feedback_fn: function used send feedback back to the caller
322
    @param lu_result: the previous Exec result this LU had, or None
323
        in the PRE phase
324
    @return: the new Exec result, based on the previous result
325
        and hook results
326

327
    """
328
    # API must be kept, thus we ignore the unused argument and could
329
    # be a function warnings
330
    # pylint: disable=W0613,R0201
331
    return lu_result
332

    
333
  def _ExpandAndLockInstance(self):
334
    """Helper function to expand and lock an instance.
335

336
    Many LUs that work on an instance take its name in self.op.instance_name
337
    and need to expand it and then declare the expanded name for locking. This
338
    function does it, and then updates self.op.instance_name to the expanded
339
    name. It also initializes needed_locks as a dict, if this hasn't been done
340
    before.
341

342
    """
343
    if self.needed_locks is None:
344
      self.needed_locks = {}
345
    else:
346
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
347
        "_ExpandAndLockInstance called with instance-level locks set"
348
    self.op.instance_name = _ExpandInstanceName(self.cfg,
349
                                                self.op.instance_name)
350
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
351

    
352
  def _LockInstancesNodes(self, primary_only=False,
353
                          level=locking.LEVEL_NODE):
354
    """Helper function to declare instances' nodes for locking.
355

356
    This function should be called after locking one or more instances to lock
357
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
358
    with all primary or secondary nodes for instances already locked and
359
    present in self.needed_locks[locking.LEVEL_INSTANCE].
360

361
    It should be called from DeclareLocks, and for safety only works if
362
    self.recalculate_locks[locking.LEVEL_NODE] is set.
363

364
    In the future it may grow parameters to just lock some instance's nodes, or
365
    to just lock primaries or secondary nodes, if needed.
366

367
    If should be called in DeclareLocks in a way similar to::
368

369
      if level == locking.LEVEL_NODE:
370
        self._LockInstancesNodes()
371

372
    @type primary_only: boolean
373
    @param primary_only: only lock primary nodes of locked instances
374
    @param level: Which lock level to use for locking nodes
375

376
    """
377
    assert level in self.recalculate_locks, \
378
      "_LockInstancesNodes helper function called with no nodes to recalculate"
379

    
380
    # TODO: check if we're really been called with the instance locks held
381

    
382
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
383
    # future we might want to have different behaviors depending on the value
384
    # of self.recalculate_locks[locking.LEVEL_NODE]
385
    wanted_nodes = []
386
    locked_i = self.owned_locks(locking.LEVEL_INSTANCE)
387
    for _, instance in self.cfg.GetMultiInstanceInfo(locked_i):
388
      wanted_nodes.append(instance.primary_node)
389
      if not primary_only:
390
        wanted_nodes.extend(instance.secondary_nodes)
391

    
392
    if self.recalculate_locks[level] == constants.LOCKS_REPLACE:
393
      self.needed_locks[level] = wanted_nodes
394
    elif self.recalculate_locks[level] == constants.LOCKS_APPEND:
395
      self.needed_locks[level].extend(wanted_nodes)
396
    else:
397
      raise errors.ProgrammerError("Unknown recalculation mode")
398

    
399
    del self.recalculate_locks[level]
400

    
401

    
402
class NoHooksLU(LogicalUnit): # pylint: disable=W0223
403
  """Simple LU which runs no hooks.
404

405
  This LU is intended as a parent for other LogicalUnits which will
406
  run no hooks, in order to reduce duplicate code.
407

408
  """
409
  HPATH = None
410
  HTYPE = None
411

    
412
  def BuildHooksEnv(self):
413
    """Empty BuildHooksEnv for NoHooksLu.
414

415
    This just raises an error.
416

417
    """
418
    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
419

    
420
  def BuildHooksNodes(self):
421
    """Empty BuildHooksNodes for NoHooksLU.
422

423
    """
424
    raise AssertionError("BuildHooksNodes called for NoHooksLU")
425

    
426

    
427
class Tasklet:
428
  """Tasklet base class.
429

430
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
431
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
432
  tasklets know nothing about locks.
433

434
  Subclasses must follow these rules:
435
    - Implement CheckPrereq
436
    - Implement Exec
437

438
  """
439
  def __init__(self, lu):
440
    self.lu = lu
441

    
442
    # Shortcuts
443
    self.cfg = lu.cfg
444
    self.rpc = lu.rpc
445

    
446
  def CheckPrereq(self):
447
    """Check prerequisites for this tasklets.
448

449
    This method should check whether the prerequisites for the execution of
450
    this tasklet are fulfilled. It can do internode communication, but it
451
    should be idempotent - no cluster or system changes are allowed.
452

453
    The method should raise errors.OpPrereqError in case something is not
454
    fulfilled. Its return value is ignored.
455

456
    This method should also update all parameters to their canonical form if it
457
    hasn't been done before.
458

459
    """
460
    pass
461

    
462
  def Exec(self, feedback_fn):
463
    """Execute the tasklet.
464

465
    This method should implement the actual work. It should raise
466
    errors.OpExecError for failures that are somewhat dealt with in code, or
467
    expected.
468

469
    """
470
    raise NotImplementedError
471

    
472

    
473
class _QueryBase:
474
  """Base for query utility classes.
475

476
  """
477
  #: Attribute holding field definitions
478
  FIELDS = None
479

    
480
  def __init__(self, qfilter, fields, use_locking):
481
    """Initializes this class.
482

483
    """
484
    self.use_locking = use_locking
485

    
486
    self.query = query.Query(self.FIELDS, fields, qfilter=qfilter,
487
                             namefield="name")
488
    self.requested_data = self.query.RequestedData()
489
    self.names = self.query.RequestedNames()
490

    
491
    # Sort only if no names were requested
492
    self.sort_by_name = not self.names
493

    
494
    self.do_locking = None
495
    self.wanted = None
496

    
497
  def _GetNames(self, lu, all_names, lock_level):
498
    """Helper function to determine names asked for in the query.
499

500
    """
501
    if self.do_locking:
502
      names = lu.owned_locks(lock_level)
503
    else:
504
      names = all_names
505

    
506
    if self.wanted == locking.ALL_SET:
507
      assert not self.names
508
      # caller didn't specify names, so ordering is not important
509
      return utils.NiceSort(names)
510

    
511
    # caller specified names and we must keep the same order
512
    assert self.names
513
    assert not self.do_locking or lu.glm.is_owned(lock_level)
514

    
515
    missing = set(self.wanted).difference(names)
516
    if missing:
517
      raise errors.OpExecError("Some items were removed before retrieving"
518
                               " their data: %s" % missing)
519

    
520
    # Return expanded names
521
    return self.wanted
522

    
523
  def ExpandNames(self, lu):
524
    """Expand names for this query.
525

526
    See L{LogicalUnit.ExpandNames}.
527

528
    """
529
    raise NotImplementedError()
530

    
531
  def DeclareLocks(self, lu, level):
532
    """Declare locks for this query.
533

534
    See L{LogicalUnit.DeclareLocks}.
535

536
    """
537
    raise NotImplementedError()
538

    
539
  def _GetQueryData(self, lu):
540
    """Collects all data for this query.
541

542
    @return: Query data object
543

544
    """
545
    raise NotImplementedError()
546

    
547
  def NewStyleQuery(self, lu):
548
    """Collect data and execute query.
549

550
    """
551
    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
552
                                  sort_by_name=self.sort_by_name)
553

    
554
  def OldStyleQuery(self, lu):
555
    """Collect data and execute query.
556

557
    """
558
    return self.query.OldStyleQuery(self._GetQueryData(lu),
559
                                    sort_by_name=self.sort_by_name)
560

    
561

    
562
def _ShareAll():
563
  """Returns a dict declaring all lock levels shared.
564

565
  """
566
  return dict.fromkeys(locking.LEVELS, 1)
567

    
568

    
569
def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
570
  """Checks if the owned node groups are still correct for an instance.
571

572
  @type cfg: L{config.ConfigWriter}
573
  @param cfg: The cluster configuration
574
  @type instance_name: string
575
  @param instance_name: Instance name
576
  @type owned_groups: set or frozenset
577
  @param owned_groups: List of currently owned node groups
578

579
  """
580
  inst_groups = cfg.GetInstanceNodeGroups(instance_name)
581

    
582
  if not owned_groups.issuperset(inst_groups):
583
    raise errors.OpPrereqError("Instance %s's node groups changed since"
584
                               " locks were acquired, current groups are"
585
                               " are '%s', owning groups '%s'; retry the"
586
                               " operation" %
587
                               (instance_name,
588
                                utils.CommaJoin(inst_groups),
589
                                utils.CommaJoin(owned_groups)),
590
                               errors.ECODE_STATE)
591

    
592
  return inst_groups
593

    
594

    
595
def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
596
  """Checks if the instances in a node group are still correct.
597

598
  @type cfg: L{config.ConfigWriter}
599
  @param cfg: The cluster configuration
600
  @type group_uuid: string
601
  @param group_uuid: Node group UUID
602
  @type owned_instances: set or frozenset
603
  @param owned_instances: List of currently owned instances
604

605
  """
606
  wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
607
  if owned_instances != wanted_instances:
608
    raise errors.OpPrereqError("Instances in node group '%s' changed since"
609
                               " locks were acquired, wanted '%s', have '%s';"
610
                               " retry the operation" %
611
                               (group_uuid,
612
                                utils.CommaJoin(wanted_instances),
613
                                utils.CommaJoin(owned_instances)),
614
                               errors.ECODE_STATE)
615

    
616
  return wanted_instances
617

    
618

    
619
def _SupportsOob(cfg, node):
620
  """Tells if node supports OOB.
621

622
  @type cfg: L{config.ConfigWriter}
623
  @param cfg: The cluster configuration
624
  @type node: L{objects.Node}
625
  @param node: The node
626
  @return: The OOB script if supported or an empty string otherwise
627

628
  """
629
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
630

    
631

    
632
def _GetWantedNodes(lu, nodes):
633
  """Returns list of checked and expanded node names.
634

635
  @type lu: L{LogicalUnit}
636
  @param lu: the logical unit on whose behalf we execute
637
  @type nodes: list
638
  @param nodes: list of node names or None for all nodes
639
  @rtype: list
640
  @return: the list of nodes, sorted
641
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
642

643
  """
644
  if nodes:
645
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
646

    
647
  return utils.NiceSort(lu.cfg.GetNodeList())
648

    
649

    
650
def _GetWantedInstances(lu, instances):
651
  """Returns list of checked and expanded instance names.
652

653
  @type lu: L{LogicalUnit}
654
  @param lu: the logical unit on whose behalf we execute
655
  @type instances: list
656
  @param instances: list of instance names or None for all instances
657
  @rtype: list
658
  @return: the list of instances, sorted
659
  @raise errors.OpPrereqError: if the instances parameter is wrong type
660
  @raise errors.OpPrereqError: if any of the passed instances is not found
661

662
  """
663
  if instances:
664
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
665
  else:
666
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
667
  return wanted
668

    
669

    
670
def _GetUpdatedParams(old_params, update_dict,
671
                      use_default=True, use_none=False):
672
  """Return the new version of a parameter dictionary.
673

674
  @type old_params: dict
675
  @param old_params: old parameters
676
  @type update_dict: dict
677
  @param update_dict: dict containing new parameter values, or
678
      constants.VALUE_DEFAULT to reset the parameter to its default
679
      value
680
  @param use_default: boolean
681
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
682
      values as 'to be deleted' values
683
  @param use_none: boolean
684
  @type use_none: whether to recognise C{None} values as 'to be
685
      deleted' values
686
  @rtype: dict
687
  @return: the new parameter dictionary
688

689
  """
690
  params_copy = copy.deepcopy(old_params)
691
  for key, val in update_dict.iteritems():
692
    if ((use_default and val == constants.VALUE_DEFAULT) or
693
        (use_none and val is None)):
694
      try:
695
        del params_copy[key]
696
      except KeyError:
697
        pass
698
    else:
699
      params_copy[key] = val
700
  return params_copy
701

    
702

    
703
def _ReleaseLocks(lu, level, names=None, keep=None):
704
  """Releases locks owned by an LU.
705

706
  @type lu: L{LogicalUnit}
707
  @param level: Lock level
708
  @type names: list or None
709
  @param names: Names of locks to release
710
  @type keep: list or None
711
  @param keep: Names of locks to retain
712

713
  """
714
  assert not (keep is not None and names is not None), \
715
         "Only one of the 'names' and the 'keep' parameters can be given"
716

    
717
  if names is not None:
718
    should_release = names.__contains__
719
  elif keep:
720
    should_release = lambda name: name not in keep
721
  else:
722
    should_release = None
723

    
724
  if should_release:
725
    retain = []
726
    release = []
727

    
728
    # Determine which locks to release
729
    for name in lu.owned_locks(level):
730
      if should_release(name):
731
        release.append(name)
732
      else:
733
        retain.append(name)
734

    
735
    assert len(lu.owned_locks(level)) == (len(retain) + len(release))
736

    
737
    # Release just some locks
738
    lu.glm.release(level, names=release)
739

    
740
    assert frozenset(lu.owned_locks(level)) == frozenset(retain)
741
  else:
742
    # Release everything
743
    lu.glm.release(level)
744

    
745
    assert not lu.glm.is_owned(level), "No locks should be owned"
746

    
747

    
748
def _MapInstanceDisksToNodes(instances):
749
  """Creates a map from (node, volume) to instance name.
750

751
  @type instances: list of L{objects.Instance}
752
  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
753

754
  """
755
  return dict(((node, vol), inst.name)
756
              for inst in instances
757
              for (node, vols) in inst.MapLVsByNode().items()
758
              for vol in vols)
759

    
760

    
761
def _RunPostHook(lu, node_name):
762
  """Runs the post-hook for an opcode on a single node.
763

764
  """
765
  hm = lu.proc.BuildHooksManager(lu)
766
  try:
767
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
768
  except:
769
    # pylint: disable=W0702
770
    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
771

    
772

    
773
def _CheckOutputFields(static, dynamic, selected):
774
  """Checks whether all selected fields are valid.
775

776
  @type static: L{utils.FieldSet}
777
  @param static: static fields set
778
  @type dynamic: L{utils.FieldSet}
779
  @param dynamic: dynamic fields set
780

781
  """
782
  f = utils.FieldSet()
783
  f.Extend(static)
784
  f.Extend(dynamic)
785

    
786
  delta = f.NonMatching(selected)
787
  if delta:
788
    raise errors.OpPrereqError("Unknown output fields selected: %s"
789
                               % ",".join(delta), errors.ECODE_INVAL)
790

    
791

    
792
def _CheckGlobalHvParams(params):
793
  """Validates that given hypervisor params are not global ones.
794

795
  This will ensure that instances don't get customised versions of
796
  global params.
797

798
  """
799
  used_globals = constants.HVC_GLOBALS.intersection(params)
800
  if used_globals:
801
    msg = ("The following hypervisor parameters are global and cannot"
802
           " be customized at instance level, please modify them at"
803
           " cluster level: %s" % utils.CommaJoin(used_globals))
804
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
805

    
806

    
807
def _CheckNodeOnline(lu, node, msg=None):
808
  """Ensure that a given node is online.
809

810
  @param lu: the LU on behalf of which we make the check
811
  @param node: the node to check
812
  @param msg: if passed, should be a message to replace the default one
813
  @raise errors.OpPrereqError: if the node is offline
814

815
  """
816
  if msg is None:
817
    msg = "Can't use offline node"
818
  if lu.cfg.GetNodeInfo(node).offline:
819
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
820

    
821

    
822
def _CheckNodeNotDrained(lu, node):
823
  """Ensure that a given node is not drained.
824

825
  @param lu: the LU on behalf of which we make the check
826
  @param node: the node to check
827
  @raise errors.OpPrereqError: if the node is drained
828

829
  """
830
  if lu.cfg.GetNodeInfo(node).drained:
831
    raise errors.OpPrereqError("Can't use drained node %s" % node,
832
                               errors.ECODE_STATE)
833

    
834

    
835
def _CheckNodeVmCapable(lu, node):
836
  """Ensure that a given node is vm capable.
837

838
  @param lu: the LU on behalf of which we make the check
839
  @param node: the node to check
840
  @raise errors.OpPrereqError: if the node is not vm capable
841

842
  """
843
  if not lu.cfg.GetNodeInfo(node).vm_capable:
844
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
845
                               errors.ECODE_STATE)
846

    
847

    
848
def _CheckNodeHasOS(lu, node, os_name, force_variant):
849
  """Ensure that a node supports a given OS.
850

851
  @param lu: the LU on behalf of which we make the check
852
  @param node: the node to check
853
  @param os_name: the OS to query about
854
  @param force_variant: whether to ignore variant errors
855
  @raise errors.OpPrereqError: if the node is not supporting the OS
856

857
  """
858
  result = lu.rpc.call_os_get(node, os_name)
859
  result.Raise("OS '%s' not in supported OS list for node %s" %
860
               (os_name, node),
861
               prereq=True, ecode=errors.ECODE_INVAL)
862
  if not force_variant:
863
    _CheckOSVariant(result.payload, os_name)
864

    
865

    
866
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
867
  """Ensure that a node has the given secondary ip.
868

869
  @type lu: L{LogicalUnit}
870
  @param lu: the LU on behalf of which we make the check
871
  @type node: string
872
  @param node: the node to check
873
  @type secondary_ip: string
874
  @param secondary_ip: the ip to check
875
  @type prereq: boolean
876
  @param prereq: whether to throw a prerequisite or an execute error
877
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
878
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
879

880
  """
881
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
882
  result.Raise("Failure checking secondary ip on node %s" % node,
883
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
884
  if not result.payload:
885
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
886
           " please fix and re-run this command" % secondary_ip)
887
    if prereq:
888
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
889
    else:
890
      raise errors.OpExecError(msg)
891

    
892

    
893
def _GetClusterDomainSecret():
894
  """Reads the cluster domain secret.
895

896
  """
897
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
898
                               strict=True)
899

    
900

    
901
def _CheckInstanceDown(lu, instance, reason):
902
  """Ensure that an instance is not running."""
903
  if instance.admin_up:
904
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
905
                               (instance.name, reason), errors.ECODE_STATE)
906

    
907
  pnode = instance.primary_node
908
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
909
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
910
              prereq=True, ecode=errors.ECODE_ENVIRON)
911

    
912
  if instance.name in ins_l.payload:
913
    raise errors.OpPrereqError("Instance %s is running, %s" %
914
                               (instance.name, reason), errors.ECODE_STATE)
915

    
916

    
917
def _ExpandItemName(fn, name, kind):
918
  """Expand an item name.
919

920
  @param fn: the function to use for expansion
921
  @param name: requested item name
922
  @param kind: text description ('Node' or 'Instance')
923
  @return: the resolved (full) name
924
  @raise errors.OpPrereqError: if the item is not found
925

926
  """
927
  full_name = fn(name)
928
  if full_name is None:
929
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
930
                               errors.ECODE_NOENT)
931
  return full_name
932

    
933

    
934
def _ExpandNodeName(cfg, name):
935
  """Wrapper over L{_ExpandItemName} for nodes."""
936
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
937

    
938

    
939
def _ExpandInstanceName(cfg, name):
940
  """Wrapper over L{_ExpandItemName} for instance."""
941
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
942

    
943

    
944
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
945
                          memory, vcpus, nics, disk_template, disks,
946
                          bep, hvp, hypervisor_name, tags):
947
  """Builds instance related env variables for hooks
948

949
  This builds the hook environment from individual variables.
950

951
  @type name: string
952
  @param name: the name of the instance
953
  @type primary_node: string
954
  @param primary_node: the name of the instance's primary node
955
  @type secondary_nodes: list
956
  @param secondary_nodes: list of secondary nodes as strings
957
  @type os_type: string
958
  @param os_type: the name of the instance's OS
959
  @type status: boolean
960
  @param status: the should_run status of the instance
961
  @type memory: string
962
  @param memory: the memory size of the instance
963
  @type vcpus: string
964
  @param vcpus: the count of VCPUs the instance has
965
  @type nics: list
966
  @param nics: list of tuples (ip, mac, mode, link) representing
967
      the NICs the instance has
968
  @type disk_template: string
969
  @param disk_template: the disk template of the instance
970
  @type disks: list
971
  @param disks: the list of (size, mode) pairs
972
  @type bep: dict
973
  @param bep: the backend parameters for the instance
974
  @type hvp: dict
975
  @param hvp: the hypervisor parameters for the instance
976
  @type hypervisor_name: string
977
  @param hypervisor_name: the hypervisor for the instance
978
  @type tags: list
979
  @param tags: list of instance tags as strings
980
  @rtype: dict
981
  @return: the hook environment for this instance
982

983
  """
984
  if status:
985
    str_status = "up"
986
  else:
987
    str_status = "down"
988
  env = {
989
    "OP_TARGET": name,
990
    "INSTANCE_NAME": name,
991
    "INSTANCE_PRIMARY": primary_node,
992
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
993
    "INSTANCE_OS_TYPE": os_type,
994
    "INSTANCE_STATUS": str_status,
995
    "INSTANCE_MEMORY": memory,
996
    "INSTANCE_VCPUS": vcpus,
997
    "INSTANCE_DISK_TEMPLATE": disk_template,
998
    "INSTANCE_HYPERVISOR": hypervisor_name,
999
  }
1000

    
1001
  if nics:
1002
    nic_count = len(nics)
1003
    for idx, (ip, mac, mode, link) in enumerate(nics):
1004
      if ip is None:
1005
        ip = ""
1006
      env["INSTANCE_NIC%d_IP" % idx] = ip
1007
      env["INSTANCE_NIC%d_MAC" % idx] = mac
1008
      env["INSTANCE_NIC%d_MODE" % idx] = mode
1009
      env["INSTANCE_NIC%d_LINK" % idx] = link
1010
      if mode == constants.NIC_MODE_BRIDGED:
1011
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
1012
  else:
1013
    nic_count = 0
1014

    
1015
  env["INSTANCE_NIC_COUNT"] = nic_count
1016

    
1017
  if disks:
1018
    disk_count = len(disks)
1019
    for idx, (size, mode) in enumerate(disks):
1020
      env["INSTANCE_DISK%d_SIZE" % idx] = size
1021
      env["INSTANCE_DISK%d_MODE" % idx] = mode
1022
  else:
1023
    disk_count = 0
1024

    
1025
  env["INSTANCE_DISK_COUNT"] = disk_count
1026

    
1027
  if not tags:
1028
    tags = []
1029

    
1030
  env["INSTANCE_TAGS"] = " ".join(tags)
1031

    
1032
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
1033
    for key, value in source.items():
1034
      env["INSTANCE_%s_%s" % (kind, key)] = value
1035

    
1036
  return env
1037

    
1038

    
1039
def _NICListToTuple(lu, nics):
1040
  """Build a list of nic information tuples.
1041

1042
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
1043
  value in LUInstanceQueryData.
1044

1045
  @type lu:  L{LogicalUnit}
1046
  @param lu: the logical unit on whose behalf we execute
1047
  @type nics: list of L{objects.NIC}
1048
  @param nics: list of nics to convert to hooks tuples
1049

1050
  """
1051
  hooks_nics = []
1052
  cluster = lu.cfg.GetClusterInfo()
1053
  for nic in nics:
1054
    ip = nic.ip
1055
    mac = nic.mac
1056
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
1057
    mode = filled_params[constants.NIC_MODE]
1058
    link = filled_params[constants.NIC_LINK]
1059
    hooks_nics.append((ip, mac, mode, link))
1060
  return hooks_nics
1061

    
1062

    
1063
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
1064
  """Builds instance related env variables for hooks from an object.
1065

1066
  @type lu: L{LogicalUnit}
1067
  @param lu: the logical unit on whose behalf we execute
1068
  @type instance: L{objects.Instance}
1069
  @param instance: the instance for which we should build the
1070
      environment
1071
  @type override: dict
1072
  @param override: dictionary with key/values that will override
1073
      our values
1074
  @rtype: dict
1075
  @return: the hook environment dictionary
1076

1077
  """
1078
  cluster = lu.cfg.GetClusterInfo()
1079
  bep = cluster.FillBE(instance)
1080
  hvp = cluster.FillHV(instance)
1081
  args = {
1082
    "name": instance.name,
1083
    "primary_node": instance.primary_node,
1084
    "secondary_nodes": instance.secondary_nodes,
1085
    "os_type": instance.os,
1086
    "status": instance.admin_up,
1087
    "memory": bep[constants.BE_MEMORY],
1088
    "vcpus": bep[constants.BE_VCPUS],
1089
    "nics": _NICListToTuple(lu, instance.nics),
1090
    "disk_template": instance.disk_template,
1091
    "disks": [(disk.size, disk.mode) for disk in instance.disks],
1092
    "bep": bep,
1093
    "hvp": hvp,
1094
    "hypervisor_name": instance.hypervisor,
1095
    "tags": instance.tags,
1096
  }
1097
  if override:
1098
    args.update(override)
1099
  return _BuildInstanceHookEnv(**args) # pylint: disable=W0142
1100

    
1101

    
1102
def _AdjustCandidatePool(lu, exceptions):
1103
  """Adjust the candidate pool after node operations.
1104

1105
  """
1106
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1107
  if mod_list:
1108
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1109
               utils.CommaJoin(node.name for node in mod_list))
1110
    for name in mod_list:
1111
      lu.context.ReaddNode(name)
1112
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1113
  if mc_now > mc_max:
1114
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1115
               (mc_now, mc_max))
1116

    
1117

    
1118
def _DecideSelfPromotion(lu, exceptions=None):
1119
  """Decide whether I should promote myself as a master candidate.
1120

1121
  """
1122
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1123
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1124
  # the new node will increase mc_max with one, so:
1125
  mc_should = min(mc_should + 1, cp_size)
1126
  return mc_now < mc_should
1127

    
1128

    
1129
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1130
  """Check that the brigdes needed by a list of nics exist.
1131

1132
  """
1133
  cluster = lu.cfg.GetClusterInfo()
1134
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1135
  brlist = [params[constants.NIC_LINK] for params in paramslist
1136
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1137
  if brlist:
1138
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1139
    result.Raise("Error checking bridges on destination node '%s'" %
1140
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1141

    
1142

    
1143
def _CheckInstanceBridgesExist(lu, instance, node=None):
1144
  """Check that the brigdes needed by an instance exist.
1145

1146
  """
1147
  if node is None:
1148
    node = instance.primary_node
1149
  _CheckNicsBridgesExist(lu, instance.nics, node)
1150

    
1151

    
1152
def _CheckOSVariant(os_obj, name):
1153
  """Check whether an OS name conforms to the os variants specification.
1154

1155
  @type os_obj: L{objects.OS}
1156
  @param os_obj: OS object to check
1157
  @type name: string
1158
  @param name: OS name passed by the user, to check for validity
1159

1160
  """
1161
  variant = objects.OS.GetVariant(name)
1162
  if not os_obj.supported_variants:
1163
    if variant:
1164
      raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
1165
                                 " passed)" % (os_obj.name, variant),
1166
                                 errors.ECODE_INVAL)
1167
    return
1168
  if not variant:
1169
    raise errors.OpPrereqError("OS name must include a variant",
1170
                               errors.ECODE_INVAL)
1171

    
1172
  if variant not in os_obj.supported_variants:
1173
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1174

    
1175

    
1176
def _GetNodeInstancesInner(cfg, fn):
1177
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1178

    
1179

    
1180
def _GetNodeInstances(cfg, node_name):
1181
  """Returns a list of all primary and secondary instances on a node.
1182

1183
  """
1184

    
1185
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1186

    
1187

    
1188
def _GetNodePrimaryInstances(cfg, node_name):
1189
  """Returns primary instances on a node.
1190

1191
  """
1192
  return _GetNodeInstancesInner(cfg,
1193
                                lambda inst: node_name == inst.primary_node)
1194

    
1195

    
1196
def _GetNodeSecondaryInstances(cfg, node_name):
1197
  """Returns secondary instances on a node.
1198

1199
  """
1200
  return _GetNodeInstancesInner(cfg,
1201
                                lambda inst: node_name in inst.secondary_nodes)
1202

    
1203

    
1204
def _GetStorageTypeArgs(cfg, storage_type):
1205
  """Returns the arguments for a storage type.
1206

1207
  """
1208
  # Special case for file storage
1209
  if storage_type == constants.ST_FILE:
1210
    # storage.FileStorage wants a list of storage directories
1211
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1212

    
1213
  return []
1214

    
1215

    
1216
def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
1217
  faulty = []
1218

    
1219
  for dev in instance.disks:
1220
    cfg.SetDiskID(dev, node_name)
1221

    
1222
  result = rpc_runner.call_blockdev_getmirrorstatus(node_name, instance.disks)
1223
  result.Raise("Failed to get disk status from node %s" % node_name,
1224
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1225

    
1226
  for idx, bdev_status in enumerate(result.payload):
1227
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1228
      faulty.append(idx)
1229

    
1230
  return faulty
1231

    
1232

    
1233
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1234
  """Check the sanity of iallocator and node arguments and use the
1235
  cluster-wide iallocator if appropriate.
1236

1237
  Check that at most one of (iallocator, node) is specified. If none is
1238
  specified, then the LU's opcode's iallocator slot is filled with the
1239
  cluster-wide default iallocator.
1240

1241
  @type iallocator_slot: string
1242
  @param iallocator_slot: the name of the opcode iallocator slot
1243
  @type node_slot: string
1244
  @param node_slot: the name of the opcode target node slot
1245

1246
  """
1247
  node = getattr(lu.op, node_slot, None)
1248
  iallocator = getattr(lu.op, iallocator_slot, None)
1249

    
1250
  if node is not None and iallocator is not None:
1251
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
1252
                               errors.ECODE_INVAL)
1253
  elif node is None and iallocator is None:
1254
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1255
    if default_iallocator:
1256
      setattr(lu.op, iallocator_slot, default_iallocator)
1257
    else:
1258
      raise errors.OpPrereqError("No iallocator or node given and no"
1259
                                 " cluster-wide default iallocator found;"
1260
                                 " please specify either an iallocator or a"
1261
                                 " node, or set a cluster-wide default"
1262
                                 " iallocator")
1263

    
1264

    
1265
def _GetDefaultIAllocator(cfg, iallocator):
1266
  """Decides on which iallocator to use.
1267

1268
  @type cfg: L{config.ConfigWriter}
1269
  @param cfg: Cluster configuration object
1270
  @type iallocator: string or None
1271
  @param iallocator: Iallocator specified in opcode
1272
  @rtype: string
1273
  @return: Iallocator name
1274

1275
  """
1276
  if not iallocator:
1277
    # Use default iallocator
1278
    iallocator = cfg.GetDefaultIAllocator()
1279

    
1280
  if not iallocator:
1281
    raise errors.OpPrereqError("No iallocator was specified, neither in the"
1282
                               " opcode nor as a cluster-wide default",
1283
                               errors.ECODE_INVAL)
1284

    
1285
  return iallocator
1286

    
1287

    
1288
class LUClusterPostInit(LogicalUnit):
1289
  """Logical unit for running hooks after cluster initialization.
1290

1291
  """
1292
  HPATH = "cluster-init"
1293
  HTYPE = constants.HTYPE_CLUSTER
1294

    
1295
  def BuildHooksEnv(self):
1296
    """Build hooks env.
1297

1298
    """
1299
    return {
1300
      "OP_TARGET": self.cfg.GetClusterName(),
1301
      }
1302

    
1303
  def BuildHooksNodes(self):
1304
    """Build hooks nodes.
1305

1306
    """
1307
    return ([], [self.cfg.GetMasterNode()])
1308

    
1309
  def Exec(self, feedback_fn):
1310
    """Nothing to do.
1311

1312
    """
1313
    return True
1314

    
1315

    
1316
class LUClusterDestroy(LogicalUnit):
1317
  """Logical unit for destroying the cluster.
1318

1319
  """
1320
  HPATH = "cluster-destroy"
1321
  HTYPE = constants.HTYPE_CLUSTER
1322

    
1323
  def BuildHooksEnv(self):
1324
    """Build hooks env.
1325

1326
    """
1327
    return {
1328
      "OP_TARGET": self.cfg.GetClusterName(),
1329
      }
1330

    
1331
  def BuildHooksNodes(self):
1332
    """Build hooks nodes.
1333

1334
    """
1335
    return ([], [])
1336

    
1337
  def CheckPrereq(self):
1338
    """Check prerequisites.
1339

1340
    This checks whether the cluster is empty.
1341

1342
    Any errors are signaled by raising errors.OpPrereqError.
1343

1344
    """
1345
    master = self.cfg.GetMasterNode()
1346

    
1347
    nodelist = self.cfg.GetNodeList()
1348
    if len(nodelist) != 1 or nodelist[0] != master:
1349
      raise errors.OpPrereqError("There are still %d node(s) in"
1350
                                 " this cluster." % (len(nodelist) - 1),
1351
                                 errors.ECODE_INVAL)
1352
    instancelist = self.cfg.GetInstanceList()
1353
    if instancelist:
1354
      raise errors.OpPrereqError("There are still %d instance(s) in"
1355
                                 " this cluster." % len(instancelist),
1356
                                 errors.ECODE_INVAL)
1357

    
1358
  def Exec(self, feedback_fn):
1359
    """Destroys the cluster.
1360

1361
    """
1362
    master_params = self.cfg.GetMasterNetworkParameters()
1363

    
1364
    # Run post hooks on master node before it's removed
1365
    _RunPostHook(self, master_params.name)
1366

    
1367
    ems = self.cfg.GetUseExternalMipScript()
1368
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1369
                                                     master_params, ems)
1370
    result.Raise("Could not disable the master role")
1371

    
1372
    return master_params.name
1373

    
1374

    
1375
def _VerifyCertificate(filename):
1376
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1377

1378
  @type filename: string
1379
  @param filename: Path to PEM file
1380

1381
  """
1382
  try:
1383
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1384
                                           utils.ReadFile(filename))
1385
  except Exception, err: # pylint: disable=W0703
1386
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1387
            "Failed to load X509 certificate %s: %s" % (filename, err))
1388

    
1389
  (errcode, msg) = \
1390
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1391
                                constants.SSL_CERT_EXPIRATION_ERROR)
1392

    
1393
  if msg:
1394
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1395
  else:
1396
    fnamemsg = None
1397

    
1398
  if errcode is None:
1399
    return (None, fnamemsg)
1400
  elif errcode == utils.CERT_WARNING:
1401
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1402
  elif errcode == utils.CERT_ERROR:
1403
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1404

    
1405
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1406

    
1407

    
1408
def _GetAllHypervisorParameters(cluster, instances):
1409
  """Compute the set of all hypervisor parameters.
1410

1411
  @type cluster: L{objects.Cluster}
1412
  @param cluster: the cluster object
1413
  @param instances: list of L{objects.Instance}
1414
  @param instances: additional instances from which to obtain parameters
1415
  @rtype: list of (origin, hypervisor, parameters)
1416
  @return: a list with all parameters found, indicating the hypervisor they
1417
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1418

1419
  """
1420
  hvp_data = []
1421

    
1422
  for hv_name in cluster.enabled_hypervisors:
1423
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1424

    
1425
  for os_name, os_hvp in cluster.os_hvp.items():
1426
    for hv_name, hv_params in os_hvp.items():
1427
      if hv_params:
1428
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1429
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1430

    
1431
  # TODO: collapse identical parameter values in a single one
1432
  for instance in instances:
1433
    if instance.hvparams:
1434
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1435
                       cluster.FillHV(instance)))
1436

    
1437
  return hvp_data
1438

    
1439

    
1440
class _VerifyErrors(object):
1441
  """Mix-in for cluster/group verify LUs.
1442

1443
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1444
  self.op and self._feedback_fn to be available.)
1445

1446
  """
1447

    
1448
  ETYPE_FIELD = "code"
1449
  ETYPE_ERROR = "ERROR"
1450
  ETYPE_WARNING = "WARNING"
1451

    
1452
  def _Error(self, ecode, item, msg, *args, **kwargs):
1453
    """Format an error message.
1454

1455
    Based on the opcode's error_codes parameter, either format a
1456
    parseable error code, or a simpler error string.
1457

1458
    This must be called only from Exec and functions called from Exec.
1459

1460
    """
1461
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1462
    itype, etxt, _ = ecode
1463
    # first complete the msg
1464
    if args:
1465
      msg = msg % args
1466
    # then format the whole message
1467
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1468
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1469
    else:
1470
      if item:
1471
        item = " " + item
1472
      else:
1473
        item = ""
1474
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1475
    # and finally report it via the feedback_fn
1476
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1477

    
1478
  def _ErrorIf(self, cond, ecode, *args, **kwargs):
1479
    """Log an error message if the passed condition is True.
1480

1481
    """
1482
    cond = (bool(cond)
1483
            or self.op.debug_simulate_errors) # pylint: disable=E1101
1484

    
1485
    # If the error code is in the list of ignored errors, demote the error to a
1486
    # warning
1487
    (_, etxt, _) = ecode
1488
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1489
      kwargs[self.ETYPE_FIELD] = self.ETYPE_WARNING
1490

    
1491
    if cond:
1492
      self._Error(ecode, *args, **kwargs)
1493

    
1494
    # do not mark the operation as failed for WARN cases only
1495
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1496
      self.bad = self.bad or cond
1497

    
1498

    
1499
class LUClusterVerify(NoHooksLU):
1500
  """Submits all jobs necessary to verify the cluster.
1501

1502
  """
1503
  REQ_BGL = False
1504

    
1505
  def ExpandNames(self):
1506
    self.needed_locks = {}
1507

    
1508
  def Exec(self, feedback_fn):
1509
    jobs = []
1510

    
1511
    if self.op.group_name:
1512
      groups = [self.op.group_name]
1513
      depends_fn = lambda: None
1514
    else:
1515
      groups = self.cfg.GetNodeGroupList()
1516

    
1517
      # Verify global configuration
1518
      jobs.append([
1519
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors)
1520
        ])
1521

    
1522
      # Always depend on global verification
1523
      depends_fn = lambda: [(-len(jobs), [])]
1524

    
1525
    jobs.extend([opcodes.OpClusterVerifyGroup(group_name=group,
1526
                                            ignore_errors=self.op.ignore_errors,
1527
                                            depends=depends_fn())]
1528
                for group in groups)
1529

    
1530
    # Fix up all parameters
1531
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1532
      op.debug_simulate_errors = self.op.debug_simulate_errors
1533
      op.verbose = self.op.verbose
1534
      op.error_codes = self.op.error_codes
1535
      try:
1536
        op.skip_checks = self.op.skip_checks
1537
      except AttributeError:
1538
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1539

    
1540
    return ResultWithJobs(jobs)
1541

    
1542

    
1543
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1544
  """Verifies the cluster config.
1545

1546
  """
1547
  REQ_BGL = True
1548

    
1549
  def _VerifyHVP(self, hvp_data):
1550
    """Verifies locally the syntax of the hypervisor parameters.
1551

1552
    """
1553
    for item, hv_name, hv_params in hvp_data:
1554
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1555
             (item, hv_name))
1556
      try:
1557
        hv_class = hypervisor.GetHypervisor(hv_name)
1558
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1559
        hv_class.CheckParameterSyntax(hv_params)
1560
      except errors.GenericError, err:
1561
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1562

    
1563
  def ExpandNames(self):
1564
    # Information can be safely retrieved as the BGL is acquired in exclusive
1565
    # mode
1566
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
1567
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1568
    self.all_node_info = self.cfg.GetAllNodesInfo()
1569
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1570
    self.needed_locks = {}
1571

    
1572
  def Exec(self, feedback_fn):
1573
    """Verify integrity of cluster, performing various test on nodes.
1574

1575
    """
1576
    self.bad = False
1577
    self._feedback_fn = feedback_fn
1578

    
1579
    feedback_fn("* Verifying cluster config")
1580

    
1581
    for msg in self.cfg.VerifyConfig():
1582
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1583

    
1584
    feedback_fn("* Verifying cluster certificate files")
1585

    
1586
    for cert_filename in constants.ALL_CERT_FILES:
1587
      (errcode, msg) = _VerifyCertificate(cert_filename)
1588
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1589

    
1590
    feedback_fn("* Verifying hypervisor parameters")
1591

    
1592
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1593
                                                self.all_inst_info.values()))
1594

    
1595
    feedback_fn("* Verifying all nodes belong to an existing group")
1596

    
1597
    # We do this verification here because, should this bogus circumstance
1598
    # occur, it would never be caught by VerifyGroup, which only acts on
1599
    # nodes/instances reachable from existing node groups.
1600

    
1601
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1602
                         if node.group not in self.all_group_info)
1603

    
1604
    dangling_instances = {}
1605
    no_node_instances = []
1606

    
1607
    for inst in self.all_inst_info.values():
1608
      if inst.primary_node in dangling_nodes:
1609
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1610
      elif inst.primary_node not in self.all_node_info:
1611
        no_node_instances.append(inst.name)
1612

    
1613
    pretty_dangling = [
1614
        "%s (%s)" %
1615
        (node.name,
1616
         utils.CommaJoin(dangling_instances.get(node.name,
1617
                                                ["no instances"])))
1618
        for node in dangling_nodes]
1619

    
1620
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1621
                  None,
1622
                  "the following nodes (and their instances) belong to a non"
1623
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1624

    
1625
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1626
                  None,
1627
                  "the following instances have a non-existing primary-node:"
1628
                  " %s", utils.CommaJoin(no_node_instances))
1629

    
1630
    return not self.bad
1631

    
1632

    
1633
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1634
  """Verifies the status of a node group.
1635

1636
  """
1637
  HPATH = "cluster-verify"
1638
  HTYPE = constants.HTYPE_CLUSTER
1639
  REQ_BGL = False
1640

    
1641
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1642

    
1643
  class NodeImage(object):
1644
    """A class representing the logical and physical status of a node.
1645

1646
    @type name: string
1647
    @ivar name: the node name to which this object refers
1648
    @ivar volumes: a structure as returned from
1649
        L{ganeti.backend.GetVolumeList} (runtime)
1650
    @ivar instances: a list of running instances (runtime)
1651
    @ivar pinst: list of configured primary instances (config)
1652
    @ivar sinst: list of configured secondary instances (config)
1653
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1654
        instances for which this node is secondary (config)
1655
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1656
    @ivar dfree: free disk, as reported by the node (runtime)
1657
    @ivar offline: the offline status (config)
1658
    @type rpc_fail: boolean
1659
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1660
        not whether the individual keys were correct) (runtime)
1661
    @type lvm_fail: boolean
1662
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1663
    @type hyp_fail: boolean
1664
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1665
    @type ghost: boolean
1666
    @ivar ghost: whether this is a known node or not (config)
1667
    @type os_fail: boolean
1668
    @ivar os_fail: whether the RPC call didn't return valid OS data
1669
    @type oslist: list
1670
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1671
    @type vm_capable: boolean
1672
    @ivar vm_capable: whether the node can host instances
1673

1674
    """
1675
    def __init__(self, offline=False, name=None, vm_capable=True):
1676
      self.name = name
1677
      self.volumes = {}
1678
      self.instances = []
1679
      self.pinst = []
1680
      self.sinst = []
1681
      self.sbp = {}
1682
      self.mfree = 0
1683
      self.dfree = 0
1684
      self.offline = offline
1685
      self.vm_capable = vm_capable
1686
      self.rpc_fail = False
1687
      self.lvm_fail = False
1688
      self.hyp_fail = False
1689
      self.ghost = False
1690
      self.os_fail = False
1691
      self.oslist = {}
1692

    
1693
  def ExpandNames(self):
1694
    # This raises errors.OpPrereqError on its own:
1695
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1696

    
1697
    # Get instances in node group; this is unsafe and needs verification later
1698
    inst_names = self.cfg.GetNodeGroupInstances(self.group_uuid)
1699

    
1700
    self.needed_locks = {
1701
      locking.LEVEL_INSTANCE: inst_names,
1702
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1703
      locking.LEVEL_NODE: [],
1704
      }
1705

    
1706
    self.share_locks = _ShareAll()
1707

    
1708
  def DeclareLocks(self, level):
1709
    if level == locking.LEVEL_NODE:
1710
      # Get members of node group; this is unsafe and needs verification later
1711
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1712

    
1713
      all_inst_info = self.cfg.GetAllInstancesInfo()
1714

    
1715
      # In Exec(), we warn about mirrored instances that have primary and
1716
      # secondary living in separate node groups. To fully verify that
1717
      # volumes for these instances are healthy, we will need to do an
1718
      # extra call to their secondaries. We ensure here those nodes will
1719
      # be locked.
1720
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1721
        # Important: access only the instances whose lock is owned
1722
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1723
          nodes.update(all_inst_info[inst].secondary_nodes)
1724

    
1725
      self.needed_locks[locking.LEVEL_NODE] = nodes
1726

    
1727
  def CheckPrereq(self):
1728
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1729
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1730

    
1731
    group_nodes = set(self.group_info.members)
1732
    group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
1733

    
1734
    unlocked_nodes = \
1735
        group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1736

    
1737
    unlocked_instances = \
1738
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1739

    
1740
    if unlocked_nodes:
1741
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1742
                                 utils.CommaJoin(unlocked_nodes))
1743

    
1744
    if unlocked_instances:
1745
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1746
                                 utils.CommaJoin(unlocked_instances))
1747

    
1748
    self.all_node_info = self.cfg.GetAllNodesInfo()
1749
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1750

    
1751
    self.my_node_names = utils.NiceSort(group_nodes)
1752
    self.my_inst_names = utils.NiceSort(group_instances)
1753

    
1754
    self.my_node_info = dict((name, self.all_node_info[name])
1755
                             for name in self.my_node_names)
1756

    
1757
    self.my_inst_info = dict((name, self.all_inst_info[name])
1758
                             for name in self.my_inst_names)
1759

    
1760
    # We detect here the nodes that will need the extra RPC calls for verifying
1761
    # split LV volumes; they should be locked.
1762
    extra_lv_nodes = set()
1763

    
1764
    for inst in self.my_inst_info.values():
1765
      if inst.disk_template in constants.DTS_INT_MIRROR:
1766
        group = self.my_node_info[inst.primary_node].group
1767
        for nname in inst.secondary_nodes:
1768
          if self.all_node_info[nname].group != group:
1769
            extra_lv_nodes.add(nname)
1770

    
1771
    unlocked_lv_nodes = \
1772
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1773

    
1774
    if unlocked_lv_nodes:
1775
      raise errors.OpPrereqError("these nodes could be locked: %s" %
1776
                                 utils.CommaJoin(unlocked_lv_nodes))
1777
    self.extra_lv_nodes = list(extra_lv_nodes)
1778

    
1779
  def _VerifyNode(self, ninfo, nresult):
1780
    """Perform some basic validation on data returned from a node.
1781

1782
      - check the result data structure is well formed and has all the
1783
        mandatory fields
1784
      - check ganeti version
1785

1786
    @type ninfo: L{objects.Node}
1787
    @param ninfo: the node to check
1788
    @param nresult: the results from the node
1789
    @rtype: boolean
1790
    @return: whether overall this call was successful (and we can expect
1791
         reasonable values in the respose)
1792

1793
    """
1794
    node = ninfo.name
1795
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1796

    
1797
    # main result, nresult should be a non-empty dict
1798
    test = not nresult or not isinstance(nresult, dict)
1799
    _ErrorIf(test, constants.CV_ENODERPC, node,
1800
                  "unable to verify node: no data returned")
1801
    if test:
1802
      return False
1803

    
1804
    # compares ganeti version
1805
    local_version = constants.PROTOCOL_VERSION
1806
    remote_version = nresult.get("version", None)
1807
    test = not (remote_version and
1808
                isinstance(remote_version, (list, tuple)) and
1809
                len(remote_version) == 2)
1810
    _ErrorIf(test, constants.CV_ENODERPC, node,
1811
             "connection to node returned invalid data")
1812
    if test:
1813
      return False
1814

    
1815
    test = local_version != remote_version[0]
1816
    _ErrorIf(test, constants.CV_ENODEVERSION, node,
1817
             "incompatible protocol versions: master %s,"
1818
             " node %s", local_version, remote_version[0])
1819
    if test:
1820
      return False
1821

    
1822
    # node seems compatible, we can actually try to look into its results
1823

    
1824
    # full package version
1825
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1826
                  constants.CV_ENODEVERSION, node,
1827
                  "software version mismatch: master %s, node %s",
1828
                  constants.RELEASE_VERSION, remote_version[1],
1829
                  code=self.ETYPE_WARNING)
1830

    
1831
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1832
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1833
      for hv_name, hv_result in hyp_result.iteritems():
1834
        test = hv_result is not None
1835
        _ErrorIf(test, constants.CV_ENODEHV, node,
1836
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1837

    
1838
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1839
    if ninfo.vm_capable and isinstance(hvp_result, list):
1840
      for item, hv_name, hv_result in hvp_result:
1841
        _ErrorIf(True, constants.CV_ENODEHV, node,
1842
                 "hypervisor %s parameter verify failure (source %s): %s",
1843
                 hv_name, item, hv_result)
1844

    
1845
    test = nresult.get(constants.NV_NODESETUP,
1846
                       ["Missing NODESETUP results"])
1847
    _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1848
             "; ".join(test))
1849

    
1850
    return True
1851

    
1852
  def _VerifyNodeTime(self, ninfo, nresult,
1853
                      nvinfo_starttime, nvinfo_endtime):
1854
    """Check the node time.
1855

1856
    @type ninfo: L{objects.Node}
1857
    @param ninfo: the node to check
1858
    @param nresult: the remote results for the node
1859
    @param nvinfo_starttime: the start time of the RPC call
1860
    @param nvinfo_endtime: the end time of the RPC call
1861

1862
    """
1863
    node = ninfo.name
1864
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1865

    
1866
    ntime = nresult.get(constants.NV_TIME, None)
1867
    try:
1868
      ntime_merged = utils.MergeTime(ntime)
1869
    except (ValueError, TypeError):
1870
      _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1871
      return
1872

    
1873
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1874
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1875
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1876
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1877
    else:
1878
      ntime_diff = None
1879

    
1880
    _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1881
             "Node time diverges by at least %s from master node time",
1882
             ntime_diff)
1883

    
1884
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1885
    """Check the node LVM results.
1886

1887
    @type ninfo: L{objects.Node}
1888
    @param ninfo: the node to check
1889
    @param nresult: the remote results for the node
1890
    @param vg_name: the configured VG name
1891

1892
    """
1893
    if vg_name is None:
1894
      return
1895

    
1896
    node = ninfo.name
1897
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1898

    
1899
    # checks vg existence and size > 20G
1900
    vglist = nresult.get(constants.NV_VGLIST, None)
1901
    test = not vglist
1902
    _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1903
    if not test:
1904
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1905
                                            constants.MIN_VG_SIZE)
1906
      _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1907

    
1908
    # check pv names
1909
    pvlist = nresult.get(constants.NV_PVLIST, None)
1910
    test = pvlist is None
1911
    _ErrorIf(test, constants.CV_ENODELVM, node, "Can't get PV list from node")
1912
    if not test:
1913
      # check that ':' is not present in PV names, since it's a
1914
      # special character for lvcreate (denotes the range of PEs to
1915
      # use on the PV)
1916
      for _, pvname, owner_vg in pvlist:
1917
        test = ":" in pvname
1918
        _ErrorIf(test, constants.CV_ENODELVM, node,
1919
                 "Invalid character ':' in PV '%s' of VG '%s'",
1920
                 pvname, owner_vg)
1921

    
1922
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1923
    """Check the node bridges.
1924

1925
    @type ninfo: L{objects.Node}
1926
    @param ninfo: the node to check
1927
    @param nresult: the remote results for the node
1928
    @param bridges: the expected list of bridges
1929

1930
    """
1931
    if not bridges:
1932
      return
1933

    
1934
    node = ninfo.name
1935
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1936

    
1937
    missing = nresult.get(constants.NV_BRIDGES, None)
1938
    test = not isinstance(missing, list)
1939
    _ErrorIf(test, constants.CV_ENODENET, node,
1940
             "did not return valid bridge information")
1941
    if not test:
1942
      _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1943
               "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1944

    
1945
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1946
    """Check the results of user scripts presence and executability on the node
1947

1948
    @type ninfo: L{objects.Node}
1949
    @param ninfo: the node to check
1950
    @param nresult: the remote results for the node
1951

1952
    """
1953
    node = ninfo.name
1954

    
1955
    test = not constants.NV_USERSCRIPTS in nresult
1956
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1957
                  "did not return user scripts information")
1958

    
1959
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1960
    if not test:
1961
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1962
                    "user scripts not present or not executable: %s" %
1963
                    utils.CommaJoin(sorted(broken_scripts)))
1964

    
1965
  def _VerifyNodeNetwork(self, ninfo, nresult):
1966
    """Check the node network connectivity results.
1967

1968
    @type ninfo: L{objects.Node}
1969
    @param ninfo: the node to check
1970
    @param nresult: the remote results for the node
1971

1972
    """
1973
    node = ninfo.name
1974
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1975

    
1976
    test = constants.NV_NODELIST not in nresult
1977
    _ErrorIf(test, constants.CV_ENODESSH, node,
1978
             "node hasn't returned node ssh connectivity data")
1979
    if not test:
1980
      if nresult[constants.NV_NODELIST]:
1981
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1982
          _ErrorIf(True, constants.CV_ENODESSH, node,
1983
                   "ssh communication with node '%s': %s", a_node, a_msg)
1984

    
1985
    test = constants.NV_NODENETTEST not in nresult
1986
    _ErrorIf(test, constants.CV_ENODENET, node,
1987
             "node hasn't returned node tcp connectivity data")
1988
    if not test:
1989
      if nresult[constants.NV_NODENETTEST]:
1990
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1991
        for anode in nlist:
1992
          _ErrorIf(True, constants.CV_ENODENET, node,
1993
                   "tcp communication with node '%s': %s",
1994
                   anode, nresult[constants.NV_NODENETTEST][anode])
1995

    
1996
    test = constants.NV_MASTERIP not in nresult
1997
    _ErrorIf(test, constants.CV_ENODENET, node,
1998
             "node hasn't returned node master IP reachability data")
1999
    if not test:
2000
      if not nresult[constants.NV_MASTERIP]:
2001
        if node == self.master_node:
2002
          msg = "the master node cannot reach the master IP (not configured?)"
2003
        else:
2004
          msg = "cannot reach the master IP"
2005
        _ErrorIf(True, constants.CV_ENODENET, node, msg)
2006

    
2007
  def _VerifyInstance(self, instance, instanceconfig, node_image,
2008
                      diskstatus):
2009
    """Verify an instance.
2010

2011
    This function checks to see if the required block devices are
2012
    available on the instance's node.
2013

2014
    """
2015
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2016
    node_current = instanceconfig.primary_node
2017

    
2018
    node_vol_should = {}
2019
    instanceconfig.MapLVsByNode(node_vol_should)
2020

    
2021
    for node in node_vol_should:
2022
      n_img = node_image[node]
2023
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2024
        # ignore missing volumes on offline or broken nodes
2025
        continue
2026
      for volume in node_vol_should[node]:
2027
        test = volume not in n_img.volumes
2028
        _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
2029
                 "volume %s missing on node %s", volume, node)
2030

    
2031
    if instanceconfig.admin_up:
2032
      pri_img = node_image[node_current]
2033
      test = instance not in pri_img.instances and not pri_img.offline
2034
      _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
2035
               "instance not running on its primary node %s",
2036
               node_current)
2037

    
2038
    diskdata = [(nname, success, status, idx)
2039
                for (nname, disks) in diskstatus.items()
2040
                for idx, (success, status) in enumerate(disks)]
2041

    
2042
    for nname, success, bdev_status, idx in diskdata:
2043
      # the 'ghost node' construction in Exec() ensures that we have a
2044
      # node here
2045
      snode = node_image[nname]
2046
      bad_snode = snode.ghost or snode.offline
2047
      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
2048
               constants.CV_EINSTANCEFAULTYDISK, instance,
2049
               "couldn't retrieve status for disk/%s on %s: %s",
2050
               idx, nname, bdev_status)
2051
      _ErrorIf((instanceconfig.admin_up and success and
2052
                bdev_status.ldisk_status == constants.LDS_FAULTY),
2053
               constants.CV_EINSTANCEFAULTYDISK, instance,
2054
               "disk/%s on %s is faulty", idx, nname)
2055

    
2056
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2057
    """Verify if there are any unknown volumes in the cluster.
2058

2059
    The .os, .swap and backup volumes are ignored. All other volumes are
2060
    reported as unknown.
2061

2062
    @type reserved: L{ganeti.utils.FieldSet}
2063
    @param reserved: a FieldSet of reserved volume names
2064

2065
    """
2066
    for node, n_img in node_image.items():
2067
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2068
        # skip non-healthy nodes
2069
        continue
2070
      for volume in n_img.volumes:
2071
        test = ((node not in node_vol_should or
2072
                volume not in node_vol_should[node]) and
2073
                not reserved.Matches(volume))
2074
        self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
2075
                      "volume %s is unknown", volume)
2076

    
2077
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
2078
    """Verify N+1 Memory Resilience.
2079

2080
    Check that if one single node dies we can still start all the
2081
    instances it was primary for.
2082

2083
    """
2084
    cluster_info = self.cfg.GetClusterInfo()
2085
    for node, n_img in node_image.items():
2086
      # This code checks that every node which is now listed as
2087
      # secondary has enough memory to host all instances it is
2088
      # supposed to should a single other node in the cluster fail.
2089
      # FIXME: not ready for failover to an arbitrary node
2090
      # FIXME: does not support file-backed instances
2091
      # WARNING: we currently take into account down instances as well
2092
      # as up ones, considering that even if they're down someone
2093
      # might want to start them even in the event of a node failure.
2094
      if n_img.offline:
2095
        # we're skipping offline nodes from the N+1 warning, since
2096
        # most likely we don't have good memory infromation from them;
2097
        # we already list instances living on such nodes, and that's
2098
        # enough warning
2099
        continue
2100
      for prinode, instances in n_img.sbp.items():
2101
        needed_mem = 0
2102
        for instance in instances:
2103
          bep = cluster_info.FillBE(instance_cfg[instance])
2104
          if bep[constants.BE_AUTO_BALANCE]:
2105
            needed_mem += bep[constants.BE_MEMORY]
2106
        test = n_img.mfree < needed_mem
2107
        self._ErrorIf(test, constants.CV_ENODEN1, node,
2108
                      "not enough memory to accomodate instance failovers"
2109
                      " should node %s fail (%dMiB needed, %dMiB available)",
2110
                      prinode, needed_mem, n_img.mfree)
2111

    
2112
  @classmethod
2113
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
2114
                   (files_all, files_opt, files_mc, files_vm)):
2115
    """Verifies file checksums collected from all nodes.
2116

2117
    @param errorif: Callback for reporting errors
2118
    @param nodeinfo: List of L{objects.Node} objects
2119
    @param master_node: Name of master node
2120
    @param all_nvinfo: RPC results
2121

2122
    """
2123
    # Define functions determining which nodes to consider for a file
2124
    files2nodefn = [
2125
      (files_all, None),
2126
      (files_mc, lambda node: (node.master_candidate or
2127
                               node.name == master_node)),
2128
      (files_vm, lambda node: node.vm_capable),
2129
      ]
2130

    
2131
    # Build mapping from filename to list of nodes which should have the file
2132
    nodefiles = {}
2133
    for (files, fn) in files2nodefn:
2134
      if fn is None:
2135
        filenodes = nodeinfo
2136
      else:
2137
        filenodes = filter(fn, nodeinfo)
2138
      nodefiles.update((filename,
2139
                        frozenset(map(operator.attrgetter("name"), filenodes)))
2140
                       for filename in files)
2141

    
2142
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2143

    
2144
    fileinfo = dict((filename, {}) for filename in nodefiles)
2145
    ignore_nodes = set()
2146

    
2147
    for node in nodeinfo:
2148
      if node.offline:
2149
        ignore_nodes.add(node.name)
2150
        continue
2151

    
2152
      nresult = all_nvinfo[node.name]
2153

    
2154
      if nresult.fail_msg or not nresult.payload:
2155
        node_files = None
2156
      else:
2157
        node_files = nresult.payload.get(constants.NV_FILELIST, None)
2158

    
2159
      test = not (node_files and isinstance(node_files, dict))
2160
      errorif(test, constants.CV_ENODEFILECHECK, node.name,
2161
              "Node did not return file checksum data")
2162
      if test:
2163
        ignore_nodes.add(node.name)
2164
        continue
2165

    
2166
      # Build per-checksum mapping from filename to nodes having it
2167
      for (filename, checksum) in node_files.items():
2168
        assert filename in nodefiles
2169
        fileinfo[filename].setdefault(checksum, set()).add(node.name)
2170

    
2171
    for (filename, checksums) in fileinfo.items():
2172
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2173

    
2174
      # Nodes having the file
2175
      with_file = frozenset(node_name
2176
                            for nodes in fileinfo[filename].values()
2177
                            for node_name in nodes) - ignore_nodes
2178

    
2179
      expected_nodes = nodefiles[filename] - ignore_nodes
2180

    
2181
      # Nodes missing file
2182
      missing_file = expected_nodes - with_file
2183

    
2184
      if filename in files_opt:
2185
        # All or no nodes
2186
        errorif(missing_file and missing_file != expected_nodes,
2187
                constants.CV_ECLUSTERFILECHECK, None,
2188
                "File %s is optional, but it must exist on all or no"
2189
                " nodes (not found on %s)",
2190
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2191
      else:
2192
        errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2193
                "File %s is missing from node(s) %s", filename,
2194
                utils.CommaJoin(utils.NiceSort(missing_file)))
2195

    
2196
        # Warn if a node has a file it shouldn't
2197
        unexpected = with_file - expected_nodes
2198
        errorif(unexpected,
2199
                constants.CV_ECLUSTERFILECHECK, None,
2200
                "File %s should not exist on node(s) %s",
2201
                filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2202

    
2203
      # See if there are multiple versions of the file
2204
      test = len(checksums) > 1
2205
      if test:
2206
        variants = ["variant %s on %s" %
2207
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2208
                    for (idx, (checksum, nodes)) in
2209
                      enumerate(sorted(checksums.items()))]
2210
      else:
2211
        variants = []
2212

    
2213
      errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2214
              "File %s found with %s different checksums (%s)",
2215
              filename, len(checksums), "; ".join(variants))
2216

    
2217
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2218
                      drbd_map):
2219
    """Verifies and the node DRBD status.
2220

2221
    @type ninfo: L{objects.Node}
2222
    @param ninfo: the node to check
2223
    @param nresult: the remote results for the node
2224
    @param instanceinfo: the dict of instances
2225
    @param drbd_helper: the configured DRBD usermode helper
2226
    @param drbd_map: the DRBD map as returned by
2227
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2228

2229
    """
2230
    node = ninfo.name
2231
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2232

    
2233
    if drbd_helper:
2234
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2235
      test = (helper_result == None)
2236
      _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2237
               "no drbd usermode helper returned")
2238
      if helper_result:
2239
        status, payload = helper_result
2240
        test = not status
2241
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2242
                 "drbd usermode helper check unsuccessful: %s", payload)
2243
        test = status and (payload != drbd_helper)
2244
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2245
                 "wrong drbd usermode helper: %s", payload)
2246

    
2247
    # compute the DRBD minors
2248
    node_drbd = {}
2249
    for minor, instance in drbd_map[node].items():
2250
      test = instance not in instanceinfo
2251
      _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2252
               "ghost instance '%s' in temporary DRBD map", instance)
2253
        # ghost instance should not be running, but otherwise we
2254
        # don't give double warnings (both ghost instance and
2255
        # unallocated minor in use)
2256
      if test:
2257
        node_drbd[minor] = (instance, False)
2258
      else:
2259
        instance = instanceinfo[instance]
2260
        node_drbd[minor] = (instance.name, instance.admin_up)
2261

    
2262
    # and now check them
2263
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2264
    test = not isinstance(used_minors, (tuple, list))
2265
    _ErrorIf(test, constants.CV_ENODEDRBD, node,
2266
             "cannot parse drbd status file: %s", str(used_minors))
2267
    if test:
2268
      # we cannot check drbd status
2269
      return
2270

    
2271
    for minor, (iname, must_exist) in node_drbd.items():
2272
      test = minor not in used_minors and must_exist
2273
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2274
               "drbd minor %d of instance %s is not active", minor, iname)
2275
    for minor in used_minors:
2276
      test = minor not in node_drbd
2277
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2278
               "unallocated drbd minor %d is in use", minor)
2279

    
2280
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2281
    """Builds the node OS structures.
2282

2283
    @type ninfo: L{objects.Node}
2284
    @param ninfo: the node to check
2285
    @param nresult: the remote results for the node
2286
    @param nimg: the node image object
2287

2288
    """
2289
    node = ninfo.name
2290
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2291

    
2292
    remote_os = nresult.get(constants.NV_OSLIST, None)
2293
    test = (not isinstance(remote_os, list) or
2294
            not compat.all(isinstance(v, list) and len(v) == 7
2295
                           for v in remote_os))
2296

    
2297
    _ErrorIf(test, constants.CV_ENODEOS, node,
2298
             "node hasn't returned valid OS data")
2299

    
2300
    nimg.os_fail = test
2301

    
2302
    if test:
2303
      return
2304

    
2305
    os_dict = {}
2306

    
2307
    for (name, os_path, status, diagnose,
2308
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2309

    
2310
      if name not in os_dict:
2311
        os_dict[name] = []
2312

    
2313
      # parameters is a list of lists instead of list of tuples due to
2314
      # JSON lacking a real tuple type, fix it:
2315
      parameters = [tuple(v) for v in parameters]
2316
      os_dict[name].append((os_path, status, diagnose,
2317
                            set(variants), set(parameters), set(api_ver)))
2318

    
2319
    nimg.oslist = os_dict
2320

    
2321
  def _VerifyNodeOS(self, ninfo, nimg, base):
2322
    """Verifies the node OS list.
2323

2324
    @type ninfo: L{objects.Node}
2325
    @param ninfo: the node to check
2326
    @param nimg: the node image object
2327
    @param base: the 'template' node we match against (e.g. from the master)
2328

2329
    """
2330
    node = ninfo.name
2331
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2332

    
2333
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2334

    
2335
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2336
    for os_name, os_data in nimg.oslist.items():
2337
      assert os_data, "Empty OS status for OS %s?!" % os_name
2338
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2339
      _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2340
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2341
      _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2342
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2343
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2344
      # comparisons with the 'base' image
2345
      test = os_name not in base.oslist
2346
      _ErrorIf(test, constants.CV_ENODEOS, node,
2347
               "Extra OS %s not present on reference node (%s)",
2348
               os_name, base.name)
2349
      if test:
2350
        continue
2351
      assert base.oslist[os_name], "Base node has empty OS status?"
2352
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2353
      if not b_status:
2354
        # base OS is invalid, skipping
2355
        continue
2356
      for kind, a, b in [("API version", f_api, b_api),
2357
                         ("variants list", f_var, b_var),
2358
                         ("parameters", beautify_params(f_param),
2359
                          beautify_params(b_param))]:
2360
        _ErrorIf(a != b, constants.CV_ENODEOS, node,
2361
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2362
                 kind, os_name, base.name,
2363
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2364

    
2365
    # check any missing OSes
2366
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2367
    _ErrorIf(missing, constants.CV_ENODEOS, node,
2368
             "OSes present on reference node %s but missing on this node: %s",
2369
             base.name, utils.CommaJoin(missing))
2370

    
2371
  def _VerifyOob(self, ninfo, nresult):
2372
    """Verifies out of band functionality of a node.
2373

2374
    @type ninfo: L{objects.Node}
2375
    @param ninfo: the node to check
2376
    @param nresult: the remote results for the node
2377

2378
    """
2379
    node = ninfo.name
2380
    # We just have to verify the paths on master and/or master candidates
2381
    # as the oob helper is invoked on the master
2382
    if ((ninfo.master_candidate or ninfo.master_capable) and
2383
        constants.NV_OOB_PATHS in nresult):
2384
      for path_result in nresult[constants.NV_OOB_PATHS]:
2385
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2386

    
2387
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2388
    """Verifies and updates the node volume data.
2389

2390
    This function will update a L{NodeImage}'s internal structures
2391
    with data from the remote call.
2392

2393
    @type ninfo: L{objects.Node}
2394
    @param ninfo: the node to check
2395
    @param nresult: the remote results for the node
2396
    @param nimg: the node image object
2397
    @param vg_name: the configured VG name
2398

2399
    """
2400
    node = ninfo.name
2401
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2402

    
2403
    nimg.lvm_fail = True
2404
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2405
    if vg_name is None:
2406
      pass
2407
    elif isinstance(lvdata, basestring):
2408
      _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2409
               utils.SafeEncode(lvdata))
2410
    elif not isinstance(lvdata, dict):
2411
      _ErrorIf(True, constants.CV_ENODELVM, node,
2412
               "rpc call to node failed (lvlist)")
2413
    else:
2414
      nimg.volumes = lvdata
2415
      nimg.lvm_fail = False
2416

    
2417
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2418
    """Verifies and updates the node instance list.
2419

2420
    If the listing was successful, then updates this node's instance
2421
    list. Otherwise, it marks the RPC call as failed for the instance
2422
    list key.
2423

2424
    @type ninfo: L{objects.Node}
2425
    @param ninfo: the node to check
2426
    @param nresult: the remote results for the node
2427
    @param nimg: the node image object
2428

2429
    """
2430
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2431
    test = not isinstance(idata, list)
2432
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2433
                  "rpc call to node failed (instancelist): %s",
2434
                  utils.SafeEncode(str(idata)))
2435
    if test:
2436
      nimg.hyp_fail = True
2437
    else:
2438
      nimg.instances = idata
2439

    
2440
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2441
    """Verifies and computes a node information map
2442

2443
    @type ninfo: L{objects.Node}
2444
    @param ninfo: the node to check
2445
    @param nresult: the remote results for the node
2446
    @param nimg: the node image object
2447
    @param vg_name: the configured VG name
2448

2449
    """
2450
    node = ninfo.name
2451
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2452

    
2453
    # try to read free memory (from the hypervisor)
2454
    hv_info = nresult.get(constants.NV_HVINFO, None)
2455
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2456
    _ErrorIf(test, constants.CV_ENODEHV, node,
2457
             "rpc call to node failed (hvinfo)")
2458
    if not test:
2459
      try:
2460
        nimg.mfree = int(hv_info["memory_free"])
2461
      except (ValueError, TypeError):
2462
        _ErrorIf(True, constants.CV_ENODERPC, node,
2463
                 "node returned invalid nodeinfo, check hypervisor")
2464

    
2465
    # FIXME: devise a free space model for file based instances as well
2466
    if vg_name is not None:
2467
      test = (constants.NV_VGLIST not in nresult or
2468
              vg_name not in nresult[constants.NV_VGLIST])
2469
      _ErrorIf(test, constants.CV_ENODELVM, node,
2470
               "node didn't return data for the volume group '%s'"
2471
               " - it is either missing or broken", vg_name)
2472
      if not test:
2473
        try:
2474
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2475
        except (ValueError, TypeError):
2476
          _ErrorIf(True, constants.CV_ENODERPC, node,
2477
                   "node returned invalid LVM info, check LVM status")
2478

    
2479
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2480
    """Gets per-disk status information for all instances.
2481

2482
    @type nodelist: list of strings
2483
    @param nodelist: Node names
2484
    @type node_image: dict of (name, L{objects.Node})
2485
    @param node_image: Node objects
2486
    @type instanceinfo: dict of (name, L{objects.Instance})
2487
    @param instanceinfo: Instance objects
2488
    @rtype: {instance: {node: [(succes, payload)]}}
2489
    @return: a dictionary of per-instance dictionaries with nodes as
2490
        keys and disk information as values; the disk information is a
2491
        list of tuples (success, payload)
2492

2493
    """
2494
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2495

    
2496
    node_disks = {}
2497
    node_disks_devonly = {}
2498
    diskless_instances = set()
2499
    diskless = constants.DT_DISKLESS
2500

    
2501
    for nname in nodelist:
2502
      node_instances = list(itertools.chain(node_image[nname].pinst,
2503
                                            node_image[nname].sinst))
2504
      diskless_instances.update(inst for inst in node_instances
2505
                                if instanceinfo[inst].disk_template == diskless)
2506
      disks = [(inst, disk)
2507
               for inst in node_instances
2508
               for disk in instanceinfo[inst].disks]
2509

    
2510
      if not disks:
2511
        # No need to collect data
2512
        continue
2513

    
2514
      node_disks[nname] = disks
2515

    
2516
      # Creating copies as SetDiskID below will modify the objects and that can
2517
      # lead to incorrect data returned from nodes
2518
      devonly = [dev.Copy() for (_, dev) in disks]
2519

    
2520
      for dev in devonly:
2521
        self.cfg.SetDiskID(dev, nname)
2522

    
2523
      node_disks_devonly[nname] = devonly
2524

    
2525
    assert len(node_disks) == len(node_disks_devonly)
2526

    
2527
    # Collect data from all nodes with disks
2528
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2529
                                                          node_disks_devonly)
2530

    
2531
    assert len(result) == len(node_disks)
2532

    
2533
    instdisk = {}
2534

    
2535
    for (nname, nres) in result.items():
2536
      disks = node_disks[nname]
2537

    
2538
      if nres.offline:
2539
        # No data from this node
2540
        data = len(disks) * [(False, "node offline")]
2541
      else:
2542
        msg = nres.fail_msg
2543
        _ErrorIf(msg, constants.CV_ENODERPC, nname,
2544
                 "while getting disk information: %s", msg)
2545
        if msg:
2546
          # No data from this node
2547
          data = len(disks) * [(False, msg)]
2548
        else:
2549
          data = []
2550
          for idx, i in enumerate(nres.payload):
2551
            if isinstance(i, (tuple, list)) and len(i) == 2:
2552
              data.append(i)
2553
            else:
2554
              logging.warning("Invalid result from node %s, entry %d: %s",
2555
                              nname, idx, i)
2556
              data.append((False, "Invalid result from the remote node"))
2557

    
2558
      for ((inst, _), status) in zip(disks, data):
2559
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2560

    
2561
    # Add empty entries for diskless instances.
2562
    for inst in diskless_instances:
2563
      assert inst not in instdisk
2564
      instdisk[inst] = {}
2565

    
2566
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2567
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2568
                      compat.all(isinstance(s, (tuple, list)) and
2569
                                 len(s) == 2 for s in statuses)
2570
                      for inst, nnames in instdisk.items()
2571
                      for nname, statuses in nnames.items())
2572
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2573

    
2574
    return instdisk
2575

    
2576
  @staticmethod
2577
  def _SshNodeSelector(group_uuid, all_nodes):
2578
    """Create endless iterators for all potential SSH check hosts.
2579

2580
    """
2581
    nodes = [node for node in all_nodes
2582
             if (node.group != group_uuid and
2583
                 not node.offline)]
2584
    keyfunc = operator.attrgetter("group")
2585

    
2586
    return map(itertools.cycle,
2587
               [sorted(map(operator.attrgetter("name"), names))
2588
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2589
                                                  keyfunc)])
2590

    
2591
  @classmethod
2592
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2593
    """Choose which nodes should talk to which other nodes.
2594

2595
    We will make nodes contact all nodes in their group, and one node from
2596
    every other group.
2597

2598
    @warning: This algorithm has a known issue if one node group is much
2599
      smaller than others (e.g. just one node). In such a case all other
2600
      nodes will talk to the single node.
2601

2602
    """
2603
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2604
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2605

    
2606
    return (online_nodes,
2607
            dict((name, sorted([i.next() for i in sel]))
2608
                 for name in online_nodes))
2609

    
2610
  def BuildHooksEnv(self):
2611
    """Build hooks env.
2612

2613
    Cluster-Verify hooks just ran in the post phase and their failure makes
2614
    the output be logged in the verify output and the verification to fail.
2615

2616
    """
2617
    env = {
2618
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2619
      }
2620

    
2621
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2622
               for node in self.my_node_info.values())
2623

    
2624
    return env
2625

    
2626
  def BuildHooksNodes(self):
2627
    """Build hooks nodes.
2628

2629
    """
2630
    return ([], self.my_node_names)
2631

    
2632
  def Exec(self, feedback_fn):
2633
    """Verify integrity of the node group, performing various test on nodes.
2634

2635
    """
2636
    # This method has too many local variables. pylint: disable=R0914
2637
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2638

    
2639
    if not self.my_node_names:
2640
      # empty node group
2641
      feedback_fn("* Empty node group, skipping verification")
2642
      return True
2643

    
2644
    self.bad = False
2645
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2646
    verbose = self.op.verbose
2647
    self._feedback_fn = feedback_fn
2648

    
2649
    vg_name = self.cfg.GetVGName()
2650
    drbd_helper = self.cfg.GetDRBDHelper()
2651
    cluster = self.cfg.GetClusterInfo()
2652
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2653
    hypervisors = cluster.enabled_hypervisors
2654
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2655

    
2656
    i_non_redundant = [] # Non redundant instances
2657
    i_non_a_balanced = [] # Non auto-balanced instances
2658
    n_offline = 0 # Count of offline nodes
2659
    n_drained = 0 # Count of nodes being drained
2660
    node_vol_should = {}
2661

    
2662
    # FIXME: verify OS list
2663

    
2664
    # File verification
2665
    filemap = _ComputeAncillaryFiles(cluster, False)
2666

    
2667
    # do local checksums
2668
    master_node = self.master_node = self.cfg.GetMasterNode()
2669
    master_ip = self.cfg.GetMasterIP()
2670

    
2671
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2672

    
2673
    user_scripts = []
2674
    if self.cfg.GetUseExternalMipScript():
2675
      user_scripts.append(constants.EXTERNAL_MASTER_SETUP_SCRIPT)
2676

    
2677
    node_verify_param = {
2678
      constants.NV_FILELIST:
2679
        utils.UniqueSequence(filename
2680
                             for files in filemap
2681
                             for filename in files),
2682
      constants.NV_NODELIST:
2683
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2684
                                  self.all_node_info.values()),
2685
      constants.NV_HYPERVISOR: hypervisors,
2686
      constants.NV_HVPARAMS:
2687
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2688
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2689
                                 for node in node_data_list
2690
                                 if not node.offline],
2691
      constants.NV_INSTANCELIST: hypervisors,
2692
      constants.NV_VERSION: None,
2693
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2694
      constants.NV_NODESETUP: None,
2695
      constants.NV_TIME: None,
2696
      constants.NV_MASTERIP: (master_node, master_ip),
2697
      constants.NV_OSLIST: None,
2698
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2699
      constants.NV_USERSCRIPTS: user_scripts,
2700
      }
2701

    
2702
    if vg_name is not None:
2703
      node_verify_param[constants.NV_VGLIST] = None
2704
      node_verify_param[constants.NV_LVLIST] = vg_name
2705
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2706
      node_verify_param[constants.NV_DRBDLIST] = None
2707

    
2708
    if drbd_helper:
2709
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2710

    
2711
    # bridge checks
2712
    # FIXME: this needs to be changed per node-group, not cluster-wide
2713
    bridges = set()
2714
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2715
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2716
      bridges.add(default_nicpp[constants.NIC_LINK])
2717
    for instance in self.my_inst_info.values():
2718
      for nic in instance.nics:
2719
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2720
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2721
          bridges.add(full_nic[constants.NIC_LINK])
2722

    
2723
    if bridges:
2724
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2725

    
2726
    # Build our expected cluster state
2727
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2728
                                                 name=node.name,
2729
                                                 vm_capable=node.vm_capable))
2730
                      for node in node_data_list)
2731

    
2732
    # Gather OOB paths
2733
    oob_paths = []
2734
    for node in self.all_node_info.values():
2735
      path = _SupportsOob(self.cfg, node)
2736
      if path and path not in oob_paths:
2737
        oob_paths.append(path)
2738

    
2739
    if oob_paths:
2740
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2741

    
2742
    for instance in self.my_inst_names:
2743
      inst_config = self.my_inst_info[instance]
2744

    
2745
      for nname in inst_config.all_nodes:
2746
        if nname not in node_image:
2747
          gnode = self.NodeImage(name=nname)
2748
          gnode.ghost = (nname not in self.all_node_info)
2749
          node_image[nname] = gnode
2750

    
2751
      inst_config.MapLVsByNode(node_vol_should)
2752

    
2753
      pnode = inst_config.primary_node
2754
      node_image[pnode].pinst.append(instance)
2755

    
2756
      for snode in inst_config.secondary_nodes:
2757
        nimg = node_image[snode]
2758
        nimg.sinst.append(instance)
2759
        if pnode not in nimg.sbp:
2760
          nimg.sbp[pnode] = []
2761
        nimg.sbp[pnode].append(instance)
2762

    
2763
    # At this point, we have the in-memory data structures complete,
2764
    # except for the runtime information, which we'll gather next
2765

    
2766
    # Due to the way our RPC system works, exact response times cannot be
2767
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2768
    # time before and after executing the request, we can at least have a time
2769
    # window.
2770
    nvinfo_starttime = time.time()
2771
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2772
                                           node_verify_param,
2773
                                           self.cfg.GetClusterName())
2774
    nvinfo_endtime = time.time()
2775

    
2776
    if self.extra_lv_nodes and vg_name is not None:
2777
      extra_lv_nvinfo = \
2778
          self.rpc.call_node_verify(self.extra_lv_nodes,
2779
                                    {constants.NV_LVLIST: vg_name},
2780
                                    self.cfg.GetClusterName())
2781
    else:
2782
      extra_lv_nvinfo = {}
2783

    
2784
    all_drbd_map = self.cfg.ComputeDRBDMap()
2785

    
2786
    feedback_fn("* Gathering disk information (%s nodes)" %
2787
                len(self.my_node_names))
2788
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2789
                                     self.my_inst_info)
2790

    
2791
    feedback_fn("* Verifying configuration file consistency")
2792

    
2793
    # If not all nodes are being checked, we need to make sure the master node
2794
    # and a non-checked vm_capable node are in the list.
2795
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2796
    if absent_nodes:
2797
      vf_nvinfo = all_nvinfo.copy()
2798
      vf_node_info = list(self.my_node_info.values())
2799
      additional_nodes = []
2800
      if master_node not in self.my_node_info:
2801
        additional_nodes.append(master_node)
2802
        vf_node_info.append(self.all_node_info[master_node])
2803
      # Add the first vm_capable node we find which is not included
2804
      for node in absent_nodes:
2805
        nodeinfo = self.all_node_info[node]
2806
        if nodeinfo.vm_capable and not nodeinfo.offline:
2807
          additional_nodes.append(node)
2808
          vf_node_info.append(self.all_node_info[node])
2809
          break
2810
      key = constants.NV_FILELIST
2811
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2812
                                                 {key: node_verify_param[key]},
2813
                                                 self.cfg.GetClusterName()))
2814
    else:
2815
      vf_nvinfo = all_nvinfo
2816
      vf_node_info = self.my_node_info.values()
2817

    
2818
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2819

    
2820
    feedback_fn("* Verifying node status")
2821

    
2822
    refos_img = None
2823

    
2824
    for node_i in node_data_list:
2825
      node = node_i.name
2826
      nimg = node_image[node]
2827

    
2828
      if node_i.offline:
2829
        if verbose:
2830
          feedback_fn("* Skipping offline node %s" % (node,))
2831
        n_offline += 1
2832
        continue
2833

    
2834
      if node == master_node:
2835
        ntype = "master"
2836
      elif node_i.master_candidate:
2837
        ntype = "master candidate"
2838
      elif node_i.drained:
2839
        ntype = "drained"
2840
        n_drained += 1
2841
      else:
2842
        ntype = "regular"
2843
      if verbose:
2844
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2845

    
2846
      msg = all_nvinfo[node].fail_msg
2847
      _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2848
               msg)
2849
      if msg:
2850
        nimg.rpc_fail = True
2851
        continue
2852

    
2853
      nresult = all_nvinfo[node].payload
2854

    
2855
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2856
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2857
      self._VerifyNodeNetwork(node_i, nresult)
2858
      self._VerifyNodeUserScripts(node_i, nresult)
2859
      self._VerifyOob(node_i, nresult)
2860

    
2861
      if nimg.vm_capable:
2862
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2863
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2864
                             all_drbd_map)
2865

    
2866
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2867
        self._UpdateNodeInstances(node_i, nresult, nimg)
2868
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2869
        self._UpdateNodeOS(node_i, nresult, nimg)
2870

    
2871
        if not nimg.os_fail:
2872
          if refos_img is None:
2873
            refos_img = nimg
2874
          self._VerifyNodeOS(node_i, nimg, refos_img)
2875
        self._VerifyNodeBridges(node_i, nresult, bridges)
2876

    
2877
        # Check whether all running instancies are primary for the node. (This
2878
        # can no longer be done from _VerifyInstance below, since some of the
2879
        # wrong instances could be from other node groups.)
2880
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2881

    
2882
        for inst in non_primary_inst:
2883
          test = inst in self.all_inst_info
2884
          _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2885
                   "instance should not run on node %s", node_i.name)
2886
          _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2887
                   "node is running unknown instance %s", inst)
2888

    
2889
    for node, result in extra_lv_nvinfo.items():
2890
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2891
                              node_image[node], vg_name)
2892

    
2893
    feedback_fn("* Verifying instance status")
2894
    for instance in self.my_inst_names:
2895
      if verbose:
2896
        feedback_fn("* Verifying instance %s" % instance)
2897
      inst_config = self.my_inst_info[instance]
2898
      self._VerifyInstance(instance, inst_config, node_image,
2899
                           instdisk[instance])
2900
      inst_nodes_offline = []
2901

    
2902
      pnode = inst_config.primary_node
2903
      pnode_img = node_image[pnode]
2904
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2905
               constants.CV_ENODERPC, pnode, "instance %s, connection to"
2906
               " primary node failed", instance)
2907

    
2908
      _ErrorIf(inst_config.admin_up and pnode_img.offline,
2909
               constants.CV_EINSTANCEBADNODE, instance,
2910
               "instance is marked as running and lives on offline node %s",
2911
               inst_config.primary_node)
2912

    
2913
      # If the instance is non-redundant we cannot survive losing its primary
2914
      # node, so we are not N+1 compliant. On the other hand we have no disk
2915
      # templates with more than one secondary so that situation is not well
2916
      # supported either.
2917
      # FIXME: does not support file-backed instances
2918
      if not inst_config.secondary_nodes:
2919
        i_non_redundant.append(instance)
2920

    
2921
      _ErrorIf(len(inst_config.secondary_nodes) > 1,
2922
               constants.CV_EINSTANCELAYOUT,
2923
               instance, "instance has multiple secondary nodes: %s",
2924
               utils.CommaJoin(inst_config.secondary_nodes),
2925
               code=self.ETYPE_WARNING)
2926

    
2927
      if inst_config.disk_template in constants.DTS_INT_MIRROR:
2928
        pnode = inst_config.primary_node
2929
        instance_nodes = utils.NiceSort(inst_config.all_nodes)
2930
        instance_groups = {}
2931

    
2932
        for node in instance_nodes:
2933
          instance_groups.setdefault(self.all_node_info[node].group,
2934
                                     []).append(node)
2935

    
2936
        pretty_list = [
2937
          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
2938
          # Sort so that we always list the primary node first.
2939
          for group, nodes in sorted(instance_groups.items(),
2940
                                     key=lambda (_, nodes): pnode in nodes,
2941
                                     reverse=True)]
2942

    
2943
        self._ErrorIf(len(instance_groups) > 1,
2944
                      constants.CV_EINSTANCESPLITGROUPS,
2945
                      instance, "instance has primary and secondary nodes in"
2946
                      " different groups: %s", utils.CommaJoin(pretty_list),
2947
                      code=self.ETYPE_WARNING)
2948

    
2949
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2950
        i_non_a_balanced.append(instance)
2951

    
2952
      for snode in inst_config.secondary_nodes:
2953
        s_img = node_image[snode]
2954
        _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2955
                 snode, "instance %s, connection to secondary node failed",
2956
                 instance)
2957

    
2958
        if s_img.offline:
2959
          inst_nodes_offline.append(snode)
2960

    
2961
      # warn that the instance lives on offline nodes
2962
      _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
2963
               "instance has offline secondary node(s) %s",
2964
               utils.CommaJoin(inst_nodes_offline))
2965
      # ... or ghost/non-vm_capable nodes
2966
      for node in inst_config.all_nodes:
2967
        _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
2968
                 instance, "instance lives on ghost node %s", node)
2969
        _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
2970
                 instance, "instance lives on non-vm_capable node %s", node)
2971

    
2972
    feedback_fn("* Verifying orphan volumes")
2973
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2974

    
2975
    # We will get spurious "unknown volume" warnings if any node of this group
2976
    # is secondary for an instance whose primary is in another group. To avoid
2977
    # them, we find these instances and add their volumes to node_vol_should.
2978
    for inst in self.all_inst_info.values():
2979
      for secondary in inst.secondary_nodes:
2980
        if (secondary in self.my_node_info
2981
            and inst.name not in self.my_inst_info):
2982
          inst.MapLVsByNode(node_vol_should)
2983
          break
2984

    
2985
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2986

    
2987
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2988
      feedback_fn("* Verifying N+1 Memory redundancy")
2989
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2990

    
2991
    feedback_fn("* Other Notes")
2992
    if i_non_redundant:
2993
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2994
                  % len(i_non_redundant))
2995

    
2996
    if i_non_a_balanced:
2997
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2998
                  % len(i_non_a_balanced))
2999

    
3000
    if n_offline:
3001
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3002

    
3003
    if n_drained:
3004
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3005

    
3006
    return not self.bad
3007

    
3008
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3009
    """Analyze the post-hooks' result
3010

3011
    This method analyses the hook result, handles it, and sends some
3012
    nicely-formatted feedback back to the user.
3013

3014
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3015
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3016
    @param hooks_results: the results of the multi-node hooks rpc call
3017
    @param feedback_fn: function used send feedback back to the caller
3018
    @param lu_result: previous Exec result
3019
    @return: the new Exec result, based on the previous result
3020
        and hook results
3021

3022
    """
3023
    # We only really run POST phase hooks, only for non-empty groups,
3024
    # and are only interested in their results
3025
    if not self.my_node_names:
3026
      # empty node group
3027
      pass
3028
    elif phase == constants.HOOKS_PHASE_POST:
3029
      # Used to change hooks' output to proper indentation
3030
      feedback_fn("* Hooks Results")
3031
      assert hooks_results, "invalid result from hooks"
3032

    
3033
      for node_name in hooks_results:
3034
        res = hooks_results[node_name]
3035
        msg = res.fail_msg
3036
        test = msg and not res.offline
3037
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3038
                      "Communication failure in hooks execution: %s", msg)
3039
        if res.offline or msg:
3040
          # No need to investigate payload if node is offline or gave
3041
          # an error.
3042
          continue
3043
        for script, hkr, output in res.payload:
3044
          test = hkr == constants.HKR_FAIL
3045
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3046
                        "Script %s failed, output:", script)
3047
          if test:
3048
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3049
            feedback_fn("%s" % output)
3050
            lu_result = False
3051

    
3052
    return lu_result
3053

    
3054

    
3055
class LUClusterVerifyDisks(NoHooksLU):
3056
  """Verifies the cluster disks status.
3057

3058
  """
3059
  REQ_BGL = False
3060

    
3061
  def ExpandNames(self):
3062
    self.share_locks = _ShareAll()
3063
    self.needed_locks = {
3064
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3065
      }
3066

    
3067
  def Exec(self, feedback_fn):
3068
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3069

    
3070
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3071
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3072
                           for group in group_names])
3073

    
3074

    
3075
class LUGroupVerifyDisks(NoHooksLU):
3076
  """Verifies the status of all disks in a node group.
3077

3078
  """
3079
  REQ_BGL = False
3080

    
3081
  def ExpandNames(self):
3082
    # Raises errors.OpPrereqError on its own if group can't be found
3083
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
3084

    
3085
    self.share_locks = _ShareAll()
3086
    self.needed_locks = {
3087
      locking.LEVEL_INSTANCE: [],
3088
      locking.LEVEL_NODEGROUP: [],
3089
      locking.LEVEL_NODE: [],
3090
      }
3091

    
3092
  def DeclareLocks(self, level):
3093
    if level == locking.LEVEL_INSTANCE:
3094
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
3095

    
3096
      # Lock instances optimistically, needs verification once node and group
3097
      # locks have been acquired
3098
      self.needed_locks[locking.LEVEL_INSTANCE] = \
3099
        self.cfg.GetNodeGroupInstances(self.group_uuid)
3100

    
3101
    elif level == locking.LEVEL_NODEGROUP:
3102
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3103

    
3104
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
3105
        set([self.group_uuid] +
3106
            # Lock all groups used by instances optimistically; this requires
3107
            # going via the node before it's locked, requiring verification
3108
            # later on
3109
            [group_uuid
3110
             for instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
3111
             for group_uuid in self.cfg.GetInstanceNodeGroups(instance_name)])
3112

    
3113
    elif level == locking.LEVEL_NODE:
3114
      # This will only lock the nodes in the group to be verified which contain
3115
      # actual instances
3116
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3117
      self._LockInstancesNodes()
3118

    
3119
      # Lock all nodes in group to be verified
3120
      assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
3121
      member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
3122
      self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
3123

    
3124
  def CheckPrereq(self):
3125
    owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
3126
    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
3127
    owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
3128

    
3129
    assert self.group_uuid in owned_groups
3130

    
3131
    # Check if locked instances are still correct
3132
    _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
3133

    
3134
    # Get instance information
3135
    self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
3136

    
3137
    # Check if node groups for locked instances are still correct
3138
    for (instance_name, inst) in self.instances.items():
3139
      assert owned_nodes.issuperset(inst.all_nodes), \
3140
        "Instance %s's nodes changed while we kept the lock" % instance_name
3141

    
3142
      inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
3143
                                             owned_groups)
3144

    
3145
      assert self.group_uuid in inst_groups, \
3146
        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
3147

    
3148
  def Exec(self, feedback_fn):
3149
    """Verify integrity of cluster disks.
3150

3151
    @rtype: tuple of three items
3152
    @return: a tuple of (dict of node-to-node_error, list of instances
3153
        which need activate-disks, dict of instance: (node, volume) for
3154
        missing volumes
3155

3156
    """
3157
    res_nodes = {}
3158
    res_instances = set()
3159
    res_missing = {}
3160

    
3161
    nv_dict = _MapInstanceDisksToNodes([inst
3162
                                        for inst in self.instances.values()
3163
                                        if inst.admin_up])
3164

    
3165
    if nv_dict:
3166
      nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
3167
                             set(self.cfg.GetVmCapableNodeList()))
3168

    
3169
      node_lvs = self.rpc.call_lv_list(nodes, [])
3170

    
3171
      for (node, node_res) in node_lvs.items():
3172
        if node_res.offline:
3173
          continue
3174

    
3175
        msg = node_res.fail_msg
3176
        if msg:
3177
          logging.warning("Error enumerating LVs on node %s: %s", node, msg)
3178
          res_nodes[node] = msg
3179
          continue
3180

    
3181
        for lv_name, (_, _, lv_online) in node_res.payload.items():
3182
          inst = nv_dict.pop((node, lv_name), None)
3183
          if not (lv_online or inst is None):
3184
            res_instances.add(inst)
3185

    
3186
      # any leftover items in nv_dict are missing LVs, let's arrange the data
3187
      # better
3188
      for key, inst in nv_dict.iteritems():
3189
        res_missing.setdefault(inst, []).append(list(key))
3190

    
3191
    return (res_nodes, list(res_instances), res_missing)
3192

    
3193

    
3194
class LUClusterRepairDiskSizes(NoHooksLU):
3195
  """Verifies the cluster disks sizes.
3196

3197
  """
3198
  REQ_BGL = False
3199

    
3200
  def ExpandNames(self):
3201
    if self.op.instances:
3202
      self.wanted_names = _GetWantedInstances(self, self.op.instances)
3203
      self.needed_locks = {
3204
        locking.LEVEL_NODE_RES: [],
3205
        locking.LEVEL_INSTANCE: self.wanted_names,
3206
        }
3207
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
3208
    else:
3209
      self.wanted_names = None
3210
      self.needed_locks = {
3211
        locking.LEVEL_NODE_RES: locking.ALL_SET,
3212
        locking.LEVEL_INSTANCE: locking.ALL_SET,
3213
        }
3214
    self.share_locks = _ShareAll()
3215

    
3216
  def DeclareLocks(self, level):
3217
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
3218
      self._LockInstancesNodes(primary_only=True, level=level)
3219

    
3220
  def CheckPrereq(self):
3221
    """Check prerequisites.
3222

3223
    This only checks the optional instance list against the existing names.
3224

3225
    """
3226
    if self.wanted_names is None:
3227
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
3228

    
3229
    self.wanted_instances = \
3230
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
3231

    
3232
  def _EnsureChildSizes(self, disk):
3233
    """Ensure children of the disk have the needed disk size.
3234

3235
    This is valid mainly for DRBD8 and fixes an issue where the
3236
    children have smaller disk size.
3237

3238
    @param disk: an L{ganeti.objects.Disk} object
3239

3240
    """
3241
    if disk.dev_type == constants.LD_DRBD8:
3242
      assert disk.children, "Empty children for DRBD8?"
3243
      fchild = disk.children[0]
3244
      mismatch = fchild.size < disk.size
3245
      if mismatch:
3246
        self.LogInfo("Child disk has size %d, parent %d, fixing",
3247
                     fchild.size, disk.size)
3248
        fchild.size = disk.size
3249

    
3250
      # and we recurse on this child only, not on the metadev
3251
      return self._EnsureChildSizes(fchild) or mismatch
3252
    else:
3253
      return False
3254

    
3255
  def Exec(self, feedback_fn):
3256
    """Verify the size of cluster disks.
3257

3258
    """
3259
    # TODO: check child disks too
3260
    # TODO: check differences in size between primary/secondary nodes
3261
    per_node_disks = {}
3262
    for instance in self.wanted_instances:
3263
      pnode = instance.primary_node
3264
      if pnode not in per_node_disks:
3265
        per_node_disks[pnode] = []
3266
      for idx, disk in enumerate(instance.disks):
3267
        per_node_disks[pnode].append((instance, idx, disk))
3268

    
3269
    assert not (frozenset(per_node_disks.keys()) -
3270
                self.owned_locks(locking.LEVEL_NODE_RES)), \
3271
      "Not owning correct locks"
3272
    assert not self.owned_locks(locking.LEVEL_NODE)
3273

    
3274
    changed = []
3275
    for node, dskl in per_node_disks.items():
3276
      newl = [v[2].Copy() for v in dskl]
3277
      for dsk in newl:
3278
        self.cfg.SetDiskID(dsk, node)
3279
      result = self.rpc.call_blockdev_getsize(node, newl)
3280
      if result.fail_msg:
3281
        self.LogWarning("Failure in blockdev_getsize call to node"
3282
                        " %s, ignoring", node)
3283
        continue
3284
      if len(result.payload) != len(dskl):
3285
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
3286
                        " result.payload=%s", node, len(dskl), result.payload)
3287
        self.LogWarning("Invalid result from node %s, ignoring node results",
3288
                        node)
3289
        continue
3290
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
3291
        if size is None:
3292
          self.LogWarning("Disk %d of instance %s did not return size"
3293
                          " information, ignoring", idx, instance.name)
3294
          continue
3295
        if not isinstance(size, (int, long)):
3296
          self.LogWarning("Disk %d of instance %s did not return valid"
3297
                          " size information, ignoring", idx, instance.name)
3298
          continue
3299
        size = size >> 20
3300
        if size != disk.size:
3301
          self.LogInfo("Disk %d of instance %s has mismatched size,"
3302
                       " correcting: recorded %d, actual %d", idx,
3303
                       instance.name, disk.size, size)
3304
          disk.size = size
3305
          self.cfg.Update(instance, feedback_fn)
3306
          changed.append((instance.name, idx, size))
3307
        if self._EnsureChildSizes(disk):
3308
          self.cfg.Update(instance, feedback_fn)
3309
          changed.append((instance.name, idx, disk.size))
3310
    return changed
3311

    
3312

    
3313
class LUClusterRename(LogicalUnit):
3314
  """Rename the cluster.
3315

3316
  """
3317
  HPATH = "cluster-rename"
3318
  HTYPE = constants.HTYPE_CLUSTER
3319

    
3320
  def BuildHooksEnv(self):
3321
    """Build hooks env.
3322

3323
    """
3324
    return {
3325
      "OP_TARGET": self.cfg.GetClusterName(),
3326
      "NEW_NAME": self.op.name,
3327
      }
3328

    
3329
  def BuildHooksNodes(self):
3330
    """Build hooks nodes.
3331

3332
    """
3333
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
3334

    
3335
  def CheckPrereq(self):
3336
    """Verify that the passed name is a valid one.
3337

3338
    """
3339
    hostname = netutils.GetHostname(name=self.op.name,
3340
                                    family=self.cfg.GetPrimaryIPFamily())
3341

    
3342
    new_name = hostname.name
3343
    self.ip = new_ip = hostname.ip
3344
    old_name = self.cfg.GetClusterName()
3345
    old_ip = self.cfg.GetMasterIP()
3346
    if new_name == old_name and new_ip == old_ip:
3347
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
3348
                                 " cluster has changed",
3349
                                 errors.ECODE_INVAL)
3350
    if new_ip != old_ip:
3351
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
3352
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
3353
                                   " reachable on the network" %
3354
                                   new_ip, errors.ECODE_NOTUNIQUE)
3355

    
3356
    self.op.name = new_name
3357

    
3358
  def Exec(self, feedback_fn):
3359
    """Rename the cluster.
3360

3361
    """
3362
    clustername = self.op.name
3363
    new_ip = self.ip
3364

    
3365
    # shutdown the master IP
3366
    master_params = self.cfg.GetMasterNetworkParameters()
3367
    ems = self.cfg.GetUseExternalMipScript()
3368
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
3369
                                                     master_params, ems)
3370
    result.Raise("Could not disable the master role")
3371

    
3372
    try:
3373
      cluster = self.cfg.GetClusterInfo()
3374
      cluster.cluster_name = clustername
3375
      cluster.master_ip = new_ip
3376
      self.cfg.Update(cluster, feedback_fn)
3377

    
3378
      # update the known hosts file
3379
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
3380
      node_list = self.cfg.GetOnlineNodeList()
3381
      try:
3382
        node_list.remove(master_params.name)
3383
      except ValueError:
3384
        pass
3385
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
3386
    finally:
3387
      master_params.ip = new_ip
3388
      result = self.rpc.call_node_activate_master_ip(master_params.name,
3389
                                                     master_params, ems)
3390
      msg = result.fail_msg
3391
      if msg:
3392
        self.LogWarning("Could not re-enable the master role on"
3393
                        " the master, please restart manually: %s", msg)
3394

    
3395
    return clustername
3396

    
3397

    
3398
def _ValidateNetmask(cfg, netmask):
3399
  """Checks if a netmask is valid.
3400

3401
  @type cfg: L{config.ConfigWriter}
3402
  @param cfg: The cluster configuration
3403
  @type netmask: int
3404
  @param netmask: the netmask to be verified
3405
  @raise errors.OpPrereqError: if the validation fails
3406

3407
  """
3408
  ip_family = cfg.GetPrimaryIPFamily()
3409
  try:
3410
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
3411
  except errors.ProgrammerError:
3412
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
3413
                               ip_family)
3414
  if not ipcls.ValidateNetmask(netmask):
3415
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
3416
                                (netmask))
3417

    
3418

    
3419
class LUClusterSetParams(LogicalUnit):
3420
  """Change the parameters of the cluster.
3421

3422
  """
3423
  HPATH = "cluster-modify"
3424
  HTYPE = constants.HTYPE_CLUSTER
3425
  REQ_BGL = False
3426

    
3427
  def CheckArguments(self):
3428
    """Check parameters
3429

3430
    """
3431
    if self.op.uid_pool:
3432
      uidpool.CheckUidPool(self.op.uid_pool)
3433

    
3434
    if self.op.add_uids:
3435
      uidpool.CheckUidPool(self.op.add_uids)
3436

    
3437
    if self.op.remove_uids:
3438
      uidpool.CheckUidPool(self.op.remove_uids)
3439

    
3440
    if self.op.master_netmask is not None:
3441
      _ValidateNetmask(self.cfg, self.op.master_netmask)
3442

    
3443
  def ExpandNames(self):
3444
    # FIXME: in the future maybe other cluster params won't require checking on
3445
    # all nodes to be modified.
3446
    self.needed_locks = {
3447
      locking.LEVEL_NODE: locking.ALL_SET,
3448
    }
3449
    self.share_locks[locking.LEVEL_NODE] = 1
3450

    
3451
  def BuildHooksEnv(self):
3452
    """Build hooks env.
3453

3454
    """
3455
    return {
3456
      "OP_TARGET": self.cfg.GetClusterName(),
3457
      "NEW_VG_NAME": self.op.vg_name,
3458
      }
3459

    
3460
  def BuildHooksNodes(self):
3461
    """Build hooks nodes.
3462

3463
    """
3464
    mn = self.cfg.GetMasterNode()
3465
    return ([mn], [mn])
3466

    
3467
  def CheckPrereq(self):
3468
    """Check prerequisites.
3469

3470
    This checks whether the given params don't conflict and
3471
    if the given volume group is valid.
3472

3473
    """
3474
    if self.op.vg_name is not None and not self.op.vg_name:
3475
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
3476
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
3477
                                   " instances exist", errors.ECODE_INVAL)
3478

    
3479
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
3480
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
3481
        raise errors.OpPrereqError("Cannot disable drbd helper while"
3482
                                   " drbd-based instances exist",
3483
                                   errors.ECODE_INVAL)
3484

    
3485
    node_list = self.owned_locks(locking.LEVEL_NODE)
3486

    
3487
    # if vg_name not None, checks given volume group on all nodes
3488
    if self.op.vg_name:
3489
      vglist = self.rpc.call_vg_list(node_list)
3490
      for node in node_list:
3491
        msg = vglist[node].fail_msg
3492
        if msg:
3493
          # ignoring down node
3494
          self.LogWarning("Error while gathering data on node %s"
3495
                          " (ignoring node): %s", node, msg)
3496
          continue
3497
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
3498
                                              self.op.vg_name,
3499
                                              constants.MIN_VG_SIZE)
3500
        if vgstatus:
3501
          raise errors.OpPrereqError("Error on node '%s': %s" %
3502
                                     (node, vgstatus), errors.ECODE_ENVIRON)
3503

    
3504
    if self.op.drbd_helper:
3505
      # checks given drbd helper on all nodes
3506
      helpers = self.rpc.call_drbd_helper(node_list)
3507
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
3508
        if ninfo.offline:
3509
          self.LogInfo("Not checking drbd helper on offline node %s", node)
3510
          continue
3511
        msg = helpers[node].fail_msg
3512
        if msg:
3513
          raise errors.OpPrereqError("Error checking drbd helper on node"
3514
                                     " '%s': %s" % (node, msg),
3515
                                     errors.ECODE_ENVIRON)
3516
        node_helper = helpers[node].payload
3517
        if node_helper != self.op.drbd_helper:
3518
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
3519
                                     (node, node_helper), errors.ECODE_ENVIRON)
3520

    
3521
    self.cluster = cluster = self.cfg.GetClusterInfo()
3522
    # validate params changes
3523
    if self.op.beparams:
3524
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
3525
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
3526

    
3527
    if self.op.ndparams:
3528
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
3529
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
3530

    
3531
      # TODO: we need a more general way to handle resetting
3532
      # cluster-level parameters to default values
3533
      if self.new_ndparams["oob_program"] == "":
3534
        self.new_ndparams["oob_program"] = \
3535
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
3536

    
3537
    if self.op.nicparams:
3538
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
3539
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
3540
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
3541
      nic_errors = []
3542

    
3543
      # check all instances for consistency
3544
      for instance in self.cfg.GetAllInstancesInfo().values():
3545
        for nic_idx, nic in enumerate(instance.nics):
3546
          params_copy = copy.deepcopy(nic.nicparams)
3547
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
3548

    
3549
          # check parameter syntax
3550
          try:
3551
            objects.NIC.CheckParameterSyntax(params_filled)
3552
          except errors.ConfigurationError, err:
3553
            nic_errors.append("Instance %s, nic/%d: %s" %
3554
                              (instance.name, nic_idx, err))
3555

    
3556
          # if we're moving instances to routed, check that they have an ip
3557
          target_mode = params_filled[constants.NIC_MODE]
3558
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
3559
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
3560
                              " address" % (instance.name, nic_idx))
3561
      if nic_errors:
3562
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
3563
                                   "\n".join(nic_errors))
3564

    
3565
    # hypervisor list/parameters
3566
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
3567
    if self.op.hvparams:
3568
      for hv_name, hv_dict in self.op.hvparams.items():
3569
        if hv_name not in self.new_hvparams:
3570
          self.new_hvparams[hv_name] = hv_dict
3571
        else:
3572
          self.new_hvparams[hv_name].update(hv_dict)
3573

    
3574
    # os hypervisor parameters
3575
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
3576
    if self.op.os_hvp:
3577
      for os_name, hvs in self.op.os_hvp.items():
3578
        if os_name not in self.new_os_hvp:
3579
          self.new_os_hvp[os_name] = hvs
3580
        else:
3581
          for hv_name, hv_dict in hvs.items():
3582
            if hv_name not in self.new_os_hvp[os_name]:
3583
              self.new_os_hvp[os_name][hv_name] = hv_dict
3584
            else:
3585
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
3586

    
3587
    # os parameters
3588
    self.new_osp = objects.FillDict(cluster.osparams, {})
3589
    if self.op.osparams:
3590
      for os_name, osp in self.op.osparams.items():
3591
        if os_name not in self.new_osp:
3592
          self.new_osp[os_name] = {}
3593

    
3594
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
3595
                                                  use_none=True)
3596

    
3597
        if not self.new_osp[os_name]:
3598
          # we removed all parameters
3599
          del self.new_osp[os_name]
3600
        else:
3601
          # check the parameter validity (remote check)
3602
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
3603
                         os_name, self.new_osp[os_name])
3604

    
3605
    # changes to the hypervisor list
3606
    if self.op.enabled_hypervisors is not None:
3607
      self.hv_list = self.op.enabled_hypervisors
3608
      for hv in self.hv_list:
3609
        # if the hypervisor doesn't already exist in the cluster
3610
        # hvparams, we initialize it to empty, and then (in both
3611
        # cases) we make sure to fill the defaults, as we might not
3612
        # have a complete defaults list if the hypervisor wasn't
3613
        # enabled before
3614
        if hv not in new_hvp:
3615
          new_hvp[hv] = {}
3616
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
3617
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
3618
    else:
3619
      self.hv_list = cluster.enabled_hypervisors
3620

    
3621
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
3622
      # either the enabled list has changed, or the parameters have, validate
3623
      for hv_name, hv_params in self.new_hvparams.items():
3624
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
3625
            (self.op.enabled_hypervisors and
3626
             hv_name in self.op.enabled_hypervisors)):
3627
          # either this is a new hypervisor, or its parameters have changed
3628
          hv_class = hypervisor.GetHypervisor(hv_name)
3629
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3630
          hv_class.CheckParameterSyntax(hv_params)
3631
          _CheckHVParams(self, node_list, hv_name, hv_params)
3632

    
3633
    if self.op.os_hvp:
3634
      # no need to check any newly-enabled hypervisors, since the
3635
      # defaults have already been checked in the above code-block
3636
      for os_name, os_hvp in self.new_os_hvp.items():
3637
        for hv_name, hv_params in os_hvp.items():
3638
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3639
          # we need to fill in the new os_hvp on top of the actual hv_p
3640
          cluster_defaults = self.new_hvparams.get(hv_name, {})
3641
          new_osp = objects.FillDict(cluster_defaults, hv_params)
3642
          hv_class = hypervisor.GetHypervisor(hv_name)
3643
          hv_class.CheckParameterSyntax(new_osp)
3644
          _CheckHVParams(self, node_list, hv_name, new_osp)
3645

    
3646
    if self.op.default_iallocator:
3647
      alloc_script = utils.FindFile(self.op.default_iallocator,
3648
                                    constants.IALLOCATOR_SEARCH_PATH,
3649
                                    os.path.isfile)
3650
      if alloc_script is None:
3651
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
3652
                                   " specified" % self.op.default_iallocator,
3653
                                   errors.ECODE_INVAL)
3654

    
3655
  def Exec(self, feedback_fn):
3656
    """Change the parameters of the cluster.
3657

3658
    """
3659
    if self.op.vg_name is not None:
3660
      new_volume = self.op.vg_name
3661
      if not new_volume:
3662
        new_volume = None
3663
      if new_volume != self.cfg.GetVGName():
3664
        self.cfg.SetVGName(new_volume)
3665
      else:
3666
        feedback_fn("Cluster LVM configuration already in desired"
3667
                    " state, not changing")
3668
    if self.op.drbd_helper is not None:
3669
      new_helper = self.op.drbd_helper
3670
      if not new_helper:
3671
        new_helper = None
3672
      if new_helper != self.cfg.GetDRBDHelper():
3673
        self.cfg.SetDRBDHelper(new_helper)
3674
      else:
3675
        feedback_fn("Cluster DRBD helper already in desired state,"
3676
                    " not changing")
3677
    if self.op.hvparams:
3678
      self.cluster.hvparams = self.new_hvparams
3679
    if self.op.os_hvp:
3680
      self.cluster.os_hvp = self.new_os_hvp
3681
    if self.op.enabled_hypervisors is not None:
3682
      self.cluster.hvparams = self.new_hvparams
3683
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
3684
    if self.op.beparams:
3685
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
3686
    if self.op.nicparams:
3687
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
3688
    if self.op.osparams:
3689
      self.cluster.osparams = self.new_osp
3690
    if self.op.ndparams:
3691
      self.cluster.ndparams = self.new_ndparams
3692

    
3693
    if self.op.candidate_pool_size is not None:
3694
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3695
      # we need to update the pool size here, otherwise the save will fail
3696
      _AdjustCandidatePool(self, [])
3697

    
3698
    if self.op.maintain_node_health is not None:
3699
      self.cluster.maintain_node_health = self.op.maintain_node_health
3700

    
3701
    if self.op.prealloc_wipe_disks is not None:
3702
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3703

    
3704
    if self.op.add_uids is not None:
3705
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3706

    
3707
    if self.op.remove_uids is not None:
3708
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3709

    
3710
    if self.op.uid_pool is not None:
3711
      self.cluster.uid_pool = self.op.uid_pool
3712

    
3713
    if self.op.default_iallocator is not None:
3714
      self.cluster.default_iallocator = self.op.default_iallocator
3715

    
3716
    if self.op.reserved_lvs is not None:
3717
      self.cluster.reserved_lvs = self.op.reserved_lvs
3718

    
3719
    if self.op.use_external_mip_script is not None:
3720
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
3721

    
3722
    def helper_os(aname, mods, desc):
3723
      desc += " OS list"
3724
      lst = getattr(self.cluster, aname)
3725
      for key, val in mods:
3726
        if key == constants.DDM_ADD:
3727
          if val in lst:
3728
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3729
          else:
3730
            lst.append(val)
3731
        elif key == constants.DDM_REMOVE:
3732
          if val in lst:
3733
            lst.remove(val)
3734
          else:
3735
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3736
        else:
3737
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3738

    
3739
    if self.op.hidden_os:
3740
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3741

    
3742
    if self.op.blacklisted_os:
3743
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3744

    
3745
    if self.op.master_netdev:
3746
      master_params = self.cfg.GetMasterNetworkParameters()
3747
      ems = self.cfg.GetUseExternalMipScript()
3748
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3749
                  self.cluster.master_netdev)
3750
      result = self.rpc.call_node_deactivate_master_ip(master_params.name,
3751
                                                       master_params, ems)
3752
      result.Raise("Could not disable the master ip")
3753
      feedback_fn("Changing master_netdev from %s to %s" %
3754
                  (master_params.netdev, self.op.master_netdev))
3755
      self.cluster.master_netdev = self.op.master_netdev
3756

    
3757
    if self.op.master_netmask:
3758
      master_params = self.cfg.GetMasterNetworkParameters()
3759
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
3760
      result = self.rpc.call_node_change_master_netmask(master_params.name,
3761
                                                        master_params.netmask,
3762
                                                        self.op.master_netmask,
3763
                                                        master_params.ip,
3764
                                                        master_params.netdev)
3765
      if result.fail_msg:
3766
        msg = "Could not change the master IP netmask: %s" % result.fail_msg
3767
        feedback_fn(msg)
3768

    
3769
      self.cluster.master_netmask = self.op.master_netmask
3770

    
3771
    self.cfg.Update(self.cluster, feedback_fn)
3772

    
3773
    if self.op.master_netdev:
3774
      master_params = self.cfg.GetMasterNetworkParameters()
3775
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3776
                  self.op.master_netdev)
3777
      ems = self.cfg.GetUseExternalMipScript()
3778
      result = self.rpc.call_node_activate_master_ip(master_params.name,
3779
                                                     master_params, ems)
3780
      if result.fail_msg:
3781
        self.LogWarning("Could not re-enable the master ip on"
3782
                        " the master, please restart manually: %s",
3783
                        result.fail_msg)
3784

    
3785

    
3786
def _UploadHelper(lu, nodes, fname):
3787
  """Helper for uploading a file and showing warnings.
3788

3789
  """
3790
  if os.path.exists(fname):
3791
    result = lu.rpc.call_upload_file(nodes, fname)
3792
    for to_node, to_result in result.items():
3793
      msg = to_result.fail_msg
3794
      if msg:
3795
        msg = ("Copy of file %s to node %s failed: %s" %
3796
               (fname, to_node, msg))
3797
        lu.proc.LogWarning(msg)
3798

    
3799

    
3800
def _ComputeAncillaryFiles(cluster, redist):
3801
  """Compute files external to Ganeti which need to be consistent.
3802

3803
  @type redist: boolean
3804
  @param redist: Whether to include files which need to be redistributed
3805

3806
  """
3807
  # Compute files for all nodes
3808
  files_all = set([
3809
    constants.SSH_KNOWN_HOSTS_FILE,
3810
    constants.CONFD_HMAC_KEY,
3811
    constants.CLUSTER_DOMAIN_SECRET_FILE,
3812
    constants.SPICE_CERT_FILE,
3813
    constants.SPICE_CACERT_FILE,
3814
    constants.RAPI_USERS_FILE,
3815
    ])
3816

    
3817
  if not redist:
3818
    files_all.update(constants.ALL_CERT_FILES)
3819
    files_all.update(ssconf.SimpleStore().GetFileList())
3820
  else:
3821
    # we need to ship at least the RAPI certificate
3822
    files_all.add(constants.RAPI_CERT_FILE)
3823

    
3824
  if cluster.modify_etc_hosts:
3825
    files_all.add(constants.ETC_HOSTS)
3826

    
3827
  # Files which are optional, these must:
3828
  # - be present in one other category as well
3829
  # - either exist or not exist on all nodes of that category (mc, vm all)
3830
  files_opt = set([
3831
    constants.RAPI_USERS_FILE,
3832
    ])
3833

    
3834
  # Files which should only be on master candidates
3835
  files_mc = set()
3836

    
3837
  if not redist:
3838
    files_mc.add(constants.CLUSTER_CONF_FILE)
3839

    
3840
    # FIXME: this should also be replicated but Ganeti doesn't support files_mc
3841
    # replication
3842
    files_mc.add(constants.DEFAULT_MASTER_SETUP_SCRIPT)
3843

    
3844
  # Files which should only be on VM-capable nodes
3845
  files_vm = set(filename
3846
    for hv_name in cluster.enabled_hypervisors
3847
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0])
3848

    
3849
  files_opt |= set(filename
3850
    for hv_name in cluster.enabled_hypervisors
3851
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1])
3852

    
3853
  # Filenames in each category must be unique
3854
  all_files_set = files_all | files_mc | files_vm
3855
  assert (len(all_files_set) ==
3856
          sum(map(len, [files_all, files_mc, files_vm]))), \
3857
         "Found file listed in more than one file list"
3858

    
3859
  # Optional files must be present in one other category
3860
  assert all_files_set.issuperset(files_opt), \
3861
         "Optional file not in a different required list"
3862

    
3863
  return (files_all, files_opt, files_mc, files_vm)
3864

    
3865

    
3866
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3867
  """Distribute additional files which are part of the cluster configuration.
3868

3869
  ConfigWriter takes care of distributing the config and ssconf files, but
3870
  there are more files which should be distributed to all nodes. This function
3871
  makes sure those are copied.
3872

3873
  @param lu: calling logical unit
3874
  @param additional_nodes: list of nodes not in the config to distribute to
3875
  @type additional_vm: boolean
3876
  @param additional_vm: whether the additional nodes are vm-capable or not
3877

3878
  """
3879
  # Gather target nodes
3880
  cluster = lu.cfg.GetClusterInfo()
3881
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3882

    
3883
  online_nodes = lu.cfg.GetOnlineNodeList()
3884
  vm_nodes = lu.cfg.GetVmCapableNodeList()
3885

    
3886
  if additional_nodes is not None:
3887
    online_nodes.extend(additional_nodes)
3888
    if additional_vm:
3889
      vm_nodes.extend(additional_nodes)
3890

    
3891
  # Never distribute to master node
3892
  for nodelist in [online_nodes, vm_nodes]:
3893
    if master_info.name in nodelist:
3894
      nodelist.remove(master_info.name)
3895

    
3896
  # Gather file lists
3897
  (files_all, _, files_mc, files_vm) = \
3898
    _ComputeAncillaryFiles(cluster, True)
3899

    
3900
  # Never re-distribute configuration file from here
3901
  assert not (constants.CLUSTER_CONF_FILE in files_all or
3902
              constants.CLUSTER_CONF_FILE in files_vm)
3903
  assert not files_mc, "Master candidates not handled in this function"
3904

    
3905
  filemap = [
3906
    (online_nodes, files_all),
3907
    (vm_nodes, files_vm),
3908
    ]
3909

    
3910
  # Upload the files
3911
  for (node_list, files) in filemap:
3912
    for fname in files:
3913
      _UploadHelper(lu, node_list, fname)
3914

    
3915

    
3916
class LUClusterRedistConf(NoHooksLU):
3917
  """Force the redistribution of cluster configuration.
3918

3919
  This is a very simple LU.
3920

3921
  """
3922
  REQ_BGL = False
3923

    
3924
  def ExpandNames(self):
3925
    self.needed_locks = {
3926
      locking.LEVEL_NODE: locking.ALL_SET,
3927
    }
3928
    self.share_locks[locking.LEVEL_NODE] = 1
3929

    
3930
  def Exec(self, feedback_fn):
3931
    """Redistribute the configuration.
3932

3933
    """
3934
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3935
    _RedistributeAncillaryFiles(self)
3936

    
3937

    
3938
class LUClusterActivateMasterIp(NoHooksLU):
3939
  """Activate the master IP on the master node.
3940

3941
  """
3942
  def Exec(self, feedback_fn):
3943
    """Activate the master IP.
3944

3945
    """
3946
    master_params = self.cfg.GetMasterNetworkParameters()
3947
    ems = self.cfg.GetUseExternalMipScript()
3948
    self.rpc.call_node_activate_master_ip(master_params.name,
3949
                                          master_params, ems)
3950

    
3951

    
3952
class LUClusterDeactivateMasterIp(NoHooksLU):
3953
  """Deactivate the master IP on the master node.
3954

3955
  """
3956
  def Exec(self, feedback_fn):
3957
    """Deactivate the master IP.
3958

3959
    """
3960
    master_params = self.cfg.GetMasterNetworkParameters()
3961
    ems = self.cfg.GetUseExternalMipScript()
3962
    self.rpc.call_node_deactivate_master_ip(master_params.name, master_params,
3963
                                            ems)
3964

    
3965

    
3966
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3967
  """Sleep and poll for an instance's disk to sync.
3968

3969
  """
3970
  if not instance.disks or disks is not None and not disks:
3971
    return True
3972

    
3973
  disks = _ExpandCheckDisks(instance, disks)
3974

    
3975
  if not oneshot:
3976
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3977

    
3978
  node = instance.primary_node
3979

    
3980
  for dev in disks:
3981
    lu.cfg.SetDiskID(dev, node)
3982

    
3983
  # TODO: Convert to utils.Retry
3984

    
3985
  retries = 0
3986
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3987
  while True:
3988
    max_time = 0
3989
    done = True
3990
    cumul_degraded = False
3991
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3992
    msg = rstats.fail_msg
3993
    if msg:
3994
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3995
      retries += 1
3996
      if retries >= 10:
3997
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3998
                                 " aborting." % node)
3999
      time.sleep(6)
4000
      continue
4001
    rstats = rstats.payload
4002
    retries = 0
4003
    for i, mstat in enumerate(rstats):
4004
      if mstat is None:
4005
        lu.LogWarning("Can't compute data for node %s/%s",
4006
                           node, disks[i].iv_name)
4007
        continue
4008

    
4009
      cumul_degraded = (cumul_degraded or
4010
                        (mstat.is_degraded and mstat.sync_percent is None))
4011
      if mstat.sync_percent is not None:
4012
        done = False
4013
        if mstat.estimated_time is not None:
4014
          rem_time = ("%s remaining (estimated)" %
4015
                      utils.FormatSeconds(mstat.estimated_time))
4016
          max_time = mstat.estimated_time
4017
        else:
4018
          rem_time = "no time estimate"
4019
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
4020
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
4021

    
4022
    # if we're done but degraded, let's do a few small retries, to
4023
    # make sure we see a stable and not transient situation; therefore
4024
    # we force restart of the loop
4025
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
4026
      logging.info("Degraded disks found, %d retries left", degr_retries)
4027
      degr_retries -= 1
4028
      time.sleep(1)
4029
      continue
4030

    
4031
    if done or oneshot:
4032
      break
4033

    
4034
    time.sleep(min(60, max_time))
4035

    
4036
  if done:
4037
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
4038
  return not cumul_degraded
4039

    
4040

    
4041
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
4042
  """Check that mirrors are not degraded.
4043

4044
  The ldisk parameter, if True, will change the test from the
4045
  is_degraded attribute (which represents overall non-ok status for
4046
  the device(s)) to the ldisk (representing the local storage status).
4047

4048
  """
4049
  lu.cfg.SetDiskID(dev, node)
4050

    
4051
  result = True
4052

    
4053
  if on_primary or dev.AssembleOnSecondary():
4054
    rstats = lu.rpc.call_blockdev_find(node, dev)
4055
    msg = rstats.fail_msg
4056
    if msg:
4057
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
4058
      result = False
4059
    elif not rstats.payload:
4060
      lu.LogWarning("Can't find disk on node %s", node)
4061
      result = False
4062
    else:
4063
      if ldisk:
4064
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
4065
      else:
4066
        result = result and not rstats.payload.is_degraded
4067

    
4068
  if dev.children:
4069
    for child in dev.children:
4070
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
4071

    
4072
  return result
4073

    
4074

    
4075
class LUOobCommand(NoHooksLU):
4076
  """Logical unit for OOB handling.
4077

4078
  """
4079
  REG_BGL = False
4080
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
4081

    
4082
  def ExpandNames(self):
4083
    """Gather locks we need.
4084

4085
    """
4086
    if self.op.node_names:
4087
      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
4088
      lock_names = self.op.node_names
4089
    else:
4090
      lock_names = locking.ALL_SET
4091

    
4092
    self.needed_locks = {
4093
      locking.LEVEL_NODE: lock_names,
4094
      }
4095

    
4096
  def CheckPrereq(self):
4097
    """Check prerequisites.
4098

4099
    This checks:
4100
     - the node exists in the configuration