Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 3ac3f5e4

History | View | Annotate | Download (480.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay too many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42
import itertools
43
import operator
44

    
45
from ganeti import ssh
46
from ganeti import utils
47
from ganeti import errors
48
from ganeti import hypervisor
49
from ganeti import locking
50
from ganeti import constants
51
from ganeti import objects
52
from ganeti import serializer
53
from ganeti import ssconf
54
from ganeti import uidpool
55
from ganeti import compat
56
from ganeti import masterd
57
from ganeti import netutils
58
from ganeti import query
59
from ganeti import qlang
60
from ganeti import opcodes
61
from ganeti import ht
62

    
63
import ganeti.masterd.instance # pylint: disable=W0611
64

    
65

    
66
class ResultWithJobs:
67
  """Data container for LU results with jobs.
68

69
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
70
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
71
  contained in the C{jobs} attribute and include the job IDs in the opcode
72
  result.
73

74
  """
75
  def __init__(self, jobs, **kwargs):
76
    """Initializes this class.
77

78
    Additional return values can be specified as keyword arguments.
79

80
    @type jobs: list of lists of L{opcode.OpCode}
81
    @param jobs: A list of lists of opcode objects
82

83
    """
84
    self.jobs = jobs
85
    self.other = kwargs
86

    
87

    
88
class LogicalUnit(object):
89
  """Logical Unit base class.
90

91
  Subclasses must follow these rules:
92
    - implement ExpandNames
93
    - implement CheckPrereq (except when tasklets are used)
94
    - implement Exec (except when tasklets are used)
95
    - implement BuildHooksEnv
96
    - implement BuildHooksNodes
97
    - redefine HPATH and HTYPE
98
    - optionally redefine their run requirements:
99
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
100

101
  Note that all commands require root permissions.
102

103
  @ivar dry_run_result: the value (if any) that will be returned to the caller
104
      in dry-run mode (signalled by opcode dry_run parameter)
105

106
  """
107
  HPATH = None
108
  HTYPE = None
109
  REQ_BGL = True
110

    
111
  def __init__(self, processor, op, context, rpc):
112
    """Constructor for LogicalUnit.
113

114
    This needs to be overridden in derived classes in order to check op
115
    validity.
116

117
    """
118
    self.proc = processor
119
    self.op = op
120
    self.cfg = context.cfg
121
    self.glm = context.glm
122
    # readability alias
123
    self.owned_locks = context.glm.list_owned
124
    self.context = context
125
    self.rpc = rpc
126
    # Dicts used to declare locking needs to mcpu
127
    self.needed_locks = None
128
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
129
    self.add_locks = {}
130
    self.remove_locks = {}
131
    # Used to force good behavior when calling helper functions
132
    self.recalculate_locks = {}
133
    # logging
134
    self.Log = processor.Log # pylint: disable=C0103
135
    self.LogWarning = processor.LogWarning # pylint: disable=C0103
136
    self.LogInfo = processor.LogInfo # pylint: disable=C0103
137
    self.LogStep = processor.LogStep # pylint: disable=C0103
138
    # support for dry-run
139
    self.dry_run_result = None
140
    # support for generic debug attribute
141
    if (not hasattr(self.op, "debug_level") or
142
        not isinstance(self.op.debug_level, int)):
143
      self.op.debug_level = 0
144

    
145
    # Tasklets
146
    self.tasklets = None
147

    
148
    # Validate opcode parameters and set defaults
149
    self.op.Validate(True)
150

    
151
    self.CheckArguments()
152

    
153
  def CheckArguments(self):
154
    """Check syntactic validity for the opcode arguments.
155

156
    This method is for doing a simple syntactic check and ensure
157
    validity of opcode parameters, without any cluster-related
158
    checks. While the same can be accomplished in ExpandNames and/or
159
    CheckPrereq, doing these separate is better because:
160

161
      - ExpandNames is left as as purely a lock-related function
162
      - CheckPrereq is run after we have acquired locks (and possible
163
        waited for them)
164

165
    The function is allowed to change the self.op attribute so that
166
    later methods can no longer worry about missing parameters.
167

168
    """
169
    pass
170

    
171
  def ExpandNames(self):
172
    """Expand names for this LU.
173

174
    This method is called before starting to execute the opcode, and it should
175
    update all the parameters of the opcode to their canonical form (e.g. a
176
    short node name must be fully expanded after this method has successfully
177
    completed). This way locking, hooks, logging, etc. can work correctly.
178

179
    LUs which implement this method must also populate the self.needed_locks
180
    member, as a dict with lock levels as keys, and a list of needed lock names
181
    as values. Rules:
182

183
      - use an empty dict if you don't need any lock
184
      - if you don't need any lock at a particular level omit that level
185
      - don't put anything for the BGL level
186
      - if you want all locks at a level use locking.ALL_SET as a value
187

188
    If you need to share locks (rather than acquire them exclusively) at one
189
    level you can modify self.share_locks, setting a true value (usually 1) for
190
    that level. By default locks are not shared.
191

192
    This function can also define a list of tasklets, which then will be
193
    executed in order instead of the usual LU-level CheckPrereq and Exec
194
    functions, if those are not defined by the LU.
195

196
    Examples::
197

198
      # Acquire all nodes and one instance
199
      self.needed_locks = {
200
        locking.LEVEL_NODE: locking.ALL_SET,
201
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
202
      }
203
      # Acquire just two nodes
204
      self.needed_locks = {
205
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
206
      }
207
      # Acquire no locks
208
      self.needed_locks = {} # No, you can't leave it to the default value None
209

210
    """
211
    # The implementation of this method is mandatory only if the new LU is
212
    # concurrent, so that old LUs don't need to be changed all at the same
213
    # time.
214
    if self.REQ_BGL:
215
      self.needed_locks = {} # Exclusive LUs don't need locks.
216
    else:
217
      raise NotImplementedError
218

    
219
  def DeclareLocks(self, level):
220
    """Declare LU locking needs for a level
221

222
    While most LUs can just declare their locking needs at ExpandNames time,
223
    sometimes there's the need to calculate some locks after having acquired
224
    the ones before. This function is called just before acquiring locks at a
225
    particular level, but after acquiring the ones at lower levels, and permits
226
    such calculations. It can be used to modify self.needed_locks, and by
227
    default it does nothing.
228

229
    This function is only called if you have something already set in
230
    self.needed_locks for the level.
231

232
    @param level: Locking level which is going to be locked
233
    @type level: member of ganeti.locking.LEVELS
234

235
    """
236

    
237
  def CheckPrereq(self):
238
    """Check prerequisites for this LU.
239

240
    This method should check that the prerequisites for the execution
241
    of this LU are fulfilled. It can do internode communication, but
242
    it should be idempotent - no cluster or system changes are
243
    allowed.
244

245
    The method should raise errors.OpPrereqError in case something is
246
    not fulfilled. Its return value is ignored.
247

248
    This method should also update all the parameters of the opcode to
249
    their canonical form if it hasn't been done by ExpandNames before.
250

251
    """
252
    if self.tasklets is not None:
253
      for (idx, tl) in enumerate(self.tasklets):
254
        logging.debug("Checking prerequisites for tasklet %s/%s",
255
                      idx + 1, len(self.tasklets))
256
        tl.CheckPrereq()
257
    else:
258
      pass
259

    
260
  def Exec(self, feedback_fn):
261
    """Execute the LU.
262

263
    This method should implement the actual work. It should raise
264
    errors.OpExecError for failures that are somewhat dealt with in
265
    code, or expected.
266

267
    """
268
    if self.tasklets is not None:
269
      for (idx, tl) in enumerate(self.tasklets):
270
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
271
        tl.Exec(feedback_fn)
272
    else:
273
      raise NotImplementedError
274

    
275
  def BuildHooksEnv(self):
276
    """Build hooks environment for this LU.
277

278
    @rtype: dict
279
    @return: Dictionary containing the environment that will be used for
280
      running the hooks for this LU. The keys of the dict must not be prefixed
281
      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
282
      will extend the environment with additional variables. If no environment
283
      should be defined, an empty dictionary should be returned (not C{None}).
284
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
285
      will not be called.
286

287
    """
288
    raise NotImplementedError
289

    
290
  def BuildHooksNodes(self):
291
    """Build list of nodes to run LU's hooks.
292

293
    @rtype: tuple; (list, list)
294
    @return: Tuple containing a list of node names on which the hook
295
      should run before the execution and a list of node names on which the
296
      hook should run after the execution. No nodes should be returned as an
297
      empty list (and not None).
298
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
299
      will not be called.
300

301
    """
302
    raise NotImplementedError
303

    
304
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
305
    """Notify the LU about the results of its hooks.
306

307
    This method is called every time a hooks phase is executed, and notifies
308
    the Logical Unit about the hooks' result. The LU can then use it to alter
309
    its result based on the hooks.  By default the method does nothing and the
310
    previous result is passed back unchanged but any LU can define it if it
311
    wants to use the local cluster hook-scripts somehow.
312

313
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
314
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
315
    @param hook_results: the results of the multi-node hooks rpc call
316
    @param feedback_fn: function used send feedback back to the caller
317
    @param lu_result: the previous Exec result this LU had, or None
318
        in the PRE phase
319
    @return: the new Exec result, based on the previous result
320
        and hook results
321

322
    """
323
    # API must be kept, thus we ignore the unused argument and could
324
    # be a function warnings
325
    # pylint: disable=W0613,R0201
326
    return lu_result
327

    
328
  def _ExpandAndLockInstance(self):
329
    """Helper function to expand and lock an instance.
330

331
    Many LUs that work on an instance take its name in self.op.instance_name
332
    and need to expand it and then declare the expanded name for locking. This
333
    function does it, and then updates self.op.instance_name to the expanded
334
    name. It also initializes needed_locks as a dict, if this hasn't been done
335
    before.
336

337
    """
338
    if self.needed_locks is None:
339
      self.needed_locks = {}
340
    else:
341
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
342
        "_ExpandAndLockInstance called with instance-level locks set"
343
    self.op.instance_name = _ExpandInstanceName(self.cfg,
344
                                                self.op.instance_name)
345
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
346

    
347
  def _LockInstancesNodes(self, primary_only=False):
348
    """Helper function to declare instances' nodes for locking.
349

350
    This function should be called after locking one or more instances to lock
351
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
352
    with all primary or secondary nodes for instances already locked and
353
    present in self.needed_locks[locking.LEVEL_INSTANCE].
354

355
    It should be called from DeclareLocks, and for safety only works if
356
    self.recalculate_locks[locking.LEVEL_NODE] is set.
357

358
    In the future it may grow parameters to just lock some instance's nodes, or
359
    to just lock primaries or secondary nodes, if needed.
360

361
    If should be called in DeclareLocks in a way similar to::
362

363
      if level == locking.LEVEL_NODE:
364
        self._LockInstancesNodes()
365

366
    @type primary_only: boolean
367
    @param primary_only: only lock primary nodes of locked instances
368

369
    """
370
    assert locking.LEVEL_NODE in self.recalculate_locks, \
371
      "_LockInstancesNodes helper function called with no nodes to recalculate"
372

    
373
    # TODO: check if we're really been called with the instance locks held
374

    
375
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
376
    # future we might want to have different behaviors depending on the value
377
    # of self.recalculate_locks[locking.LEVEL_NODE]
378
    wanted_nodes = []
379
    locked_i = self.owned_locks(locking.LEVEL_INSTANCE)
380
    for _, instance in self.cfg.GetMultiInstanceInfo(locked_i):
381
      wanted_nodes.append(instance.primary_node)
382
      if not primary_only:
383
        wanted_nodes.extend(instance.secondary_nodes)
384

    
385
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
386
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
387
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
388
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
389

    
390
    del self.recalculate_locks[locking.LEVEL_NODE]
391

    
392

    
393
class NoHooksLU(LogicalUnit): # pylint: disable=W0223
394
  """Simple LU which runs no hooks.
395

396
  This LU is intended as a parent for other LogicalUnits which will
397
  run no hooks, in order to reduce duplicate code.
398

399
  """
400
  HPATH = None
401
  HTYPE = None
402

    
403
  def BuildHooksEnv(self):
404
    """Empty BuildHooksEnv for NoHooksLu.
405

406
    This just raises an error.
407

408
    """
409
    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
410

    
411
  def BuildHooksNodes(self):
412
    """Empty BuildHooksNodes for NoHooksLU.
413

414
    """
415
    raise AssertionError("BuildHooksNodes called for NoHooksLU")
416

    
417

    
418
class Tasklet:
419
  """Tasklet base class.
420

421
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
422
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
423
  tasklets know nothing about locks.
424

425
  Subclasses must follow these rules:
426
    - Implement CheckPrereq
427
    - Implement Exec
428

429
  """
430
  def __init__(self, lu):
431
    self.lu = lu
432

    
433
    # Shortcuts
434
    self.cfg = lu.cfg
435
    self.rpc = lu.rpc
436

    
437
  def CheckPrereq(self):
438
    """Check prerequisites for this tasklets.
439

440
    This method should check whether the prerequisites for the execution of
441
    this tasklet are fulfilled. It can do internode communication, but it
442
    should be idempotent - no cluster or system changes are allowed.
443

444
    The method should raise errors.OpPrereqError in case something is not
445
    fulfilled. Its return value is ignored.
446

447
    This method should also update all parameters to their canonical form if it
448
    hasn't been done before.
449

450
    """
451
    pass
452

    
453
  def Exec(self, feedback_fn):
454
    """Execute the tasklet.
455

456
    This method should implement the actual work. It should raise
457
    errors.OpExecError for failures that are somewhat dealt with in code, or
458
    expected.
459

460
    """
461
    raise NotImplementedError
462

    
463

    
464
class _QueryBase:
465
  """Base for query utility classes.
466

467
  """
468
  #: Attribute holding field definitions
469
  FIELDS = None
470

    
471
  def __init__(self, filter_, fields, use_locking):
472
    """Initializes this class.
473

474
    """
475
    self.use_locking = use_locking
476

    
477
    self.query = query.Query(self.FIELDS, fields, filter_=filter_,
478
                             namefield="name")
479
    self.requested_data = self.query.RequestedData()
480
    self.names = self.query.RequestedNames()
481

    
482
    # Sort only if no names were requested
483
    self.sort_by_name = not self.names
484

    
485
    self.do_locking = None
486
    self.wanted = None
487

    
488
  def _GetNames(self, lu, all_names, lock_level):
489
    """Helper function to determine names asked for in the query.
490

491
    """
492
    if self.do_locking:
493
      names = lu.owned_locks(lock_level)
494
    else:
495
      names = all_names
496

    
497
    if self.wanted == locking.ALL_SET:
498
      assert not self.names
499
      # caller didn't specify names, so ordering is not important
500
      return utils.NiceSort(names)
501

    
502
    # caller specified names and we must keep the same order
503
    assert self.names
504
    assert not self.do_locking or lu.glm.is_owned(lock_level)
505

    
506
    missing = set(self.wanted).difference(names)
507
    if missing:
508
      raise errors.OpExecError("Some items were removed before retrieving"
509
                               " their data: %s" % missing)
510

    
511
    # Return expanded names
512
    return self.wanted
513

    
514
  def ExpandNames(self, lu):
515
    """Expand names for this query.
516

517
    See L{LogicalUnit.ExpandNames}.
518

519
    """
520
    raise NotImplementedError()
521

    
522
  def DeclareLocks(self, lu, level):
523
    """Declare locks for this query.
524

525
    See L{LogicalUnit.DeclareLocks}.
526

527
    """
528
    raise NotImplementedError()
529

    
530
  def _GetQueryData(self, lu):
531
    """Collects all data for this query.
532

533
    @return: Query data object
534

535
    """
536
    raise NotImplementedError()
537

    
538
  def NewStyleQuery(self, lu):
539
    """Collect data and execute query.
540

541
    """
542
    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
543
                                  sort_by_name=self.sort_by_name)
544

    
545
  def OldStyleQuery(self, lu):
546
    """Collect data and execute query.
547

548
    """
549
    return self.query.OldStyleQuery(self._GetQueryData(lu),
550
                                    sort_by_name=self.sort_by_name)
551

    
552

    
553
def _ShareAll():
554
  """Returns a dict declaring all lock levels shared.
555

556
  """
557
  return dict.fromkeys(locking.LEVELS, 1)
558

    
559

    
560
def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
561
  """Checks if the owned node groups are still correct for an instance.
562

563
  @type cfg: L{config.ConfigWriter}
564
  @param cfg: The cluster configuration
565
  @type instance_name: string
566
  @param instance_name: Instance name
567
  @type owned_groups: set or frozenset
568
  @param owned_groups: List of currently owned node groups
569

570
  """
571
  inst_groups = cfg.GetInstanceNodeGroups(instance_name)
572

    
573
  if not owned_groups.issuperset(inst_groups):
574
    raise errors.OpPrereqError("Instance %s's node groups changed since"
575
                               " locks were acquired, current groups are"
576
                               " are '%s', owning groups '%s'; retry the"
577
                               " operation" %
578
                               (instance_name,
579
                                utils.CommaJoin(inst_groups),
580
                                utils.CommaJoin(owned_groups)),
581
                               errors.ECODE_STATE)
582

    
583
  return inst_groups
