Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ c4de9b7a

History | View | Annotate | Download (481.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay too many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42
import itertools
43
import operator
44

    
45
from ganeti import ssh
46
from ganeti import utils
47
from ganeti import errors
48
from ganeti import hypervisor
49
from ganeti import locking
50
from ganeti import constants
51
from ganeti import objects
52
from ganeti import serializer
53
from ganeti import ssconf
54
from ganeti import uidpool
55
from ganeti import compat
56
from ganeti import masterd
57
from ganeti import netutils
58
from ganeti import query
59
from ganeti import qlang
60
from ganeti import opcodes
61
from ganeti import ht
62

    
63
import ganeti.masterd.instance # pylint: disable=W0611
64

    
65

    
66
#: Size of DRBD meta block device
67
DRBD_META_SIZE = 128
68

    
69

    
70
class ResultWithJobs:
71
  """Data container for LU results with jobs.
72

73
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
74
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
75
  contained in the C{jobs} attribute and include the job IDs in the opcode
76
  result.
77

78
  """
79
  def __init__(self, jobs, **kwargs):
80
    """Initializes this class.
81

82
    Additional return values can be specified as keyword arguments.
83

84
    @type jobs: list of lists of L{opcode.OpCode}
85
    @param jobs: A list of lists of opcode objects
86

87
    """
88
    self.jobs = jobs
89
    self.other = kwargs
90

    
91

    
92
class LogicalUnit(object):
93
  """Logical Unit base class.
94

95
  Subclasses must follow these rules:
96
    - implement ExpandNames
97
    - implement CheckPrereq (except when tasklets are used)
98
    - implement Exec (except when tasklets are used)
99
    - implement BuildHooksEnv
100
    - implement BuildHooksNodes
101
    - redefine HPATH and HTYPE
102
    - optionally redefine their run requirements:
103
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
104

105
  Note that all commands require root permissions.
106

107
  @ivar dry_run_result: the value (if any) that will be returned to the caller
108
      in dry-run mode (signalled by opcode dry_run parameter)
109

110
  """
111
  HPATH = None
112
  HTYPE = None
113
  REQ_BGL = True
114

    
115
  def __init__(self, processor, op, context, rpc):
116
    """Constructor for LogicalUnit.
117

118
    This needs to be overridden in derived classes in order to check op
119
    validity.
120

121
    """
122
    self.proc = processor
123
    self.op = op
124
    self.cfg = context.cfg
125
    self.glm = context.glm
126
    # readability alias
127
    self.owned_locks = context.glm.list_owned
128
    self.context = context
129
    self.rpc = rpc
130
    # Dicts used to declare locking needs to mcpu
131
    self.needed_locks = None
132
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
133
    self.add_locks = {}
134
    self.remove_locks = {}
135
    # Used to force good behavior when calling helper functions
136
    self.recalculate_locks = {}
137
    # logging
138
    self.Log = processor.Log # pylint: disable=C0103
139
    self.LogWarning = processor.LogWarning # pylint: disable=C0103
140
    self.LogInfo = processor.LogInfo # pylint: disable=C0103
141
    self.LogStep = processor.LogStep # pylint: disable=C0103
142
    # support for dry-run
143
    self.dry_run_result = None
144
    # support for generic debug attribute
145
    if (not hasattr(self.op, "debug_level") or
146
        not isinstance(self.op.debug_level, int)):
147
      self.op.debug_level = 0
148

    
149
    # Tasklets
150
    self.tasklets = None
151

    
152
    # Validate opcode parameters and set defaults
153
    self.op.Validate(True)
154

    
155
    self.CheckArguments()
156

    
157
  def CheckArguments(self):
158
    """Check syntactic validity for the opcode arguments.
159

160
    This method is for doing a simple syntactic check and ensure
161
    validity of opcode parameters, without any cluster-related
162
    checks. While the same can be accomplished in ExpandNames and/or
163
    CheckPrereq, doing these separate is better because:
164

165
      - ExpandNames is left as as purely a lock-related function
166
      - CheckPrereq is run after we have acquired locks (and possible
167
        waited for them)
168

169
    The function is allowed to change the self.op attribute so that
170
    later methods can no longer worry about missing parameters.
171

172
    """
173
    pass
174

    
175
  def ExpandNames(self):
176
    """Expand names for this LU.
177

178
    This method is called before starting to execute the opcode, and it should
179
    update all the parameters of the opcode to their canonical form (e.g. a
180
    short node name must be fully expanded after this method has successfully
181
    completed). This way locking, hooks, logging, etc. can work correctly.
182

183
    LUs which implement this method must also populate the self.needed_locks
184
    member, as a dict with lock levels as keys, and a list of needed lock names
185
    as values. Rules:
186

187
      - use an empty dict if you don't need any lock
188
      - if you don't need any lock at a particular level omit that level
189
      - don't put anything for the BGL level
190
      - if you want all locks at a level use locking.ALL_SET as a value
191

192
    If you need to share locks (rather than acquire them exclusively) at one
193
    level you can modify self.share_locks, setting a true value (usually 1) for
194
    that level. By default locks are not shared.
195

196
    This function can also define a list of tasklets, which then will be
197
    executed in order instead of the usual LU-level CheckPrereq and Exec
198
    functions, if those are not defined by the LU.
199

200
    Examples::
201

202
      # Acquire all nodes and one instance
203
      self.needed_locks = {
204
        locking.LEVEL_NODE: locking.ALL_SET,
205
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
206
      }
207
      # Acquire just two nodes
208
      self.needed_locks = {
209
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
210
      }
211
      # Acquire no locks
212
      self.needed_locks = {} # No, you can't leave it to the default value None
213

214
    """
215
    # The implementation of this method is mandatory only if the new LU is
216
    # concurrent, so that old LUs don't need to be changed all at the same
217
    # time.
218
    if self.REQ_BGL:
219
      self.needed_locks = {} # Exclusive LUs don't need locks.
220
    else:
221
      raise NotImplementedError
222

    
223
  def DeclareLocks(self, level):
224
    """Declare LU locking needs for a level
225

226
    While most LUs can just declare their locking needs at ExpandNames time,
227
    sometimes there's the need to calculate some locks after having acquired
228
    the ones before. This function is called just before acquiring locks at a
229
    particular level, but after acquiring the ones at lower levels, and permits
230
    such calculations. It can be used to modify self.needed_locks, and by
231
    default it does nothing.
232

233
    This function is only called if you have something already set in
234
    self.needed_locks for the level.
235

236
    @param level: Locking level which is going to be locked
237
    @type level: member of ganeti.locking.LEVELS
238

239
    """
240

    
241
  def CheckPrereq(self):
242
    """Check prerequisites for this LU.
243

244
    This method should check that the prerequisites for the execution
245
    of this LU are fulfilled. It can do internode communication, but
246
    it should be idempotent - no cluster or system changes are
247
    allowed.
248

249
    The method should raise errors.OpPrereqError in case something is
250
    not fulfilled. Its return value is ignored.
251

252
    This method should also update all the parameters of the opcode to
253
    their canonical form if it hasn't been done by ExpandNames before.
254

255
    """
256
    if self.tasklets is not None:
257
      for (idx, tl) in enumerate(self.tasklets):
258
        logging.debug("Checking prerequisites for tasklet %s/%s",
259
                      idx + 1, len(self.tasklets))
260
        tl.CheckPrereq()
261
    else:
262
      pass
263

    
264
  def Exec(self, feedback_fn):
265
    """Execute the LU.
266

267
    This method should implement the actual work. It should raise
268
    errors.OpExecError for failures that are somewhat dealt with in
269
    code, or expected.
270

271
    """
272
    if self.tasklets is not None:
273
      for (idx, tl) in enumerate(self.tasklets):
274
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
275
        tl.Exec(feedback_fn)
276
    else:
277
      raise NotImplementedError
278

    
279
  def BuildHooksEnv(self):
280
    """Build hooks environment for this LU.
281

282
    @rtype: dict
283
    @return: Dictionary containing the environment that will be used for
284
      running the hooks for this LU. The keys of the dict must not be prefixed
285
      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
286
      will extend the environment with additional variables. If no environment
287
      should be defined, an empty dictionary should be returned (not C{None}).
288
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
289
      will not be called.
290

291
    """
292
    raise NotImplementedError
293

    
294
  def BuildHooksNodes(self):
295
    """Build list of nodes to run LU's hooks.
296

297
    @rtype: tuple; (list, list)
298
    @return: Tuple containing a list of node names on which the hook
299
      should run before the execution and a list of node names on which the
300
      hook should run after the execution. No nodes should be returned as an
301
      empty list (and not None).
302
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
303
      will not be called.
304

305
    """
306
    raise NotImplementedError
307

    
308
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
309
    """Notify the LU about the results of its hooks.
310

311
    This method is called every time a hooks phase is executed, and notifies
312
    the Logical Unit about the hooks' result. The LU can then use it to alter
313
    its result based on the hooks.  By default the method does nothing and the
314
    previous result is passed back unchanged but any LU can define it if it
315
    wants to use the local cluster hook-scripts somehow.
316

317
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
318
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
319
    @param hook_results: the results of the multi-node hooks rpc call
320
    @param feedback_fn: function used send feedback back to the caller
321
    @param lu_result: the previous Exec result this LU had, or None
322
        in the PRE phase
323
    @return: the new Exec result, based on the previous result
324
        and hook results
325

326
    """
327
    # API must be kept, thus we ignore the unused argument and could
328
    # be a function warnings
329
    # pylint: disable=W0613,R0201
330
    return lu_result
331

    
332
  def _ExpandAndLockInstance(self):
333
    """Helper function to expand and lock an instance.
334

335
    Many LUs that work on an instance take its name in self.op.instance_name
336
    and need to expand it and then declare the expanded name for locking. This
337
    function does it, and then updates self.op.instance_name to the expanded
338
    name. It also initializes needed_locks as a dict, if this hasn't been done
339
    before.
340

341
    """
342
    if self.needed_locks is None:
343
      self.needed_locks = {}
344
    else:
345
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
346
        "_ExpandAndLockInstance called with instance-level locks set"
347
    self.op.instance_name = _ExpandInstanceName(self.cfg,
348
                                                self.op.instance_name)
349
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
350

    
351
  def _LockInstancesNodes(self, primary_only=False):
352
    """Helper function to declare instances' nodes for locking.
353

354
    This function should be called after locking one or more instances to lock
355
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
356
    with all primary or secondary nodes for instances already locked and
357
    present in self.needed_locks[locking.LEVEL_INSTANCE].
358

359
    It should be called from DeclareLocks, and for safety only works if
360
    self.recalculate_locks[locking.LEVEL_NODE] is set.
361

362
    In the future it may grow parameters to just lock some instance's nodes, or
363
    to just lock primaries or secondary nodes, if needed.
364

365
    If should be called in DeclareLocks in a way similar to::
366

367
      if level == locking.LEVEL_NODE:
368
        self._LockInstancesNodes()
369

370
    @type primary_only: boolean
371
    @param primary_only: only lock primary nodes of locked instances
372

373
    """
374
    assert locking.LEVEL_NODE in self.recalculate_locks, \
375
      "_LockInstancesNodes helper function called with no nodes to recalculate"
376

    
377
    # TODO: check if we're really been called with the instance locks held
378

    
379
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
380
    # future we might want to have different behaviors depending on the value
381
    # of self.recalculate_locks[locking.LEVEL_NODE]
382
    wanted_nodes = []
383
    locked_i = self.owned_locks(locking.LEVEL_INSTANCE)
384
    for _, instance in self.cfg.GetMultiInstanceInfo(locked_i):
385
      wanted_nodes.append(instance.primary_node)
386
      if not primary_only:
387
        wanted_nodes.extend(instance.secondary_nodes)
388

    
389
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
390
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
391
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
392
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
393

    
394
    del self.recalculate_locks[locking.LEVEL_NODE]
395

    
396

    
397
class NoHooksLU(LogicalUnit): # pylint: disable=W0223
398
  """Simple LU which runs no hooks.
399

400
  This LU is intended as a parent for other LogicalUnits which will
401
  run no hooks, in order to reduce duplicate code.
402

403
  """
404
  HPATH = None
405
  HTYPE = None
406

    
407
  def BuildHooksEnv(self):
408
    """Empty BuildHooksEnv for NoHooksLu.
409

410
    This just raises an error.
411

412
    """
413
    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
414

    
415
  def BuildHooksNodes(self):
416
    """Empty BuildHooksNodes for NoHooksLU.
417

418
    """
419
    raise AssertionError("BuildHooksNodes called for NoHooksLU")
420

    
421

    
422
class Tasklet:
423
  """Tasklet base class.
424

425
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
426
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
427
  tasklets know nothing about locks.
428

429
  Subclasses must follow these rules:
430
    - Implement CheckPrereq
431
    - Implement Exec
432

433
  """
434
  def __init__(self, lu):
435
    self.lu = lu
436

    
437
    # Shortcuts
438
    self.cfg = lu.cfg
439
    self.rpc = lu.rpc
440

    
441
  def CheckPrereq(self):
442
    """Check prerequisites for this tasklets.
443

444
    This method should check whether the prerequisites for the execution of
445
    this tasklet are fulfilled. It can do internode communication, but it
446
    should be idempotent - no cluster or system changes are allowed.
447

448
    The method should raise errors.OpPrereqError in case something is not
449
    fulfilled. Its return value is ignored.
450

451
    This method should also update all parameters to their canonical form if it
452
    hasn't been done before.
453

454
    """
455
    pass
456

    
457
  def Exec(self, feedback_fn):
458
    """Execute the tasklet.
459

460
    This method should implement the actual work. It should raise
461
    errors.OpExecError for failures that are somewhat dealt with in code, or
462
    expected.
463

464
    """
465
    raise NotImplementedError
466

    
467

    
468
class _QueryBase:
469
  """Base for query utility classes.
470

471
  """
472
  #: Attribute holding field definitions
473
  FIELDS = None
474

    
475
  def __init__(self, qfilter, fields, use_locking):
476
    """Initializes this class.
477

478
    """
479
    self.use_locking = use_locking
480

    
481
    self.query = query.Query(self.FIELDS, fields, qfilter=qfilter,
482
                             namefield="name")
483
    self.requested_data = self.query.RequestedData()
484
    self.names = self.query.RequestedNames()
485

    
486
    # Sort only if no names were requested
487
    self.sort_by_name = not self.names
488

    
489
    self.do_locking = None
490
    self.wanted = None
491

    
492
  def _GetNames(self, lu, all_names, lock_level):
493
    """Helper function to determine names asked for in the query.
494

495
    """
496
    if self.do_locking:
497
      names = lu.owned_locks(lock_level)
498
    else:
499
      names = all_names
500

    
501
    if self.wanted == locking.ALL_SET:
502
      assert not self.names
503
      # caller didn't specify names, so ordering is not important
504
      return utils.NiceSort(names)
505

    
506
    # caller specified names and we must keep the same order
507
    assert self.names
508
    assert not self.do_locking or lu.glm.is_owned(lock_level)
509

    
510
    missing = set(self.wanted).difference(names)
511
    if missing:
512
      raise errors.OpExecError("Some items were removed before retrieving"
513
                               " their data: %s" % missing)
514

    
515
    # Return expanded names
516
    return self.wanted
517

    
518
  def ExpandNames(self, lu):
519
    """Expand names for this query.
520

521
    See L{LogicalUnit.ExpandNames}.
522

523
    """
524
    raise NotImplementedError()
525

    
526
  def DeclareLocks(self, lu, level):
527
    """Declare locks for this query.
528

529
    See L{LogicalUnit.DeclareLocks}.
530

531
    """
532
    raise NotImplementedError()
533

    
534
  def _GetQueryData(self, lu):
535
    """Collects all data for this query.
536

537
    @return: Query data object
538

539
    """
540
    raise NotImplementedError()
541

    
542
  def NewStyleQuery(self, lu):
543
    """Collect data and execute query.
544

545
    """
546
    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
547
                                  sort_by_name=self.sort_by_name)
548

    
549
  def OldStyleQuery(self, lu):
550
    """Collect data and execute query.
551

552
    """
553
    return self.query.OldStyleQuery(self._GetQueryData(lu),
554
                                    sort_by_name=self.sort_by_name)
555

    
556

    
557
def _ShareAll():
558
  """Returns a dict declaring all lock levels shared.
559

560
  """
561
  return dict.fromkeys(locking.LEVELS, 1)
562

    
563

    
564
def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
565
  """Checks if the owned node groups are still correct for an instance.
566

567
  @type cfg: L{config.ConfigWriter}
568
  @param cfg: The cluster configuration
569
  @type instance_name: string
570
  @param instance_name: Instance name
571
  @type owned_groups: set or frozenset
572
  @param owned_groups: List of currently owned node groups
573

574
  """
575
  inst_groups = cfg.GetInstanceNodeGroups(instance_name)
576

    
577
  if not owned_groups.issuperset(inst_groups):
578
    raise errors.OpPrereqError("Instance %s's node groups changed since"
579
                               " locks were acquired, current groups are"
580
                               " are '%s', owning groups '%s'; retry the"
581
                               " operation" %
582
                               (instance_name,
583
                                utils.CommaJoin(inst_groups),
584
                                utils.CommaJoin(owned_groups)),
585
                               errors.ECODE_STATE)
586

    
587
  return inst_groups
588

    
589

    
590
def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
591
  """Checks if the instances in a node group are still correct.
592

593
  @type cfg: L{config.ConfigWriter}
594
  @param cfg: The cluster configuration
595
  @type group_uuid: string
596
  @param group_uuid: Node group UUID
597
  @type owned_instances: set or frozenset
598
  @param owned_instances: List of currently owned instances
599

600
  """
601
  wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
602
  if owned_instances != wanted_instances:
603
    raise errors.OpPrereqError("Instances in node group '%s' changed since"
604
                               " locks were acquired, wanted '%s', have '%s';"
605
                               " retry the operation" %
606
                               (group_uuid,
607
                                utils.CommaJoin(wanted_instances),
608
                                utils.CommaJoin(owned_instances)),
609
                               errors.ECODE_STATE)
610

    
611
  return wanted_instances
612

    
613

    
614
def _SupportsOob(cfg, node):
615
  """Tells if node supports OOB.
616

617
  @type cfg: L{config.ConfigWriter}
618
  @param cfg: The cluster configuration
619
  @type node: L{objects.Node}
620
  @param node: The node
621
  @return: The OOB script if supported or an empty string otherwise
622

623
  """
624
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
625

    
626

    
627
def _GetWantedNodes(lu, nodes):
628
  """Returns list of checked and expanded node names.
629

630
  @type lu: L{LogicalUnit}
631
  @param lu: the logical unit on whose behalf we execute
632
  @type nodes: list
633
  @param nodes: list of node names or None for all nodes
634
  @rtype: list
635
  @return: the list of nodes, sorted
636
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
637

638
  """
639
  if nodes:
640
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
641

    
642
  return utils.NiceSort(lu.cfg.GetNodeList())
643

    
644

    
645
def _GetWantedInstances(lu, instances):
646
  """Returns list of checked and expanded instance names.
647

648
  @type lu: L{LogicalUnit}
649
  @param lu: the logical unit on whose behalf we execute
650
  @type instances: list
651
  @param instances: list of instance names or None for all instances
652
  @rtype: list
653
  @return: the list of instances, sorted
654
  @raise errors.OpPrereqError: if the instances parameter is wrong type
655
  @raise errors.OpPrereqError: if any of the passed instances is not found
656

657
  """
658
  if instances:
659
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
660
  else:
661
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
662
  return wanted
663

    
664

    
665
def _GetUpdatedParams(old_params, update_dict,
666
                      use_default=True, use_none=False):
667
  """Return the new version of a parameter dictionary.
668

669
  @type old_params: dict
670
  @param old_params: old parameters
671
  @type update_dict: dict
672
  @param update_dict: dict containing new parameter values, or
673
      constants.VALUE_DEFAULT to reset the parameter to its default
674
      value
675
  @param use_default: boolean
676
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
677
      values as 'to be deleted' values
678
  @param use_none: boolean
679
  @type use_none: whether to recognise C{None} values as 'to be
680
      deleted' values
681
  @rtype: dict
682
  @return: the new parameter dictionary
683

684
  """
685
  params_copy = copy.deepcopy(old_params)
686
  for key, val in update_dict.iteritems():
687
    if ((use_default and val == constants.VALUE_DEFAULT) or
688
        (use_none and val is None)):
689
      try:
690
        del params_copy[key]
691
      except KeyError:
692
        pass
693
    else:
694
      params_copy[key] = val
695
  return params_copy
696

    
697

    
698
def _ReleaseLocks(lu, level, names=None, keep=None):
699
  """Releases locks owned by an LU.
700

701
  @type lu: L{LogicalUnit}
702
  @param level: Lock level
703
  @type names: list or None
704
  @param names: Names of locks to release
705
  @type keep: list or None
706
  @param keep: Names of locks to retain
707

708
  """
709
  assert not (keep is not None and names is not None), \
710
         "Only one of the 'names' and the 'keep' parameters can be given"
711

    
712
  if names is not None:
713
    should_release = names.__contains__
714
  elif keep:
715
    should_release = lambda name: name not in keep
716
  else:
717
    should_release = None
718

    
719
  if should_release:
720
    retain = []
721
    release = []
722

    
723
    # Determine which locks to release
724
    for name in lu.owned_locks(level):
725
      if should_release(name):
726
        release.append(name)
727
      else:
728
        retain.append(name)
729

    
730
    assert len(lu.owned_locks(level)) == (len(retain) + len(release))
731

    
732
    # Release just some locks
733
    lu.glm.release(level, names=release)
734

    
735
    assert frozenset(lu.owned_locks(level)) == frozenset(retain)
736
  else:
737
    # Release everything
738
    lu.glm.release(level)
739

    
740
    assert not lu.glm.is_owned(level), "No locks should be owned"
741

    
742

    
743
def _MapInstanceDisksToNodes(instances):
744
  """Creates a map from (node, volume) to instance name.
745

746
  @type instances: list of L{objects.Instance}
747
  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
748

749
  """
750
  return dict(((node, vol), inst.name)
751
              for inst in instances
752
              for (node, vols) in inst.MapLVsByNode().items()
753
              for vol in vols)
754

    
755

    
756
def _RunPostHook(lu, node_name):
757
  """Runs the post-hook for an opcode on a single node.
758

759
  """
760
  hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu)
761
  try:
762
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
763
  except:
764
    # pylint: disable=W0702
765
    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
766

    
767

    
768
def _CheckOutputFields(static, dynamic, selected):
769
  """Checks whether all selected fields are valid.
770

771
  @type static: L{utils.FieldSet}
772
  @param static: static fields set
773
  @type dynamic: L{utils.FieldSet}
774
  @param dynamic: dynamic fields set
775

776
  """
777
  f = utils.FieldSet()
778
  f.Extend(static)
779
  f.Extend(dynamic)
780

    
781
  delta = f.NonMatching(selected)
782
  if delta:
783
    raise errors.OpPrereqError("Unknown output fields selected: %s"
784
                               % ",".join(delta), errors.ECODE_INVAL)
785

    
786

    
787
def _CheckGlobalHvParams(params):
788
  """Validates that given hypervisor params are not global ones.
789

790
  This will ensure that instances don't get customised versions of
791
  global params.
792

793
  """
794
  used_globals = constants.HVC_GLOBALS.intersection(params)
795
  if used_globals:
796
    msg = ("The following hypervisor parameters are global and cannot"
797
           " be customized at instance level, please modify them at"
798
           " cluster level: %s" % utils.CommaJoin(used_globals))
799
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
800

    
801

    
802
def _CheckNodeOnline(lu, node, msg=None):
803
  """Ensure that a given node is online.
804

805
  @param lu: the LU on behalf of which we make the check
806
  @param node: the node to check
807
  @param msg: if passed, should be a message to replace the default one
808
  @raise errors.OpPrereqError: if the node is offline
809

810
  """
811
  if msg is None:
812
    msg = "Can't use offline node"
813
  if lu.cfg.GetNodeInfo(node).offline:
814
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
815

    
816

    
817
def _CheckNodeNotDrained(lu, node):
818
  """Ensure that a given node is not drained.
819

820
  @param lu: the LU on behalf of which we make the check
821
  @param node: the node to check
822
  @raise errors.OpPrereqError: if the node is drained
823

824
  """
825
  if lu.cfg.GetNodeInfo(node).drained:
826
    raise errors.OpPrereqError("Can't use drained node %s" % node,
827
                               errors.ECODE_STATE)
828

    
829

    
830
def _CheckNodeVmCapable(lu, node):
831
  """Ensure that a given node is vm capable.
832

833
  @param lu: the LU on behalf of which we make the check
834
  @param node: the node to check
835
  @raise errors.OpPrereqError: if the node is not vm capable
836

837
  """
838
  if not lu.cfg.GetNodeInfo(node).vm_capable:
839
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
840
                               errors.ECODE_STATE)
841

    
842

    
843
def _CheckNodeHasOS(lu, node, os_name, force_variant):
844
  """Ensure that a node supports a given OS.
845

846
  @param lu: the LU on behalf of which we make the check
847
  @param node: the node to check
848
  @param os_name: the OS to query about
849
  @param force_variant: whether to ignore variant errors
850
  @raise errors.OpPrereqError: if the node is not supporting the OS
851

852
  """
853
  result = lu.rpc.call_os_get(node, os_name)
854
  result.Raise("OS '%s' not in supported OS list for node %s" %
855
               (os_name, node),
856
               prereq=True, ecode=errors.ECODE_INVAL)
857
  if not force_variant:
858
    _CheckOSVariant(result.payload, os_name)
859

    
860

    
861
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
862
  """Ensure that a node has the given secondary ip.
863

864
  @type lu: L{LogicalUnit}
865
  @param lu: the LU on behalf of which we make the check
866
  @type node: string
867
  @param node: the node to check
868
  @type secondary_ip: string
869
  @param secondary_ip: the ip to check
870
  @type prereq: boolean
871
  @param prereq: whether to throw a prerequisite or an execute error
872
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
873
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
874

875
  """
876
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
877
  result.Raise("Failure checking secondary ip on node %s" % node,
878
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
879
  if not result.payload:
880
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
881
           " please fix and re-run this command" % secondary_ip)
882
    if prereq:
883
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
884
    else:
885
      raise errors.OpExecError(msg)
886

    
887

    
888
def _GetClusterDomainSecret():
889
  """Reads the cluster domain secret.
890

891
  """
892
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
893
                               strict=True)
894

    
895

    
896
def _CheckInstanceDown(lu, instance, reason):
897
  """Ensure that an instance is not running."""
898
  if instance.admin_up:
899
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
900
                               (instance.name, reason), errors.ECODE_STATE)
901

    
902
  pnode = instance.primary_node
903
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
904
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
905
              prereq=True, ecode=errors.ECODE_ENVIRON)
906

    
907
  if instance.name in ins_l.payload:
908
    raise errors.OpPrereqError("Instance %s is running, %s" %
909
                               (instance.name, reason), errors.ECODE_STATE)
910

    
911

    
912
def _ExpandItemName(fn, name, kind):
913
  """Expand an item name.
914

915
  @param fn: the function to use for expansion
916
  @param name: requested item name
917
  @param kind: text description ('Node' or 'Instance')
918
  @return: the resolved (full) name
919
  @raise errors.OpPrereqError: if the item is not found
920

921
  """
922
  full_name = fn(name)
923
  if full_name is None:
924
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
925
                               errors.ECODE_NOENT)
926
  return full_name
927

    
928

    
929
def _ExpandNodeName(cfg, name):
930
  """Wrapper over L{_ExpandItemName} for nodes."""
931
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
932

    
933

    
934
def _ExpandInstanceName(cfg, name):
935
  """Wrapper over L{_ExpandItemName} for instance."""
936
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
937

    
938

    
939
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
940
                          memory, vcpus, nics, disk_template, disks,
941
                          bep, hvp, hypervisor_name, tags):
942
  """Builds instance related env variables for hooks
943

944
  This builds the hook environment from individual variables.
945

946
  @type name: string
947
  @param name: the name of the instance
948
  @type primary_node: string
949
  @param primary_node: the name of the instance's primary node
950
  @type secondary_nodes: list
951
  @param secondary_nodes: list of secondary nodes as strings
952
  @type os_type: string
953
  @param os_type: the name of the instance's OS
954
  @type status: boolean
955
  @param status: the should_run status of the instance
956
  @type memory: string
957
  @param memory: the memory size of the instance
958
  @type vcpus: string
959
  @param vcpus: the count of VCPUs the instance has
960
  @type nics: list
961
  @param nics: list of tuples (ip, mac, mode, link) representing
962
      the NICs the instance has
963
  @type disk_template: string
964
  @param disk_template: the disk template of the instance
965
  @type disks: list
966
  @param disks: the list of (size, mode) pairs
967
  @type bep: dict
968
  @param bep: the backend parameters for the instance
969
  @type hvp: dict
970
  @param hvp: the hypervisor parameters for the instance
971
  @type hypervisor_name: string
972
  @param hypervisor_name: the hypervisor for the instance
973
  @type tags: list
974
  @param tags: list of instance tags as strings
975
  @rtype: dict
976
  @return: the hook environment for this instance
977

978
  """
979
  if status:
980
    str_status = "up"
981
  else:
982
    str_status = "down"
983
  env = {
984
    "OP_TARGET": name,
985
    "INSTANCE_NAME": name,
986
    "INSTANCE_PRIMARY": primary_node,
987
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
988
    "INSTANCE_OS_TYPE": os_type,
989
    "INSTANCE_STATUS": str_status,
990
    "INSTANCE_MEMORY": memory,
991
    "INSTANCE_VCPUS": vcpus,
992
    "INSTANCE_DISK_TEMPLATE": disk_template,
993
    "INSTANCE_HYPERVISOR": hypervisor_name,
994
  }
995

    
996
  if nics:
997
    nic_count = len(nics)
998
    for idx, (ip, mac, mode, link) in enumerate(nics):
999
      if ip is None:
1000
        ip = ""
1001
      env["INSTANCE_NIC%d_IP" % idx] = ip
1002
      env["INSTANCE_NIC%d_MAC" % idx] = mac
1003
      env["INSTANCE_NIC%d_MODE" % idx] = mode
1004
      env["INSTANCE_NIC%d_LINK" % idx] = link
1005
      if mode == constants.NIC_MODE_BRIDGED:
1006
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
1007
  else:
1008
    nic_count = 0
1009

    
1010
  env["INSTANCE_NIC_COUNT"] = nic_count
1011

    
1012
  if disks:
1013
    disk_count = len(disks)
1014
    for idx, (size, mode) in enumerate(disks):
1015
      env["INSTANCE_DISK%d_SIZE" % idx] = size
1016
      env["INSTANCE_DISK%d_MODE" % idx] = mode
1017
  else:
1018
    disk_count = 0
1019

    
1020
  env["INSTANCE_DISK_COUNT"] = disk_count
1021

    
1022
  if not tags:
1023
    tags = []
1024

    
1025
  env["INSTANCE_TAGS"] = " ".join(tags)
1026

    
1027
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
1028
    for key, value in source.items():
1029
      env["INSTANCE_%s_%s" % (kind, key)] = value
1030

    
1031
  return env
1032

    
1033

    
1034
def _NICListToTuple(lu, nics):
1035
  """Build a list of nic information tuples.
1036

1037
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
1038
  value in LUInstanceQueryData.
1039

1040
  @type lu:  L{LogicalUnit}
1041
  @param lu: the logical unit on whose behalf we execute
1042
  @type nics: list of L{objects.NIC}
1043
  @param nics: list of nics to convert to hooks tuples
1044

1045
  """
1046
  hooks_nics = []
1047
  cluster = lu.cfg.GetClusterInfo()
1048
  for nic in nics:
1049
    ip = nic.ip
1050
    mac = nic.mac
1051
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
1052
    mode = filled_params[constants.NIC_MODE]
1053
    link = filled_params[constants.NIC_LINK]
1054
    hooks_nics.append((ip, mac, mode, link))
1055
  return hooks_nics
1056

    
1057

    
1058
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
1059
  """Builds instance related env variables for hooks from an object.
1060

1061
  @type lu: L{LogicalUnit}
1062
  @param lu: the logical unit on whose behalf we execute
1063
  @type instance: L{objects.Instance}
1064
  @param instance: the instance for which we should build the
1065
      environment
1066
  @type override: dict
1067
  @param override: dictionary with key/values that will override
1068
      our values
1069
  @rtype: dict
1070
  @return: the hook environment dictionary
1071

1072
  """
1073
  cluster = lu.cfg.GetClusterInfo()
1074
  bep = cluster.FillBE(instance)
1075
  hvp = cluster.FillHV(instance)
1076
  args = {
1077
    "name": instance.name,
1078
    "primary_node": instance.primary_node,
1079
    "secondary_nodes": instance.secondary_nodes,
1080
    "os_type": instance.os,
1081
    "status": instance.admin_up,
1082
    "memory": bep[constants.BE_MEMORY],
1083
    "vcpus": bep[constants.BE_VCPUS],
1084
    "nics": _NICListToTuple(lu, instance.nics),
1085
    "disk_template": instance.disk_template,
1086
    "disks": [(disk.size, disk.mode) for disk in instance.disks],
1087
    "bep": bep,
1088
    "hvp": hvp,
1089
    "hypervisor_name": instance.hypervisor,
1090
    "tags": instance.tags,
1091
  }
1092
  if override:
1093
    args.update(override)
1094
  return _BuildInstanceHookEnv(**args) # pylint: disable=W0142
1095

    
1096

    
1097
def _AdjustCandidatePool(lu, exceptions):
1098
  """Adjust the candidate pool after node operations.
1099

1100
  """
1101
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1102
  if mod_list:
1103
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1104
               utils.CommaJoin(node.name for node in mod_list))
1105
    for name in mod_list:
1106
      lu.context.ReaddNode(name)
1107
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1108
  if mc_now > mc_max:
1109
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1110
               (mc_now, mc_max))
1111

    
1112

    
1113
def _DecideSelfPromotion(lu, exceptions=None):
1114
  """Decide whether I should promote myself as a master candidate.
1115

1116
  """