584

    
585

    
586
def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
587
  """Checks if the instances in a node group are still correct.
588

589
  @type cfg: L{config.ConfigWriter}
590
  @param cfg: The cluster configuration
591
  @type group_uuid: string
592
  @param group_uuid: Node group UUID
593
  @type owned_instances: set or frozenset
594
  @param owned_instances: List of currently owned instances
595

596
  """
597
  wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
598
  if owned_instances != wanted_instances:
599
    raise errors.OpPrereqError("Instances in node group '%s' changed since"
600
                               " locks were acquired, wanted '%s', have '%s';"
601
                               " retry the operation" %
602
                               (group_uuid,
603
                                utils.CommaJoin(wanted_instances),
604
                                utils.CommaJoin(owned_instances)),
605
                               errors.ECODE_STATE)
606

    
607
  return wanted_instances
608

    
609

    
610
def _SupportsOob(cfg, node):
611
  """Tells if node supports OOB.
612

613
  @type cfg: L{config.ConfigWriter}
614
  @param cfg: The cluster configuration
615
  @type node: L{objects.Node}
616
  @param node: The node
617
  @return: The OOB script if supported or an empty string otherwise
618

619
  """
620
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
621

    
622

    
623
def _GetWantedNodes(lu, nodes):
624
  """Returns list of checked and expanded node names.
625

626
  @type lu: L{LogicalUnit}
627
  @param lu: the logical unit on whose behalf we execute
628
  @type nodes: list
629
  @param nodes: list of node names or None for all nodes
630
  @rtype: list
631
  @return: the list of nodes, sorted
632
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
633

634
  """
635
  if nodes:
636
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
637

    
638
  return utils.NiceSort(lu.cfg.GetNodeList())
639

    
640

    
641
def _GetWantedInstances(lu, instances):
642
  """Returns list of checked and expanded instance names.
643

644
  @type lu: L{LogicalUnit}
645
  @param lu: the logical unit on whose behalf we execute
646
  @type instances: list
647
  @param instances: list of instance names or None for all instances
648
  @rtype: list
649
  @return: the list of instances, sorted
650
  @raise errors.OpPrereqError: if the instances parameter is wrong type
651
  @raise errors.OpPrereqError: if any of the passed instances is not found
652

653
  """
654
  if instances:
655
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
656
  else:
657
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
658
  return wanted
659

    
660

    
661
def _GetUpdatedParams(old_params, update_dict,
662
                      use_default=True, use_none=False):
663
  """Return the new version of a parameter dictionary.
664

665
  @type old_params: dict
666
  @param old_params: old parameters
667
  @type update_dict: dict
668
  @param update_dict: dict containing new parameter values, or
669
      constants.VALUE_DEFAULT to reset the parameter to its default
670
      value
671
  @param use_default: boolean
672
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
673
      values as 'to be deleted' values
674
  @param use_none: boolean
675
  @type use_none: whether to recognise C{None} values as 'to be
676
      deleted' values
677
  @rtype: dict
678
  @return: the new parameter dictionary
679

680
  """
681
  params_copy = copy.deepcopy(old_params)
682
  for key, val in update_dict.iteritems():
683
    if ((use_default and val == constants.VALUE_DEFAULT) or
684
        (use_none and val is None)):
685
      try:
686
        del params_copy[key]
687
      except KeyError:
688
        pass
689
    else:
690
      params_copy[key] = val
691
  return params_copy
692

    
693

    
694
def _ReleaseLocks(lu, level, names=None, keep=None):
695
  """Releases locks owned by an LU.
696

697
  @type lu: L{LogicalUnit}
698
  @param level: Lock level
699
  @type names: list or None
700
  @param names: Names of locks to release
701
  @type keep: list or None
702
  @param keep: Names of locks to retain
703

704
  """
705
  assert not (keep is not None and names is not None), \
706
         "Only one of the 'names' and the 'keep' parameters can be given"
707

    
708
  if names is not None:
709
    should_release = names.__contains__
710
  elif keep:
711
    should_release = lambda name: name not in keep
712
  else:
713
    should_release = None
714

    
715
  if should_release:
716
    retain = []
717
    release = []
718

    
719
    # Determine which locks to release
720
    for name in lu.owned_locks(level):
721
      if should_release(name):
722
        release.append(name)
723
      else:
724
        retain.append(name)
725

    
726
    assert len(lu.owned_locks(level)) == (len(retain) + len(release))
727

    
728
    # Release just some locks
729
    lu.glm.release(level, names=release)
730

    
731
    assert frozenset(lu.owned_locks(level)) == frozenset(retain)
732
  else:
733
    # Release everything
734
    lu.glm.release(level)
735

    
736
    assert not lu.glm.is_owned(level), "No locks should be owned"
737

    
738

    
739
def _MapInstanceDisksToNodes(instances):
740
  """Creates a map from (node, volume) to instance name.
741

742
  @type instances: list of L{objects.Instance}
743
  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
744

745
  """
746
  return dict(((node, vol), inst.name)
747
              for inst in instances
748
              for (node, vols) in inst.MapLVsByNode().items()
749
              for vol in vols)
750

    
751

    
752
def _RunPostHook(lu, node_name):
753
  """Runs the post-hook for an opcode on a single node.
754

755
  """
756
  hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu)
757
  try:
758
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
759
  except:
760
    # pylint: disable=W0702
761
    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
762

    
763

    
764
def _CheckOutputFields(static, dynamic, selected):
765
  """Checks whether all selected fields are valid.
766

767
  @type static: L{utils.FieldSet}
768
  @param static: static fields set
769
  @type dynamic: L{utils.FieldSet}
770
  @param dynamic: dynamic fields set
771

772
  """
773
  f = utils.FieldSet()
774
  f.Extend(static)
775
  f.Extend(dynamic)
776

    
777
  delta = f.NonMatching(selected)
778
  if delta:
779
    raise errors.OpPrereqError("Unknown output fields selected: %s"
780
                               % ",".join(delta), errors.ECODE_INVAL)
781

    
782

    
783
def _CheckGlobalHvParams(params):
784
  """Validates that given hypervisor params are not global ones.
785

786
  This will ensure that instances don't get customised versions of
787
  global params.
788

789
  """
790
  used_globals = constants.HVC_GLOBALS.intersection(params)
791
  if used_globals:
792
    msg = ("The following hypervisor parameters are global and cannot"
793
           " be customized at instance level, please modify them at"
794
           " cluster level: %s" % utils.CommaJoin(used_globals))
795
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
796

    
797

    
798
def _CheckNodeOnline(lu, node, msg=None):
799
  """Ensure that a given node is online.
800

801
  @param lu: the LU on behalf of which we make the check
802
  @param node: the node to check
803
  @param msg: if passed, should be a message to replace the default one
804
  @raise errors.OpPrereqError: if the node is offline
805

806
  """
807
  if msg is None:
808
    msg = "Can't use offline node"
809
  if lu.cfg.GetNodeInfo(node).offline:
810
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
811

    
812

    
813
def _CheckNodeNotDrained(lu, node):
814
  """Ensure that a given node is not drained.
815

816
  @param lu: the LU on behalf of which we make the check
817
  @param node: the node to check
818
  @raise errors.OpPrereqError: if the node is drained
819

820
  """
821
  if lu.cfg.GetNodeInfo(node).drained:
822
    raise errors.OpPrereqError("Can't use drained node %s" % node,
823
                               errors.ECODE_STATE)
824

    
825

    
826
def _CheckNodeVmCapable(lu, node):
827
  """Ensure that a given node is vm capable.
828

829
  @param lu: the LU on behalf of which we make the check
830
  @param node: the node to check
831
  @raise errors.OpPrereqError: if the node is not vm capable
832

833
  """
834
  if not lu.cfg.GetNodeInfo(node).vm_capable:
835
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
836
                               errors.ECODE_STATE)
837

    
838

    
839
def _CheckNodeHasOS(lu, node, os_name, force_variant):
840
  """Ensure that a node supports a given OS.
841

842
  @param lu: the LU on behalf of which we make the check
843
  @param node: the node to check
844
  @param os_name: the OS to query about
845
  @param force_variant: whether to ignore variant errors
846
  @raise errors.OpPrereqError: if the node is not supporting the OS
847

848
  """
849
  result = lu.rpc.call_os_get(node, os_name)
850
  result.Raise("OS '%s' not in supported OS list for node %s" %
851
               (os_name, node),
852
               prereq=True, ecode=errors.ECODE_INVAL)
853
  if not force_variant:
854
    _CheckOSVariant(result.payload, os_name)
855

    
856

    
857
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
858
  """Ensure that a node has the given secondary ip.
859

860
  @type lu: L{LogicalUnit}
861
  @param lu: the LU on behalf of which we make the check
862
  @type node: string
863
  @param node: the node to check
864
  @type secondary_ip: string
865
  @param secondary_ip: the ip to check
866
  @type prereq: boolean
867
  @param prereq: whether to throw a prerequisite or an execute error
868
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
869
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
870

871
  """
872
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
873
  result.Raise("Failure checking secondary ip on node %s" % node,
874
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
875
  if not result.payload:
876
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
877
           " please fix and re-run this command" % secondary_ip)
878
    if prereq:
879
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
880
    else:
881
      raise errors.OpExecError(msg)
882

    
883

    
884
def _GetClusterDomainSecret():
885
  """Reads the cluster domain secret.
886

887
  """
888
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
889
                               strict=True)
890

    
891

    
892
def _CheckInstanceDown(lu, instance, reason):
893
  """Ensure that an instance is not running."""
894
  if instance.admin_up:
895
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
896
                               (instance.name, reason), errors.ECODE_STATE)
897

    
898
  pnode = instance.primary_node
899
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
900
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
901
              prereq=True, ecode=errors.ECODE_ENVIRON)
902

    
903
  if instance.name in ins_l.payload:
904
    raise errors.OpPrereqError("Instance %s is running, %s" %
905
                               (instance.name, reason), errors.ECODE_STATE)
906

    
907

    
908
def _ExpandItemName(fn, name, kind):
909
  """Expand an item name.
910

911
  @param fn: the function to use for expansion
912
  @param name: requested item name
913
  @param kind: text description ('Node' or 'Instance')
914
  @return: the resolved (full) name
915
  @raise errors.OpPrereqError: if the item is not found
916

917
  """
918
  full_name = fn(name)
919
  if full_name is None:
920
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
921
                               errors.ECODE_NOENT)
922
  return full_name
923

    
924

    
925
def _ExpandNodeName(cfg, name):
926
  """Wrapper over L{_ExpandItemName} for nodes."""
927
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
928

    
929

    
930
def _ExpandInstanceName(cfg, name):
931
  """Wrapper over L{_ExpandItemName} for instance."""
932
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
933

    
934

    
935
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
936
                          memory, vcpus, nics, disk_template, disks,
937
                          bep, hvp, hypervisor_name, tags):
938
  """Builds instance related env variables for hooks
939

940
  This builds the hook environment from individual variables.
941

942
  @type name: string
943
  @param name: the name of the instance
944
  @type primary_node: string
945
  @param primary_node: the name of the instance's primary node
946
  @type secondary_nodes: list
947
  @param secondary_nodes: list of secondary nodes as strings
948
  @type os_type: string
949
  @param os_type: the name of the instance's OS
950
  @type status: boolean
951
  @param status: the should_run status of the instance
952
  @type memory: string
953
  @param memory: the memory size of the instance
954
  @type vcpus: string
955
  @param vcpus: the count of VCPUs the instance has
956
  @type nics: list
957
  @param nics: list of tuples (ip, mac, mode, link) representing
958
      the NICs the instance has
959
  @type disk_template: string
960
  @param disk_template: the disk template of the instance
961
  @type disks: list
962
  @param disks: the list of (size, mode) pairs
963
  @type bep: dict
964
  @param bep: the backend parameters for the instance
965
  @type hvp: dict
966
  @param hvp: the hypervisor parameters for the instance
967
  @type hypervisor_name: string
968
  @param hypervisor_name: the hypervisor for the instance
969
  @type tags: list
970
  @param tags: list of instance tags as strings
971
  @rtype: dict
972
  @return: the hook environment for this instance
973

974
  """
975
  if status:
976
    str_status = "up"
977
  else:
978
    str_status = "down"
979
  env = {
980
    "OP_TARGET": name,
981
    "INSTANCE_NAME": name,
982
    "INSTANCE_PRIMARY": primary_node,
983
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
984
    "INSTANCE_OS_TYPE": os_type,
985
    "INSTANCE_STATUS": str_status,
986
    "INSTANCE_MEMORY": memory,
987
    "INSTANCE_VCPUS": vcpus,
988
    "INSTANCE_DISK_TEMPLATE": disk_template,
989
    "INSTANCE_HYPERVISOR": hypervisor_name,
990
  }
991

    
992
  if nics:
993
    nic_count = len(nics)
994
    for idx, (ip, mac, mode, link) in enumerate(nics):
995
      if ip is None:
996
        ip = ""
997
      env["INSTANCE_NIC%d_IP" % idx] = ip
998
      env["INSTANCE_NIC%d_MAC" % idx] = mac
999
      env["INSTANCE_NIC%d_MODE" % idx] = mode
1000
      env["INSTANCE_NIC%d_LINK" % idx] = link
1001
      if mode == constants.NIC_MODE_BRIDGED:
1002
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
1003
  else:
1004
    nic_count = 0
1005

    
1006
  env["INSTANCE_NIC_COUNT"] = nic_count
1007

    
1008
  if disks:
1009
    disk_count = len(disks)
1010
    for idx, (size, mode) in enumerate(disks):
1011
      env["INSTANCE_DISK%d_SIZE" % idx] = size
1012
      env["INSTANCE_DISK%d_MODE" % idx] = mode
1013
  else:
1014
    disk_count = 0
1015

    
1016
  env["INSTANCE_DISK_COUNT"] = disk_count
1017

    
1018
  if not tags:
1019
    tags = []
1020

    
1021
  env["INSTANCE_TAGS"] = " ".join(tags)
1022

    
1023
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
1024
    for key, value in source.items():
1025
      env["INSTANCE_%s_%s" % (kind, key)] = value
1026

    
1027
  return env
1028

    
1029

    
1030
def _NICListToTuple(lu, nics):
1031
  """Build a list of nic information tuples.
1032

1033
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
1034
  value in LUInstanceQueryData.
1035

1036
  @type lu:  L{LogicalUnit}
1037
  @param lu: the logical unit on whose behalf we execute
1038
  @type nics: list of L{objects.NIC}
1039
  @param nics: list of nics to convert to hooks tuples
1040

1041
  """
1042
  hooks_nics = []
1043
  cluster = lu.cfg.GetClusterInfo()
1044
  for nic in nics:
1045
    ip = nic.ip
1046
    mac = nic.mac
1047
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
1048
    mode = filled_params[constants.NIC_MODE]
1049
    link = filled_params[constants.NIC_LINK]
1050
    hooks_nics.append((ip, mac, mode, link))
1051
  return hooks_nics
1052

    
1053

    
1054
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
1055
  """Builds instance related env variables for hooks from an object.
1056

1057
  @type lu: L{LogicalUnit}
1058
  @param lu: the logical unit on whose behalf we execute
1059
  @type instance: L{objects.Instance}
1060
  @param instance: the instance for which we should build the
1061
      environment
1062
  @type override: dict
1063
  @param override: dictionary with key/values that will override
1064
      our values
1065
  @rtype: dict
1066
  @return: the hook environment dictionary
1067

1068
  """
1069
  cluster = lu.cfg.GetClusterInfo()
1070
  bep = cluster.FillBE(instance)
1071
  hvp = cluster.FillHV(instance)
1072
  args = {
1073
    "name": instance.name,
1074
    "primary_node": instance.primary_node,
1075
    "secondary_nodes": instance.secondary_nodes,
1076
    "os_type": instance.os,
1077
    "status": instance.admin_up,
1078
    "memory": bep[constants.BE_MEMORY],
1079
    "vcpus": bep[constants.BE_VCPUS],
1080
    "nics": _NICListToTuple(lu, instance.nics),
1081
    "disk_template": instance.disk_template,
1082
    "disks": [(disk.size, disk.mode) for disk in instance.disks],
1083
    "bep": bep,
1084
    "hvp": hvp,
1085
    "hypervisor_name": instance.hypervisor,
1086
    "tags": instance.tags,
1087
  }
1088
  if override:
1089
    args.update(override)
1090
  return _BuildInstanceHookEnv(**args) # pylint: disable=W0142
1091

    
1092

    
1093
def _AdjustCandidatePool(lu, exceptions):
1094
  """Adjust the candidate pool after node operations.
1095

1096
  """
1097
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1098
  if mod_list:
1099
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1100
               utils.CommaJoin(node.name for node in mod_list))
1101
    for name in mod_list:
1102
      lu.context.ReaddNode(name)
1103
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1104
  if mc_now > mc_max:
1105
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1106
               (mc_now, mc_max))
1107

    
1108

    
1109
def _DecideSelfPromotion(lu, exceptions=None):
1110
  """Decide whether I should promote myself as a master candidate.
1111

1112
  """
1113
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1114
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1115
  # the new node will increase mc_max with one, so:
1116
  mc_should = min(mc_should + 1, cp_size)
1117
  return mc_now < mc_should
1118

    
1119

    
1120
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1121
  """Check that the brigdes needed by a list of nics exist.
1122

1123
  """
1124
  cluster = lu.cfg.GetClusterInfo()
1125
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1126
  brlist = [params[constants.NIC_LINK] for params in paramslist
1127
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1128
  if brlist:
1129
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1130
    result.Raise("Error checking bridges on destination node '%s'" %
1131
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1132

    
1133

    
1134
def _CheckInstanceBridgesExist(lu, instance, node=None):
1135
  """Check that the brigdes needed by an instance exist.
1136

1137
  """
1138
  if node is None:
1139
    node = instance.primary_node
1140
  _CheckNicsBridgesExist(lu, instance.nics, node)
1141

    
1142

    
1143
def _CheckOSVariant(os_obj, name):
1144
  """Check whether an OS name conforms to the os variants specification.
1145

1146
  @type os_obj: L{objects.OS}
1147
  @param os_obj: OS object to check
1148
  @type name: string
1149
  @param name: OS name passed by the user, to check for validity
1150

1151
  """
1152
  variant = objects.OS.GetVariant(name)
1153
  if not os_obj.supported_variants:
1154
    if variant:
1155
      raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
1156
                                 " passed)" % (os_obj.name, variant),
1157
                                 errors.ECODE_INVAL)
1158
    return
1159
  if not variant:
1160
    raise errors.OpPrereqError("OS name must include a variant",
1161
                               errors.ECODE_INVAL)
1162

    
1163
  if variant not in os_obj.supported_variants:
1164
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1165

    
1166

    
1167
def _GetNodeInstancesInner(cfg, fn):
1168
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1169

    
1170

    
1171
def _GetNodeInstances(cfg, node_name):
1172
  """Returns a list of all primary and secondary instances on a node.
1173

1174
  """
1175

    
1176
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1177

    
1178

    
1179
def _GetNodePrimaryInstances(cfg, node_name):
1180
  """Returns primary instances on a node.
1181

1182
  """
1183
  return _GetNodeInstancesInner(cfg,
1184
                                lambda inst: node_name == inst.primary_node)
1185

    
1186

    
1187
def _GetNodeSecondaryInstances(cfg, node_name):
1188
  """Returns secondary instances on a node.
1189

1190
  """
1191
  return _GetNodeInstancesInner(cfg,
1192
                                lambda inst: node_name in inst.secondary_nodes)
1193

    
1194

    
1195
def _GetStorageTypeArgs(cfg, storage_type):
1196
  """Returns the arguments for a storage type.
1197

1198
  """
1199
  # Special case for file storage
1200
  if storage_type == constants.ST_FILE:
1201
    # storage.FileStorage wants a list of storage directories
1202
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1203

    
1204
  return []
1205

    
1206

    
1207
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
1208
  faulty = []
1209

    
1210
  for dev in instance.disks:
1211
    cfg.SetDiskID(dev, node_name)
1212

    
1213
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
1214
  result.Raise("Failed to get disk status from node %s" % node_name,
1215
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1216

    
1217
  for idx, bdev_status in enumerate(result.payload):
1218
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1219
      faulty.append(idx)
1220

    
1221
  return faulty
1222

    
1223

    
1224
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1225
  """Check the sanity of iallocator and node arguments and use the
1226
  cluster-wide iallocator if appropriate.
1227

1228
  Check that at most one of (iallocator, node) is specified. If none is
1229
  specified, then the LU's opcode's iallocator slot is filled with the
1230
  cluster-wide default iallocator.
1231

1232
  @type iallocator_slot: string
1233
  @param iallocator_slot: the name of the opcode iallocator slot
1234
  @type node_slot: string
1235
  @param node_slot: the name of the opcode target node slot
1236

1237
  """
1238
  node = getattr(lu.op, node_slot, None)
1239
  iallocator = getattr(lu.op, iallocator_slot, None)
1240

    
1241
  if node is not None and iallocator is not None:
1242
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
1243
                               errors.ECODE_INVAL)
1244
  elif node is None and iallocator is None:
1245
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1246
    if default_iallocator:
1247
      setattr(lu.op, iallocator_slot, default_iallocator)
1248
    else:
1249
      raise errors.OpPrereqError("No iallocator or node given and no"
1250
                                 " cluster-wide default iallocator found;"
1251
                                 " please specify either an iallocator or a"
1252
                                 " node, or set a cluster-wide default"
1253
                                 " iallocator")
1254

    
1255

    
1256
def _GetDefaultIAllocator(cfg, iallocator):
1257
  """Decides on which iallocator to use.
1258

1259
  @type cfg: L{config.ConfigWriter}
1260
  @param cfg: Cluster configuration object
1261
  @type iallocator: string or None
1262
  @param iallocator: Iallocator specified in opcode
1263
  @rtype: string
1264
  @return: Iallocator name
1265

1266
  """
1267
  if not iallocator:
1268
    # Use default iallocator
1269
    iallocator = cfg.GetDefaultIAllocator()
1270

    
1271
  if not iallocator:
1272
    raise errors.OpPrereqError("No iallocator was specified, neither in the"
1273
                               " opcode nor as a cluster-wide default",
1274
                               errors.ECODE_INVAL)
1275

    
1276
  return iallocator
1277

    
1278

    
1279
class LUClusterPostInit(LogicalUnit):
1280
  """Logical unit for running hooks after cluster initialization.
1281

1282
  """
1283
  HPATH = "cluster-init"
1284
  HTYPE = constants.HTYPE_CLUSTER
1285

    
1286
  def BuildHooksEnv(self):
1287
    """Build hooks env.
1288

1289
    """
1290
    return {
1291
      "OP_TARGET": self.cfg.GetClusterName(),
1292
      }
1293

    
1294
  def BuildHooksNodes(self):
1295
    """Build hooks nodes.
1296

1297
    """
1298
    return ([], [self.cfg.GetMasterNode()])
1299

    
1300
  def Exec(self, feedback_fn):
1301
    """Nothing to do.
1302

1303
    """
1304
    return True
1305

    
1306

    
1307
class LUClusterDestroy(LogicalUnit):
1308
  """Logical unit for destroying the cluster.
1309

1310
  """
1311
  HPATH = "cluster-destroy"
1312
  HTYPE = constants.HTYPE_CLUSTER
1313

    
1314
  def BuildHooksEnv(self):
1315
    """Build hooks env.
1316

1317
    """
1318
    return {
1319
      "OP_TARGET": self.cfg.GetClusterName(),
1320
      }
1321

    
1322
  def BuildHooksNodes(self):
1323
    """Build hooks nodes.
1324

1325
    """
1326
    return ([], [])
1327

    
1328
  def CheckPrereq(self):
1329
    """Check prerequisites.
1330

1331
    This checks whether the cluster is empty.
1332

1333
    Any errors are signaled by raising errors.OpPrereqError.
1334

1335
    """
1336
    master = self.cfg.GetMasterNode()
1337

    
1338
    nodelist = self.cfg.GetNodeList()
1339
    if len(nodelist) != 1 or nodelist[0] != master:
1340
      raise errors.OpPrereqError("There are still %d node(s) in"
1341
                                 " this cluster." % (len(nodelist) - 1),
1342
                                 errors.ECODE_INVAL)
1343
    instancelist = self.cfg.GetInstanceList()
1344
    if instancelist:
1345
      raise errors.OpPrereqError("There are still %d instance(s) in"
1346
                                 " this cluster." % len(instancelist),
1347
                                 errors.ECODE_INVAL)
1348

    
1349
  def Exec(self, feedback_fn):
1350
    """Destroys the cluster.
1351

1352
    """
1353
    master = self.cfg.GetMasterNode()
1354

    
1355
    # Run post hooks on master node before it's removed
1356
    _RunPostHook(self, master)
1357

    
1358
    result = self.rpc.call_node_deactivate_master_ip(master)
1359
    result.Raise("Could not disable the master role")
1360

    
1361
    return master
1362

    
1363

    
1364
def _VerifyCertificate(filename):
1365
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1366

1367
  @type filename: string
1368
  @param filename: Path to PEM file
1369

1370
  """
1371
  try:
1372
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1373
                                           utils.ReadFile(filename))
1374
  except Exception, err: # pylint: disable=W0703
1375
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1376
            "Failed to load X509 certificate %s: %s" % (filename, err))
1377

    
1378
  (errcode, msg) = \
1379
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1380
                                constants.SSL_CERT_EXPIRATION_ERROR)
1381

    
1382
  if msg:
1383
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1384
  else:
1385
    fnamemsg = None
1386

    
1387
  if errcode is None:
1388
    return (None, fnamemsg)
1389
  elif errcode == utils.CERT_WARNING:
1390
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1391
  elif errcode == utils.CERT_ERROR:
1392
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1393

    
1394
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1395

    
1396

    
1397
def _GetAllHypervisorParameters(cluster, instances):
1398
  """Compute the set of all hypervisor parameters.
1399

1400
  @type cluster: L{objects.Cluster}
1401
  @param cluster: the cluster object
1402
  @param instances: list of L{objects.Instance}
1403
  @param instances: additional instances from which to obtain parameters
1404
  @rtype: list of (origin, hypervisor, parameters)
1405
  @return: a list with all parameters found, indicating the hypervisor they
1406
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1407

1408
  """
1409
  hvp_data = []
1410

    
1411
  for hv_name in cluster.enabled_hypervisors:
1412
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1413

    
1414
  for os_name, os_hvp in cluster.os_hvp.items():
1415
    for hv_name, hv_params in os_hvp.items():
1416
      if hv_params:
1417
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1418
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1419

    
1420
  # TODO: collapse identical parameter values in a single one
1421
  for instance in instances:
1422
    if instance.hvparams:
1423
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1424
                       cluster.FillHV(instance)))
1425

    
1426
  return hvp_data
1427

    
1428

    
1429
class _VerifyErrors(object):
1430
  """Mix-in for cluster/group verify LUs.
1431

1432
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1433
  self.op and self._feedback_fn to be available.)
1434

1435
  """
1436

    
1437
  ETYPE_FIELD = "code"
1438
  ETYPE_ERROR = "ERROR"
1439
  ETYPE_WARNING = "WARNING"
1440

    
1441
  def _Error(self, ecode, item, msg, *args, **kwargs):
1442
    """Format an error message.
1443

1444
    Based on the opcode's error_codes parameter, either format a
1445
    parseable error code, or a simpler error string.
1446

1447
    This must be called only from Exec and functions called from Exec.
1448

1449
    """
1450
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1451
    itype, etxt, _ = ecode
1452
    # first complete the msg
1453
    if args:
1454
      msg = msg % args
1455
    # then format the whole message
1456
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1457
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1458
    else:
1459
      if item:
1460
        item = " " + item
1461
      else:
1462
        item = ""
1463
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1464
    # and finally report it via the feedback_fn
1465
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1466

    
1467
  def _ErrorIf(self, cond, ecode, *args, **kwargs):
1468
    """Log an error message if the passed condition is True.
1469

1470
    """
1471
    cond = (bool(cond)
1472
            or self.op.debug_simulate_errors) # pylint: disable=E1101
1473

    
1474
    # If the error code is in the list of ignored errors, demote the error to a
1475
    # warning
1476
    (_, etxt, _) = ecode
1477
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1478
      kwargs[self.ETYPE_FIELD] = self.ETYPE_WARNING
1479

    
1480
    if cond:
1481
      self._Error(ecode, *args, **kwargs)
1482

    
1483
    # do not mark the operation as failed for WARN cases only
1484
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1485
      self.bad = self.bad or cond
1486

    
1487

    
1488
class LUClusterVerify(NoHooksLU):
1489
  """Submits all jobs necessary to verify the cluster.
1490

1491
  """
1492
  REQ_BGL = False
1493

    
1494
  def ExpandNames(self):
1495
    self.needed_locks = {}
1496

    
1497
  def Exec(self, feedback_fn):
1498
    jobs = []
1499

    
1500
    if self.op.group_name:
1501
      groups = [self.op.group_name]
1502
      depends_fn = lambda: None
1503
    else:
1504
      groups = self.cfg.GetNodeGroupList()
1505

    
1506
      # Verify global configuration
1507
      jobs.append([
1508
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors)
1509
        ])
1510

    
1511
      # Always depend on global verification
1512
      depends_fn = lambda: [(-len(jobs), [])]
1513

    
1514
    jobs.extend([opcodes.OpClusterVerifyGroup(group_name=group,
1515
                                            ignore_errors=self.op.ignore_errors,
1516
                                            depends=depends_fn())]
1517
                for group in groups)
1518

    
1519
    # Fix up all parameters
1520
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1521
      op.debug_simulate_errors = self.op.debug_simulate_errors
1522
      op.verbose = self.op.verbose
1523
      op.error_codes = self.op.error_codes
1524
      try:
1525
        op.skip_checks = self.op.skip_checks
1526
      except AttributeError:
1527
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1528

    
1529
    return ResultWithJobs(jobs)
1530

    
1531

    
1532
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1533
  """Verifies the cluster config.
1534

1535
  """
1536
  REQ_BGL = True
1537

    
1538
  def _VerifyHVP(self, hvp_data):
1539
    """Verifies locally the syntax of the hypervisor parameters.
1540

1541
    """
1542
    for item, hv_name, hv_params in hvp_data:
1543
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1544
             (item, hv_name))
1545
      try:
1546
        hv_class = hypervisor.GetHypervisor(hv_name)
1547
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1548
        hv_class.CheckParameterSyntax(hv_params)
1549
      except errors.GenericError, err:
1550
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1551

    
1552
  def ExpandNames(self):
1553
    # Information can be safely retrieved as the BGL is acquired in exclusive
1554
    # mode
1555
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
1556
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1557
    self.all_node_info = self.cfg.GetAllNodesInfo()
1558
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1559
    self.needed_locks = {}
1560

    
1561
  def Exec(self, feedback_fn):
1562
    """Verify integrity of cluster, performing various test on nodes.
1563

1564
    """
1565
    self.bad = False
1566
    self._feedback_fn = feedback_fn
1567

    
1568
    feedback_fn("* Verifying cluster config")
1569

    
1570
    for msg in self.cfg.VerifyConfig():
1571
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1572

    
1573
    feedback_fn("* Verifying cluster certificate files")
1574

    
1575
    for cert_filename in constants.ALL_CERT_FILES:
1576
      (errcode, msg) = _VerifyCertificate(cert_filename)
1577
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1578

    
1579
    feedback_fn("* Verifying hypervisor parameters")
1580

    
1581
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1582
                                                self.all_inst_info.values()))
1583

    
1584
    feedback_fn("* Verifying all nodes belong to an existing group")
1585

    
1586
    # We do this verification here because, should this bogus circumstance
1587
    # occur, it would never be caught by VerifyGroup, which only acts on
1588
    # nodes/instances reachable from existing node groups.
1589

    
1590
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1591
                         if node.group not in self.all_group_info)
1592

    
1593
    dangling_instances = {}
1594
    no_node_instances = []
1595

    
1596
    for inst in self.all_inst_info.values():
1597
      if inst.primary_node in dangling_nodes:
1598
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1599
      elif inst.primary_node not in self.all_node_info:
1600
        no_node_instances.append(inst.name)
1601

    
1602
    pretty_dangling = [
1603
        "%s (%s)" %
1604
        (node.name,
1605
         utils.CommaJoin(dangling_instances.get(node.name,
1606
                                                ["no instances"])))
1607
        for node in dangling_nodes]
1608

    
1609
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1610
                  None,
1611
                  "the following nodes (and their instances) belong to a non"
1612
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1613

    
1614
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1615
                  None,
1616
                  "the following instances have a non-existing primary-node:"
1617
                  " %s", utils.CommaJoin(no_node_instances))
1618

    
1619
    return not self.bad
1620

    
1621

    
1622
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1623
  """Verifies the status of a node group.
1624

1625
  """
1626
  HPATH = "cluster-verify"
1627
  HTYPE = constants.HTYPE_CLUSTER
1628
  REQ_BGL = False
1629

    
1630
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1631

    
1632
  class NodeImage(object):
1633
    """A class representing the logical and physical status of a node.
1634

1635
    @type name: string
1636
    @ivar name: the node name to which this object refers
1637
    @ivar volumes: a structure as returned from
1638
        L{ganeti.backend.GetVolumeList} (runtime)
1639
    @ivar instances: a list of running instances (runtime)
1640
    @ivar pinst: list of configured primary instances (config)
1641
    @ivar sinst: list of configured secondary instances (config)
1642
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1643
        instances for which this node is secondary (config)
1644
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1645
    @ivar dfree: free disk, as reported by the node (runtime)
1646
    @ivar offline: the offline status (config)
1647
    @type rpc_fail: boolean
1648
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1649
        not whether the individual keys were correct) (runtime)
1650
    @type lvm_fail: boolean
1651
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1652
    @type hyp_fail: boolean
1653
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1654
    @type ghost: boolean
1655
    @ivar ghost: whether this is a known node or not (config)
1656
    @type os_fail: boolean
1657
    @ivar os_fail: whether the RPC call didn't return valid OS data
1658
    @type oslist: list
1659
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1660
    @type vm_capable: boolean
1661
    @ivar vm_capable: whether the node can host instances