1117
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1118
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1119
  # the new node will increase mc_max with one, so:
1120
  mc_should = min(mc_should + 1, cp_size)
1121
  return mc_now < mc_should
1122

    
1123

    
1124
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1125
  """Check that the brigdes needed by a list of nics exist.
1126

1127
  """
1128
  cluster = lu.cfg.GetClusterInfo()
1129
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1130
  brlist = [params[constants.NIC_LINK] for params in paramslist
1131
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1132
  if brlist:
1133
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1134
    result.Raise("Error checking bridges on destination node '%s'" %
1135
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1136

    
1137

    
1138
def _CheckInstanceBridgesExist(lu, instance, node=None):
1139
  """Check that the brigdes needed by an instance exist.
1140

1141
  """
1142
  if node is None:
1143
    node = instance.primary_node
1144
  _CheckNicsBridgesExist(lu, instance.nics, node)
1145

    
1146

    
1147
def _CheckOSVariant(os_obj, name):
1148
  """Check whether an OS name conforms to the os variants specification.
1149

1150
  @type os_obj: L{objects.OS}
1151
  @param os_obj: OS object to check
1152
  @type name: string
1153
  @param name: OS name passed by the user, to check for validity
1154

1155
  """
1156
  variant = objects.OS.GetVariant(name)
1157
  if not os_obj.supported_variants:
1158
    if variant:
1159
      raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
1160
                                 " passed)" % (os_obj.name, variant),
1161
                                 errors.ECODE_INVAL)
1162
    return
1163
  if not variant:
1164
    raise errors.OpPrereqError("OS name must include a variant",
1165
                               errors.ECODE_INVAL)
1166

    
1167
  if variant not in os_obj.supported_variants:
1168
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1169

    
1170

    
1171
def _GetNodeInstancesInner(cfg, fn):
1172
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1173

    
1174

    
1175
def _GetNodeInstances(cfg, node_name):
1176
  """Returns a list of all primary and secondary instances on a node.
1177

1178
  """
1179

    
1180
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1181

    
1182

    
1183
def _GetNodePrimaryInstances(cfg, node_name):
1184
  """Returns primary instances on a node.
1185

1186
  """
1187
  return _GetNodeInstancesInner(cfg,
1188
                                lambda inst: node_name == inst.primary_node)
1189

    
1190

    
1191
def _GetNodeSecondaryInstances(cfg, node_name):
1192
  """Returns secondary instances on a node.
1193

1194
  """
1195
  return _GetNodeInstancesInner(cfg,
1196
                                lambda inst: node_name in inst.secondary_nodes)
1197

    
1198

    
1199
def _GetStorageTypeArgs(cfg, storage_type):
1200
  """Returns the arguments for a storage type.
1201

1202
  """
1203
  # Special case for file storage
1204
  if storage_type == constants.ST_FILE:
1205
    # storage.FileStorage wants a list of storage directories
1206
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1207

    
1208
  return []
1209

    
1210

    
1211
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
1212
  faulty = []
1213

    
1214
  for dev in instance.disks:
1215
    cfg.SetDiskID(dev, node_name)
1216

    
1217
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
1218
  result.Raise("Failed to get disk status from node %s" % node_name,
1219
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1220

    
1221
  for idx, bdev_status in enumerate(result.payload):
1222
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1223
      faulty.append(idx)
1224

    
1225
  return faulty
1226

    
1227

    
1228
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1229
  """Check the sanity of iallocator and node arguments and use the
1230
  cluster-wide iallocator if appropriate.
1231

1232
  Check that at most one of (iallocator, node) is specified. If none is
1233
  specified, then the LU's opcode's iallocator slot is filled with the
1234
  cluster-wide default iallocator.
1235

1236
  @type iallocator_slot: string
1237
  @param iallocator_slot: the name of the opcode iallocator slot
1238
  @type node_slot: string
1239
  @param node_slot: the name of the opcode target node slot
1240

1241
  """
1242
  node = getattr(lu.op, node_slot, None)
1243
  iallocator = getattr(lu.op, iallocator_slot, None)
1244

    
1245
  if node is not None and iallocator is not None:
1246
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
1247
                               errors.ECODE_INVAL)
1248
  elif node is None and iallocator is None:
1249
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1250
    if default_iallocator:
1251
      setattr(lu.op, iallocator_slot, default_iallocator)
1252
    else:
1253
      raise errors.OpPrereqError("No iallocator or node given and no"
1254
                                 " cluster-wide default iallocator found;"
1255
                                 " please specify either an iallocator or a"
1256
                                 " node, or set a cluster-wide default"
1257
                                 " iallocator")
1258

    
1259

    
1260
def _GetDefaultIAllocator(cfg, iallocator):
1261
  """Decides on which iallocator to use.
1262

1263
  @type cfg: L{config.ConfigWriter}
1264
  @param cfg: Cluster configuration object
1265
  @type iallocator: string or None
1266
  @param iallocator: Iallocator specified in opcode
1267
  @rtype: string
1268
  @return: Iallocator name
1269

1270
  """
1271
  if not iallocator:
1272
    # Use default iallocator
1273
    iallocator = cfg.GetDefaultIAllocator()
1274

    
1275
  if not iallocator:
1276
    raise errors.OpPrereqError("No iallocator was specified, neither in the"
1277
                               " opcode nor as a cluster-wide default",
1278
                               errors.ECODE_INVAL)
1279

    
1280
  return iallocator
1281

    
1282

    
1283
class LUClusterPostInit(LogicalUnit):
1284
  """Logical unit for running hooks after cluster initialization.
1285

1286
  """
1287
  HPATH = "cluster-init"
1288
  HTYPE = constants.HTYPE_CLUSTER
1289

    
1290
  def BuildHooksEnv(self):
1291
    """Build hooks env.
1292

1293
    """
1294
    return {
1295
      "OP_TARGET": self.cfg.GetClusterName(),
1296
      }
1297

    
1298
  def BuildHooksNodes(self):
1299
    """Build hooks nodes.
1300

1301
    """
1302
    return ([], [self.cfg.GetMasterNode()])
1303

    
1304
  def Exec(self, feedback_fn):
1305
    """Nothing to do.
1306

1307
    """
1308
    return True
1309

    
1310

    
1311
class LUClusterDestroy(LogicalUnit):
1312
  """Logical unit for destroying the cluster.
1313

1314
  """
1315
  HPATH = "cluster-destroy"
1316
  HTYPE = constants.HTYPE_CLUSTER
1317

    
1318
  def BuildHooksEnv(self):
1319
    """Build hooks env.
1320

1321
    """
1322
    return {
1323
      "OP_TARGET": self.cfg.GetClusterName(),
1324
      }
1325

    
1326
  def BuildHooksNodes(self):
1327
    """Build hooks nodes.
1328

1329
    """
1330
    return ([], [])
1331

    
1332
  def CheckPrereq(self):
1333
    """Check prerequisites.
1334

1335
    This checks whether the cluster is empty.
1336

1337
    Any errors are signaled by raising errors.OpPrereqError.
1338

1339
    """
1340
    master = self.cfg.GetMasterNode()
1341

    
1342
    nodelist = self.cfg.GetNodeList()
1343
    if len(nodelist) != 1 or nodelist[0] != master:
1344
      raise errors.OpPrereqError("There are still %d node(s) in"
1345
                                 " this cluster." % (len(nodelist) - 1),
1346
                                 errors.ECODE_INVAL)
1347
    instancelist = self.cfg.GetInstanceList()
1348
    if instancelist:
1349
      raise errors.OpPrereqError("There are still %d instance(s) in"
1350
                                 " this cluster." % len(instancelist),
1351
                                 errors.ECODE_INVAL)
1352

    
1353
  def Exec(self, feedback_fn):
1354
    """Destroys the cluster.
1355

1356
    """
1357
    master = self.cfg.GetMasterNode()
1358

    
1359
    # Run post hooks on master node before it's removed
1360
    _RunPostHook(self, master)
1361

    
1362
    result = self.rpc.call_node_deactivate_master_ip(master)
1363
    result.Raise("Could not disable the master role")
1364

    
1365
    return master
1366

    
1367

    
1368
def _VerifyCertificate(filename):
1369
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1370

1371
  @type filename: string
1372
  @param filename: Path to PEM file
1373

1374
  """
1375
  try:
1376
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1377
                                           utils.ReadFile(filename))
1378
  except Exception, err: # pylint: disable=W0703
1379
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1380
            "Failed to load X509 certificate %s: %s" % (filename, err))
1381

    
1382
  (errcode, msg) = \
1383
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1384
                                constants.SSL_CERT_EXPIRATION_ERROR)
1385

    
1386
  if msg:
1387
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1388
  else:
1389
    fnamemsg = None
1390

    
1391
  if errcode is None:
1392
    return (None, fnamemsg)
1393
  elif errcode == utils.CERT_WARNING:
1394
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1395
  elif errcode == utils.CERT_ERROR:
1396
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1397

    
1398
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1399

    
1400

    
1401
def _GetAllHypervisorParameters(cluster, instances):
1402
  """Compute the set of all hypervisor parameters.
1403

1404
  @type cluster: L{objects.Cluster}
1405
  @param cluster: the cluster object
1406
  @param instances: list of L{objects.Instance}
1407
  @param instances: additional instances from which to obtain parameters
1408
  @rtype: list of (origin, hypervisor, parameters)
1409
  @return: a list with all parameters found, indicating the hypervisor they
1410
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1411

1412
  """
1413
  hvp_data = []
1414

    
1415
  for hv_name in cluster.enabled_hypervisors:
1416
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1417

    
1418
  for os_name, os_hvp in cluster.os_hvp.items():
1419
    for hv_name, hv_params in os_hvp.items():
1420
      if hv_params:
1421
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1422
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1423

    
1424
  # TODO: collapse identical parameter values in a single one
1425
  for instance in instances:
1426
    if instance.hvparams:
1427
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1428
                       cluster.FillHV(instance)))
1429

    
1430
  return hvp_data
1431

    
1432

    
1433
class _VerifyErrors(object):
1434
  """Mix-in for cluster/group verify LUs.
1435

1436
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1437
  self.op and self._feedback_fn to be available.)
1438

1439
  """
1440

    
1441
  ETYPE_FIELD = "code"
1442
  ETYPE_ERROR = "ERROR"
1443
  ETYPE_WARNING = "WARNING"
1444

    
1445
  def _Error(self, ecode, item, msg, *args, **kwargs):
1446
    """Format an error message.
1447

1448
    Based on the opcode's error_codes parameter, either format a
1449
    parseable error code, or a simpler error string.
1450

1451
    This must be called only from Exec and functions called from Exec.
1452

1453
    """
1454
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1455
    itype, etxt, _ = ecode
1456
    # first complete the msg
1457
    if args:
1458
      msg = msg % args
1459
    # then format the whole message
1460
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1461
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1462
    else:
1463
      if item:
1464
        item = " " + item
1465
      else:
1466
        item = ""
1467
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1468
    # and finally report it via the feedback_fn
1469
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1470

    
1471
  def _ErrorIf(self, cond, ecode, *args, **kwargs):
1472
    """Log an error message if the passed condition is True.
1473

1474
    """
1475
    cond = (bool(cond)
1476
            or self.op.debug_simulate_errors) # pylint: disable=E1101
1477

    
1478
    # If the error code is in the list of ignored errors, demote the error to a
1479
    # warning
1480
    (_, etxt, _) = ecode
1481
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1482
      kwargs[self.ETYPE_FIELD] = self.ETYPE_WARNING
1483

    
1484
    if cond:
1485
      self._Error(ecode, *args, **kwargs)
1486

    
1487
    # do not mark the operation as failed for WARN cases only
1488
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1489
      self.bad = self.bad or cond
1490

    
1491

    
1492
class LUClusterVerify(NoHooksLU):
1493
  """Submits all jobs necessary to verify the cluster.
1494

1495
  """
1496
  REQ_BGL = False
1497

    
1498
  def ExpandNames(self):
1499
    self.needed_locks = {}
1500

    
1501
  def Exec(self, feedback_fn):
1502
    jobs = []
1503

    
1504
    if self.op.group_name:
1505
      groups = [self.op.group_name]
1506
      depends_fn = lambda: None
1507
    else:
1508
      groups = self.cfg.GetNodeGroupList()
1509

    
1510
      # Verify global configuration
1511
      jobs.append([
1512
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors)
1513
        ])
1514

    
1515
      # Always depend on global verification
1516
      depends_fn = lambda: [(-len(jobs), [])]
1517

    
1518
    jobs.extend([opcodes.OpClusterVerifyGroup(group_name=group,
1519
                                            ignore_errors=self.op.ignore_errors,
1520
                                            depends=depends_fn())]
1521
                for group in groups)
1522

    
1523
    # Fix up all parameters
1524
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1525
      op.debug_simulate_errors = self.op.debug_simulate_errors
1526
      op.verbose = self.op.verbose
1527
      op.error_codes = self.op.error_codes
1528
      try:
1529
        op.skip_checks = self.op.skip_checks
1530
      except AttributeError:
1531
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1532

    
1533
    return ResultWithJobs(jobs)
1534

    
1535

    
1536
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1537
  """Verifies the cluster config.
1538

1539
  """
1540
  REQ_BGL = True
1541

    
1542
  def _VerifyHVP(self, hvp_data):
1543
    """Verifies locally the syntax of the hypervisor parameters.
1544

1545
    """
1546
    for item, hv_name, hv_params in hvp_data:
1547
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1548
             (item, hv_name))
1549
      try:
1550
        hv_class = hypervisor.GetHypervisor(hv_name)
1551
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1552
        hv_class.CheckParameterSyntax(hv_params)
1553
      except errors.GenericError, err:
1554
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1555

    
1556
  def ExpandNames(self):
1557
    # Information can be safely retrieved as the BGL is acquired in exclusive
1558
    # mode
1559
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
1560
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1561
    self.all_node_info = self.cfg.GetAllNodesInfo()
1562
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1563
    self.needed_locks = {}
1564

    
1565
  def Exec(self, feedback_fn):
1566
    """Verify integrity of cluster, performing various test on nodes.
1567

1568
    """
1569
    self.bad = False
1570
    self._feedback_fn = feedback_fn
1571

    
1572
    feedback_fn("* Verifying cluster config")
1573

    
1574
    for msg in self.cfg.VerifyConfig():
1575
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1576

    
1577
    feedback_fn("* Verifying cluster certificate files")
1578

    
1579
    for cert_filename in constants.ALL_CERT_FILES:
1580
      (errcode, msg) = _VerifyCertificate(cert_filename)
1581
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1582

    
1583
    feedback_fn("* Verifying hypervisor parameters")
1584

    
1585
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1586
                                                self.all_inst_info.values()))
1587

    
1588
    feedback_fn("* Verifying all nodes belong to an existing group")
1589

    
1590
    # We do this verification here because, should this bogus circumstance
1591
    # occur, it would never be caught by VerifyGroup, which only acts on
1592
    # nodes/instances reachable from existing node groups.
1593

    
1594
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1595
                         if node.group not in self.all_group_info)
1596

    
1597
    dangling_instances = {}
1598
    no_node_instances = []
1599

    
1600
    for inst in self.all_inst_info.values():
1601
      if inst.primary_node in dangling_nodes:
1602
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1603
      elif inst.primary_node not in self.all_node_info:
1604
        no_node_instances.append(inst.name)
1605

    
1606
    pretty_dangling = [
1607
        "%s (%s)" %
1608
        (node.name,
1609
         utils.CommaJoin(dangling_instances.get(node.name,
1610
                                                ["no instances"])))
1611
        for node in dangling_nodes]
1612

    
1613
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1614
                  None,
1615
                  "the following nodes (and their instances) belong to a non"
1616
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1617

    
1618
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1619
                  None,
1620
                  "the following instances have a non-existing primary-node:"
1621
                  " %s", utils.CommaJoin(no_node_instances))
1622

    
1623
    return not self.bad
1624

    
1625

    
1626
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1627
  """Verifies the status of a node group.
1628

1629
  """
1630
  HPATH = "cluster-verify"
1631
  HTYPE = constants.HTYPE_CLUSTER
1632
  REQ_BGL = False
1633

    
1634
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1635

    
1636
  class NodeImage(object):
1637
    """A class representing the logical and physical status of a node.
1638

1639
    @type name: string
1640
    @ivar name: the node name to which this object refers
1641
    @ivar volumes: a structure as returned from
1642
        L{ganeti.backend.GetVolumeList} (runtime)
1643
    @ivar instances: a list of running instances (runtime)
1644
    @ivar pinst: list of configured primary instances (config)
1645
    @ivar sinst: list of configured secondary instances (config)
1646
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1647
        instances for which this node is secondary (config)
1648
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1649
    @ivar dfree: free disk, as reported by the node (runtime)
1650
    @ivar offline: the offline status (config)
1651
    @type rpc_fail: boolean
1652
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1653
        not whether the individual keys were correct) (runtime)
1654
    @type lvm_fail: boolean
1655
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1656
    @type hyp_fail: boolean
1657
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1658
    @type ghost: boolean
1659
    @ivar ghost: whether this is a known node or not (config)
1660
    @type os_fail: boolean
1661
    @ivar os_fail: whether the RPC call didn't return valid OS data