1662

1663
    """
1664
    def __init__(self, offline=False, name=None, vm_capable=True):
1665
      self.name = name
1666
      self.volumes = {}
1667
      self.instances = []
1668
      self.pinst = []
1669
      self.sinst = []
1670
      self.sbp = {}
1671
      self.mfree = 0
1672
      self.dfree = 0
1673
      self.offline = offline
1674
      self.vm_capable = vm_capable
1675
      self.rpc_fail = False
1676
      self.lvm_fail = False
1677
      self.hyp_fail = False
1678
      self.ghost = False
1679
      self.os_fail = False
1680
      self.oslist = {}
1681

    
1682
  def ExpandNames(self):
1683
    # This raises errors.OpPrereqError on its own:
1684
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1685

    
1686
    # Get instances in node group; this is unsafe and needs verification later
1687
    inst_names = self.cfg.GetNodeGroupInstances(self.group_uuid)
1688

    
1689
    self.needed_locks = {
1690
      locking.LEVEL_INSTANCE: inst_names,
1691
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1692
      locking.LEVEL_NODE: [],
1693
      }
1694

    
1695
    self.share_locks = _ShareAll()
1696

    
1697
  def DeclareLocks(self, level):
1698
    if level == locking.LEVEL_NODE:
1699
      # Get members of node group; this is unsafe and needs verification later
1700
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1701

    
1702
      all_inst_info = self.cfg.GetAllInstancesInfo()
1703

    
1704
      # In Exec(), we warn about mirrored instances that have primary and
1705
      # secondary living in separate node groups. To fully verify that
1706
      # volumes for these instances are healthy, we will need to do an
1707
      # extra call to their secondaries. We ensure here those nodes will
1708
      # be locked.
1709
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1710
        # Important: access only the instances whose lock is owned
1711
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1712
          nodes.update(all_inst_info[inst].secondary_nodes)
1713

    
1714
      self.needed_locks[locking.LEVEL_NODE] = nodes
1715

    
1716
  def CheckPrereq(self):
1717
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1718
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1719

    
1720
    group_nodes = set(self.group_info.members)
1721
    group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
1722

    
1723
    unlocked_nodes = \
1724
        group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1725

    
1726
    unlocked_instances = \
1727
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1728

    
1729
    if unlocked_nodes:
1730
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1731
                                 utils.CommaJoin(unlocked_nodes))
1732

    
1733
    if unlocked_instances:
1734
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1735
                                 utils.CommaJoin(unlocked_instances))
1736

    
1737
    self.all_node_info = self.cfg.GetAllNodesInfo()
1738
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1739

    
1740
    self.my_node_names = utils.NiceSort(group_nodes)
1741
    self.my_inst_names = utils.NiceSort(group_instances)
1742

    
1743
    self.my_node_info = dict((name, self.all_node_info[name])
1744
                             for name in self.my_node_names)
1745

    
1746
    self.my_inst_info = dict((name, self.all_inst_info[name])
1747
                             for name in self.my_inst_names)
1748

    
1749
    # We detect here the nodes that will need the extra RPC calls for verifying
1750
    # split LV volumes; they should be locked.
1751
    extra_lv_nodes = set()
1752

    
1753
    for inst in self.my_inst_info.values():
1754
      if inst.disk_template in constants.DTS_INT_MIRROR:
1755
        group = self.my_node_info[inst.primary_node].group
1756
        for nname in inst.secondary_nodes:
1757
          if self.all_node_info[nname].group != group:
1758
            extra_lv_nodes.add(nname)
1759

    
1760
    unlocked_lv_nodes = \
1761
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1762

    
1763
    if unlocked_lv_nodes:
1764
      raise errors.OpPrereqError("these nodes could be locked: %s" %
1765
                                 utils.CommaJoin(unlocked_lv_nodes))
1766
    self.extra_lv_nodes = list(extra_lv_nodes)
1767

    
1768
  def _VerifyNode(self, ninfo, nresult):
1769
    """Perform some basic validation on data returned from a node.
1770

1771
      - check the result data structure is well formed and has all the
1772
        mandatory fields
1773
      - check ganeti version
1774

1775
    @type ninfo: L{objects.Node}
1776
    @param ninfo: the node to check
1777
    @param nresult: the results from the node
1778
    @rtype: boolean
1779
    @return: whether overall this call was successful (and we can expect
1780
         reasonable values in the respose)
1781

1782
    """
1783
    node = ninfo.name
1784
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1785

    
1786
    # main result, nresult should be a non-empty dict
1787
    test = not nresult or not isinstance(nresult, dict)
1788
    _ErrorIf(test, constants.CV_ENODERPC, node,
1789
                  "unable to verify node: no data returned")
1790
    if test:
1791
      return False
1792

    
1793
    # compares ganeti version
1794
    local_version = constants.PROTOCOL_VERSION
1795
    remote_version = nresult.get("version", None)
1796
    test = not (remote_version and
1797
                isinstance(remote_version, (list, tuple)) and
1798
                len(remote_version) == 2)
1799
    _ErrorIf(test, constants.CV_ENODERPC, node,
1800
             "connection to node returned invalid data")
1801
    if test:
1802
      return False
1803

    
1804
    test = local_version != remote_version[0]
1805
    _ErrorIf(test, constants.CV_ENODEVERSION, node,
1806
             "incompatible protocol versions: master %s,"
1807
             " node %s", local_version, remote_version[0])
1808
    if test:
1809
      return False
1810

    
1811
    # node seems compatible, we can actually try to look into its results
1812

    
1813
    # full package version
1814
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1815
                  constants.CV_ENODEVERSION, node,
1816
                  "software version mismatch: master %s, node %s",
1817
                  constants.RELEASE_VERSION, remote_version[1],
1818
                  code=self.ETYPE_WARNING)
1819

    
1820
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1821
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1822
      for hv_name, hv_result in hyp_result.iteritems():
1823
        test = hv_result is not None
1824
        _ErrorIf(test, constants.CV_ENODEHV, node,
1825
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1826

    
1827
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1828
    if ninfo.vm_capable and isinstance(hvp_result, list):
1829
      for item, hv_name, hv_result in hvp_result:
1830
        _ErrorIf(True, constants.CV_ENODEHV, node,
1831
                 "hypervisor %s parameter verify failure (source %s): %s",
1832
                 hv_name, item, hv_result)
1833

    
1834
    test = nresult.get(constants.NV_NODESETUP,
1835
                       ["Missing NODESETUP results"])
1836
    _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1837
             "; ".join(test))
1838

    
1839
    return True
1840

    
1841
  def _VerifyNodeTime(self, ninfo, nresult,
1842
                      nvinfo_starttime, nvinfo_endtime):
1843
    """Check the node time.
1844

1845
    @type ninfo: L{objects.Node}
1846
    @param ninfo: the node to check
1847
    @param nresult: the remote results for the node
1848
    @param nvinfo_starttime: the start time of the RPC call
1849
    @param nvinfo_endtime: the end time of the RPC call
1850

1851
    """
1852
    node = ninfo.name
1853
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1854

    
1855
    ntime = nresult.get(constants.NV_TIME, None)
1856
    try:
1857
      ntime_merged = utils.MergeTime(ntime)
1858
    except (ValueError, TypeError):
1859
      _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1860
      return
1861

    
1862
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1863
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1864
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1865
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1866
    else:
1867
      ntime_diff = None
1868

    
1869
    _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1870
             "Node time diverges by at least %s from master node time",
1871
             ntime_diff)
1872

    
1873
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1874
    """Check the node LVM results.
1875

1876
    @type ninfo: L{objects.Node}
1877
    @param ninfo: the node to check
1878
    @param nresult: the remote results for the node
1879
    @param vg_name: the configured VG name
1880

1881
    """
1882
    if vg_name is None:
1883
      return
1884

    
1885
    node = ninfo.name
1886
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1887

    
1888
    # checks vg existence and size > 20G
1889
    vglist = nresult.get(constants.NV_VGLIST, None)
1890
    test = not vglist
1891
    _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1892
    if not test:
1893
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1894
                                            constants.MIN_VG_SIZE)
1895
      _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1896

    
1897
    # check pv names
1898
    pvlist = nresult.get(constants.NV_PVLIST, None)
1899
    test = pvlist is None
1900
    _ErrorIf(test, constants.CV_ENODELVM, node, "Can't get PV list from node")
1901
    if not test:
1902
      # check that ':' is not present in PV names, since it's a
1903
      # special character for lvcreate (denotes the range of PEs to
1904
      # use on the PV)
1905
      for _, pvname, owner_vg in pvlist:
1906
        test = ":" in pvname
1907
        _ErrorIf(test, constants.CV_ENODELVM, node,
1908
                 "Invalid character ':' in PV '%s' of VG '%s'",
1909
                 pvname, owner_vg)
1910

    
1911
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1912
    """Check the node bridges.
1913

1914
    @type ninfo: L{objects.Node}
1915
    @param ninfo: the node to check
1916
    @param nresult: the remote results for the node
1917
    @param bridges: the expected list of bridges
1918

1919
    """
1920
    if not bridges:
1921
      return
1922

    
1923
    node = ninfo.name
1924
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1925

    
1926
    missing = nresult.get(constants.NV_BRIDGES, None)
1927
    test = not isinstance(missing, list)
1928
    _ErrorIf(test, constants.CV_ENODENET, node,
1929
             "did not return valid bridge information")
1930
    if not test:
1931
      _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1932
               "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1933

    
1934
  def _VerifyNodeNetwork(self, ninfo, nresult):
1935
    """Check the node network connectivity results.
1936

1937
    @type ninfo: L{objects.Node}
1938
    @param ninfo: the node to check
1939
    @param nresult: the remote results for the node
1940

1941
    """
1942
    node = ninfo.name
1943
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1944

    
1945
    test = constants.NV_NODELIST not in nresult
1946
    _ErrorIf(test, constants.CV_ENODESSH, node,
1947
             "node hasn't returned node ssh connectivity data")
1948
    if not test:
1949
      if nresult[constants.NV_NODELIST]:
1950
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1951
          _ErrorIf(True, constants.CV_ENODESSH, node,
1952
                   "ssh communication with node '%s': %s", a_node, a_msg)
1953

    
1954
    test = constants.NV_NODENETTEST not in nresult
1955
    _ErrorIf(test, constants.CV_ENODENET, node,
1956
             "node hasn't returned node tcp connectivity data")
1957
    if not test:
1958
      if nresult[constants.NV_NODENETTEST]:
1959
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1960
        for anode in nlist:
1961
          _ErrorIf(True, constants.CV_ENODENET, node,
1962
                   "tcp communication with node '%s': %s",
1963
                   anode, nresult[constants.NV_NODENETTEST][anode])
1964

    
1965
    test = constants.NV_MASTERIP not in nresult
1966
    _ErrorIf(test, constants.CV_ENODENET, node,
1967
             "node hasn't returned node master IP reachability data")
1968
    if not test:
1969
      if not nresult[constants.NV_MASTERIP]:
1970
        if node == self.master_node:
1971
          msg = "the master node cannot reach the master IP (not configured?)"
1972
        else:
1973
          msg = "cannot reach the master IP"
1974
        _ErrorIf(True, constants.CV_ENODENET, node, msg)
1975

    
1976
  def _VerifyInstance(self, instance, instanceconfig, node_image,
1977
                      diskstatus):
1978
    """Verify an instance.
1979

1980
    This function checks to see if the required block devices are
1981
    available on the instance's node.
1982

1983
    """
1984
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1985
    node_current = instanceconfig.primary_node
1986

    
1987
    node_vol_should = {}
1988
    instanceconfig.MapLVsByNode(node_vol_should)
1989

    
1990
    for node in node_vol_should:
1991
      n_img = node_image[node]
1992
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1993
        # ignore missing volumes on offline or broken nodes
1994
        continue
1995
      for volume in node_vol_should[node]:
1996
        test = volume not in n_img.volumes
1997
        _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1998
                 "volume %s missing on node %s", volume, node)
1999

    
2000
    if instanceconfig.admin_up:
2001
      pri_img = node_image[node_current]
2002
      test = instance not in pri_img.instances and not pri_img.offline
2003
      _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
2004
               "instance not running on its primary node %s",
2005
               node_current)
2006

    
2007
    diskdata = [(nname, success, status, idx)
2008
                for (nname, disks) in diskstatus.items()
2009
                for idx, (success, status) in enumerate(disks)]
2010

    
2011
    for nname, success, bdev_status, idx in diskdata:
2012
      # the 'ghost node' construction in Exec() ensures that we have a
2013
      # node here
2014
      snode = node_image[nname]
2015
      bad_snode = snode.ghost or snode.offline
2016
      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
2017
               constants.CV_EINSTANCEFAULTYDISK, instance,
2018
               "couldn't retrieve status for disk/%s on %s: %s",
2019
               idx, nname, bdev_status)
2020
      _ErrorIf((instanceconfig.admin_up and success and
2021
                bdev_status.ldisk_status == constants.LDS_FAULTY),
2022
               constants.CV_EINSTANCEFAULTYDISK, instance,
2023
               "disk/%s on %s is faulty", idx, nname)
2024

    
2025
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2026
    """Verify if there are any unknown volumes in the cluster.
2027

2028
    The .os, .swap and backup volumes are ignored. All other volumes are
2029
    reported as unknown.
2030

2031
    @type reserved: L{ganeti.utils.FieldSet}
2032
    @param reserved: a FieldSet of reserved volume names
2033

2034
    """
2035
    for node, n_img in node_image.items():
2036
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2037
        # skip non-healthy nodes
2038
        continue
2039
      for volume in n_img.volumes:
2040
        test = ((node not in node_vol_should or
2041
                volume not in node_vol_should[node]) and
2042
                not reserved.Matches(volume))
2043
        self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
2044
                      "volume %s is unknown", volume)
2045

    
2046
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
2047
    """Verify N+1 Memory Resilience.
2048

2049
    Check that if one single node dies we can still start all the
2050
    instances it was primary for.
2051

2052
    """
2053
    cluster_info = self.cfg.GetClusterInfo()
2054
    for node, n_img in node_image.items():
2055
      # This code checks that every node which is now listed as
2056
      # secondary has enough memory to host all instances it is
2057
      # supposed to should a single other node in the cluster fail.
2058
      # FIXME: not ready for failover to an arbitrary node
2059
      # FIXME: does not support file-backed instances
2060
      # WARNING: we currently take into account down instances as well
2061
      # as up ones, considering that even if they're down someone
2062
      # might want to start them even in the event of a node failure.
2063
      if n_img.offline:
2064
        # we're skipping offline nodes from the N+1 warning, since
2065
        # most likely we don't have good memory infromation from them;
2066
        # we already list instances living on such nodes, and that's
2067
        # enough warning
2068
        continue
2069
      for prinode, instances in n_img.sbp.items():
2070
        needed_mem = 0
2071
        for instance in instances:
2072
          bep = cluster_info.FillBE(instance_cfg[instance])
2073
          if bep[constants.BE_AUTO_BALANCE]:
2074
            needed_mem += bep[constants.BE_MEMORY]
2075
        test = n_img.mfree < needed_mem
2076
        self._ErrorIf(test, constants.CV_ENODEN1, node,
2077
                      "not enough memory to accomodate instance failovers"
2078
                      " should node %s fail (%dMiB needed, %dMiB available)",
2079
                      prinode, needed_mem, n_img.mfree)
2080

    
2081
  @classmethod
2082
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
2083
                   (files_all, files_all_opt, files_mc, files_vm)):
2084
    """Verifies file checksums collected from all nodes.
2085

2086
    @param errorif: Callback for reporting errors
2087
    @param nodeinfo: List of L{objects.Node} objects
2088
    @param master_node: Name of master node
2089
    @param all_nvinfo: RPC results
2090

2091
    """
2092
    assert (len(files_all | files_all_opt | files_mc | files_vm) ==
2093
            sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
2094
           "Found file listed in more than one file list"
2095

    
2096
    # Define functions determining which nodes to consider for a file
2097
    files2nodefn = [
2098
      (files_all, None),
2099
      (files_all_opt, None),
2100
      (files_mc, lambda node: (node.master_candidate or
2101
                               node.name == master_node)),
2102
      (files_vm, lambda node: node.vm_capable),
2103
      ]
2104

    
2105
    # Build mapping from filename to list of nodes which should have the file
2106
    nodefiles = {}
2107
    for (files, fn) in files2nodefn:
2108
      if fn is None:
2109
        filenodes = nodeinfo
2110
      else:
2111
        filenodes = filter(fn, nodeinfo)
2112
      nodefiles.update((filename,
2113
                        frozenset(map(operator.attrgetter("name"), filenodes)))
2114
                       for filename in files)
2115

    
2116
    assert set(nodefiles) == (files_all | files_all_opt | files_mc | files_vm)
2117

    
2118
    fileinfo = dict((filename, {}) for filename in nodefiles)
2119
    ignore_nodes = set()
2120

    
2121
    for node in nodeinfo:
2122
      if node.offline:
2123
        ignore_nodes.add(node.name)
2124
        continue
2125

    
2126
      nresult = all_nvinfo[node.name]
2127

    
2128
      if nresult.fail_msg or not nresult.payload:
2129
        node_files = None
2130
      else:
2131
        node_files = nresult.payload.get(constants.NV_FILELIST, None)
2132

    
2133
      test = not (node_files and isinstance(node_files, dict))
2134
      errorif(test, constants.CV_ENODEFILECHECK, node.name,
2135
              "Node did not return file checksum data")
2136
      if test:
2137
        ignore_nodes.add(node.name)
2138
        continue
2139

    
2140
      # Build per-checksum mapping from filename to nodes having it
2141
      for (filename, checksum) in node_files.items():
2142
        assert filename in nodefiles
2143
        fileinfo[filename].setdefault(checksum, set()).add(node.name)
2144

    
2145
    for (filename, checksums) in fileinfo.items():
2146
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2147

    
2148
      # Nodes having the file
2149
      with_file = frozenset(node_name
2150
                            for nodes in fileinfo[filename].values()
2151
                            for node_name in nodes) - ignore_nodes
2152

    
2153
      expected_nodes = nodefiles[filename] - ignore_nodes
2154

    
2155
      # Nodes missing file
2156
      missing_file = expected_nodes - with_file
2157

    
2158
      if filename in files_all_opt:
2159
        # All or no nodes
2160
        errorif(missing_file and missing_file != expected_nodes,
2161
                constants.CV_ECLUSTERFILECHECK, None,
2162
                "File %s is optional, but it must exist on all or no"
2163
                " nodes (not found on %s)",
2164
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2165
      else:
2166
        errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2167
                "File %s is missing from node(s) %s", filename,
2168
                utils.CommaJoin(utils.NiceSort(missing_file)))
2169

    
2170
        # Warn if a node has a file it shouldn't
2171
        unexpected = with_file - expected_nodes
2172
        errorif(unexpected,
2173
                constants.CV_ECLUSTERFILECHECK, None,
2174
                "File %s should not exist on node(s) %s",
2175
                filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2176

    
2177
      # See if there are multiple versions of the file
2178
      test = len(checksums) > 1
2179
      if test:
2180
        variants = ["variant %s on %s" %
2181
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2182
                    for (idx, (checksum, nodes)) in
2183
                      enumerate(sorted(checksums.items()))]
2184
      else:
2185
        variants = []
2186

    
2187
      errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2188
              "File %s found with %s different checksums (%s)",
2189
              filename, len(checksums), "; ".join(variants))
2190

    
2191
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2192
                      drbd_map):
2193
    """Verifies and the node DRBD status.
2194

2195
    @type ninfo: L{objects.Node}
2196
    @param ninfo: the node to check
2197
    @param nresult: the remote results for the node
2198
    @param instanceinfo: the dict of instances
2199
    @param drbd_helper: the configured DRBD usermode helper
2200
    @param drbd_map: the DRBD map as returned by
2201
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2202

2203
    """
2204
    node = ninfo.name
2205
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2206

    
2207
    if drbd_helper:
2208
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2209
      test = (helper_result == None)
2210
      _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2211
               "no drbd usermode helper returned")
2212
      if helper_result:
2213
        status, payload = helper_result
2214
        test = not status
2215
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2216
                 "drbd usermode helper check unsuccessful: %s", payload)
2217
        test = status and (payload != drbd_helper)
2218
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2219
                 "wrong drbd usermode helper: %s", payload)
2220

    
2221
    # compute the DRBD minors
2222
    node_drbd = {}
2223
    for minor, instance in drbd_map[node].items():
2224
      test = instance not in instanceinfo
2225
      _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2226
               "ghost instance '%s' in temporary DRBD map", instance)
2227
        # ghost instance should not be running, but otherwise we
2228
        # don't give double warnings (both ghost instance and
2229
        # unallocated minor in use)
2230
      if test:
2231
        node_drbd[minor] = (instance, False)
2232
      else:
2233
        instance = instanceinfo[instance]
2234
        node_drbd[minor] = (instance.name, instance.admin_up)
2235

    
2236
    # and now check them
2237
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2238
    test = not isinstance(used_minors, (tuple, list))
2239
    _ErrorIf(test, constants.CV_ENODEDRBD, node,
2240
             "cannot parse drbd status file: %s", str(used_minors))
2241
    if test:
2242
      # we cannot check drbd status
2243
      return
2244

    
2245
    for minor, (iname, must_exist) in node_drbd.items():
2246
      test = minor not in used_minors and must_exist
2247
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2248
               "drbd minor %d of instance %s is not active", minor, iname)
2249
    for minor in used_minors:
2250
      test = minor not in node_drbd
2251
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2252
               "unallocated drbd minor %d is in use", minor)
2253

    
2254
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2255
    """Builds the node OS structures.
2256

2257
    @type ninfo: L{objects.Node}
2258
    @param ninfo: the node to check
2259
    @param nresult: the remote results for the node
2260
    @param nimg: the node image object
2261

2262
    """
2263
    node = ninfo.name
2264
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2265

    
2266
    remote_os = nresult.get(constants.NV_OSLIST, None)
2267
    test = (not isinstance(remote_os, list) or
2268
            not compat.all(isinstance(v, list) and len(v) == 7
2269
                           for v in remote_os))
2270

    
2271
    _ErrorIf(test, constants.CV_ENODEOS, node,
2272
             "node hasn't returned valid OS data")
2273

    
2274
    nimg.os_fail = test
2275

    
2276
    if test:
2277
      return
2278

    
2279
    os_dict = {}
2280

    
2281
    for (name, os_path, status, diagnose,
2282
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2283

    
2284
      if name not in os_dict:
2285
        os_dict[name] = []
2286

    
2287
      # parameters is a list of lists instead of list of tuples due to
2288
      # JSON lacking a real tuple type, fix it:
2289
      parameters = [tuple(v) for v in parameters]
2290
      os_dict[name].append((os_path, status, diagnose,
2291
                            set(variants), set(parameters), set(api_ver)))
2292

    
2293
    nimg.oslist = os_dict
2294

    
2295
  def _VerifyNodeOS(self, ninfo, nimg, base):
2296
    """Verifies the node OS list.
2297

2298
    @type ninfo: L{objects.Node}
2299
    @param ninfo: the node to check
2300
    @param nimg: the node image object
2301
    @param base: the 'template' node we match against (e.g. from the master)
2302

2303
    """
2304
    node = ninfo.name
2305
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2306

    
2307
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2308

    
2309
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2310
    for os_name, os_data in nimg.oslist.items():
2311
      assert os_data, "Empty OS status for OS %s?!" % os_name
2312
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2313
      _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2314
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2315
      _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2316
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2317
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2318
      # comparisons with the 'base' image
2319
      test = os_name not in base.oslist
2320
      _ErrorIf(test, constants.CV_ENODEOS, node,
2321
               "Extra OS %s not present on reference node (%s)",
2322
               os_name, base.name)
2323
      if test:
2324
        continue
2325
      assert base.oslist[os_name], "Base node has empty OS status?"
2326
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2327
      if not b_status:
2328
        # base OS is invalid, skipping
2329
        continue
2330
      for kind, a, b in [("API version", f_api, b_api),
2331
                         ("variants list", f_var, b_var),
2332
                         ("parameters", beautify_params(f_param),
2333
                          beautify_params(b_param))]:
2334
        _ErrorIf(a != b, constants.CV_ENODEOS, node,
2335
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2336
                 kind, os_name, base.name,
2337
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2338

    
2339
    # check any missing OSes
2340
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2341
    _ErrorIf(missing, constants.CV_ENODEOS, node,
2342
             "OSes present on reference node %s but missing on this node: %s",
2343
             base.name, utils.CommaJoin(missing))
2344

    
2345
  def _VerifyOob(self, ninfo, nresult):
2346
    """Verifies out of band functionality of a node.
2347

2348
    @type ninfo: L{objects.Node}
2349
    @param ninfo: the node to check
2350
    @param nresult: the remote results for the node
2351

2352
    """
2353
    node = ninfo.name
2354
    # We just have to verify the paths on master and/or master candidates
2355
    # as the oob helper is invoked on the master
2356
    if ((ninfo.master_candidate or ninfo.master_capable) and
2357
        constants.NV_OOB_PATHS in nresult):
2358
      for path_result in nresult[constants.NV_OOB_PATHS]:
2359
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2360

    
2361
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2362
    """Verifies and updates the node volume data.
2363

2364
    This function will update a L{NodeImage}'s internal structures
2365
    with data from the remote call.
2366

2367
    @type ninfo: L{objects.Node}
2368
    @param ninfo: the node to check
2369
    @param nresult: the remote results for the node
2370
    @param nimg: the node image object
2371
    @param vg_name: the configured VG name
2372

2373
    """
2374
    node = ninfo.name
2375
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2376

    
2377
    nimg.lvm_fail = True
2378
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2379
    if vg_name is None:
2380
      pass
2381
    elif isinstance(lvdata, basestring):
2382
      _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2383
               utils.SafeEncode(lvdata))
2384
    elif not isinstance(lvdata, dict):
2385
      _ErrorIf(True, constants.CV_ENODELVM, node,
2386
               "rpc call to node failed (lvlist)")
2387
    else:
2388
      nimg.volumes = lvdata
2389
      nimg.lvm_fail = False
2390

    
2391
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2392
    """Verifies and updates the node instance list.
2393

2394
    If the listing was successful, then updates this node's instance
2395
    list. Otherwise, it marks the RPC call as failed for the instance
2396
    list key.
2397

2398
    @type ninfo: L{objects.Node}
2399
    @param ninfo: the node to check
2400
    @param nresult: the remote results for the node
2401
    @param nimg: the node image object
2402

2403
    """
2404
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2405
    test = not isinstance(idata, list)
2406
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2407
                  "rpc call to node failed (instancelist): %s",
2408
                  utils.SafeEncode(str(idata)))
2409
    if test:
2410
      nimg.hyp_fail = True
2411
    else:
2412
      nimg.instances = idata
2413

    
2414
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2415
    """Verifies and computes a node information map
2416

2417
    @type ninfo: L{objects.Node}
2418
    @param ninfo: the node to check
2419
    @param nresult: the remote results for the node
2420
    @param nimg: the node image object
2421
    @param vg_name: the configured VG name
2422

2423
    """
2424
    node = ninfo.name
2425
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2426

    
2427
    # try to read free memory (from the hypervisor)
2428
    hv_info = nresult.get(constants.NV_HVINFO, None)
2429
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2430
    _ErrorIf(test, constants.CV_ENODEHV, node,
2431
             "rpc call to node failed (hvinfo)")
2432
    if not test:
2433
      try:
2434
        nimg.mfree = int(hv_info["memory_free"])
2435
      except (ValueError, TypeError):
2436
        _ErrorIf(True, constants.CV_ENODERPC, node,
2437
                 "node returned invalid nodeinfo, check hypervisor")
2438

    
2439
    # FIXME: devise a free space model for file based instances as well
2440
    if vg_name is not None:
2441
      test = (constants.NV_VGLIST not in nresult or
2442
              vg_name not in nresult[constants.NV_VGLIST])
2443
      _ErrorIf(test, constants.CV_ENODELVM, node,
2444
               "node didn't return data for the volume group '%s'"
2445
               " - it is either missing or broken", vg_name)
2446
      if not test:
2447
        try:
2448
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2449
        except (ValueError, TypeError):
2450
          _ErrorIf(True, constants.CV_ENODERPC, node,
2451
                   "node returned invalid LVM info, check LVM status")
2452

    
2453
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2454
    """Gets per-disk status information for all instances.
2455

2456
    @type nodelist: list of strings
2457
    @param nodelist: Node names
2458
    @type node_image: dict of (name, L{objects.Node})
2459
    @param node_image: Node objects
2460
    @type instanceinfo: dict of (name, L{objects.Instance})
2461
    @param instanceinfo: Instance objects
2462
    @rtype: {instance: {node: [(succes, payload)]}}
2463
    @return: a dictionary of per-instance dictionaries with nodes as
2464
        keys and disk information as values; the disk information is a
2465
        list of tuples (success, payload)
2466

2467
    """
2468
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2469

    
2470
    node_disks = {}
2471
    node_disks_devonly = {}
2472
    diskless_instances = set()
2473
    diskless = constants.DT_DISKLESS
2474

    
2475
    for nname in nodelist:
2476
      node_instances = list(itertools.chain(node_image[nname].pinst,
2477
                                            node_image[nname].sinst))
2478
      diskless_instances.update(inst for inst in node_instances
2479
                                if instanceinfo[inst].disk_template == diskless)
2480
      disks = [(inst, disk)
2481
               for inst in node_instances
2482
               for disk in instanceinfo[inst].disks]
2483

    
2484
      if not disks:
2485
        # No need to collect data
2486
        continue
2487

    
2488
      node_disks[nname] = disks
2489

    
2490
      # Creating copies as SetDiskID below will modify the objects and that can
2491
      # lead to incorrect data returned from nodes
2492
      devonly = [dev.Copy() for (_, dev) in disks]
2493

    
2494
      for dev in devonly:
2495
        self.cfg.SetDiskID(dev, nname)
2496

    
2497
      node_disks_devonly[nname] = devonly
2498

    
2499
    assert len(node_disks) == len(node_disks_devonly)
2500

    
2501
    # Collect data from all nodes with disks
2502
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2503
                                                          node_disks_devonly)
2504

    
2505
    assert len(result) == len(node_disks)
2506

    
2507
    instdisk = {}
2508

    
2509
    for (nname, nres) in result.items():
2510
      disks = node_disks[nname]
2511

    
2512
      if nres.offline:
2513
        # No data from this node
2514
        data = len(disks) * [(False, "node offline")]
2515
      else:
2516
        msg = nres.fail_msg
2517
        _ErrorIf(msg, constants.CV_ENODERPC, nname,
2518
                 "while getting disk information: %s", msg)
2519
        if msg:
2520
          # No data from this node
2521
          data = len(disks) * [(False, msg)]
2522
        else:
2523
          data = []
2524
          for idx, i in enumerate(nres.payload):
2525
            if isinstance(i, (tuple, list)) and len(i) == 2:
2526
              data.append(i)
2527
            else:
2528
              logging.warning("Invalid result from node %s, entry %d: %s",
2529
                              nname, idx, i)
2530
              data.append((False, "Invalid result from the remote node"))
2531

    
2532
      for ((inst, _), status) in zip(disks, data):
2533
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2534

    
2535
    # Add empty entries for diskless instances.
2536
    for inst in diskless_instances:
2537
      assert inst not in instdisk
2538
      instdisk[inst] = {}
2539

    
2540
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2541
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2542
                      compat.all(isinstance(s, (tuple, list)) and
2543
                                 len(s) == 2 for s in statuses)
2544
                      for inst, nnames in instdisk.items()
2545
                      for nname, statuses in nnames.items())
2546
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2547

    
2548
    return instdisk
2549

    
2550
  @staticmethod
2551
  def _SshNodeSelector(group_uuid, all_nodes):
2552
    """Create endless iterators for all potential SSH check hosts.
2553

2554
    """
2555
    nodes = [node for node in all_nodes
2556
             if (node.group != group_uuid and
2557
                 not node.offline)]
2558
    keyfunc = operator.attrgetter("group")
2559

    
2560
    return map(itertools.cycle,
2561
               [sorted(map(operator.attrgetter("name"), names))
2562
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2563
                                                  keyfunc)])
2564

    
2565
  @classmethod
2566
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2567
    """Choose which nodes should talk to which other nodes.
2568

2569
    We will make nodes contact all nodes in their group, and one node from
2570
    every other group.
2571

2572
    @warning: This algorithm has a known issue if one node group is much
2573
      smaller than others (e.g. just one node). In such a case all other
2574
      nodes will talk to the single node.
2575

2576
    """
2577
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2578
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2579

    
2580
    return (online_nodes,
2581
            dict((name, sorted([i.next() for i in sel]))
2582
                 for name in online_nodes))
2583

    
2584
  def BuildHooksEnv(self):
2585
    """Build hooks env.
2586

2587
    Cluster-Verify hooks just ran in the post phase and their failure makes
2588
    the output be logged in the verify output and the verification to fail.
2589

2590
    """
2591
    env = {
2592
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2593
      }
2594

    
2595
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2596
               for node in self.my_node_info.values())
2597

    
2598
    return env
2599

    
2600
  def BuildHooksNodes(self):
2601
    """Build hooks nodes.
2602

2603
    """
2604
    return ([], self.my_node_names)
2605

    
2606
  def Exec(self, feedback_fn):
2607
    """Verify integrity of the node group, performing various test on nodes.
2608