1662
    @type oslist: list
1663
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1664
    @type vm_capable: boolean
1665
    @ivar vm_capable: whether the node can host instances
1666

1667
    """
1668
    def __init__(self, offline=False, name=None, vm_capable=True):
1669
      self.name = name
1670
      self.volumes = {}
1671
      self.instances = []
1672
      self.pinst = []
1673
      self.sinst = []
1674
      self.sbp = {}
1675
      self.mfree = 0
1676
      self.dfree = 0
1677
      self.offline = offline
1678
      self.vm_capable = vm_capable
1679
      self.rpc_fail = False
1680
      self.lvm_fail = False
1681
      self.hyp_fail = False
1682
      self.ghost = False
1683
      self.os_fail = False
1684
      self.oslist = {}
1685

    
1686
  def ExpandNames(self):
1687
    # This raises errors.OpPrereqError on its own:
1688
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1689

    
1690
    # Get instances in node group; this is unsafe and needs verification later
1691
    inst_names = self.cfg.GetNodeGroupInstances(self.group_uuid)
1692

    
1693
    self.needed_locks = {
1694
      locking.LEVEL_INSTANCE: inst_names,
1695
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1696
      locking.LEVEL_NODE: [],
1697
      }
1698

    
1699
    self.share_locks = _ShareAll()
1700

    
1701
  def DeclareLocks(self, level):
1702
    if level == locking.LEVEL_NODE:
1703
      # Get members of node group; this is unsafe and needs verification later
1704
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1705

    
1706
      all_inst_info = self.cfg.GetAllInstancesInfo()
1707

    
1708
      # In Exec(), we warn about mirrored instances that have primary and
1709
      # secondary living in separate node groups. To fully verify that
1710
      # volumes for these instances are healthy, we will need to do an
1711
      # extra call to their secondaries. We ensure here those nodes will
1712
      # be locked.
1713
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1714
        # Important: access only the instances whose lock is owned
1715
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1716
          nodes.update(all_inst_info[inst].secondary_nodes)
1717

    
1718
      self.needed_locks[locking.LEVEL_NODE] = nodes
1719

    
1720
  def CheckPrereq(self):
1721
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1722
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1723

    
1724
    group_nodes = set(self.group_info.members)
1725
    group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
1726

    
1727
    unlocked_nodes = \
1728
        group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1729

    
1730
    unlocked_instances = \
1731
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1732

    
1733
    if unlocked_nodes:
1734
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1735
                                 utils.CommaJoin(unlocked_nodes))
1736

    
1737
    if unlocked_instances:
1738
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1739
                                 utils.CommaJoin(unlocked_instances))
1740

    
1741
    self.all_node_info = self.cfg.GetAllNodesInfo()
1742
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1743

    
1744
    self.my_node_names = utils.NiceSort(group_nodes)
1745
    self.my_inst_names = utils.NiceSort(group_instances)
1746

    
1747
    self.my_node_info = dict((name, self.all_node_info[name])
1748
                             for name in self.my_node_names)
1749

    
1750
    self.my_inst_info = dict((name, self.all_inst_info[name])
1751
                             for name in self.my_inst_names)
1752

    
1753
    # We detect here the nodes that will need the extra RPC calls for verifying
1754
    # split LV volumes; they should be locked.
1755
    extra_lv_nodes = set()
1756

    
1757
    for inst in self.my_inst_info.values():
1758
      if inst.disk_template in constants.DTS_INT_MIRROR:
1759
        group = self.my_node_info[inst.primary_node].group
1760
        for nname in inst.secondary_nodes:
1761
          if self.all_node_info[nname].group != group:
1762
            extra_lv_nodes.add(nname)
1763

    
1764
    unlocked_lv_nodes = \
1765
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1766

    
1767
    if unlocked_lv_nodes:
1768
      raise errors.OpPrereqError("these nodes could be locked: %s" %
1769
                                 utils.CommaJoin(unlocked_lv_nodes))
1770
    self.extra_lv_nodes = list(extra_lv_nodes)
1771

    
1772
  def _VerifyNode(self, ninfo, nresult):
1773
    """Perform some basic validation on data returned from a node.
1774

1775
      - check the result data structure is well formed and has all the
1776
        mandatory fields
1777
      - check ganeti version
1778

1779
    @type ninfo: L{objects.Node}
1780
    @param ninfo: the node to check
1781
    @param nresult: the results from the node
1782
    @rtype: boolean
1783
    @return: whether overall this call was successful (and we can expect
1784
         reasonable values in the respose)
1785

1786
    """
1787
    node = ninfo.name
1788
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1789

    
1790
    # main result, nresult should be a non-empty dict
1791
    test = not nresult or not isinstance(nresult, dict)
1792
    _ErrorIf(test, constants.CV_ENODERPC, node,
1793
                  "unable to verify node: no data returned")
1794
    if test:
1795
      return False
1796

    
1797
    # compares ganeti version
1798
    local_version = constants.PROTOCOL_VERSION
1799
    remote_version = nresult.get("version", None)
1800
    test = not (remote_version and
1801
                isinstance(remote_version, (list, tuple)) and
1802
                len(remote_version) == 2)
1803
    _ErrorIf(test, constants.CV_ENODERPC, node,
1804
             "connection to node returned invalid data")
1805
    if test:
1806
      return False
1807

    
1808
    test = local_version != remote_version[0]
1809
    _ErrorIf(test, constants.CV_ENODEVERSION, node,
1810
             "incompatible protocol versions: master %s,"
1811
             " node %s", local_version, remote_version[0])
1812
    if test:
1813
      return False
1814

    
1815
    # node seems compatible, we can actually try to look into its results
1816

    
1817
    # full package version
1818
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1819
                  constants.CV_ENODEVERSION, node,
1820
                  "software version mismatch: master %s, node %s",
1821
                  constants.RELEASE_VERSION, remote_version[1],
1822
                  code=self.ETYPE_WARNING)
1823

    
1824
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1825
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1826
      for hv_name, hv_result in hyp_result.iteritems():
1827
        test = hv_result is not None
1828
        _ErrorIf(test, constants.CV_ENODEHV, node,
1829
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1830

    
1831
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1832
    if ninfo.vm_capable and isinstance(hvp_result, list):
1833
      for item, hv_name, hv_result in hvp_result:
1834
        _ErrorIf(True, constants.CV_ENODEHV, node,
1835
                 "hypervisor %s parameter verify failure (source %s): %s",
1836
                 hv_name, item, hv_result)
1837

    
1838
    test = nresult.get(constants.NV_NODESETUP,
1839
                       ["Missing NODESETUP results"])
1840
    _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1841
             "; ".join(test))
1842

    
1843
    return True
1844

    
1845
  def _VerifyNodeTime(self, ninfo, nresult,
1846
                      nvinfo_starttime, nvinfo_endtime):
1847
    """Check the node time.
1848

1849
    @type ninfo: L{objects.Node}
1850
    @param ninfo: the node to check
1851
    @param nresult: the remote results for the node
1852
    @param nvinfo_starttime: the start time of the RPC call
1853
    @param nvinfo_endtime: the end time of the RPC call
1854

1855
    """
1856
    node = ninfo.name
1857
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1858

    
1859
    ntime = nresult.get(constants.NV_TIME, None)
1860
    try:
1861
      ntime_merged = utils.MergeTime(ntime)
1862
    except (ValueError, TypeError):
1863
      _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1864
      return
1865

    
1866
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1867
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1868
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1869
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1870
    else:
1871
      ntime_diff = None
1872

    
1873
    _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1874
             "Node time diverges by at least %s from master node time",
1875
             ntime_diff)
1876

    
1877
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1878
    """Check the node LVM results.
1879

1880
    @type ninfo: L{objects.Node}
1881
    @param ninfo: the node to check
1882
    @param nresult: the remote results for the node
1883
    @param vg_name: the configured VG name
1884

1885
    """
1886
    if vg_name is None:
1887
      return
1888

    
1889
    node = ninfo.name
1890
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1891

    
1892
    # checks vg existence and size > 20G
1893
    vglist = nresult.get(constants.NV_VGLIST, None)
1894
    test = not vglist
1895
    _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1896
    if not test:
1897
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1898
                                            constants.MIN_VG_SIZE)
1899
      _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1900

    
1901
    # check pv names
1902
    pvlist = nresult.get(constants.NV_PVLIST, None)
1903
    test = pvlist is None
1904
    _ErrorIf(test, constants.CV_ENODELVM, node, "Can't get PV list from node")
1905
    if not test:
1906
      # check that ':' is not present in PV names, since it's a
1907
      # special character for lvcreate (denotes the range of PEs to
1908
      # use on the PV)
1909
      for _, pvname, owner_vg in pvlist:
1910
        test = ":" in pvname
1911
        _ErrorIf(test, constants.CV_ENODELVM, node,
1912
                 "Invalid character ':' in PV '%s' of VG '%s'",
1913
                 pvname, owner_vg)
1914

    
1915
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1916
    """Check the node bridges.
1917

1918
    @type ninfo: L{objects.Node}
1919
    @param ninfo: the node to check
1920
    @param nresult: the remote results for the node
1921
    @param bridges: the expected list of bridges
1922

1923
    """
1924
    if not bridges:
1925
      return
1926

    
1927
    node = ninfo.name
1928
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1929

    
1930
    missing = nresult.get(constants.NV_BRIDGES, None)
1931
    test = not isinstance(missing, list)
1932
    _ErrorIf(test, constants.CV_ENODENET, node,
1933
             "did not return valid bridge information")
1934
    if not test:
1935
      _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1936
               "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1937

    
1938
  def _VerifyNodeNetwork(self, ninfo, nresult):
1939
    """Check the node network connectivity results.
1940

1941
    @type ninfo: L{objects.Node}
1942
    @param ninfo: the node to check
1943
    @param nresult: the remote results for the node
1944

1945
    """
1946
    node = ninfo.name
1947
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1948

    
1949
    test = constants.NV_NODELIST not in nresult
1950
    _ErrorIf(test, constants.CV_ENODESSH, node,
1951
             "node hasn't returned node ssh connectivity data")
1952
    if not test:
1953
      if nresult[constants.NV_NODELIST]:
1954
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1955
          _ErrorIf(True, constants.CV_ENODESSH, node,
1956
                   "ssh communication with node '%s': %s", a_node, a_msg)
1957

    
1958
    test = constants.NV_NODENETTEST not in nresult
1959
    _ErrorIf(test, constants.CV_ENODENET, node,
1960
             "node hasn't returned node tcp connectivity data")
1961
    if not test:
1962
      if nresult[constants.NV_NODENETTEST]:
1963
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1964
        for anode in nlist:
1965
          _ErrorIf(True, constants.CV_ENODENET, node,
1966
                   "tcp communication with node '%s': %s",
1967
                   anode, nresult[constants.NV_NODENETTEST][anode])
1968

    
1969
    test = constants.NV_MASTERIP not in nresult
1970
    _ErrorIf(test, constants.CV_ENODENET, node,
1971
             "node hasn't returned node master IP reachability data")
1972
    if not test:
1973
      if not nresult[constants.NV_MASTERIP]:
1974
        if node == self.master_node:
1975
          msg = "the master node cannot reach the master IP (not configured?)"
1976
        else:
1977
          msg = "cannot reach the master IP"
1978
        _ErrorIf(True, constants.CV_ENODENET, node, msg)
1979

    
1980
  def _VerifyInstance(self, instance, instanceconfig, node_image,
1981
                      diskstatus):
1982
    """Verify an instance.
1983

1984
    This function checks to see if the required block devices are
1985
    available on the instance's node.
1986

1987
    """
1988
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1989
    node_current = instanceconfig.primary_node
1990

    
1991
    node_vol_should = {}
1992
    instanceconfig.MapLVsByNode(node_vol_should)
1993

    
1994
    for node in node_vol_should:
1995
      n_img = node_image[node]
1996
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1997
        # ignore missing volumes on offline or broken nodes
1998
        continue
1999
      for volume in node_vol_should[node]:
2000
        test = volume not in n_img.volumes
2001
        _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
2002
                 "volume %s missing on node %s", volume, node)
2003

    
2004
    if instanceconfig.admin_up:
2005
      pri_img = node_image[node_current]
2006
      test = instance not in pri_img.instances and not pri_img.offline
2007
      _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
2008
               "instance not running on its primary node %s",
2009
               node_current)
2010

    
2011
    diskdata = [(nname, success, status, idx)
2012
                for (nname, disks) in diskstatus.items()
2013
                for idx, (success, status) in enumerate(disks)]
2014

    
2015
    for nname, success, bdev_status, idx in diskdata:
2016
      # the 'ghost node' construction in Exec() ensures that we have a
2017
      # node here
2018
      snode = node_image[nname]
2019
      bad_snode = snode.ghost or snode.offline
2020
      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
2021
               constants.CV_EINSTANCEFAULTYDISK, instance,
2022
               "couldn't retrieve status for disk/%s on %s: %s",
2023
               idx, nname, bdev_status)
2024
      _ErrorIf((instanceconfig.admin_up and success and
2025
                bdev_status.ldisk_status == constants.LDS_FAULTY),
2026
               constants.CV_EINSTANCEFAULTYDISK, instance,
2027
               "disk/%s on %s is faulty", idx, nname)
2028

    
2029
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2030
    """Verify if there are any unknown volumes in the cluster.
2031

2032
    The .os, .swap and backup volumes are ignored. All other volumes are
2033
    reported as unknown.
2034

2035
    @type reserved: L{ganeti.utils.FieldSet}
2036
    @param reserved: a FieldSet of reserved volume names
2037

2038
    """
2039
    for node, n_img in node_image.items():
2040
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2041
        # skip non-healthy nodes
2042
        continue
2043
      for volume in n_img.volumes:
2044
        test = ((node not in node_vol_should or
2045
                volume not in node_vol_should[node]) and
2046
                not reserved.Matches(volume))
2047
        self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
2048
                      "volume %s is unknown", volume)
2049

    
2050
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
2051
    """Verify N+1 Memory Resilience.
2052

2053
    Check that if one single node dies we can still start all the
2054
    instances it was primary for.
2055

2056
    """
2057
    cluster_info = self.cfg.GetClusterInfo()
2058
    for node, n_img in node_image.items():
2059
      # This code checks that every node which is now listed as
2060
      # secondary has enough memory to host all instances it is
2061
      # supposed to should a single other node in the cluster fail.
2062
      # FIXME: not ready for failover to an arbitrary node
2063
      # FIXME: does not support file-backed instances
2064
      # WARNING: we currently take into account down instances as well
2065
      # as up ones, considering that even if they're down someone
2066
      # might want to start them even in the event of a node failure.
2067
      if n_img.offline:
2068
        # we're skipping offline nodes from the N+1 warning, since
2069
        # most likely we don't have good memory infromation from them;
2070
        # we already list instances living on such nodes, and that's
2071
        # enough warning
2072
        continue
2073
      for prinode, instances in n_img.sbp.items():
2074
        needed_mem = 0
2075
        for instance in instances:
2076
          bep = cluster_info.FillBE(instance_cfg[instance])
2077
          if bep[constants.BE_AUTO_BALANCE]:
2078
            needed_mem += bep[constants.BE_MEMORY]
2079
        test = n_img.mfree < needed_mem
2080
        self._ErrorIf(test, constants.CV_ENODEN1, node,
2081
                      "not enough memory to accomodate instance failovers"
2082
                      " should node %s fail (%dMiB needed, %dMiB available)",
2083
                      prinode, needed_mem, n_img.mfree)
2084

    
2085
  @classmethod
2086
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
2087
                   (files_all, files_opt, files_mc, files_vm)):
2088
    """Verifies file checksums collected from all nodes.
2089

2090
    @param errorif: Callback for reporting errors
2091
    @param nodeinfo: List of L{objects.Node} objects
2092
    @param master_node: Name of master node
2093
    @param all_nvinfo: RPC results
2094