2609
    """
2610
    # This method has too many local variables. pylint: disable=R0914
2611
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2612

    
2613
    if not self.my_node_names:
2614
      # empty node group
2615
      feedback_fn("* Empty node group, skipping verification")
2616
      return True
2617

    
2618
    self.bad = False
2619
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2620
    verbose = self.op.verbose
2621
    self._feedback_fn = feedback_fn
2622

    
2623
    vg_name = self.cfg.GetVGName()
2624
    drbd_helper = self.cfg.GetDRBDHelper()
2625
    cluster = self.cfg.GetClusterInfo()
2626
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2627
    hypervisors = cluster.enabled_hypervisors
2628
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2629

    
2630
    i_non_redundant = [] # Non redundant instances
2631
    i_non_a_balanced = [] # Non auto-balanced instances
2632
    n_offline = 0 # Count of offline nodes
2633
    n_drained = 0 # Count of nodes being drained
2634
    node_vol_should = {}
2635

    
2636
    # FIXME: verify OS list
2637

    
2638
    # File verification
2639
    filemap = _ComputeAncillaryFiles(cluster, False)
2640

    
2641
    # do local checksums
2642
    master_node = self.master_node = self.cfg.GetMasterNode()
2643
    master_ip = self.cfg.GetMasterIP()
2644

    
2645
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2646

    
2647
    node_verify_param = {
2648
      constants.NV_FILELIST:
2649
        utils.UniqueSequence(filename
2650
                             for files in filemap
2651
                             for filename in files),
2652
      constants.NV_NODELIST:
2653
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2654
                                  self.all_node_info.values()),
2655
      constants.NV_HYPERVISOR: hypervisors,
2656
      constants.NV_HVPARAMS:
2657
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2658
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2659
                                 for node in node_data_list
2660
                                 if not node.offline],
2661
      constants.NV_INSTANCELIST: hypervisors,
2662
      constants.NV_VERSION: None,
2663
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2664
      constants.NV_NODESETUP: None,
2665
      constants.NV_TIME: None,
2666
      constants.NV_MASTERIP: (master_node, master_ip),
2667
      constants.NV_OSLIST: None,
2668
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2669
      }
2670

    
2671
    if vg_name is not None:
2672
      node_verify_param[constants.NV_VGLIST] = None
2673
      node_verify_param[constants.NV_LVLIST] = vg_name
2674
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2675
      node_verify_param[constants.NV_DRBDLIST] = None
2676

    
2677
    if drbd_helper:
2678
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2679

    
2680
    # bridge checks
2681
    # FIXME: this needs to be changed per node-group, not cluster-wide
2682
    bridges = set()
2683
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2684
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2685
      bridges.add(default_nicpp[constants.NIC_LINK])
2686
    for instance in self.my_inst_info.values():
2687
      for nic in instance.nics:
2688
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2689
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2690
          bridges.add(full_nic[constants.NIC_LINK])
2691

    
2692
    if bridges:
2693
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2694

    
2695
    # Build our expected cluster state
2696
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2697
                                                 name=node.name,
2698
                                                 vm_capable=node.vm_capable))
2699
                      for node in node_data_list)
2700

    
2701
    # Gather OOB paths
2702
    oob_paths = []
2703
    for node in self.all_node_info.values():
2704
      path = _SupportsOob(self.cfg, node)
2705
      if path and path not in oob_paths:
2706
        oob_paths.append(path)
2707

    
2708
    if oob_paths:
2709
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2710

    
2711
    for instance in self.my_inst_names:
2712
      inst_config = self.my_inst_info[instance]
2713

    
2714
      for nname in inst_config.all_nodes:
2715
        if nname not in node_image:
2716
          gnode = self.NodeImage(name=nname)
2717
          gnode.ghost = (nname not in self.all_node_info)
2718
          node_image[nname] = gnode
2719

    
2720
      inst_config.MapLVsByNode(node_vol_should)
2721

    
2722
      pnode = inst_config.primary_node
2723
      node_image[pnode].pinst.append(instance)
2724

    
2725
      for snode in inst_config.secondary_nodes:
2726
        nimg = node_image[snode]
2727
        nimg.sinst.append(instance)
2728
        if pnode not in nimg.sbp:
2729
          nimg.sbp[pnode] = []
2730
        nimg.sbp[pnode].append(instance)
2731

    
2732
    # At this point, we have the in-memory data structures complete,
2733
    # except for the runtime information, which we'll gather next
2734

    
2735
    # Due to the way our RPC system works, exact response times cannot be
2736
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2737
    # time before and after executing the request, we can at least have a time
2738
    # window.
2739
    nvinfo_starttime = time.time()
2740
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2741
                                           node_verify_param,
2742
                                           self.cfg.GetClusterName())
2743
    nvinfo_endtime = time.time()
2744

    
2745
    if self.extra_lv_nodes and vg_name is not None:
2746
      extra_lv_nvinfo = \
2747
          self.rpc.call_node_verify(self.extra_lv_nodes,
2748
                                    {constants.NV_LVLIST: vg_name},
2749
                                    self.cfg.GetClusterName())
2750
    else:
2751
      extra_lv_nvinfo = {}
2752

    
2753
    all_drbd_map = self.cfg.ComputeDRBDMap()
2754

    
2755
    feedback_fn("* Gathering disk information (%s nodes)" %
2756
                len(self.my_node_names))
2757
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2758
                                     self.my_inst_info)
2759

    
2760
    feedback_fn("* Verifying configuration file consistency")
2761

    
2762
    # If not all nodes are being checked, we need to make sure the master node
2763
    # and a non-checked vm_capable node are in the list.
2764
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2765
    if absent_nodes:
2766
      vf_nvinfo = all_nvinfo.copy()
2767
      vf_node_info = list(self.my_node_info.values())
2768
      additional_nodes = []
2769
      if master_node not in self.my_node_info:
2770
        additional_nodes.append(master_node)
2771
        vf_node_info.append(self.all_node_info[master_node])
2772
      # Add the first vm_capable node we find which is not included
2773
      for node in absent_nodes:
2774
        nodeinfo = self.all_node_info[node]
2775
        if nodeinfo.vm_capable and not nodeinfo.offline:
2776
          additional_nodes.append(node)
2777
          vf_node_info.append(self.all_node_info[node])
2778
          break
2779
      key = constants.NV_FILELIST
2780
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2781
                                                 {key: node_verify_param[key]},
2782
                                                 self.cfg.GetClusterName()))
2783
    else:
2784
      vf_nvinfo = all_nvinfo
2785
      vf_node_info = self.my_node_info.values()
2786

    
2787
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2788

    
2789
    feedback_fn("* Verifying node status")
2790

    
2791
    refos_img = None
2792

    
2793
    for node_i in node_data_list:
2794
      node = node_i.name
2795
      nimg = node_image[node]
2796

    
2797
      if node_i.offline:
2798
        if verbose:
2799
          feedback_fn("* Skipping offline node %s" % (node,))
2800
        n_offline += 1
2801
        continue
2802

    
2803
      if node == master_node:
2804
        ntype = "master"
2805
      elif node_i.master_candidate:
2806
        ntype = "master candidate"
2807
      elif node_i.drained:
2808
        ntype = "drained"
2809
        n_drained += 1
2810
      else:
2811
        ntype = "regular"
2812
      if verbose:
2813
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2814

    
2815
      msg = all_nvinfo[node].fail_msg
2816
      _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2817
               msg)
2818
      if msg:
2819
        nimg.rpc_fail = True
2820
        continue
2821

    
2822
      nresult = all_nvinfo[node].payload
2823

    
2824
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2825
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2826
      self._VerifyNodeNetwork(node_i, nresult)
2827
      self._VerifyOob(node_i, nresult)
2828

    
2829
      if nimg.vm_capable:
2830
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2831
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2832
                             all_drbd_map)
2833

    
2834
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2835
        self._UpdateNodeInstances(node_i, nresult, nimg)
2836
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2837
        self._UpdateNodeOS(node_i, nresult, nimg)
2838

    
2839
        if not nimg.os_fail:
2840
          if refos_img is None:
2841
            refos_img = nimg
2842
          self._VerifyNodeOS(node_i, nimg, refos_img)
2843
        self._VerifyNodeBridges(node_i, nresult, bridges)
2844

    
2845
        # Check whether all running instancies are primary for the node. (This
2846
        # can no longer be done from _VerifyInstance below, since some of the
2847
        # wrong instances could be from other node groups.)
2848
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2849

    
2850
        for inst in non_primary_inst:
2851
          test = inst in self.all_inst_info
2852
          _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2853
                   "instance should not run on node %s", node_i.name)
2854
          _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2855
                   "node is running unknown instance %s", inst)
2856

    
2857
    for node, result in extra_lv_nvinfo.items():
2858
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2859
                              node_image[node], vg_name)
2860

    
2861
    feedback_fn("* Verifying instance status")
2862
    for instance in self.my_inst_names:
2863
      if verbose:
2864
        feedback_fn("* Verifying instance %s" % instance)
2865
      inst_config = self.my_inst_info[instance]
2866
      self._VerifyInstance(instance, inst_config, node_image,
2867
                           instdisk[instance])
2868
      inst_nodes_offline = []
2869

    
2870
      pnode = inst_config.primary_node
2871
      pnode_img = node_image[pnode]
2872
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2873
               constants.CV_ENODERPC, pnode, "instance %s, connection to"
2874
               " primary node failed", instance)
2875

    
2876
      _ErrorIf(inst_config.admin_up and pnode_img.offline,
2877
               constants.CV_EINSTANCEBADNODE, instance,
2878
               "instance is marked as running and lives on offline node %s",
2879
               inst_config.primary_node)
2880

    
2881
      # If the instance is non-redundant we cannot survive losing its primary
2882
      # node, so we are not N+1 compliant. On the other hand we have no disk
2883
      # templates with more than one secondary so that situation is not well
2884
      # supported either.
2885
      # FIXME: does not support file-backed instances
2886
      if not inst_config.secondary_nodes:
2887
        i_non_redundant.append(instance)
2888

    
2889
      _ErrorIf(len(inst_config.secondary_nodes) > 1,
2890
               constants.CV_EINSTANCELAYOUT,
2891
               instance, "instance has multiple secondary nodes: %s",
2892
               utils.CommaJoin(inst_config.secondary_nodes),
2893
               code=self.ETYPE_WARNING)
2894

    
2895
      if inst_config.disk_template in constants.DTS_INT_MIRROR:
2896
        pnode = inst_config.primary_node
2897
        instance_nodes = utils.NiceSort(inst_config.all_nodes)
2898
        instance_groups = {}
2899

    
2900
        for node in instance_nodes:
2901
          instance_groups.setdefault(self.all_node_info[node].group,
2902
                                     []).append(node)
2903

    
2904
        pretty_list = [
2905
          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
2906
          # Sort so that we always list the primary node first.
2907
          for group, nodes in sorted(instance_groups.items(),
2908
                                     key=lambda (_, nodes): pnode in nodes,
2909
                                     reverse=True)]
2910

    
2911
        self._ErrorIf(len(instance_groups) > 1,
2912
                      constants.CV_EINSTANCESPLITGROUPS,
2913
                      instance, "instance has primary and secondary nodes in"
2914
                      " different groups: %s", utils.CommaJoin(pretty_list),
2915
                      code=self.ETYPE_WARNING)
2916

    
2917
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2918
        i_non_a_balanced.append(instance)
2919

    
2920
      for snode in inst_config.secondary_nodes:
2921
        s_img = node_image[snode]
2922
        _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2923
                 snode, "instance %s, connection to secondary node failed",
2924
                 instance)
2925

    
2926
        if s_img.offline:
2927
          inst_nodes_offline.append(snode)
2928

    
2929
      # warn that the instance lives on offline nodes
2930
      _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
2931
               "instance has offline secondary node(s) %s",
2932
               utils.CommaJoin(inst_nodes_offline))
2933
      # ... or ghost/non-vm_capable nodes
2934
      for node in inst_config.all_nodes:
2935
        _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
2936
                 instance, "instance lives on ghost node %s", node)
2937
        _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
2938
                 instance, "instance lives on non-vm_capable node %s", node)
2939

    
2940
    feedback_fn("* Verifying orphan volumes")
2941
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2942

    
2943
    # We will get spurious "unknown volume" warnings if any node of this group
2944
    # is secondary for an instance whose primary is in another group. To avoid
2945
    # them, we find these instances and add their volumes to node_vol_should.
2946
    for inst in self.all_inst_info.values():
2947
      for secondary in inst.secondary_nodes:
2948
        if (secondary in self.my_node_info
2949
            and inst.name not in self.my_inst_info):
2950
          inst.MapLVsByNode(node_vol_should)
2951
          break
2952

    
2953
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2954

    
2955
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2956
      feedback_fn("* Verifying N+1 Memory redundancy")
2957
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2958

    
2959
    feedback_fn("* Other Notes")
2960
    if i_non_redundant:
2961
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2962
                  % len(i_non_redundant))
2963

    
2964
    if i_non_a_balanced:
2965
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2966
                  % len(i_non_a_balanced))
2967

    
2968
    if n_offline:
2969
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2970

    
2971
    if n_drained:
2972
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2973

    
2974
    return not self.bad
2975

    
2976
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2977
    """Analyze the post-hooks' result
2978

2979
    This method analyses the hook result, handles it, and sends some
2980
    nicely-formatted feedback back to the user.
2981

2982
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2983
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2984
    @param hooks_results: the results of the multi-node hooks rpc call
2985
    @param feedback_fn: function used send feedback back to the caller
2986
    @param lu_result: previous Exec result
2987
    @return: the new Exec result, based on the previous result
2988
        and hook results
2989

2990
    """
2991
    # We only really run POST phase hooks, only for non-empty groups,
2992
    # and are only interested in their results
2993
    if not self.my_node_names:
2994
      # empty node group
2995
      pass
2996
    elif phase == constants.HOOKS_PHASE_POST:
2997
      # Used to change hooks' output to proper indentation
2998
      feedback_fn("* Hooks Results")
2999
      assert hooks_results, "invalid result from hooks"
3000

    
3001
      for node_name in hooks_results:
3002
        res = hooks_results[node_name]
3003
        msg = res.fail_msg
3004
        test = msg and not res.offline
3005
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3006
                      "Communication failure in hooks execution: %s", msg)
3007
        if res.offline or msg:
3008
          # No need to investigate payload if node is offline or gave
3009
          # an error.
3010
          continue
3011
        for script, hkr, output in res.payload:
3012
          test = hkr == constants.HKR_FAIL
3013
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3014
                        "Script %s failed, output:", script)
3015
          if test:
3016
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3017
            feedback_fn("%s" % output)
3018
            lu_result = False
3019

    
3020
    return lu_result
3021

    
3022

    
3023
class LUClusterVerifyDisks(NoHooksLU):
3024
  """Verifies the cluster disks status.
3025

3026
  """
3027
  REQ_BGL = False
3028

    
3029
  def ExpandNames(self):
3030
    self.share_locks = _ShareAll()
3031
    self.needed_locks = {
3032
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3033
      }
3034

    
3035
  def Exec(self, feedback_fn):
3036
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3037

    
3038
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3039
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3040
                           for group in group_names])
3041

    
3042

    
3043
class LUGroupVerifyDisks(NoHooksLU):
3044
  """Verifies the status of all disks in a node group.
3045

3046
  """
3047
  REQ_BGL = False
3048

    
3049
  def ExpandNames(self):
3050
    # Raises errors.OpPrereqError on its own if group can't be found
3051
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
3052

    
3053
    self.share_locks = _ShareAll()
3054
    self.needed_locks = {
3055
      locking.LEVEL_INSTANCE: [],
3056
      locking.LEVEL_NODEGROUP: [],
3057
      locking.LEVEL_NODE: [],
3058
      }
3059

    
3060
  def DeclareLocks(self, level):
3061
    if level == locking.LEVEL_INSTANCE:
3062
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
3063

    
3064
      # Lock instances optimistically, needs verification once node and group
3065
      # locks have been acquired
3066
      self.needed_locks[locking.LEVEL_INSTANCE] = \
3067
        self.cfg.GetNodeGroupInstances(self.group_uuid)
3068

    
3069
    elif level == locking.LEVEL_NODEGROUP:
3070
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3071

    
3072
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
3073
        set([self.group_uuid] +
3074
            # Lock all groups used by instances optimistically; this requires
3075
            # going via the node before it's locked, requiring verification
3076
            # later on
3077
            [group_uuid
3078
             for instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
3079
             for group_uuid in self.cfg.GetInstanceNodeGroups(instance_name)])
3080

    
3081
    elif level == locking.LEVEL_NODE:
3082
      # This will only lock the nodes in the group to be verified which contain
3083
      # actual instances
3084
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3085
      self._LockInstancesNodes()
3086

    
3087
      # Lock all nodes in group to be verified
3088
      assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
3089
      member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
3090
      self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
3091

    
3092
  def CheckPrereq(self):
3093
    owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
3094
    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
3095
    owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
3096

    
3097
    assert self.group_uuid in owned_groups
3098

    
3099
    # Check if locked instances are still correct
3100
    _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
3101

    
3102
    # Get instance information
3103
    self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
3104

    
3105
    # Check if node groups for locked instances are still correct
3106
    for (instance_name, inst) in self.instances.items():
3107
      assert owned_nodes.issuperset(inst.all_nodes), \
3108
        "Instance %s's nodes changed while we kept the lock" % instance_name
3109

    
3110
      inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
3111
                                             owned_groups)
3112

    
3113
      assert self.group_uuid in inst_groups, \
3114
        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
3115

    
3116
  def Exec(self, feedback_fn):
3117
    """Verify integrity of cluster disks.
3118

3119
    @rtype: tuple of three items
3120
    @return: a tuple of (dict of node-to-node_error, list of instances
3121
        which need activate-disks, dict of instance: (node, volume) for
3122
        missing volumes
3123

3124
    """
3125
    res_nodes = {}
3126
    res_instances = set()
3127
    res_missing = {}
3128

    
3129
    nv_dict = _MapInstanceDisksToNodes([inst
3130
                                        for inst in self.instances.values()
3131
                                        if inst.admin_up])
3132

    
3133
    if nv_dict:
3134
      nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
3135
                             set(self.cfg.GetVmCapableNodeList()))
3136

    
3137
      node_lvs = self.rpc.call_lv_list(nodes, [])
3138

    
3139
      for (node, node_res) in node_lvs.items():
3140
        if node_res.offline:
3141
          continue
3142

    
3143
        msg = node_res.fail_msg
3144
        if msg:
3145
          logging.warning("Error enumerating LVs on node %s: %s", node, msg)
3146
          res_nodes[node] = msg
3147
          continue
3148

    
3149
        for lv_name, (_, _, lv_online) in node_res.payload.items():
3150
          inst = nv_dict.pop((node, lv_name), None)
3151
          if not (lv_online or inst is None):
3152
            res_instances.add(inst)
3153

    
3154
      # any leftover items in nv_dict are missing LVs, let's arrange the data
3155
      # better
3156
      for key, inst in nv_dict.iteritems():
3157
        res_missing.setdefault(inst, []).append(key)
3158

    
3159
    return (res_nodes, list(res_instances), res_missing)
3160

    
3161

    
3162
class LUClusterRepairDiskSizes(NoHooksLU):
3163
  """Verifies the cluster disks sizes.
3164

3165
  """
3166
  REQ_BGL = False
3167

    
3168
  def ExpandNames(self):
3169
    if self.op.instances:
3170
      self.wanted_names = _GetWantedInstances(self, self.op.instances)
3171
      self.needed_locks = {
3172
        locking.LEVEL_NODE: [],
3173
        locking.LEVEL_INSTANCE: self.wanted_names,
3174
        }
3175
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3176
    else:
3177
      self.wanted_names = None
3178
      self.needed_locks = {
3179
        locking.LEVEL_NODE: locking.ALL_SET,
3180
        locking.LEVEL_INSTANCE: locking.ALL_SET,
3181
        }
3182
    self.share_locks = _ShareAll()
3183

    
3184
  def DeclareLocks(self, level):
3185
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
3186
      self._LockInstancesNodes(primary_only=True)
3187

    
3188
  def CheckPrereq(self):
3189
    """Check prerequisites.
3190

3191
    This only checks the optional instance list against the existing names.
3192

3193
    """
3194
    if self.wanted_names is None:
3195
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
3196

    
3197
    self.wanted_instances = \
3198
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
3199

    
3200
  def _EnsureChildSizes(self, disk):
3201
    """Ensure children of the disk have the needed disk size.
3202

3203
    This is valid mainly for DRBD8 and fixes an issue where the
3204
    children have smaller disk size.
3205

3206
    @param disk: an L{ganeti.objects.Disk} object
3207

3208
    """
3209
    if disk.dev_type == constants.LD_DRBD8:
3210
      assert disk.children, "Empty children for DRBD8?"
3211
      fchild = disk.children[0]
3212
      mismatch = fchild.size < disk.size
3213
      if mismatch:
3214
        self.LogInfo("Child disk has size %d, parent %d, fixing",
3215
                     fchild.size, disk.size)
3216
        fchild.size = disk.size
3217

    
3218
      # and we recurse on this child only, not on the metadev
3219
      return self._EnsureChildSizes(fchild) or mismatch
3220
    else:
3221
      return False
3222

    
3223
  def Exec(self, feedback_fn):
3224
    """Verify the size of cluster disks.
3225

3226
    """
3227
    # TODO: check child disks too
3228
    # TODO: check differences in size between primary/secondary nodes
3229
    per_node_disks = {}
3230
    for instance in self.wanted_instances:
3231
      pnode = instance.primary_node
3232
      if pnode not in per_node_disks:
3233
        per_node_disks[pnode] = []
3234
      for idx, disk in enumerate(instance.disks):
3235
        per_node_disks[pnode].append((instance, idx, disk))
3236

    
3237
    changed = []
3238
    for node, dskl in per_node_disks.items():
3239
      newl = [v[2].Copy() for v in dskl]
3240
      for dsk in newl:
3241
        self.cfg.SetDiskID(dsk, node)
3242
      result = self.rpc.call_blockdev_getsize(node, newl)
3243
      if result.fail_msg:
3244
        self.LogWarning("Failure in blockdev_getsize call to node"
3245
                        " %s, ignoring", node)
3246
        continue
3247
      if len(result.payload) != len(dskl):
3248
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
3249
                        " result.payload=%s", node, len(dskl), result.payload)
3250
        self.LogWarning("Invalid result from node %s, ignoring node results",
3251
                        node)
3252
        continue
3253
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
3254
        if size is None:
3255
          self.LogWarning("Disk %d of instance %s did not return size"
3256
                          " information, ignoring", idx, instance.name)
3257
          continue
3258
        if not isinstance(size, (int, long)):
3259
          self.LogWarning("Disk %d of instance %s did not return valid"
3260
                          " size information, ignoring", idx, instance.name)
3261
          continue
3262
        size = size >> 20
3263
        if size != disk.size:
3264
          self.LogInfo("Disk %d of instance %s has mismatched size,"
3265
                       " correcting: recorded %d, actual %d", idx,
3266
                       instance.name, disk.size, size)
3267
          disk.size = size
3268
          self.cfg.Update(instance, feedback_fn)
3269
          changed.append((instance.name, idx, size))
3270
        if self._EnsureChildSizes(disk):
3271
          self.cfg.Update(instance, feedback_fn)
3272
          changed.append((instance.name, idx, disk.size))
3273
    return changed
3274

    
3275

    
3276
class LUClusterRename(LogicalUnit):
3277
  """Rename the cluster.
3278

3279
  """
3280
  HPATH = "cluster-rename"
3281
  HTYPE = constants.HTYPE_CLUSTER
3282

    
3283
  def BuildHooksEnv(self):
3284
    """Build hooks env.
3285

3286
    """
3287
    return {
3288
      "OP_TARGET": self.cfg.GetClusterName(),
3289
      "NEW_NAME": self.op.name,
3290
      }
3291

    
3292
  def BuildHooksNodes(self):
3293
    """Build hooks nodes.
3294

3295
    """
3296
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
3297

    
3298
  def CheckPrereq(self):
3299
    """Verify that the passed name is a valid one.
3300

3301
    """
3302
    hostname = netutils.GetHostname(name=self.op.name,
3303
                                    family=self.cfg.GetPrimaryIPFamily())
3304

    
3305
    new_name = hostname.name
3306
    self.ip = new_ip = hostname.ip
3307
    old_name = self.cfg.GetClusterName()
3308
    old_ip = self.cfg.GetMasterIP()
3309
    if new_name == old_name and new_ip == old_ip:
3310
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
3311
                                 " cluster has changed",
3312
                                 errors.ECODE_INVAL)
3313
    if new_ip != old_ip:
3314
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
3315
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
3316
                                   " reachable on the network" %
3317
                                   new_ip, errors.ECODE_NOTUNIQUE)
3318

    
3319
    self.op.name = new_name
3320

    
3321
  def Exec(self, feedback_fn):
3322
    """Rename the cluster.
3323

3324
    """
3325
    clustername = self.op.name
3326
    ip = self.ip
3327

    
3328
    # shutdown the master IP
3329
    master = self.cfg.GetMasterNode()
3330
    result = self.rpc.call_node_deactivate_master_ip(master)
3331
    result.Raise("Could not disable the master role")
3332

    
3333
    try:
3334
      cluster = self.cfg.GetClusterInfo()
3335
      cluster.cluster_name = clustername
3336
      cluster.master_ip = ip
3337
      self.cfg.Update(cluster, feedback_fn)
3338

    
3339
      # update the known hosts file
3340
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
3341
      node_list = self.cfg.GetOnlineNodeList()
3342
      try:
3343
        node_list.remove(master)
3344
      except ValueError:
3345
        pass
3346
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
3347
    finally:
3348
      result = self.rpc.call_node_activate_master_ip(master)
3349
      msg = result.fail_msg
3350
      if msg:
3351
        self.LogWarning("Could not re-enable the master role on"
3352
                        " the master, please restart manually: %s", msg)
3353

    
3354
    return clustername
3355

    
3356

    
3357
def _ValidateNetmask(cfg, netmask):
3358
  """Checks if a netmask is valid.
3359

3360
  @type cfg: L{config.ConfigWriter}
3361
  @param cfg: The cluster configuration
3362
  @type netmask: int
3363
  @param netmask: the netmask to be verified
3364
  @raise errors.OpPrereqError: if the validation fails
3365

3366
  """
3367
  ip_family = cfg.GetPrimaryIPFamily()
3368
  try:
3369
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
3370
  except errors.ProgrammerError:
3371
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
3372
                               ip_family)
3373
  if not ipcls.ValidateNetmask(netmask):
3374
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
3375
                                (netmask))
3376

    
3377

    
3378
class LUClusterSetParams(LogicalUnit):
3379
  """Change the parameters of the cluster.
3380

3381
  """
3382
  HPATH = "cluster-modify"
3383
  HTYPE = constants.HTYPE_CLUSTER
3384
  REQ_BGL = False
3385

    
3386
  def CheckArguments(self):
3387
    """Check parameters
3388

3389
    """
3390
    if self.op.uid_pool:
3391
      uidpool.CheckUidPool(self.op.uid_pool)
3392

    
3393
    if self.op.add_uids:
3394
      uidpool.CheckUidPool(self.op.add_uids)
3395

    
3396
    if self.op.remove_uids:
3397
      uidpool.CheckUidPool(self.op.remove_uids)
3398

    
3399
    if self.op.master_netmask is not None:
3400
      _ValidateNetmask(self.cfg, self.op.master_netmask)
3401

    
3402
  def ExpandNames(self):
3403
    # FIXME: in the future maybe other cluster params won't require checking on
3404
    # all nodes to be modified.
3405
    self.needed_locks = {
3406
      locking.LEVEL_NODE: locking.ALL_SET,
3407
    }
3408
    self.share_locks[locking.LEVEL_NODE] = 1
3409

    
3410
  def BuildHooksEnv(self):
3411
    """Build hooks env.
3412

3413
    """
3414
    return {
3415
      "OP_TARGET": self.cfg.GetClusterName(),
3416
      "NEW_VG_NAME": self.op.vg_name,
3417
      }
3418

    
3419
  def BuildHooksNodes(self):
3420
    """Build hooks nodes.
3421

3422
    """
3423
    mn = self.cfg.GetMasterNode()
3424
    return ([mn], [mn])
3425

    
3426
  def CheckPrereq(self):
3427
    """Check prerequisites.
3428

3429
    This checks whether the given params don't conflict and
3430
    if the given volume group is valid.
3431

3432
    """
3433
    if self.op.vg_name is not None and not self.op.vg_name:
3434
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
3435
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
3436
                                   " instances exist", errors.ECODE_INVAL)
3437

    
3438
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
3439
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
3440
        raise errors.OpPrereqError("Cannot disable drbd helper while"
3441
                                   " drbd-based instances exist",
3442
                                   errors.ECODE_INVAL)
3443

    
3444
    node_list = self.owned_locks(locking.LEVEL_NODE)
3445

    
3446
    # if vg_name not None, checks given volume group on all nodes
3447
    if self.op.vg_name:
3448
      vglist = self.rpc.call_vg_list(node_list)
3449
      for node in node_list:
3450
        msg = vglist[node].fail_msg
3451
        if msg:
3452
          # ignoring down node
3453
          self.LogWarning("Error while gathering data on node %s"
3454
                          " (ignoring node): %s", node, msg)
3455
          continue
3456
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
3457
                                              self.op.vg_name,
3458
                                              constants.MIN_VG_SIZE)
3459
        if vgstatus:
3460
          raise errors.OpPrereqError("Error on node '%s': %s" %
3461
                                     (node, vgstatus), errors.ECODE_ENVIRON)
3462

    
3463
    if self.op.drbd_helper:
3464
      # checks given drbd helper on all nodes
3465
      helpers = self.rpc.call_drbd_helper(node_list)
3466
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
3467
        if ninfo.offline:
3468
          self.LogInfo("Not checking drbd helper on offline node %s", node)
3469
          continue
3470
        msg = helpers[node].fail_msg
3471
        if msg:
3472
          raise errors.OpPrereqError("Error checking drbd helper on node"
3473
                                     " '%s': %s" % (node, msg),
3474
                                     errors.ECODE_ENVIRON)
3475
        node_helper = helpers[node].payload
3476
        if node_helper != self.op.drbd_helper:
3477
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
3478
                                     (node, node_helper), errors.ECODE_ENVIRON)
3479

    
3480
    self.cluster = cluster = self.cfg.GetClusterInfo()
3481
    # validate params changes
3482
    if self.op.beparams:
3483
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
3484
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
3485

    
3486
    if self.op.ndparams:
3487
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
3488
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
3489

    
3490
      # TODO: we need a more general way to handle resetting
3491
      # cluster-level parameters to default values
3492
      if self.new_ndparams["oob_program"] == "":
3493
        self.new_ndparams["oob_program"] = \
3494
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
3495

    
3496
    if self.op.nicparams:
3497
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
3498
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
3499
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
3500
      nic_errors = []
3501

    
3502
      # check all instances for consistency
3503
      for instance in self.cfg.GetAllInstancesInfo().values():
3504
        for nic_idx, nic in enumerate(instance.nics):
3505
          params_copy = copy.deepcopy(nic.nicparams)
3506
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
3507

    
3508
          # check parameter syntax
3509
          try:
3510
            objects.NIC.CheckParameterSyntax(params_filled)
3511
          except errors.ConfigurationError, err:
3512
            nic_errors.append("Instance %s, nic/%d: %s" %
3513
                              (instance.name, nic_idx, err))
3514

    
3515
          # if we're moving instances to routed, check that they have an ip
3516
          target_mode = params_filled[constants.NIC_MODE]
3517
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
3518
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
3519
                              " address" % (instance.name, nic_idx))
3520
      if nic_errors:
3521
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
3522
                                   "\n".join(nic_errors))
3523

    
3524
    # hypervisor list/parameters
3525
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
3526
    if self.op.hvparams:
3527
      for hv_name, hv_dict in self.op.hvparams.items():
3528
        if hv_name not in self.new_hvparams:
3529
          self.new_hvparams[hv_name] = hv_dict
3530
        else:
3531
          self.new_hvparams[hv_name].update(hv_dict)
3532

    
3533
    # os hypervisor parameters
3534
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
3535
    if self.op.os_hvp:
3536
      for os_name, hvs in self.op.os_hvp.items():
3537
        if os_name not in self.new_os_hvp:
3538
          self.new_os_hvp[os_name] = hvs
3539
        else:
3540
          for hv_name, hv_dict in hvs.items():
3541
            if hv_name not in self.new_os_hvp[os_name]:
3542
              self.new_os_hvp[os_name][hv_name] = hv_dict
3543
            else:
3544
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
3545

    
3546
    # os parameters
3547
    self.new_osp = objects.FillDict(cluster.osparams, {})
3548
    if self.op.osparams:
3549
      for os_name, osp in self.op.osparams.items():
3550
        if os_name not in self.new_osp:
3551
          self.new_osp[os_name] = {}
3552

    
3553
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
3554
                                                  use_none=True)
3555

    
3556
        if not self.new_osp[os_name]:
3557
          # we removed all parameters
3558
          del self.new_osp[os_name]
3559
        else:
3560
          # check the parameter validity (remote check)
3561
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
3562
                         os_name, self.new_osp[os_name])
3563

    
3564
    # changes to the hypervisor list
3565
    if self.op.enabled_hypervisors is not None:
3566
      self.hv_list = self.op.enabled_hypervisors
3567
      for hv in self.hv_list:
3568
        # if the hypervisor doesn't already exist in the cluster
3569
        # hvparams, we initialize it to empty, and then (in both
3570
        # cases) we make sure to fill the defaults, as we might not
3571
        # have a complete defaults list if the hypervisor wasn't
3572
        # enabled before
3573
        if hv not in new_hvp:
3574
          new_hvp[hv] = {}
3575
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
3576
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
3577
    else:
3578
      self.hv_list = cluster.enabled_hypervisors
3579

    
3580
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
3581
      # either the enabled list has changed, or the parameters have, validate
3582
      for hv_name, hv_params in self.new_hvparams.items():
3583
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
3584
            (self.op.enabled_hypervisors and
3585
             hv_name in self.op.enabled_hypervisors)):
3586
          # either this is a new hypervisor, or its parameters have changed
3587
          hv_class = hypervisor.GetHypervisor(hv_name)
3588
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3589
          hv_class.CheckParameterSyntax(hv_params)
3590
          _CheckHVParams(self, node_list, hv_name, hv_params)
3591

    
3592
    if self.op.os_hvp:
3593
      # no need to check any newly-enabled hypervisors, since the
3594
      # defaults have already been checked in the above code-block
3595
      for os_name, os_hvp in self.new_os_hvp.items():
3596
        for hv_name, hv_params in os_hvp.items():
3597
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3598
          # we need to fill in the new os_hvp on top of the actual hv_p
3599
          cluster_defaults = self.new_hvparams.get(hv_name, {})
3600
          new_osp = objects.FillDict(cluster_defaults, hv_params)
3601
          hv_class = hypervisor.GetHypervisor(hv_name)
3602
          hv_class.CheckParameterSyntax(new_osp)
3603
          _CheckHVParams(self, node_list, hv_name, new_osp)
3604

    
3605
    if self.op.default_iallocator:
3606
      alloc_script = utils.FindFile(self.op.default_iallocator,
3607
                                    constants.IALLOCATOR_SEARCH_PATH,
3608
                                    os.path.isfile)
3609
      if alloc_script is None:
3610
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
3611
                                   " specified" % self.op.default_iallocator,
3612
                                   errors.ECODE_INVAL)
3613

    
3614
  def Exec(self, feedback_fn):
3615
    """Change the parameters of the cluster.
3616

3617
    """
3618
    if self.op.vg_name is not None:
3619
      new_volume = self.op.vg_name
3620
      if not new_volume:
3621
        new_volume = None
3622
      if new_volume != self.cfg.GetVGName():
3623
        self.cfg.SetVGName(new_volume)
3624
      else:
3625
        feedback_fn("Cluster LVM configuration already in desired"
3626
                    " state, not changing")
3627
    if self.op.drbd_helper is not None:
3628
      new_helper = self.op.drbd_helper
3629
      if not new_helper:
3630
        new_helper = None
3631
      if new_helper != self.cfg.GetDRBDHelper():
3632
        self.cfg.SetDRBDHelper(new_helper)
3633
      else:
3634
        feedback_fn("Cluster DRBD helper already in desired state,"
3635
                    " not changing")
3636
    if self.op.hvparams:
3637
      self.cluster.hvparams = self.new_hvparams
3638
    if self.op.os_hvp:
3639
      self.cluster.os_hvp = self.new_os_hvp
3640
    if self.op.enabled_hypervisors is not None:
3641
      self.cluster.hvparams = self.new_hvparams
3642
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
3643
    if self.op.beparams:
3644
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
3645
    if self.op.nicparams:
3646
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
3647
    if self.op.osparams:
3648
      self.cluster.osparams = self.new_osp
3649
    if self.op.ndparams:
3650
      self.cluster.ndparams = self.new_ndparams
3651

    
3652
    if self.op.candidate_pool_size is not None:
3653
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3654
      # we need to update the pool size here, otherwise the save will fail
3655
      _AdjustCandidatePool(self, [])
3656

    
3657
    if self.op.maintain_node_health is not None:
3658
      self.cluster.maintain_node_health = self.op.maintain_node_health
3659

    
3660
    if self.op.prealloc_wipe_disks is not None:
3661
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3662

    
3663
    if self.op.add_uids is not None:
3664
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3665

    
3666
    if self.op.remove_uids is not None:
3667
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3668

    
3669
    if self.op.uid_pool is not None:
3670
      self.cluster.uid_pool = self.op.uid_pool
3671

    
3672
    if self.op.default_iallocator is not None:
3673
      self.cluster.default_iallocator = self.op.default_iallocator
3674

    
3675
    if self.op.reserved_lvs is not None:
3676
      self.cluster.reserved_lvs = self.op.reserved_lvs
3677

    
3678
    def helper_os(aname, mods, desc):
3679
      desc += " OS list"
3680
      lst = getattr(self.cluster, aname)
3681
      for key, val in mods:
3682
        if key == constants.DDM_ADD:
3683
          if val in lst:
3684
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3685
          else:
3686
            lst.append(val)
3687
        elif key == constants.DDM_REMOVE:
3688
          if val in lst:
3689
            lst.remove(val)
3690
          else:
3691
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3692
        else:
3693
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3694

    
3695
    if self.op.hidden_os:
3696
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3697

    
3698
    if self.op.blacklisted_os:
3699
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3700

    
3701
    if self.op.master_netdev:
3702
      master = self.cfg.GetMasterNode()
3703
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3704
                  self.cluster.master_netdev)
3705
      result = self.rpc.call_node_deactivate_master_ip(master)
3706
      result.Raise("Could not disable the master ip")
3707
      feedback_fn("Changing master_netdev from %s to %s" %
3708
                  (self.cluster.master_netdev, self.op.master_netdev))
3709
      self.cluster.master_netdev = self.op.master_netdev
3710

    
3711
    if self.op.master_netmask:
3712
      master = self.cfg.GetMasterNode()
3713
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
3714
      result = self.rpc.call_node_change_master_netmask(master,
3715
                                                        self.op.master_netmask)
3716
      if result.fail_msg:
3717
        msg = "Could not change the master IP netmask: %s" % result.fail_msg
3718
        self.LogWarning(msg)
3719
        feedback_fn(msg)
3720
      else:
3721
        self.cluster.master_netmask = self.op.master_netmask
3722

    
3723
    self.cfg.Update(self.cluster, feedback_fn)
3724

    
3725
    if self.op.master_netdev:
3726
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3727
                  self.op.master_netdev)
3728
      result = self.rpc.call_node_activate_master_ip(master)
3729
      if result.fail_msg:
3730
        self.LogWarning("Could not re-enable the master ip on"
3731
                        " the master, please restart manually: %s",
3732
                        result.fail_msg)
3733

    
3734

    
3735
def _UploadHelper(lu, nodes, fname):
3736
  """Helper for uploading a file and showing warnings.
3737

3738
  """
3739
  if os.path.exists(fname):
3740
    result = lu.rpc.call_upload_file(nodes, fname)
3741
    for to_node, to_result in result.items():
3742
      msg = to_result.fail_msg
3743
      if msg:
3744
        msg = ("Copy of file %s to node %s failed: %s" %
3745
               (fname, to_node, msg))
3746
        lu.proc.LogWarning(msg)
3747

    
3748

    
3749
def _ComputeAncillaryFiles(cluster, redist):
3750
  """Compute files external to Ganeti which need to be consistent.
3751

3752
  @type redist: boolean
3753
  @param redist: Whether to include files which need to be redistributed
3754

3755
  """
3756
  # Compute files for all nodes
3757
  files_all = set([
3758
    constants.SSH_KNOWN_HOSTS_FILE,
3759
    constants.CONFD_HMAC_KEY,
3760
    constants.CLUSTER_DOMAIN_SECRET_FILE,
3761
    ])
3762

    
3763
  if not redist:
3764
    files_all.update(constants.ALL_CERT_FILES)
3765
    files_all.update(ssconf.SimpleStore().GetFileList())
3766
  else:
3767
    # we need to ship at least the RAPI certificate
3768
    files_all.add(constants.RAPI_CERT_FILE)
3769

    
3770
  if cluster.modify_etc_hosts:
3771
    files_all.add(constants.ETC_HOSTS)
3772

    
3773
  # Files which must either exist on all nodes or on none
3774
  files_all_opt = set([
3775
    constants.RAPI_USERS_FILE,
3776
    ])
3777

    
3778
  # Files which should only be on master candidates
3779
  files_mc = set()
3780
  if not redist:
3781
    files_mc.add(constants.CLUSTER_CONF_FILE)
3782

    
3783
  # Files which should only be on VM-capable nodes
3784
  files_vm = set(filename
3785
    for hv_name in cluster.enabled_hypervisors
3786
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles())
3787

    
3788
  # Filenames must be unique
3789
  assert (len(files_all | files_all_opt | files_mc | files_vm) ==
3790
          sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
3791
         "Found file listed in more than one file list"
3792

    
3793
  return (files_all, files_all_opt, files_mc, files_vm)
3794

    
3795

    
3796
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3797
  """Distribute additional files which are part of the cluster configuration.
3798

3799
  ConfigWriter takes care of distributing the config and ssconf files, but
3800
  there are more files which should be distributed to all nodes. This function
3801
  makes sure those are copied.
3802

3803
  @param lu: calling logical unit
3804
  @param additional_nodes: list of nodes not in the config to distribute to
3805
  @type additional_vm: boolean
3806
  @param additional_vm: whether the additional nodes are vm-capable or not
3807

3808
  """
3809
  # Gather target nodes
3810
  cluster = lu.cfg.GetClusterInfo()
3811
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3812

    
3813
  online_nodes = lu.cfg.GetOnlineNodeList()
3814
  vm_nodes = lu.cfg.GetVmCapableNodeList()
3815

    
3816
  if additional_nodes is not None:
3817
    online_nodes.extend(additional_nodes)
3818
    if additional_vm:
3819
      vm_nodes.extend(additional_nodes)
3820

    
3821
  # Never distribute to master node
3822
  for nodelist in [online_nodes, vm_nodes]:
3823
    if master_info.name in nodelist:
3824
      nodelist.remove(master_info.name)
3825

    
3826
  # Gather file lists
3827
  (files_all, files_all_opt, files_mc, files_vm) = \
3828
    _ComputeAncillaryFiles(cluster, True)
3829

    
3830
  # Never re-distribute configuration file from here
3831
  assert not (constants.CLUSTER_CONF_FILE in files_all or
3832
              constants.CLUSTER_CONF_FILE in files_vm)
3833
  assert not files_mc, "Master candidates not handled in this function"
3834

    
3835
  filemap = [
3836
    (online_nodes, files_all),
3837
    (online_nodes, files_all_opt),
3838
    (vm_nodes, files_vm),
3839
    ]
3840

    
3841
  # Upload the files
3842
  for (node_list, files) in filemap:
3843
    for fname in files:
3844
      _UploadHelper(lu, node_list, fname)
3845

    
3846

    
3847
class LUClusterRedistConf(NoHooksLU):
3848
  """Force the redistribution of cluster configuration.
3849

3850
  This is a very simple LU.
3851

3852
  """
3853
  REQ_BGL = False
3854

    
3855
  def ExpandNames(self):
3856
    self.needed_locks = {
3857
      locking.LEVEL_NODE: locking.ALL_SET,
3858
    }
3859
    self.share_locks[locking.LEVEL_NODE] = 1
3860

    
3861
  def Exec(self, feedback_fn):
3862
    """Redistribute the configuration.
3863

3864
    """
3865
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3866
    _RedistributeAncillaryFiles(self)
3867

    
3868

    
3869
class LUClusterActivateMasterIp(NoHooksLU):
3870
  """Activate the master IP on the master node.
3871

3872
  """
3873
  def Exec(self, feedback_fn):
3874
    """Activate the master IP.
3875

3876
    """
3877
    master = self.cfg.GetMasterNode()
3878
    self.rpc.call_node_activate_master_ip(master)
3879

    
3880

    
3881
class LUClusterDeactivateMasterIp(NoHooksLU):
3882
  """Deactivate the master IP on the master node.
3883

3884
  """
3885
  def Exec(self, feedback_fn):
3886
    """Deactivate the master IP.
3887

3888
    """
3889
    master = self.cfg.GetMasterNode()
3890
    self.rpc.call_node_deactivate_master_ip(master)
3891

    
3892

    
3893
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3894
  """Sleep and poll for an instance's disk to sync.
3895

3896
  """
3897
  if not instance.disks or disks is not None and not disks:
3898
    return True
3899

    
3900
  disks = _ExpandCheckDisks(instance, disks)
3901

    
3902
  if not oneshot:
3903
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3904

    
3905
  node = instance.primary_node
3906

    
3907
  for dev in disks:
3908
    lu.cfg.SetDiskID(dev, node)
3909

    
3910
  # TODO: Convert to utils.Retry
3911

    
3912
  retries = 0
3913
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3914
  while True:
3915
    max_time = 0
3916
    done = True
3917
    cumul_degraded = False
3918
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3919
    msg = rstats.fail_msg
3920
    if msg:
3921
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3922
      retries += 1
3923
      if retries >= 10:
3924
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3925
                                 " aborting." % node)
3926
      time.sleep(6)
3927
      continue
3928
    rstats = rstats.payload
3929
    retries = 0
3930
    for i, mstat in enumerate(rstats):
3931
      if mstat is None:
3932
        lu.LogWarning("Can't compute data for node %s/%s",
3933
                           node, disks[i].iv_name)
3934
        continue
3935

    
3936
      cumul_degraded = (cumul_degraded or
3937
                        (mstat.is_degraded and mstat.sync_percent is None))
3938
      if mstat.sync_percent is not None:
3939
        done = False
3940
        if mstat.estimated_time is not None:
3941
          rem_time = ("%s remaining (estimated)" %
3942
                      utils.FormatSeconds(mstat.estimated_time))
3943
          max_time = mstat.estimated_time
3944
        else:
3945
          rem_time = "no time estimate"
3946
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
3947
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
3948

    
3949
    # if we're done but degraded, let's do a few small retries, to
3950
    # make sure we see a stable and not transient situation; therefore
3951
    # we force restart of the loop
3952
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3953
      logging.info("Degraded disks found, %d retries left", degr_retries)
3954
      degr_retries -= 1
3955
      time.sleep(1)
3956
      continue
3957

    
3958
    if done or oneshot:
3959
      break
3960

    
3961
    time.sleep(min(60, max_time))
3962

    
3963
  if done:
3964
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
3965
  return not cumul_degraded
3966

    
3967

    
3968
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3969
  """Check that mirrors are not degraded.
3970

3971
  The ldisk parameter, if True, will change the test from the
3972
  is_degraded attribute (which represents overall non-ok status for
3973
  the device(s)) to the ldisk (representing the local storage status).
3974

3975
  """
3976
  lu.cfg.SetDiskID(dev, node)
3977

    
3978
  result = True
3979

    
3980
  if on_primary or dev.AssembleOnSecondary():
3981
    rstats = lu.rpc.call_blockdev_find(node, dev)
3982
    msg = rstats.fail_msg
3983
    if msg:
3984
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
3985
      result = False
3986
    elif not rstats.payload:
3987
      lu.LogWarning("Can't find disk on node %s", node)
3988
      result = False
3989
    else:
3990
      if ldisk:
3991
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
3992
      else:
3993
        result = result and not rstats.payload.is_degraded
3994

    
3995
  if dev.children:
3996
    for child in dev.children:
3997
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
3998

    
3999
  return result
4000

    
4001

    
4002
class LUOobCommand(NoHooksLU):
4003
  """Logical unit for OOB handling.
4004

4005
  """
4006
  REG_BGL = False
4007
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
4008

    
4009
  def ExpandNames(self):
4010
    """Gather locks we need.
4011

4012
    """
4013
    if self.op.node_names:
4014
      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
4015
      lock_names = self.op.node_names
4016
    else:
4017
      lock_names = locking.ALL_SET
4018

    
4019
    self.needed_locks = {
4020
      locking.LEVEL_NODE: lock_names,
4021
      }
4022

    
4023
  def CheckPrereq(self):
4024
    """Check prerequisites.
4025

4026
    This checks:
4027
     - the node exists in the configuration
4028
     - OOB is supported
4029

4030
    Any errors are signaled by raising errors.OpPrereqError.
4031

4032
    """
4033
    self.nodes = []
4034
    self.master_node = self.cfg.GetMasterNode()
4035

    
4036
    assert self.op.power_delay >= 0.0
4037

    
4038
    if self.op.node_names:
4039
      if (self.op.command in self._SKIP_MASTER and
4040
          self.master_node in self.op.node_names):
4041
        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
4042
        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
4043

    
4044
        if master_oob_handler:
4045
          additional_text = ("run '%s %s %s' if you want to operate on the"
4046
                             " master regardless") % (master_oob_handler,
4047
                                                      self.op.command,
4048
                                                      self.master_node)
4049
        else:
4050
          additional_text = "it does not support out-of-band operations"
4051

    
4052
        raise errors.OpPrereqError(("Operating on the master node %s is not"
4053
                                    " allowed for %s; %s") %
4054
                                   (self.master_node, self.op.command,
4055
                                    additional_text), errors.ECODE_INVAL)
4056
    else:
4057
      self.op.node_names = self.cfg.GetNodeList()
4058
      if self.op.command in self._SKIP_MASTER:
4059
        self.op.node_names.remove(self.master_node)
4060

    
4061
    if self.op.command in self._SKIP_MASTER:
4062
      assert self.master_node not in self.op.node_names
4063

    
4064
    for (node_name, node) in self.cfg.GetMultiNodeInfo(self.op.node_names):
4065
      if node is None:
4066
        raise errors.OpPrereqError("Node %s not found" % node_name,
4067
                                   errors.ECODE_NOENT)
4068
      else:
4069
        self.nodes.append(node)
4070

    
4071
      if (not self.op.ignore_status and
4072
          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
4073
        raise errors.OpPrereqError(("Cannot power off node %s because it is"
4074
                                    " not marked offline") % node_name,
4075
                                   errors.ECODE_STATE)
4076

    
4077
  def Exec(self, feedback_fn):
4078
    """Execute OOB and return result if we expect any.
4079

4080
    """
4081
    master_node = self.master_node
4082
    ret = []
4083

    
4084
    for idx, node in enumerate(utils.NiceSort(self.nodes,
4085
                                              key=lambda node: node.name)):
4086
      node_entry = [(constants.RS_NORMAL, node.name)]
4087
      ret.append(node_entry)
4088

    
4089
      oob_program = _SupportsOob(self.cfg, node)
4090

    
4091
      if not oob_program:
4092
        node_entry.append((constants.RS_UNAVAIL, None))
4093
        continue
4094