2095
    """
2096
    # Define functions determining which nodes to consider for a file
2097
    files2nodefn = [
2098
      (files_all, None),
2099
      (files_mc, lambda node: (node.master_candidate or
2100
                               node.name == master_node)),
2101
      (files_vm, lambda node: node.vm_capable),
2102
      ]
2103

    
2104
    # Build mapping from filename to list of nodes which should have the file
2105
    nodefiles = {}
2106
    for (files, fn) in files2nodefn:
2107
      if fn is None:
2108
        filenodes = nodeinfo
2109
      else:
2110
        filenodes = filter(fn, nodeinfo)
2111
      nodefiles.update((filename,
2112
                        frozenset(map(operator.attrgetter("name"), filenodes)))
2113
                       for filename in files)
2114

    
2115
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2116

    
2117
    fileinfo = dict((filename, {}) for filename in nodefiles)
2118
    ignore_nodes = set()
2119

    
2120
    for node in nodeinfo:
2121
      if node.offline:
2122
        ignore_nodes.add(node.name)
2123
        continue
2124

    
2125
      nresult = all_nvinfo[node.name]
2126

    
2127
      if nresult.fail_msg or not nresult.payload:
2128
        node_files = None
2129
      else:
2130
        node_files = nresult.payload.get(constants.NV_FILELIST, None)
2131

    
2132
      test = not (node_files and isinstance(node_files, dict))
2133
      errorif(test, constants.CV_ENODEFILECHECK, node.name,
2134
              "Node did not return file checksum data")
2135
      if test:
2136
        ignore_nodes.add(node.name)
2137
        continue
2138

    
2139
      # Build per-checksum mapping from filename to nodes having it
2140
      for (filename, checksum) in node_files.items():
2141
        assert filename in nodefiles
2142
        fileinfo[filename].setdefault(checksum, set()).add(node.name)
2143

    
2144
    for (filename, checksums) in fileinfo.items():
2145
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2146

    
2147
      # Nodes having the file
2148
      with_file = frozenset(node_name
2149
                            for nodes in fileinfo[filename].values()
2150
                            for node_name in nodes) - ignore_nodes
2151

    
2152
      expected_nodes = nodefiles[filename] - ignore_nodes
2153

    
2154
      # Nodes missing file
2155
      missing_file = expected_nodes - with_file
2156

    
2157
      if filename in files_opt:
2158
        # All or no nodes
2159
        errorif(missing_file and missing_file != expected_nodes,
2160
                constants.CV_ECLUSTERFILECHECK, None,
2161
                "File %s is optional, but it must exist on all or no"
2162
                " nodes (not found on %s)",
2163
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2164
      else:
2165
        errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2166
                "File %s is missing from node(s) %s", filename,
2167
                utils.CommaJoin(utils.NiceSort(missing_file)))
2168

    
2169
        # Warn if a node has a file it shouldn't
2170
        unexpected = with_file - expected_nodes
2171
        errorif(unexpected,
2172
                constants.CV_ECLUSTERFILECHECK, None,
2173
                "File %s should not exist on node(s) %s",
2174
                filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2175

    
2176
      # See if there are multiple versions of the file
2177
      test = len(checksums) > 1
2178
      if test:
2179
        variants = ["variant %s on %s" %
2180
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2181
                    for (idx, (checksum, nodes)) in
2182
                      enumerate(sorted(checksums.items()))]
2183
      else:
2184
        variants = []
2185

    
2186
      errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2187
              "File %s found with %s different checksums (%s)",
2188
              filename, len(checksums), "; ".join(variants))
2189

    
2190
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2191
                      drbd_map):
2192
    """Verifies and the node DRBD status.
2193

2194
    @type ninfo: L{objects.Node}
2195
    @param ninfo: the node to check
2196
    @param nresult: the remote results for the node
2197
    @param instanceinfo: the dict of instances
2198
    @param drbd_helper: the configured DRBD usermode helper
2199
    @param drbd_map: the DRBD map as returned by
2200
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2201

2202
    """
2203
    node = ninfo.name
2204
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2205

    
2206
    if drbd_helper:
2207
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2208
      test = (helper_result == None)
2209
      _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2210
               "no drbd usermode helper returned")
2211
      if helper_result:
2212
        status, payload = helper_result
2213
        test = not status
2214
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2215
                 "drbd usermode helper check unsuccessful: %s", payload)
2216
        test = status and (payload != drbd_helper)
2217
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2218
                 "wrong drbd usermode helper: %s", payload)
2219

    
2220
    # compute the DRBD minors
2221
    node_drbd = {}
2222
    for minor, instance in drbd_map[node].items():
2223
      test = instance not in instanceinfo
2224
      _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2225
               "ghost instance '%s' in temporary DRBD map", instance)
2226
        # ghost instance should not be running, but otherwise we
2227
        # don't give double warnings (both ghost instance and
2228
        # unallocated minor in use)
2229
      if test:
2230
        node_drbd[minor] = (instance, False)
2231
      else:
2232
        instance = instanceinfo[instance]
2233
        node_drbd[minor] = (instance.name, instance.admin_up)
2234

    
2235
    # and now check them
2236
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2237
    test = not isinstance(used_minors, (tuple, list))
2238
    _ErrorIf(test, constants.CV_ENODEDRBD, node,
2239
             "cannot parse drbd status file: %s", str(used_minors))
2240
    if test:
2241
      # we cannot check drbd status
2242
      return
2243

    
2244
    for minor, (iname, must_exist) in node_drbd.items():
2245
      test = minor not in used_minors and must_exist
2246
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2247
               "drbd minor %d of instance %s is not active", minor, iname)
2248
    for minor in used_minors:
2249
      test = minor not in node_drbd
2250
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2251
               "unallocated drbd minor %d is in use", minor)
2252

    
2253
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2254
    """Builds the node OS structures.
2255

2256
    @type ninfo: L{objects.Node}
2257
    @param ninfo: the node to check
2258
    @param nresult: the remote results for the node
2259
    @param nimg: the node image object
2260

2261
    """
2262
    node = ninfo.name
2263
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2264

    
2265
    remote_os = nresult.get(constants.NV_OSLIST, None)
2266
    test = (not isinstance(remote_os, list) or
2267
            not compat.all(isinstance(v, list) and len(v) == 7
2268
                           for v in remote_os))
2269

    
2270
    _ErrorIf(test, constants.CV_ENODEOS, node,
2271
             "node hasn't returned valid OS data")
2272

    
2273
    nimg.os_fail = test
2274

    
2275
    if test:
2276
      return
2277

    
2278
    os_dict = {}
2279

    
2280
    for (name, os_path, status, diagnose,
2281
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2282

    
2283
      if name not in os_dict:
2284
        os_dict[name] = []
2285

    
2286
      # parameters is a list of lists instead of list of tuples due to
2287
      # JSON lacking a real tuple type, fix it:
2288
      parameters = [tuple(v) for v in parameters]
2289
      os_dict[name].append((os_path, status, diagnose,
2290
                            set(variants), set(parameters), set(api_ver)))
2291

    
2292
    nimg.oslist = os_dict
2293

    
2294
  def _VerifyNodeOS(self, ninfo, nimg, base):
2295
    """Verifies the node OS list.
2296

2297
    @type ninfo: L{objects.Node}
2298
    @param ninfo: the node to check
2299
    @param nimg: the node image object
2300
    @param base: the 'template' node we match against (e.g. from the master)
2301

2302
    """
2303
    node = ninfo.name
2304
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2305

    
2306
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2307

    
2308
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2309
    for os_name, os_data in nimg.oslist.items():
2310
      assert os_data, "Empty OS status for OS %s?!" % os_name
2311
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2312
      _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2313
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2314
      _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2315
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2316
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2317
      # comparisons with the 'base' image
2318
      test = os_name not in base.oslist
2319
      _ErrorIf(test, constants.CV_ENODEOS, node,
2320
               "Extra OS %s not present on reference node (%s)",
2321
               os_name, base.name)
2322
      if test:
2323
        continue
2324
      assert base.oslist[os_name], "Base node has empty OS status?"
2325
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2326
      if not b_status:
2327
        # base OS is invalid, skipping
2328
        continue
2329
      for kind, a, b in [("API version", f_api, b_api),
2330
                         ("variants list", f_var, b_var),
2331
                         ("parameters", beautify_params(f_param),
2332
                          beautify_params(b_param))]:
2333
        _ErrorIf(a != b, constants.CV_ENODEOS, node,
2334
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2335
                 kind, os_name, base.name,
2336
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2337

    
2338
    # check any missing OSes
2339
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2340
    _ErrorIf(missing, constants.CV_ENODEOS, node,
2341
             "OSes present on reference node %s but missing on this node: %s",
2342
             base.name, utils.CommaJoin(missing))
2343

    
2344
  def _VerifyOob(self, ninfo, nresult):
2345
    """Verifies out of band functionality of a node.
2346

2347
    @type ninfo: L{objects.Node}
2348
    @param ninfo: the node to check
2349
    @param nresult: the remote results for the node
2350

2351
    """
2352
    node = ninfo.name
2353
    # We just have to verify the paths on master and/or master candidates
2354
    # as the oob helper is invoked on the master
2355
    if ((ninfo.master_candidate or ninfo.master_capable) and
2356
        constants.NV_OOB_PATHS in nresult):
2357
      for path_result in nresult[constants.NV_OOB_PATHS]:
2358
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2359

    
2360
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2361
    """Verifies and updates the node volume data.
2362

2363
    This function will update a L{NodeImage}'s internal structures
2364
    with data from the remote call.
2365

2366
    @type ninfo: L{objects.Node}
2367
    @param ninfo: the node to check
2368
    @param nresult: the remote results for the node
2369
    @param nimg: the node image object
2370
    @param vg_name: the configured VG name
2371

2372
    """
2373
    node = ninfo.name
2374
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2375

    
2376
    nimg.lvm_fail = True
2377
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2378
    if vg_name is None:
2379
      pass
2380
    elif isinstance(lvdata, basestring):
2381
      _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2382
               utils.SafeEncode(lvdata))
2383
    elif not isinstance(lvdata, dict):
2384
      _ErrorIf(True, constants.CV_ENODELVM, node,
2385
               "rpc call to node failed (lvlist)")
2386
    else:
2387
      nimg.volumes = lvdata
2388
      nimg.lvm_fail = False
2389

    
2390
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2391
    """Verifies and updates the node instance list.
2392

2393
    If the listing was successful, then updates this node's instance
2394
    list. Otherwise, it marks the RPC call as failed for the instance
2395
    list key.
2396

2397
    @type ninfo: L{objects.Node}
2398
    @param ninfo: the node to check
2399
    @param nresult: the remote results for the node
2400
    @param nimg: the node image object
2401

2402
    """
2403
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2404
    test = not isinstance(idata, list)
2405
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2406
                  "rpc call to node failed (instancelist): %s",
2407
                  utils.SafeEncode(str(idata)))
2408
    if test:
2409
      nimg.hyp_fail = True
2410
    else:
2411
      nimg.instances = idata
2412

    
2413
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2414
    """Verifies and computes a node information map
2415

2416
    @type ninfo: L{objects.Node}
2417
    @param ninfo: the node to check
2418
    @param nresult: the remote results for the node
2419
    @param nimg: the node image object
2420
    @param vg_name: the configured VG name
2421

2422
    """
2423
    node = ninfo.name
2424
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2425

    
2426
    # try to read free memory (from the hypervisor)
2427
    hv_info = nresult.get(constants.NV_HVINFO, None)
2428
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2429
    _ErrorIf(test, constants.CV_ENODEHV, node,
2430
             "rpc call to node failed (hvinfo)")
2431
    if not test:
2432
      try:
2433
        nimg.mfree = int(hv_info["memory_free"])
2434
      except (ValueError, TypeError):
2435
        _ErrorIf(True, constants.CV_ENODERPC, node,
2436
                 "node returned invalid nodeinfo, check hypervisor")
2437

    
2438
    # FIXME: devise a free space model for file based instances as well
2439
    if vg_name is not None:
2440
      test = (constants.NV_VGLIST not in nresult or
2441
              vg_name not in nresult[constants.NV_VGLIST])
2442
      _ErrorIf(test, constants.CV_ENODELVM, node,
2443
               "node didn't return data for the volume group '%s'"
2444
               " - it is either missing or broken", vg_name)
2445
      if not test:
2446
        try:
2447
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2448
        except (ValueError, TypeError):
2449
          _ErrorIf(True, constants.CV_ENODERPC, node,
2450
                   "node returned invalid LVM info, check LVM status")
2451

    
2452
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2453
    """Gets per-disk status information for all instances.
2454

2455
    @type nodelist: list of strings
2456
    @param nodelist: Node names
2457
    @type node_image: dict of (name, L{objects.Node})
2458
    @param node_image: Node objects
2459
    @type instanceinfo: dict of (name, L{objects.Instance})
2460
    @param instanceinfo: Instance objects
2461
    @rtype: {instance: {node: [(succes, payload)]}}
2462
    @return: a dictionary of per-instance dictionaries with nodes as
2463
        keys and disk information as values; the disk information is a
2464
        list of tuples (success, payload)
2465

2466
    """
2467
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2468

    
2469
    node_disks = {}
2470
    node_disks_devonly = {}
2471
    diskless_instances = set()
2472
    diskless = constants.DT_DISKLESS
2473

    
2474
    for nname in nodelist:
2475
      node_instances = list(itertools.chain(node_image[nname].pinst,
2476
                                            node_image[nname].sinst))
2477
      diskless_instances.update(inst for inst in node_instances
2478
                                if instanceinfo[inst].disk_template == diskless)
2479
      disks = [(inst, disk)
2480
               for inst in node_instances
2481
               for disk in instanceinfo[inst].disks]
2482

    
2483
      if not disks:
2484
        # No need to collect data
2485
        continue
2486

    
2487
      node_disks[nname] = disks
2488

    
2489
      # Creating copies as SetDiskID below will modify the objects and that can
2490
      # lead to incorrect data returned from nodes
2491
      devonly = [dev.Copy() for (_, dev) in disks]
2492

    
2493
      for dev in devonly:
2494
        self.cfg.SetDiskID(dev, nname)
2495

    
2496
      node_disks_devonly[nname] = devonly
2497

    
2498
    assert len(node_disks) == len(node_disks_devonly)
2499

    
2500
    # Collect data from all nodes with disks
2501
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2502
                                                          node_disks_devonly)
2503

    
2504
    assert len(result) == len(node_disks)
2505

    
2506
    instdisk = {}
2507

    
2508
    for (nname, nres) in result.items():
2509
      disks = node_disks[nname]
2510

    
2511
      if nres.offline:
2512
        # No data from this node
2513
        data = len(disks) * [(False, "node offline")]
2514
      else:
2515
        msg = nres.fail_msg
2516
        _ErrorIf(msg, constants.CV_ENODERPC, nname,
2517
                 "while getting disk information: %s", msg)
2518
        if msg:
2519
          # No data from this node
2520
          data = len(disks) * [(False, msg)]
2521
        else:
2522
          data = []
2523
          for idx, i in enumerate(nres.payload):
2524
            if isinstance(i, (tuple, list)) and len(i) == 2:
2525
              data.append(i)
2526
            else:
2527
              logging.warning("Invalid result from node %s, entry %d: %s",
2528
                              nname, idx, i)
2529
              data.append((False, "Invalid result from the remote node"))
2530

    
2531
      for ((inst, _), status) in zip(disks, data):
2532
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2533

    
2534
    # Add empty entries for diskless instances.
2535
    for inst in diskless_instances:
2536
      assert inst not in instdisk
2537
      instdisk[inst] = {}
2538

    
2539
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2540
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2541
                      compat.all(isinstance(s, (tuple, list)) and
2542
                                 len(s) == 2 for s in statuses)
2543
                      for inst, nnames in instdisk.items()
2544
                      for nname, statuses in nnames.items())
2545
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2546

    
2547
    return instdisk
2548

    
2549
  @staticmethod
2550
  def _SshNodeSelector(group_uuid, all_nodes):
2551
    """Create endless iterators for all potential SSH check hosts.
2552

2553
    """
2554
    nodes = [node for node in all_nodes
2555
             if (node.group != group_uuid and
2556
                 not node.offline)]
2557
    keyfunc = operator.attrgetter("group")
2558

    
2559
    return map(itertools.cycle,
2560
               [sorted(map(operator.attrgetter("name"), names))
2561
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2562
                                                  keyfunc)])
2563

    
2564
  @classmethod
2565
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2566
    """Choose which nodes should talk to which other nodes.
2567

2568
    We will make nodes contact all nodes in their group, and one node from
2569
    every other group.
2570

2571
    @warning: This algorithm has a known issue if one node group is much
2572
      smaller than others (e.g. just one node). In such a case all other
2573
      nodes will talk to the single node.
2574

2575
    """
2576
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2577
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2578

    
2579
    return (online_nodes,
2580
            dict((name, sorted([i.next() for i in sel]))
2581
                 for name in online_nodes))
2582

    
2583
  def BuildHooksEnv(self):
2584
    """Build hooks env.
2585

2586
    Cluster-Verify hooks just ran in the post phase and their failure makes
2587
    the output be logged in the verify output and the verification to fail.
2588

2589
    """
2590
    env = {
2591
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2592
      }
2593

    
2594
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2595
               for node in self.my_node_info.values())
2596

    
2597
    return env
2598

    
2599
  def BuildHooksNodes(self):
2600
    """Build hooks nodes.
2601

2602
    """
2603
    return ([], self.my_node_names)
2604

    
2605
  def Exec(self, feedback_fn):
2606
    """Verify integrity of the node group, performing various test on nodes.
2607

2608
    """
2609
    # This method has too many local variables. pylint: disable=R0914
2610
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2611

    
2612
    if not self.my_node_names:
2613
      # empty node group
2614
      feedback_fn("* Empty node group, skipping verification")
2615
      return True
2616

    
2617
    self.bad = False
2618
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2619
    verbose = self.op.verbose
2620
    self._feedback_fn = feedback_fn
2621

    
2622
    vg_name = self.cfg.GetVGName()
2623
    drbd_helper = self.cfg.GetDRBDHelper()
2624
    cluster = self.cfg.GetClusterInfo()
2625
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2626
    hypervisors = cluster.enabled_hypervisors
2627
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2628

    
2629
    i_non_redundant = [] # Non redundant instances
2630
    i_non_a_balanced = [] # Non auto-balanced instances
2631
    n_offline = 0 # Count of offline nodes
2632
    n_drained = 0 # Count of nodes being drained
2633
    node_vol_should = {}
2634

    
2635
    # FIXME: verify OS list
2636

    
2637
    # File verification
2638
    filemap = _ComputeAncillaryFiles(cluster, False)
2639

    
2640
    # do local checksums
2641
    master_node = self.master_node = self.cfg.GetMasterNode()
2642
    master_ip = self.cfg.GetMasterIP()
2643

    
2644
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2645

    
2646
    node_verify_param = {
2647
      constants.NV_FILELIST:
2648
        utils.UniqueSequence(filename
2649
                             for files in filemap
2650
                             for filename in files),
2651
      constants.NV_NODELIST:
2652
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2653
                                  self.all_node_info.values()),
2654
      constants.NV_HYPERVISOR: hypervisors,
2655
      constants.NV_HVPARAMS:
2656
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2657
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2658
                                 for node in node_data_list
2659
                                 if not node.offline],
2660
      constants.NV_INSTANCELIST: hypervisors,
2661
      constants.NV_VERSION: None,
2662
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2663
      constants.NV_NODESETUP: None,
2664
      constants.NV_TIME: None,
2665
      constants.NV_MASTERIP: (master_node, master_ip),
2666
      constants.NV_OSLIST: None,
2667
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2668
      }
2669

    
2670
    if vg_name is not None:
2671
      node_verify_param[constants.NV_VGLIST] = None
2672
      node_verify_param[constants.NV_LVLIST] = vg_name
2673
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2674
      node_verify_param[constants.NV_DRBDLIST] = None
2675

    
2676
    if drbd_helper:
2677
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2678

    
2679
    # bridge checks
2680
    # FIXME: this needs to be changed per node-group, not cluster-wide
2681
    bridges = set()
2682
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2683
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2684
      bridges.add(default_nicpp[constants.NIC_LINK])
2685
    for instance in self.my_inst_info.values():
2686
      for nic in instance.nics:
2687
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2688
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2689
          bridges.add(full_nic[constants.NIC_LINK])
2690

    
2691
    if bridges:
2692
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2693

    
2694
    # Build our expected cluster state
2695
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2696
                                                 name=node.name,
2697
                                                 vm_capable=node.vm_capable))
2698
                      for node in node_data_list)
2699

    
2700
    # Gather OOB paths
2701
    oob_paths = []
2702
    for node in self.all_node_info.values():
2703
      path = _SupportsOob(self.cfg, node)
2704
      if path and path not in oob_paths:
2705
        oob_paths.append(path)
2706

    
2707
    if oob_paths:
2708
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2709

    
2710
    for instance in self.my_inst_names:
2711
      inst_config = self.my_inst_info[instance]
2712

    
2713
      for nname in inst_config.all_nodes:
2714
        if nname not in node_image:
2715
          gnode = self.NodeImage(name=nname)
2716
          gnode.ghost = (nname not in self.all_node_info)
2717
          node_image[nname] = gnode
2718

    
2719
      inst_config.MapLVsByNode(node_vol_should)
2720

    
2721
      pnode = inst_config.primary_node
2722
      node_image[pnode].pinst.append(instance)
2723

    
2724
      for snode in inst_config.secondary_nodes:
2725
        nimg = node_image[snode]
2726
        nimg.sinst.append(instance)
2727
        if pnode not in nimg.sbp:
2728
          nimg.sbp[pnode] = []
2729
        nimg.sbp[pnode].append(instance)
2730

    
2731
    # At this point, we have the in-memory data structures complete,
2732
    # except for the runtime information, which we'll gather next
2733

    
2734
    # Due to the way our RPC system works, exact response times cannot be
2735
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2736
    # time before and after executing the request, we can at least have a time
2737
    # window.
2738
    nvinfo_starttime = time.time()
2739
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2740
                                           node_verify_param,
2741
                                           self.cfg.GetClusterName())
2742
    nvinfo_endtime = time.time()
2743

    
2744
    if self.extra_lv_nodes and vg_name is not None:
2745
      extra_lv_nvinfo = \
2746
          self.rpc.call_node_verify(self.extra_lv_nodes,
2747
                                    {constants.NV_LVLIST: vg_name},
2748
                                    self.cfg.GetClusterName())
2749
    else:
2750
      extra_lv_nvinfo = {}
2751

    
2752
    all_drbd_map = self.cfg.ComputeDRBDMap()
2753

    
2754
    feedback_fn("* Gathering disk information (%s nodes)" %
2755
                len(self.my_node_names))
2756
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2757
                                     self.my_inst_info)
2758

    
2759
    feedback_fn("* Verifying configuration file consistency")
2760

    
2761
    # If not all nodes are being checked, we need to make sure the master node
2762
    # and a non-checked vm_capable node are in the list.
2763
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2764
    if absent_nodes:
2765
      vf_nvinfo = all_nvinfo.copy()
2766
      vf_node_info = list(self.my_node_info.values())
2767
      additional_nodes = []
2768
      if master_node not in self.my_node_info:
2769
        additional_nodes.append(master_node)
2770
        vf_node_info.append(self.all_node_info[master_node])
2771
      # Add the first vm_capable node we find which is not included
2772
      for node in absent_nodes:
2773
        nodeinfo = self.all_node_info[node]
2774
        if nodeinfo.vm_capable and not nodeinfo.offline:
2775
          additional_nodes.append(node)
2776
          vf_node_info.append(self.all_node_info[node])
2777
          break
2778
      key = constants.NV_FILELIST
2779
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2780
                                                 {key: node_verify_param[key]},
2781
                                                 self.cfg.GetClusterName()))
2782
    else:
2783
      vf_nvinfo = all_nvinfo
2784
      vf_node_info = self.my_node_info.values()
2785

    
2786
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2787

    
2788
    feedback_fn("* Verifying node status")
2789

    
2790
    refos_img = None
2791

    
2792
    for node_i in node_data_list:
2793
      node = node_i.name
2794
      nimg = node_image[node]
2795

    
2796
      if node_i.offline:
2797
        if verbose:
2798
          feedback_fn("* Skipping offline node %s" % (node,))
2799
        n_offline += 1
2800
        continue
2801

    
2802
      if node == master_node:
2803
        ntype = "master"
2804
      elif node_i.master_candidate:
2805
        ntype = "master candidate"
2806
      elif node_i.drained:
2807
        ntype = "drained"
2808
        n_drained += 1
2809
      else:
2810
        ntype = "regular"
2811
      if verbose:
2812
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2813

    
2814
      msg = all_nvinfo[node].fail_msg
2815
      _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2816
               msg)
2817
      if msg:
2818
        nimg.rpc_fail = True
2819
        continue
2820

    
2821
      nresult = all_nvinfo[node].payload
2822

    
2823
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2824
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2825
      self._VerifyNodeNetwork(node_i, nresult)
2826
      self._VerifyOob(node_i, nresult)
2827

    
2828
      if nimg.vm_capable:
2829
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2830
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2831
                             all_drbd_map)
2832

    
2833
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2834
        self._UpdateNodeInstances(node_i, nresult, nimg)
2835
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2836
        self._UpdateNodeOS(node_i, nresult, nimg)
2837

    
2838
        if not nimg.os_fail:
2839
          if refos_img is None:
2840
            refos_img = nimg
2841
          self._VerifyNodeOS(node_i, nimg, refos_img)
2842
        self._VerifyNodeBridges(node_i, nresult, bridges)
2843

    
2844
        # Check whether all running instancies are primary for the node. (This
2845
        # can no longer be done from _VerifyInstance below, since some of the
2846
        # wrong instances could be from other node groups.)
2847
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2848

    
2849
        for inst in non_primary_inst:
2850
          test = inst in self.all_inst_info
2851
          _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2852
                   "instance should not run on node %s", node_i.name)
2853
          _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2854
                   "node is running unknown instance %s", inst)
2855

    
2856
    for node, result in extra_lv_nvinfo.items():
2857
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2858
                              node_image[node], vg_name)
2859

    
2860
    feedback_fn("* Verifying instance status")
2861
    for instance in self.my_inst_names:
2862
      if verbose:
2863
        feedback_fn("* Verifying instance %s" % instance)
2864
      inst_config = self.my_inst_info[instance]
2865
      self._VerifyInstance(instance, inst_config, node_image,
2866
                           instdisk[instance])
2867
      inst_nodes_offline = []
2868

    
2869
      pnode = inst_config.primary_node
2870
      pnode_img = node_image[pnode]
2871
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2872
               constants.CV_ENODERPC, pnode, "instance %s, connection to"
2873
               " primary node failed", instance)
2874

    
2875
      _ErrorIf(inst_config.admin_up and pnode_img.offline,
2876
               constants.CV_EINSTANCEBADNODE, instance,
2877
               "instance is marked as running and lives on offline node %s",
2878
               inst_config.primary_node)
2879

    
2880
      # If the instance is non-redundant we cannot survive losing its primary
2881
      # node, so we are not N+1 compliant. On the other hand we have no disk
2882
      # templates with more than one secondary so that situation is not well
2883
      # supported either.
2884
      # FIXME: does not support file-backed instances
2885
      if not inst_config.secondary_nodes:
2886
        i_non_redundant.append(instance)
2887

    
2888
      _ErrorIf(len(inst_config.secondary_nodes) > 1,
2889
               constants.CV_EINSTANCELAYOUT,
2890
               instance, "instance has multiple secondary nodes: %s",
2891
               utils.CommaJoin(inst_config.secondary_nodes),
2892
               code=self.ETYPE_WARNING)
2893

    
2894
      if inst_config.disk_template in constants.DTS_INT_MIRROR:
2895
        pnode = inst_config.primary_node
2896
        instance_nodes = utils.NiceSort(inst_config.all_nodes)
2897
        instance_groups = {}
2898

    
2899
        for node in instance_nodes:
2900
          instance_groups.setdefault(self.all_node_info[node].group,
2901
                                     []).append(node)
2902

    
2903
        pretty_list = [
2904
          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
2905
          # Sort so that we always list the primary node first.
2906
          for group, nodes in sorted(instance_groups.items(),
2907
                                     key=lambda (_, nodes): pnode in nodes,
2908
                                     reverse=True)]
2909

    
2910
        self._ErrorIf(len(instance_groups) > 1,
2911
                      constants.CV_EINSTANCESPLITGROUPS,
2912
                      instance, "instance has primary and secondary nodes in"
2913
                      " different groups: %s", utils.CommaJoin(pretty_list),
2914
                      code=self.ETYPE_WARNING)
2915

    
2916
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2917
        i_non_a_balanced.append(instance)
2918

    
2919
      for snode in inst_config.secondary_nodes:
2920
        s_img = node_image[snode]
2921
        _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2922
                 snode, "instance %s, connection to secondary node failed",
2923
                 instance)
2924

    
2925
        if s_img.offline:
2926
          inst_nodes_offline.append(snode)
2927

    
2928
      # warn that the instance lives on offline nodes
2929
      _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
2930
               "instance has offline secondary node(s) %s",
2931
               utils.CommaJoin(inst_nodes_offline))
2932
      # ... or ghost/non-vm_capable nodes
2933
      for node in inst_config.all_nodes:
2934
        _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
2935
                 instance, "instance lives on ghost node %s", node)
2936
        _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
2937
                 instance, "instance lives on non-vm_capable node %s", node)
2938

    
2939
    feedback_fn("* Verifying orphan volumes")
2940
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2941

    
2942
    # We will get spurious "unknown volume" warnings if any node of this group
2943
    # is secondary for an instance whose primary is in another group. To avoid
2944
    # them, we find these instances and add their volumes to node_vol_should.
2945
    for inst in self.all_inst_info.values():
2946
      for secondary in inst.secondary_nodes:
2947
        if (secondary in self.my_node_info
2948
            and inst.name not in self.my_inst_info):
2949
          inst.MapLVsByNode(node_vol_should)
2950
          break
2951

    
2952
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2953

    
2954
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2955
      feedback_fn("* Verifying N+1 Memory redundancy")
2956
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2957

    
2958
    feedback_fn("* Other Notes")
2959
    if i_non_redundant:
2960
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2961
                  % len(i_non_redundant))
2962

    
2963
    if i_non_a_balanced:
2964
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2965
                  % len(i_non_a_balanced))
2966

    
2967
    if n_offline:
2968
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2969

    
2970
    if n_drained:
2971
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2972

    
2973
    return not self.bad
2974

    
2975
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2976
    """Analyze the post-hooks' result
2977

2978
    This method analyses the hook result, handles it, and sends some
2979
    nicely-formatted feedback back to the user.
2980

2981
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2982
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2983
    @param hooks_results: the results of the multi-node hooks rpc call
2984
    @param feedback_fn: function used send feedback back to the caller
2985
    @param lu_result: previous Exec result
2986
    @return: the new Exec result, based on the previous result
2987
        and hook results
2988

2989
    """
2990
    # We only really run POST phase hooks, only for non-empty groups,
2991
    # and are only interested in their results
2992
    if not self.my_node_names:
2993
      # empty node group
2994
      pass
2995
    elif phase == constants.HOOKS_PHASE_POST:
2996
      # Used to change hooks' output to proper indentation
2997
      feedback_fn("* Hooks Results")
2998
      assert hooks_results, "invalid result from hooks"
2999

    
3000
      for node_name in hooks_results:
3001
        res = hooks_results[node_name]
3002
        msg = res.fail_msg
3003
        test = msg and not res.offline
3004
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3005
                      "Communication failure in hooks execution: %s", msg)
3006
        if res.offline or msg:
3007
          # No need to investigate payload if node is offline or gave
3008
          # an error.
3009
          continue
3010
        for script, hkr, output in res.payload:
3011
          test = hkr == constants.HKR_FAIL
3012
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3013
                        "Script %s failed, output:", script)
3014
          if test:
3015
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3016
            feedback_fn("%s" % output)
3017
            lu_result = False
3018

    
3019
    return lu_result
3020

    
3021

    
3022
class LUClusterVerifyDisks(NoHooksLU):
3023
  """Verifies the cluster disks status.
3024

3025
  """
3026
  REQ_BGL = False
3027

    
3028
  def ExpandNames(self):
3029
    self.share_locks = _ShareAll()
3030
    self.needed_locks = {
3031
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3032
      }
3033

    
3034
  def Exec(self, feedback_fn):
3035
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3036

    
3037
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3038
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3039
                           for group in group_names])
3040

    
3041

    
3042
class LUGroupVerifyDisks(NoHooksLU):
3043
  """Verifies the status of all disks in a node group.
3044

3045
  """
3046
  REQ_BGL = False
3047

    
3048
  def ExpandNames(self):
3049
    # Raises errors.OpPrereqError on its own if group can't be found
3050
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
3051

    
3052
    self.share_locks = _ShareAll()
3053
    self.needed_locks = {
3054
      locking.LEVEL_INSTANCE: [],
3055
      locking.LEVEL_NODEGROUP: [],
3056
      locking.LEVEL_NODE: [],
3057
      }
3058

    
3059
  def DeclareLocks(self, level):
3060
    if level == locking.LEVEL_INSTANCE:
3061
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
3062

    
3063
      # Lock instances optimistically, needs verification once node and group
3064
      # locks have been acquired
3065
      self.needed_locks[locking.LEVEL_INSTANCE] = \
3066
        self.cfg.GetNodeGroupInstances(self.group_uuid)
3067

    
3068
    elif level == locking.LEVEL_NODEGROUP:
3069
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3070

    
3071
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
3072
        set([self.group_uuid] +
3073
            # Lock all groups used by instances optimistically; this requires
3074
            # going via the node before it's locked, requiring verification
3075
            # later on
3076
            [group_uuid
3077
             for instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
3078
             for group_uuid in self.cfg.GetInstanceNodeGroups(instance_name)])
3079

    
3080
    elif level == locking.LEVEL_NODE:
3081
      # This will only lock the nodes in the group to be verified which contain
3082
      # actual instances
3083
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3084
      self._LockInstancesNodes()
3085

    
3086
      # Lock all nodes in group to be verified
3087
      assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
3088
      member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
3089
      self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
3090

    
3091
  def CheckPrereq(self):
3092
    owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
3093
    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
3094
    owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
3095

    
3096
    assert self.group_uuid in owned_groups
3097

    
3098
    # Check if locked instances are still correct
3099
    _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
3100

    
3101
    # Get instance information
3102
    self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
3103

    
3104
    # Check if node groups for locked instances are still correct
3105
    for (instance_name, inst) in self.instances.items():
3106
      assert owned_nodes.issuperset(inst.all_nodes), \
3107
        "Instance %s's nodes changed while we kept the lock" % instance_name
3108

    
3109
      inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
3110
                                             owned_groups)
3111

    
3112
      assert self.group_uuid in inst_groups, \
3113
        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
3114

    
3115
  def Exec(self, feedback_fn):
3116
    """Verify integrity of cluster disks.
3117

3118
    @rtype: tuple of three items
3119
    @return: a tuple of (dict of node-to-node_error, list of instances
3120
        which need activate-disks, dict of instance: (node, volume) for
3121
        missing volumes
3122

3123
    """
3124
    res_nodes = {}
3125
    res_instances = set()
3126
    res_missing = {}
3127

    
3128
    nv_dict = _MapInstanceDisksToNodes([inst
3129
                                        for inst in self.instances.values()
3130
                                        if inst.admin_up])
3131

    
3132
    if nv_dict:
3133
      nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
3134
                             set(self.cfg.GetVmCapableNodeList()))
3135

    
3136
      node_lvs = self.rpc.call_lv_list(nodes, [])
3137

    
3138
      for (node, node_res) in node_lvs.items():
3139
        if node_res.offline:
3140
          continue
3141

    
3142
        msg = node_res.fail_msg
3143
        if msg:
3144
          logging.warning("Error enumerating LVs on node %s: %s", node, msg)
3145
          res_nodes[node] = msg
3146
          continue
3147

    
3148
        for lv_name, (_, _, lv_online) in node_res.payload.items():
3149
          inst = nv_dict.pop((node, lv_name), None)
3150
          if not (lv_online or inst is None):
3151
            res_instances.add(inst)
3152

    
3153
      # any leftover items in nv_dict are missing LVs, let's arrange the data
3154
      # better
3155
      for key, inst in nv_dict.iteritems():
3156
        res_missing.setdefault(inst, []).append(key)
3157

    
3158
    return (res_nodes, list(res_instances), res_missing)
3159

    
3160

    
3161
class LUClusterRepairDiskSizes(NoHooksLU):
3162
  """Verifies the cluster disks sizes.
3163

3164
  """
3165
  REQ_BGL = False
3166

    
3167
  def ExpandNames(self):
3168
    if self.op.instances:
3169
      self.wanted_names = _GetWantedInstances(self, self.op.instances)
3170
      self.needed_locks = {
3171
        locking.LEVEL_NODE: [],
3172
        locking.LEVEL_INSTANCE: self.wanted_names,
3173
        }
3174
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3175
    else:
3176
      self.wanted_names = None
3177
      self.needed_locks = {
3178
        locking.LEVEL_NODE: locking.ALL_SET,
3179
        locking.LEVEL_INSTANCE: locking.ALL_SET,
3180
        }
3181
    self.share_locks = _ShareAll()
3182

    
3183
  def DeclareLocks(self, level):
3184
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
3185
      self._LockInstancesNodes(primary_only=True)
3186

    
3187
  def CheckPrereq(self):
3188
    """Check prerequisites.
3189

3190
    This only checks the optional instance list against the existing names.
3191

3192
    """
3193
    if self.wanted_names is None:
3194
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
3195

    
3196
    self.wanted_instances = \
3197
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
3198

    
3199
  def _EnsureChildSizes(self, disk):
3200
    """Ensure children of the disk have the needed disk size.
3201

3202
    This is valid mainly for DRBD8 and fixes an issue where the
3203
    children have smaller disk size.
3204

3205
    @param disk: an L{ganeti.objects.Disk} object
3206

3207
    """
3208
    if disk.dev_type == constants.LD_DRBD8:
3209
      assert disk.children, "Empty children for DRBD8?"
3210
      fchild = disk.children[0]
3211
      mismatch = fchild.size < disk.size
3212
      if mismatch:
3213
        self.LogInfo("Child disk has size %d, parent %d, fixing",
3214
                     fchild.size, disk.size)
3215
        fchild.size = disk.size
3216

    
3217
      # and we recurse on this child only, not on the metadev
3218
      return self._EnsureChildSizes(fchild) or mismatch
3219
    else:
3220
      return False
3221

    
3222
  def Exec(self, feedback_fn):
3223
    """Verify the size of cluster disks.
3224

3225
    """
3226
    # TODO: check child disks too
3227
    # TODO: check differences in size between primary/secondary nodes
3228
    per_node_disks = {}
3229
    for instance in self.wanted_instances:
3230
      pnode = instance.primary_node
3231
      if pnode not in per_node_disks:
3232
        per_node_disks[pnode] = []
3233
      for idx, disk in enumerate(instance.disks):
3234
        per_node_disks[pnode].append((instance, idx, disk))
3235

    
3236
    changed = []
3237
    for node, dskl in per_node_disks.items():
3238
      newl = [v[2].Copy() for v in dskl]
3239
      for dsk in newl:
3240
        self.cfg.SetDiskID(dsk, node)
3241
      result = self.rpc.call_blockdev_getsize(node, newl)
3242
      if result.fail_msg:
3243
        self.LogWarning("Failure in blockdev_getsize call to node"
3244
                        " %s, ignoring", node)
3245
        continue
3246
      if len(result.payload) != len(dskl):
3247
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
3248
                        " result.payload=%s", node, len(dskl), result.payload)
3249
        self.LogWarning("Invalid result from node %s, ignoring node results",
3250
                        node)
3251
        continue
3252
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
3253
        if size is None:
3254
          self.LogWarning("Disk %d of instance %s did not return size"
3255
                          " information, ignoring", idx, instance.name)
3256
          continue
3257
        if not isinstance(size, (int, long)):
3258
          self.LogWarning("Disk %d of instance %s did not return valid"
3259
                          " size information, ignoring", idx, instance.name)
3260
          continue
3261
        size = size >> 20
3262
        if size != disk.size:
3263
          self.LogInfo("Disk %d of instance %s has mismatched size,"
3264
                       " correcting: recorded %d, actual %d", idx,
3265
                       instance.name, disk.size, size)
3266
          disk.size = size
3267
          self.cfg.Update(instance, feedback_fn)
3268
          changed.append((instance.name, idx, size))
3269
        if self._EnsureChildSizes(disk):
3270
          self.cfg.Update(instance, feedback_fn)
3271
          changed.append((instance.name, idx, disk.size))
3272
    return changed
3273

    
3274

    
3275
class LUClusterRename(LogicalUnit):
3276
  """Rename the cluster.
3277

3278
  """
3279
  HPATH = "cluster-rename"
3280
  HTYPE = constants.HTYPE_CLUSTER
3281

    
3282
  def BuildHooksEnv(self):
3283
    """Build hooks env.
3284

3285
    """
3286
    return {
3287
      "OP_TARGET": self.cfg.GetClusterName(),
3288
      "NEW_NAME": self.op.name,
3289
      }
3290

    
3291
  def BuildHooksNodes(self):
3292
    """Build hooks nodes.
3293

3294
    """
3295
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
3296

    
3297
  def CheckPrereq(self):
3298
    """Verify that the passed name is a valid one.
3299

3300
    """
3301
    hostname = netutils.GetHostname(name=self.op.name,
3302
                                    family=self.cfg.GetPrimaryIPFamily())
3303

    
3304
    new_name = hostname.name
3305
    self.ip = new_ip = hostname.ip
3306
    old_name = self.cfg.GetClusterName()
3307
    old_ip = self.cfg.GetMasterIP()
3308
    if new_name == old_name and new_ip == old_ip:
3309
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
3310
                                 " cluster has changed",
3311
                                 errors.ECODE_INVAL)
3312
    if new_ip != old_ip:
3313
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
3314
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
3315
                                   " reachable on the network" %
3316
                                   new_ip, errors.ECODE_NOTUNIQUE)
3317

    
3318
    self.op.name = new_name
3319

    
3320
  def Exec(self, feedback_fn):
3321
    """Rename the cluster.
3322

3323
    """
3324
    clustername = self.op.name
3325
    ip = self.ip
3326

    
3327
    # shutdown the master IP
3328
    master = self.cfg.GetMasterNode()
3329
    result = self.rpc.call_node_deactivate_master_ip(master)
3330
    result.Raise("Could not disable the master role")
3331

    
3332
    try:
3333
      cluster = self.cfg.GetClusterInfo()
3334
      cluster.cluster_name = clustername
3335
      cluster.master_ip = ip
3336
      self.cfg.Update(cluster, feedback_fn)
3337

    
3338
      # update the known hosts file
3339
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
3340
      node_list = self.cfg.GetOnlineNodeList()
3341
      try:
3342
        node_list.remove(master)
3343
      except ValueError:
3344
        pass
3345
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
3346
    finally:
3347
      result = self.rpc.call_node_activate_master_ip(master)
3348
      msg = result.fail_msg
3349
      if msg:
3350
        self.LogWarning("Could not re-enable the master role on"
3351
                        " the master, please restart manually: %s", msg)
3352

    
3353
    return clustername
3354

    
3355

    
3356
def _ValidateNetmask(cfg, netmask):
3357
  """Checks if a netmask is valid.
3358

3359
  @type cfg: L{config.ConfigWriter}
3360
  @param cfg: The cluster configuration
3361
  @type netmask: int
3362
  @param netmask: the netmask to be verified
3363
  @raise errors.OpPrereqError: if the validation fails
3364

3365
  """
3366
  ip_family = cfg.GetPrimaryIPFamily()
3367
  try:
3368
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
3369
  except errors.ProgrammerError:
3370
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
3371
                               ip_family)
3372
  if not ipcls.ValidateNetmask(netmask):
3373
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
3374
                                (netmask))
3375

    
3376

    
3377
class LUClusterSetParams(LogicalUnit):
3378
  """Change the parameters of the cluster.
3379

3380
  """
3381
  HPATH = "cluster-modify"
3382
  HTYPE = constants.HTYPE_CLUSTER
3383
  REQ_BGL = False
3384

    
3385
  def CheckArguments(self):
3386
    """Check parameters
3387

3388
    """
3389
    if self.op.uid_pool:
3390
      uidpool.CheckUidPool(self.op.uid_pool)
3391

    
3392
    if self.op.add_uids:
3393
      uidpool.CheckUidPool(self.op.add_uids)
3394

    
3395
    if self.op.remove_uids:
3396
      uidpool.CheckUidPool(self.op.remove_uids)
3397

    
3398
    if self.op.master_netmask is not None:
3399
      _ValidateNetmask(self.cfg, self.op.master_netmask)
3400

    
3401
  def ExpandNames(self):
3402
    # FIXME: in the future maybe other cluster params won't require checking on
3403
    # all nodes to be modified.
3404
    self.needed_locks = {
3405
      locking.LEVEL_NODE: locking.ALL_SET,
3406
    }
3407
    self.share_locks[locking.LEVEL_NODE] = 1
3408

    
3409
  def BuildHooksEnv(self):
3410
    """Build hooks env.
3411

3412
    """
3413
    return {
3414
      "OP_TARGET": self.cfg.GetClusterName(),
3415
      "NEW_VG_NAME": self.op.vg_name,
3416
      }
3417

    
3418
  def BuildHooksNodes(self):
3419
    """Build hooks nodes.
3420

3421
    """
3422
    mn = self.cfg.GetMasterNode()
3423
    return ([mn], [mn])
3424

    
3425
  def CheckPrereq(self):
3426
    """Check prerequisites.
3427

3428
    This checks whether the given params don't conflict and
3429
    if the given volume group is valid.
3430

3431
    """
3432
    if self.op.vg_name is not None and not self.op.vg_name:
3433
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
3434
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
3435
                                   " instances exist", errors.ECODE_INVAL)
3436

    
3437
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
3438
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
3439
        raise errors.OpPrereqError("Cannot disable drbd helper while"
3440
                                   " drbd-based instances exist",
3441
                                   errors.ECODE_INVAL)
3442

    
3443
    node_list = self.owned_locks(locking.LEVEL_NODE)
3444

    
3445
    # if vg_name not None, checks given volume group on all nodes
3446
    if self.op.vg_name:
3447
      vglist = self.rpc.call_vg_list(node_list)
3448
      for node in node_list:
3449
        msg = vglist[node].fail_msg
3450
        if msg:
3451
          # ignoring down node
3452
          self.LogWarning("Error while gathering data on node %s"
3453
                          " (ignoring node): %s", node, msg)
3454
          continue
3455
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
3456
                                              self.op.vg_name,
3457
                                              constants.MIN_VG_SIZE)
3458
        if vgstatus:
3459
          raise errors.OpPrereqError("Error on node '%s': %s" %
3460
                                     (node, vgstatus), errors.ECODE_ENVIRON)
3461

    
3462
    if self.op.drbd_helper:
3463
      # checks given drbd helper on all nodes
3464
      helpers = self.rpc.call_drbd_helper(node_list)
3465
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
3466
        if ninfo.offline:
3467
          self.LogInfo("Not checking drbd helper on offline node %s", node)
3468
          continue
3469
        msg = helpers[node].fail_msg
3470
        if msg:
3471
          raise errors.OpPrereqError("Error checking drbd helper on node"
3472
                                     " '%s': %s" % (node, msg),
3473
                                     errors.ECODE_ENVIRON)
3474
        node_helper = helpers[node].payload
3475
        if node_helper != self.op.drbd_helper:
3476
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
3477
                                     (node, node_helper), errors.ECODE_ENVIRON)
3478

    
3479
    self.cluster = cluster = self.cfg.GetClusterInfo()
3480
    # validate params changes
3481
    if self.op.beparams:
3482
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
3483
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
3484

    
3485
    if self.op.ndparams:
3486
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
3487
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
3488

    
3489
      # TODO: we need a more general way to handle resetting
3490
      # cluster-level parameters to default values
3491
      if self.new_ndparams["oob_program"] == "":
3492
        self.new_ndparams["oob_program"] = \
3493
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
3494

    
3495
    if self.op.nicparams:
3496
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
3497
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
3498
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
3499
      nic_errors = []
3500

    
3501
      # check all instances for consistency
3502
      for instance in self.cfg.GetAllInstancesInfo().values():
3503
        for nic_idx, nic in enumerate(instance.nics):
3504
          params_copy = copy.deepcopy(nic.nicparams)
3505
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
3506

    
3507
          # check parameter syntax
3508
          try:
3509
            objects.NIC.CheckParameterSyntax(params_filled)
3510
          except errors.ConfigurationError, err:
3511
            nic_errors.append("Instance %s, nic/%d: %s" %
3512
                              (instance.name, nic_idx, err))
3513

    
3514
          # if we're moving instances to routed, check that they have an ip
3515
          target_mode = params_filled[constants.NIC_MODE]
3516
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
3517
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
3518
                              " address" % (instance.name, nic_idx))
3519
      if nic_errors:
3520
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
3521
                                   "\n".join(nic_errors))
3522

    
3523
    # hypervisor list/parameters
3524
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
3525
    if self.op.hvparams:
3526
      for hv_name, hv_dict in self.op.hvparams.items():
3527
        if hv_name not in self.new_hvparams:
3528
          self.new_hvparams[hv_name] = hv_dict
3529
        else:
3530
          self.new_hvparams[hv_name].update(hv_dict)
3531

    
3532
    # os hypervisor parameters
3533
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
3534
    if self.op.os_hvp:
3535
      for os_name, hvs in self.op.os_hvp.items():
3536
        if os_name not in self.new_os_hvp:
3537
          self.new_os_hvp[os_name] = hvs
3538
        else:
3539
          for hv_name, hv_dict in hvs.items():
3540
            if hv_name not in self.new_os_hvp[os_name]:
3541
              self.new_os_hvp[os_name][hv_name] = hv_dict
3542
            else:
3543
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
3544

    
3545
    # os parameters
3546
    self.new_osp = objects.FillDict(cluster.osparams, {})
3547
    if self.op.osparams:
3548
      for os_name, osp in self.op.osparams.items():
3549
        if os_name not in self.new_osp:
3550
          self.new_osp[os_name] = {}
3551

    
3552
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
3553
                                                  use_none=True)
3554

    
3555
        if not self.new_osp[os_name]:
3556
          # we removed all parameters
3557
          del self.new_osp[os_name]
3558
        else:
3559
          # check the parameter validity (remote check)
3560
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
3561
                         os_name, self.new_osp[os_name])
3562

    
3563
    # changes to the hypervisor list
3564
    if self.op.enabled_hypervisors is not None:
3565
      self.hv_list = self.op.enabled_hypervisors
3566
      for hv in self.hv_list:
3567
        # if the hypervisor doesn't already exist in the cluster
3568
        # hvparams, we initialize it to empty, and then (in both
3569
        # cases) we make sure to fill the defaults, as we might not
3570
        # have a complete defaults list if the hypervisor wasn't
3571
        # enabled before
3572
        if hv not in new_hvp:
3573
          new_hvp[hv] = {}
3574
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
3575
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
3576
    else:
3577
      self.hv_list = cluster.enabled_hypervisors
3578

    
3579
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
3580
      # either the enabled list has changed, or the parameters have, validate
3581
      for hv_name, hv_params in self.new_hvparams.items():
3582
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
3583
            (self.op.enabled_hypervisors and
3584
             hv_name in self.op.enabled_hypervisors)):
3585
          # either this is a new hypervisor, or its parameters have changed
3586
          hv_class = hypervisor.GetHypervisor(hv_name)
3587
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3588
          hv_class.CheckParameterSyntax(hv_params)
3589
          _CheckHVParams(self, node_list, hv_name, hv_params)
3590

    
3591
    if self.op.os_hvp:
3592
      # no need to check any newly-enabled hypervisors, since the
3593
      # defaults have already been checked in the above code-block
3594
      for os_name, os_hvp in self.new_os_hvp.items():
3595
        for hv_name, hv_params in os_hvp.items():
3596
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3597
          # we need to fill in the new os_hvp on top of the actual hv_p
3598
          cluster_defaults = self.new_hvparams.get(hv_name, {})
3599
          new_osp = objects.FillDict(cluster_defaults, hv_params)
3600
          hv_class = hypervisor.GetHypervisor(hv_name)
3601
          hv_class.CheckParameterSyntax(new_osp)
3602
          _CheckHVParams(self, node_list, hv_name, new_osp)
3603

    
3604
    if self.op.default_iallocator:
3605
      alloc_script = utils.FindFile(self.op.default_iallocator,
3606
                                    constants.IALLOCATOR_SEARCH_PATH,
3607
                                    os.path.isfile)
3608
      if alloc_script is None:
3609
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
3610
                                   " specified" % self.op.default_iallocator,
3611
                                   errors.ECODE_INVAL)
3612

    
3613
  def Exec(self, feedback_fn):
3614
    """Change the parameters of the cluster.
3615

3616
    """
3617
    if self.op.vg_name is not None:
3618
      new_volume = self.op.vg_name
3619
      if not new_volume:
3620
        new_volume = None
3621
      if new_volume != self.cfg.GetVGName():
3622
        self.cfg.SetVGName(new_volume)
3623
      else:
3624
        feedback_fn("Cluster LVM configuration already in desired"
3625
                    " state, not changing")
3626
    if self.op.drbd_helper is not None:
3627
      new_helper = self.op.drbd_helper
3628
      if not new_helper:
3629
        new_helper = None
3630
      if new_helper != self.cfg.GetDRBDHelper():
3631
        self.cfg.SetDRBDHelper(new_helper)
3632
      else:
3633
        feedback_fn("Cluster DRBD helper already in desired state,"
3634
                    " not changing")
3635
    if self.op.hvparams:
3636
      self.cluster.hvparams = self.new_hvparams
3637
    if self.op.os_hvp:
3638
      self.cluster.os_hvp = self.new_os_hvp
3639
    if self.op.enabled_hypervisors is not None:
3640
      self.cluster.hvparams = self.new_hvparams
3641
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
3642
    if self.op.beparams:
3643
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
3644
    if self.op.nicparams:
3645
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
3646
    if self.op.osparams:
3647
      self.cluster.osparams = self.new_osp
3648
    if self.op.ndparams:
3649
      self.cluster.ndparams = self.new_ndparams
3650

    
3651
    if self.op.candidate_pool_size is not None:
3652
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3653
      # we need to update the pool size here, otherwise the save will fail
3654
      _AdjustCandidatePool(self, [])
3655

    
3656
    if self.op.maintain_node_health is not None:
3657
      self.cluster.maintain_node_health = self.op.maintain_node_health
3658

    
3659
    if self.op.prealloc_wipe_disks is not None:
3660
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3661

    
3662
    if self.op.add_uids is not None:
3663
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3664

    
3665
    if self.op.remove_uids is not None:
3666
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3667

    
3668
    if self.op.uid_pool is not None:
3669
      self.cluster.uid_pool = self.op.uid_pool
3670

    
3671
    if self.op.default_iallocator is not None:
3672
      self.cluster.default_iallocator = self.op.default_iallocator
3673

    
3674
    if self.op.reserved_lvs is not None:
3675
      self.cluster.reserved_lvs = self.op.reserved_lvs
3676

    
3677
    def helper_os(aname, mods, desc):
3678
      desc += " OS list"
3679
      lst = getattr(self.cluster, aname)
3680
      for key, val in mods:
3681
        if key == constants.DDM_ADD:
3682
          if val in lst:
3683
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3684
          else:
3685
            lst.append(val)
3686
        elif key == constants.DDM_REMOVE:
3687
          if val in lst:
3688
            lst.remove(val)
3689
          else:
3690
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3691
        else:
3692
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3693

    
3694
    if self.op.hidden_os:
3695
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3696

    
3697
    if self.op.blacklisted_os:
3698
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3699

    
3700
    if self.op.master_netdev:
3701
      master = self.cfg.GetMasterNode()
3702
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3703
                  self.cluster.master_netdev)
3704
      result = self.rpc.call_node_deactivate_master_ip(master)
3705
      result.Raise("Could not disable the master ip")
3706
      feedback_fn("Changing master_netdev from %s to %s" %
3707
                  (self.cluster.master_netdev, self.op.master_netdev))
3708
      self.cluster.master_netdev = self.op.master_netdev
3709

    
3710
    if self.op.master_netmask:
3711
      master = self.cfg.GetMasterNode()
3712
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
3713
      result = self.rpc.call_node_change_master_netmask(master,
3714
                                                        self.op.master_netmask)
3715
      if result.fail_msg:
3716
        msg = "Could not change the master IP netmask: %s" % result.fail_msg
3717
        self.LogWarning(msg)
3718
        feedback_fn(msg)
3719
      else:
3720
        self.cluster.master_netmask = self.op.master_netmask
3721

    
3722
    self.cfg.Update(self.cluster, feedback_fn)
3723

    
3724
    if self.op.master_netdev:
3725
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3726
                  self.op.master_netdev)
3727
      result = self.rpc.call_node_activate_master_ip(master)
3728
      if result.fail_msg:
3729
        self.LogWarning("Could not re-enable the master ip on"
3730
                        " the master, please restart manually: %s",
3731
                        result.fail_msg)
3732

    
3733

    
3734
def _UploadHelper(lu, nodes, fname):
3735
  """Helper for uploading a file and showing warnings.
3736

3737
  """
3738
  if os.path.exists(fname):
3739
    result = lu.rpc.call_upload_file(nodes, fname)
3740
    for to_node, to_result in result.items():
3741
      msg = to_result.fail_msg
3742
      if msg:
3743
        msg = ("Copy of file %s to node %s failed: %s" %
3744
               (fname, to_node, msg))
3745
        lu.proc.LogWarning(msg)
3746

    
3747

    
3748
def _ComputeAncillaryFiles(cluster, redist):
3749
  """Compute files external to Ganeti which need to be consistent.
3750

3751
  @type redist: boolean
3752
  @param redist: Whether to include files which need to be redistributed
3753

3754
  """
3755
  # Compute files for all nodes
3756
  files_all = set([
3757
    constants.SSH_KNOWN_HOSTS_FILE,
3758
    constants.CONFD_HMAC_KEY,
3759
    constants.CLUSTER_DOMAIN_SECRET_FILE,
3760
    constants.SPICE_CERT_FILE,
3761
    constants.SPICE_CACERT_FILE,
3762
    constants.RAPI_USERS_FILE,
3763
    ])
3764

    
3765
  if not redist:
3766
    files_all.update(constants.ALL_CERT_FILES)
3767
    files_all.update(ssconf.SimpleStore().GetFileList())
3768
  else:
3769
    # we need to ship at least the RAPI certificate
3770
    files_all.add(constants.RAPI_CERT_FILE)
3771

    
3772
  if cluster.modify_etc_hosts:
3773
    files_all.add(constants.ETC_HOSTS)
3774

    
3775
  # Files which are optional, these must:
3776
  # - be present in one other category as well
3777
  # - either exist or not exist on all nodes of that category (mc, vm all)
3778
  files_opt = set([
3779
    constants.RAPI_USERS_FILE,
3780
    ])
3781

    
3782
  # Files which should only be on master candidates
3783
  files_mc = set()
3784
  if not redist:
3785
    files_mc.add(constants.CLUSTER_CONF_FILE)
3786

    
3787
  # Files which should only be on VM-capable nodes
3788
  files_vm = set(filename
3789
    for hv_name in cluster.enabled_hypervisors
3790
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0])
3791

    
3792
  files_opt |= set(filename
3793
    for hv_name in cluster.enabled_hypervisors
3794
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1])
3795

    
3796
  # Filenames in each category must be unique
3797
  all_files_set = files_all | files_mc | files_vm
3798
  assert (len(all_files_set) ==
3799
          sum(map(len, [files_all, files_mc, files_vm]))), \
3800
         "Found file listed in more than one file list"
3801

    
3802
  # Optional files must be present in one other category
3803
  assert all_files_set.issuperset(files_opt), \
3804
         "Optional file not in a different required list"
3805

    
3806
  return (files_all, files_opt, files_mc, files_vm)
3807

    
3808

    
3809
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3810
  """Distribute additional files which are part of the cluster configuration.
3811

3812
  ConfigWriter takes care of distributing the config and ssconf files, but
3813
  there are more files which should be distributed to all nodes. This function
3814
  makes sure those are copied.
3815

3816
  @param lu: calling logical unit
3817
  @param additional_nodes: list of nodes not in the config to distribute to
3818
  @type additional_vm: boolean
3819
  @param additional_vm: whether the additional nodes are vm-capable or not
3820

3821
  """
3822
  # Gather target nodes
3823
  cluster = lu.cfg.GetClusterInfo()
3824
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3825

    
3826
  online_nodes = lu.cfg.GetOnlineNodeList()
3827
  vm_nodes = lu.cfg.GetVmCapableNodeList()
3828

    
3829
  if additional_nodes is not None:
3830
    online_nodes.extend(additional_nodes)
3831
    if additional_vm:
3832
      vm_nodes.extend(additional_nodes)
3833

    
3834
  # Never distribute to master node
3835
  for nodelist in [online_nodes, vm_nodes]:
3836
    if master_info.name in nodelist:
3837
      nodelist.remove(master_info.name)
3838

    
3839
  # Gather file lists
3840
  (files_all, _, files_mc, files_vm) = \
3841
    _ComputeAncillaryFiles(cluster, True)
3842

    
3843
  # Never re-distribute configuration file from here
3844
  assert not (constants.CLUSTER_CONF_FILE in files_all or
3845
              constants.CLUSTER_CONF_FILE in files_vm)
3846
  assert not files_mc, "Master candidates not handled in this function"
3847

    
3848
  filemap = [
3849
    (online_nodes, files_all),
3850
    (vm_nodes, files_vm),
3851
    ]
3852

    
3853
  # Upload the files
3854
  for (node_list, files) in filemap:
3855
    for fname in files:
3856
      _UploadHelper(lu, node_list, fname)
3857

    
3858

    
3859
class LUClusterRedistConf(NoHooksLU):
3860
  """Force the redistribution of cluster configuration.
3861

3862
  This is a very simple LU.
3863

3864
  """
3865
  REQ_BGL = False
3866

    
3867
  def ExpandNames(self):
3868
    self.needed_locks = {
3869
      locking.LEVEL_NODE: locking.ALL_SET,
3870
    }
3871
    self.share_locks[locking.LEVEL_NODE] = 1
3872

    
3873
  def Exec(self, feedback_fn):
3874
    """Redistribute the configuration.
3875

3876
    """
3877
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3878
    _RedistributeAncillaryFiles(self)
3879

    
3880

    
3881
class LUClusterActivateMasterIp(NoHooksLU):
3882
  """Activate the master IP on the master node.
3883

3884
  """
3885
  def Exec(self, feedback_fn):
3886
    """Activate the master IP.
3887

3888
    """
3889
    master = self.cfg.GetMasterNode()
3890
    self.rpc.call_node_activate_master_ip(master)
3891

    
3892

    
3893
class LUClusterDeactivateMasterIp(NoHooksLU):
3894
  """Deactivate the master IP on the master node.
3895

3896
  """
3897
  def Exec(self, feedback_fn):
3898
    """Deactivate the master IP.
3899

3900
    """
3901
    master = self.cfg.GetMasterNode()
3902
    self.rpc.call_node_deactivate_master_ip(master)
3903

    
3904

    
3905
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3906
  """Sleep and poll for an instance's disk to sync.
3907

3908
  """
3909
  if not instance.disks or disks is not None and not disks:
3910
    return True
3911

    
3912
  disks = _ExpandCheckDisks(instance, disks)
3913

    
3914
  if not oneshot:
3915
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3916

    
3917
  node = instance.primary_node
3918

    
3919
  for dev in disks:
3920
    lu.cfg.SetDiskID(dev, node)
3921

    
3922
  # TODO: Convert to utils.Retry
3923

    
3924
  retries = 0
3925
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3926
  while True:
3927
    max_time = 0
3928
    done = True
3929
    cumul_degraded = False
3930
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3931
    msg = rstats.fail_msg
3932
    if msg:
3933
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3934
      retries += 1
3935
      if retries >= 10:
3936
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3937
                                 " aborting." % node)
3938
      time.sleep(6)
3939
      continue
3940
    rstats = rstats.payload
3941
    retries = 0
3942
    for i, mstat in enumerate(rstats):
3943
      if mstat is None:
3944
        lu.LogWarning("Can't compute data for node %s/%s",
3945
                           node, disks[i].iv_name)
3946
        continue
3947

    
3948
      cumul_degraded = (cumul_degraded or
3949
                        (mstat.is_degraded and mstat.sync_percent is None))
3950
      if mstat.sync_percent is not None:
3951
        done = False
3952
        if mstat.estimated_time is not None:
3953
          rem_time = ("%s remaining (estimated)" %
3954
                      utils.FormatSeconds(mstat.estimated_time))
3955
          max_time = mstat.estimated_time
3956
        else:
3957
          rem_time = "no time estimate"
3958
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
3959
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
3960

    
3961
    # if we're done but degraded, let's do a few small retries, to
3962
    # make sure we see a stable and not transient situation; therefore
3963
    # we force restart of the loop
3964
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3965
      logging.info("Degraded disks found, %d retries left", degr_retries)
3966
      degr_retries -= 1
3967
      time.sleep(1)
3968
      continue
3969

    
3970
    if done or oneshot:
3971
      break
3972

    
3973
    time.sleep(min(60, max_time))
3974

    
3975
  if done:
3976
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
3977
  return not cumul_degraded
3978

    
3979

    
3980
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3981
  """Check that mirrors are not degraded.
3982

3983
  The ldisk parameter, if True, will change the test from the
3984
  is_degraded attribute (which represents overall non-ok status for
3985
  the device(s)) to the ldisk (representing the local storage status).
3986

3987
  """
3988
  lu.cfg.SetDiskID(dev, node)
3989

    
3990
  result = True
3991

    
3992
  if on_primary or dev.AssembleOnSecondary():
3993
    rstats = lu.rpc.call_blockdev_find(node, dev)
3994
    msg = rstats.fail_msg
3995
    if msg:
3996
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
3997
      result = False
3998
    elif not rstats.payload:
3999
      lu.LogWarning("Can't find disk on node %s", node)
4000
      result = False
4001
    else:
4002
      if ldisk:
4003
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
4004
      else:
4005
        result = result and not rstats.payload.is_degraded
4006

    
4007
  if dev.children:
4008
    for child in dev.children:
4009
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
4010

    
4011
  return result
4012

    
4013

    
4014
class LUOobCommand(NoHooksLU):
4015
  """Logical unit for OOB handling.
4016

4017
  """
4018
  REG_BGL = False
4019
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
4020

    
4021
  def ExpandNames(self):
4022
    """Gather locks we need.
4023

4024
    """
4025
    if self.op.node_names:
4026
      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
4027
      lock_names = self.op.node_names
4028
    else:
4029
      lock_names = locking.ALL_SET
4030

    
4031
    self.needed_locks = {
4032
      locking.LEVEL_NODE: lock_names,
4033
      }
4034

    
4035
  def CheckPrereq(self):
4036
    """Check prerequisites.
4037

4038
    This checks:
4039
     - the node exists in the configuration
4040
     - OOB is supported
4041

4042
    Any errors are signaled by raising errors.OpPrereqError.
4043

4044
    """
4045
    self.nodes = []
4046
    self.master_node = self.cfg.GetMasterNode()
4047

    
4048
    assert self.op.power_delay >= 0.0
4049

    
4050
    if self.op.node_names:
4051
      if (self.op.command in self._SKIP_MASTER and
4052
          self.master_node in self.op.node_names):
4053
        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
4054
        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
4055

    
4056
        if master_oob_handler:
4057
          additional_text = ("run '%s %s %s' if you want to operate on the"
4058
                             " master regardless") % (master_oob_handler,
4059
                                                      self.op.command,
4060
                                                      self.master_node)
4061
        else:
4062
          additional_text = "it does not support out-of-band operations"
4063

    
4064
        raise errors.OpPrereqError(("Operating on the master node %s is not"
4065
                                    " allowed for %s; %s") %
4066
                                   (self.master_node, self.op.command,
4067
                                    additional_text), errors.ECODE_INVAL)
4068
    else:
4069
      self.op.node_names = self.cfg.GetNodeList()
4070
      if self.op.command in self._SKIP_MASTER:
4071
        self.op.node_names.remove(self.master_node)
4072

    
4073
    if self.op.command in self._SKIP_MASTER:
4074
      assert self.master_node not in self.op.node_names
4075

    
4076
    for (node_name, node) in self.cfg.GetMultiNodeInfo(self.op.node_names):
4077
      if node is None:
4078
        raise errors.OpPrereqError("Node %s not found" % node_name,
4079
                                   errors.ECODE_NOENT)
4080
      else:
4081
        self.nodes.append(node)
4082

    
4083
      if (not self.op.ignore_status and
4084
          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
4085
        raise errors.OpPrereqError(("Cannot power off node %s because it is"
4086
                                    " not marked offline") % node_name,
4087
                                   errors.ECODE_STATE)
4088

    
4089
  def Exec(self, feedback_fn):
4090
    """Execute OOB and return result if we expect any.
4091

4092
    """
4093
    master_node = self.master_node
4094
    ret = []
4095

    
4096