Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ f0edfcf6

History | View | Annotate | Download (470.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay to many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42
import itertools
43
import operator
44

    
45
from ganeti import ssh
46
from ganeti import utils
47
from ganeti import errors
48
from ganeti import hypervisor
49
from ganeti import locking
50
from ganeti import constants
51
from ganeti import objects
52
from ganeti import serializer
53
from ganeti import ssconf
54
from ganeti import uidpool
55
from ganeti import compat
56
from ganeti import masterd
57
from ganeti import netutils
58
from ganeti import query
59
from ganeti import qlang
60
from ganeti import opcodes
61
from ganeti import ht
62

    
63
import ganeti.masterd.instance # pylint: disable-msg=W0611
64

    
65

    
66
class ResultWithJobs:
67
  """Data container for LU results with jobs.
68

69
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
70
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
71
  contained in the C{jobs} attribute and include the job IDs in the opcode
72
  result.
73

74
  """
75
  def __init__(self, jobs, **kwargs):
76
    """Initializes this class.
77

78
    Additional return values can be specified as keyword arguments.
79

80
    @type jobs: list of lists of L{opcode.OpCode}
81
    @param jobs: A list of lists of opcode objects
82

83
    """
84
    self.jobs = jobs
85
    self.other = kwargs
86

    
87

    
88
class LogicalUnit(object):
89
  """Logical Unit base class.
90

91
  Subclasses must follow these rules:
92
    - implement ExpandNames
93
    - implement CheckPrereq (except when tasklets are used)
94
    - implement Exec (except when tasklets are used)
95
    - implement BuildHooksEnv
96
    - implement BuildHooksNodes
97
    - redefine HPATH and HTYPE
98
    - optionally redefine their run requirements:
99
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
100

101
  Note that all commands require root permissions.
102

103
  @ivar dry_run_result: the value (if any) that will be returned to the caller
104
      in dry-run mode (signalled by opcode dry_run parameter)
105

106
  """
107
  HPATH = None
108
  HTYPE = None
109
  REQ_BGL = True
110

    
111
  def __init__(self, processor, op, context, rpc):
112
    """Constructor for LogicalUnit.
113

114
    This needs to be overridden in derived classes in order to check op
115
    validity.
116

117
    """
118
    self.proc = processor
119
    self.op = op
120
    self.cfg = context.cfg
121
    self.glm = context.glm
122
    # readability alias
123
    self.owned_locks = context.glm.list_owned
124
    self.context = context
125
    self.rpc = rpc
126
    # Dicts used to declare locking needs to mcpu
127
    self.needed_locks = None
128
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
129
    self.add_locks = {}
130
    self.remove_locks = {}
131
    # Used to force good behavior when calling helper functions
132
    self.recalculate_locks = {}
133
    # logging
134
    self.Log = processor.Log # pylint: disable-msg=C0103
135
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
136
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
137
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
138
    # support for dry-run
139
    self.dry_run_result = None
140
    # support for generic debug attribute
141
    if (not hasattr(self.op, "debug_level") or
142
        not isinstance(self.op.debug_level, int)):
143
      self.op.debug_level = 0
144

    
145
    # Tasklets
146
    self.tasklets = None
147

    
148
    # Validate opcode parameters and set defaults
149
    self.op.Validate(True)
150

    
151
    self.CheckArguments()
152

    
153
  def CheckArguments(self):
154
    """Check syntactic validity for the opcode arguments.
155

156
    This method is for doing a simple syntactic check and ensure
157
    validity of opcode parameters, without any cluster-related
158
    checks. While the same can be accomplished in ExpandNames and/or
159
    CheckPrereq, doing these separate is better because:
160

161
      - ExpandNames is left as as purely a lock-related function
162
      - CheckPrereq is run after we have acquired locks (and possible
163
        waited for them)
164

165
    The function is allowed to change the self.op attribute so that
166
    later methods can no longer worry about missing parameters.
167

168
    """
169
    pass
170

    
171
  def ExpandNames(self):
172
    """Expand names for this LU.
173

174
    This method is called before starting to execute the opcode, and it should
175
    update all the parameters of the opcode to their canonical form (e.g. a
176
    short node name must be fully expanded after this method has successfully
177
    completed). This way locking, hooks, logging, etc. can work correctly.
178

179
    LUs which implement this method must also populate the self.needed_locks
180
    member, as a dict with lock levels as keys, and a list of needed lock names
181
    as values. Rules:
182

183
      - use an empty dict if you don't need any lock
184
      - if you don't need any lock at a particular level omit that level
185
      - don't put anything for the BGL level
186
      - if you want all locks at a level use locking.ALL_SET as a value
187

188
    If you need to share locks (rather than acquire them exclusively) at one
189
    level you can modify self.share_locks, setting a true value (usually 1) for
190
    that level. By default locks are not shared.
191

192
    This function can also define a list of tasklets, which then will be
193
    executed in order instead of the usual LU-level CheckPrereq and Exec
194
    functions, if those are not defined by the LU.
195

196
    Examples::
197

198
      # Acquire all nodes and one instance
199
      self.needed_locks = {
200
        locking.LEVEL_NODE: locking.ALL_SET,
201
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
202
      }
203
      # Acquire just two nodes
204
      self.needed_locks = {
205
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
206
      }
207
      # Acquire no locks
208
      self.needed_locks = {} # No, you can't leave it to the default value None
209

210
    """
211
    # The implementation of this method is mandatory only if the new LU is
212
    # concurrent, so that old LUs don't need to be changed all at the same
213
    # time.
214
    if self.REQ_BGL:
215
      self.needed_locks = {} # Exclusive LUs don't need locks.
216
    else:
217
      raise NotImplementedError
218

    
219
  def DeclareLocks(self, level):
220
    """Declare LU locking needs for a level
221

222
    While most LUs can just declare their locking needs at ExpandNames time,
223
    sometimes there's the need to calculate some locks after having acquired
224
    the ones before. This function is called just before acquiring locks at a
225
    particular level, but after acquiring the ones at lower levels, and permits
226
    such calculations. It can be used to modify self.needed_locks, and by
227
    default it does nothing.
228

229
    This function is only called if you have something already set in
230
    self.needed_locks for the level.
231

232
    @param level: Locking level which is going to be locked
233
    @type level: member of ganeti.locking.LEVELS
234

235
    """
236

    
237
  def CheckPrereq(self):
238
    """Check prerequisites for this LU.
239

240
    This method should check that the prerequisites for the execution
241
    of this LU are fulfilled. It can do internode communication, but
242
    it should be idempotent - no cluster or system changes are
243
    allowed.
244

245
    The method should raise errors.OpPrereqError in case something is
246
    not fulfilled. Its return value is ignored.
247

248
    This method should also update all the parameters of the opcode to
249
    their canonical form if it hasn't been done by ExpandNames before.
250

251
    """
252
    if self.tasklets is not None:
253
      for (idx, tl) in enumerate(self.tasklets):
254
        logging.debug("Checking prerequisites for tasklet %s/%s",
255
                      idx + 1, len(self.tasklets))
256
        tl.CheckPrereq()
257
    else:
258
      pass
259

    
260
  def Exec(self, feedback_fn):
261
    """Execute the LU.
262

263
    This method should implement the actual work. It should raise
264
    errors.OpExecError for failures that are somewhat dealt with in
265
    code, or expected.
266

267
    """
268
    if self.tasklets is not None:
269
      for (idx, tl) in enumerate(self.tasklets):
270
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
271
        tl.Exec(feedback_fn)
272
    else:
273
      raise NotImplementedError
274

    
275
  def BuildHooksEnv(self):
276
    """Build hooks environment for this LU.
277

278
    @rtype: dict
279
    @return: Dictionary containing the environment that will be used for
280
      running the hooks for this LU. The keys of the dict must not be prefixed
281
      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
282
      will extend the environment with additional variables. If no environment
283
      should be defined, an empty dictionary should be returned (not C{None}).
284
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
285
      will not be called.
286

287
    """
288
    raise NotImplementedError
289

    
290
  def BuildHooksNodes(self):
291
    """Build list of nodes to run LU's hooks.
292

293
    @rtype: tuple; (list, list)
294
    @return: Tuple containing a list of node names on which the hook
295
      should run before the execution and a list of node names on which the
296
      hook should run after the execution. No nodes should be returned as an
297
      empty list (and not None).
298
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
299
      will not be called.
300

301
    """
302
    raise NotImplementedError
303

    
304
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
305
    """Notify the LU about the results of its hooks.
306

307
    This method is called every time a hooks phase is executed, and notifies
308
    the Logical Unit about the hooks' result. The LU can then use it to alter
309
    its result based on the hooks.  By default the method does nothing and the
310
    previous result is passed back unchanged but any LU can define it if it
311
    wants to use the local cluster hook-scripts somehow.
312

313
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
314
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
315
    @param hook_results: the results of the multi-node hooks rpc call
316
    @param feedback_fn: function used send feedback back to the caller
317
    @param lu_result: the previous Exec result this LU had, or None
318
        in the PRE phase
319
    @return: the new Exec result, based on the previous result
320
        and hook results
321

322
    """
323
    # API must be kept, thus we ignore the unused argument and could
324
    # be a function warnings
325
    # pylint: disable-msg=W0613,R0201
326
    return lu_result
327

    
328
  def _ExpandAndLockInstance(self):
329
    """Helper function to expand and lock an instance.
330

331
    Many LUs that work on an instance take its name in self.op.instance_name
332
    and need to expand it and then declare the expanded name for locking. This
333
    function does it, and then updates self.op.instance_name to the expanded
334
    name. It also initializes needed_locks as a dict, if this hasn't been done
335
    before.
336

337
    """
338
    if self.needed_locks is None:
339
      self.needed_locks = {}
340
    else:
341
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
342
        "_ExpandAndLockInstance called with instance-level locks set"
343
    self.op.instance_name = _ExpandInstanceName(self.cfg,
344
                                                self.op.instance_name)
345
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
346

    
347
  def _LockInstancesNodes(self, primary_only=False):
348
    """Helper function to declare instances' nodes for locking.
349

350
    This function should be called after locking one or more instances to lock
351
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
352
    with all primary or secondary nodes for instances already locked and
353
    present in self.needed_locks[locking.LEVEL_INSTANCE].
354

355
    It should be called from DeclareLocks, and for safety only works if
356
    self.recalculate_locks[locking.LEVEL_NODE] is set.
357

358
    In the future it may grow parameters to just lock some instance's nodes, or
359
    to just lock primaries or secondary nodes, if needed.
360

361
    If should be called in DeclareLocks in a way similar to::
362

363
      if level == locking.LEVEL_NODE:
364
        self._LockInstancesNodes()
365

366
    @type primary_only: boolean
367
    @param primary_only: only lock primary nodes of locked instances
368

369
    """
370
    assert locking.LEVEL_NODE in self.recalculate_locks, \
371
      "_LockInstancesNodes helper function called with no nodes to recalculate"
372

    
373
    # TODO: check if we're really been called with the instance locks held
374

    
375
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
376
    # future we might want to have different behaviors depending on the value
377
    # of self.recalculate_locks[locking.LEVEL_NODE]
378
    wanted_nodes = []
379
    locked_i = self.owned_locks(locking.LEVEL_INSTANCE)
380
    for _, instance in self.cfg.GetMultiInstanceInfo(locked_i):
381
      wanted_nodes.append(instance.primary_node)
382
      if not primary_only:
383
        wanted_nodes.extend(instance.secondary_nodes)
384

    
385
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
386
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
387
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
388
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
389

    
390
    del self.recalculate_locks[locking.LEVEL_NODE]
391

    
392

    
393
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
394
  """Simple LU which runs no hooks.
395

396
  This LU is intended as a parent for other LogicalUnits which will
397
  run no hooks, in order to reduce duplicate code.
398

399
  """
400
  HPATH = None
401
  HTYPE = None
402

    
403
  def BuildHooksEnv(self):
404
    """Empty BuildHooksEnv for NoHooksLu.
405

406
    This just raises an error.
407

408
    """
409
    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
410

    
411
  def BuildHooksNodes(self):
412
    """Empty BuildHooksNodes for NoHooksLU.
413

414
    """
415
    raise AssertionError("BuildHooksNodes called for NoHooksLU")
416

    
417

    
418
class Tasklet:
419
  """Tasklet base class.
420

421
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
422
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
423
  tasklets know nothing about locks.
424

425
  Subclasses must follow these rules:
426
    - Implement CheckPrereq
427
    - Implement Exec
428

429
  """
430
  def __init__(self, lu):
431
    self.lu = lu
432

    
433
    # Shortcuts
434
    self.cfg = lu.cfg
435
    self.rpc = lu.rpc
436

    
437
  def CheckPrereq(self):
438
    """Check prerequisites for this tasklets.
439

440
    This method should check whether the prerequisites for the execution of
441
    this tasklet are fulfilled. It can do internode communication, but it
442
    should be idempotent - no cluster or system changes are allowed.
443

444
    The method should raise errors.OpPrereqError in case something is not
445
    fulfilled. Its return value is ignored.
446

447
    This method should also update all parameters to their canonical form if it
448
    hasn't been done before.
449

450
    """
451
    pass
452

    
453
  def Exec(self, feedback_fn):
454
    """Execute the tasklet.
455

456
    This method should implement the actual work. It should raise
457
    errors.OpExecError for failures that are somewhat dealt with in code, or
458
    expected.
459

460
    """
461
    raise NotImplementedError
462

    
463

    
464
class _QueryBase:
465
  """Base for query utility classes.
466

467
  """
468
  #: Attribute holding field definitions
469
  FIELDS = None
470

    
471
  def __init__(self, filter_, fields, use_locking):
472
    """Initializes this class.
473

474
    """
475
    self.use_locking = use_locking
476

    
477
    self.query = query.Query(self.FIELDS, fields, filter_=filter_,
478
                             namefield="name")
479
    self.requested_data = self.query.RequestedData()
480
    self.names = self.query.RequestedNames()
481

    
482
    # Sort only if no names were requested
483
    self.sort_by_name = not self.names
484

    
485
    self.do_locking = None
486
    self.wanted = None
487

    
488
  def _GetNames(self, lu, all_names, lock_level):
489
    """Helper function to determine names asked for in the query.
490

491
    """
492
    if self.do_locking:
493
      names = lu.owned_locks(lock_level)
494
    else:
495
      names = all_names
496

    
497
    if self.wanted == locking.ALL_SET:
498
      assert not self.names
499
      # caller didn't specify names, so ordering is not important
500
      return utils.NiceSort(names)
501

    
502
    # caller specified names and we must keep the same order
503
    assert self.names
504
    assert not self.do_locking or lu.glm.is_owned(lock_level)
505

    
506
    missing = set(self.wanted).difference(names)
507
    if missing:
508
      raise errors.OpExecError("Some items were removed before retrieving"
509
                               " their data: %s" % missing)
510

    
511
    # Return expanded names
512
    return self.wanted
513

    
514
  def ExpandNames(self, lu):
515
    """Expand names for this query.
516

517
    See L{LogicalUnit.ExpandNames}.
518

519
    """
520
    raise NotImplementedError()
521

    
522
  def DeclareLocks(self, lu, level):
523
    """Declare locks for this query.
524

525
    See L{LogicalUnit.DeclareLocks}.
526

527
    """
528
    raise NotImplementedError()
529

    
530
  def _GetQueryData(self, lu):
531
    """Collects all data for this query.
532

533
    @return: Query data object
534

535
    """
536
    raise NotImplementedError()
537

    
538
  def NewStyleQuery(self, lu):
539
    """Collect data and execute query.
540

541
    """
542
    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
543
                                  sort_by_name=self.sort_by_name)
544

    
545
  def OldStyleQuery(self, lu):
546
    """Collect data and execute query.
547

548
    """
549
    return self.query.OldStyleQuery(self._GetQueryData(lu),
550
                                    sort_by_name=self.sort_by_name)
551

    
552

    
553
def _ShareAll():
554
  """Returns a dict declaring all lock levels shared.
555

556
  """
557
  return dict.fromkeys(locking.LEVELS, 1)
558

    
559

    
560
def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
561
  """Checks if the owned node groups are still correct for an instance.
562

563
  @type cfg: L{config.ConfigWriter}
564
  @param cfg: The cluster configuration
565
  @type instance_name: string
566
  @param instance_name: Instance name
567
  @type owned_groups: set or frozenset
568
  @param owned_groups: List of currently owned node groups
569

570
  """
571
  inst_groups = cfg.GetInstanceNodeGroups(instance_name)
572

    
573
  if not owned_groups.issuperset(inst_groups):
574
    raise errors.OpPrereqError("Instance %s's node groups changed since"
575
                               " locks were acquired, current groups are"
576
                               " are '%s', owning groups '%s'; retry the"
577
                               " operation" %
578
                               (instance_name,
579
                                utils.CommaJoin(inst_groups),
580
                                utils.CommaJoin(owned_groups)),
581
                               errors.ECODE_STATE)
582

    
583
  return inst_groups
584

    
585

    
586
def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
587
  """Checks if the instances in a node group are still correct.
588

589
  @type cfg: L{config.ConfigWriter}
590
  @param cfg: The cluster configuration
591
  @type group_uuid: string
592
  @param group_uuid: Node group UUID
593
  @type owned_instances: set or frozenset
594
  @param owned_instances: List of currently owned instances
595

596
  """
597
  wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
598
  if owned_instances != wanted_instances:
599
    raise errors.OpPrereqError("Instances in node group '%s' changed since"
600
                               " locks were acquired, wanted '%s', have '%s';"
601
                               " retry the operation" %
602
                               (group_uuid,
603
                                utils.CommaJoin(wanted_instances),
604
                                utils.CommaJoin(owned_instances)),
605
                               errors.ECODE_STATE)
606

    
607
  return wanted_instances
608

    
609

    
610
def _SupportsOob(cfg, node):
611
  """Tells if node supports OOB.
612

613
  @type cfg: L{config.ConfigWriter}
614
  @param cfg: The cluster configuration
615
  @type node: L{objects.Node}
616
  @param node: The node
617
  @return: The OOB script if supported or an empty string otherwise
618

619
  """
620
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
621

    
622

    
623
def _GetWantedNodes(lu, nodes):
624
  """Returns list of checked and expanded node names.
625

626
  @type lu: L{LogicalUnit}
627
  @param lu: the logical unit on whose behalf we execute
628
  @type nodes: list
629
  @param nodes: list of node names or None for all nodes
630
  @rtype: list
631
  @return: the list of nodes, sorted
632
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
633

634
  """
635
  if nodes:
636
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
637

    
638
  return utils.NiceSort(lu.cfg.GetNodeList())
639

    
640

    
641
def _GetWantedInstances(lu, instances):
642
  """Returns list of checked and expanded instance names.
643

644
  @type lu: L{LogicalUnit}
645
  @param lu: the logical unit on whose behalf we execute
646
  @type instances: list
647
  @param instances: list of instance names or None for all instances
648
  @rtype: list
649
  @return: the list of instances, sorted
650
  @raise errors.OpPrereqError: if the instances parameter is wrong type
651
  @raise errors.OpPrereqError: if any of the passed instances is not found
652

653
  """
654
  if instances:
655
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
656
  else:
657
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
658
  return wanted
659

    
660

    
661
def _GetUpdatedParams(old_params, update_dict,
662
                      use_default=True, use_none=False):
663
  """Return the new version of a parameter dictionary.
664

665
  @type old_params: dict
666
  @param old_params: old parameters
667
  @type update_dict: dict
668
  @param update_dict: dict containing new parameter values, or
669
      constants.VALUE_DEFAULT to reset the parameter to its default
670
      value
671
  @param use_default: boolean
672
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
673
      values as 'to be deleted' values
674
  @param use_none: boolean
675
  @type use_none: whether to recognise C{None} values as 'to be
676
      deleted' values
677
  @rtype: dict
678
  @return: the new parameter dictionary
679

680
  """
681
  params_copy = copy.deepcopy(old_params)
682
  for key, val in update_dict.iteritems():
683
    if ((use_default and val == constants.VALUE_DEFAULT) or
684
        (use_none and val is None)):
685
      try:
686
        del params_copy[key]
687
      except KeyError:
688
        pass
689
    else:
690
      params_copy[key] = val
691
  return params_copy
692

    
693

    
694
def _ReleaseLocks(lu, level, names=None, keep=None):
695
  """Releases locks owned by an LU.
696

697
  @type lu: L{LogicalUnit}
698
  @param level: Lock level
699
  @type names: list or None
700
  @param names: Names of locks to release
701
  @type keep: list or None
702
  @param keep: Names of locks to retain
703

704
  """
705
  assert not (keep is not None and names is not None), \
706
         "Only one of the 'names' and the 'keep' parameters can be given"
707

    
708
  if names is not None:
709
    should_release = names.__contains__
710
  elif keep:
711
    should_release = lambda name: name not in keep
712
  else:
713
    should_release = None
714

    
715
  if should_release:
716
    retain = []
717
    release = []
718

    
719
    # Determine which locks to release
720
    for name in lu.owned_locks(level):
721
      if should_release(name):
722
        release.append(name)
723
      else:
724
        retain.append(name)
725

    
726
    assert len(lu.owned_locks(level)) == (len(retain) + len(release))
727

    
728
    # Release just some locks
729
    lu.glm.release(level, names=release)
730

    
731
    assert frozenset(lu.owned_locks(level)) == frozenset(retain)
732
  else:
733
    # Release everything
734
    lu.glm.release(level)
735

    
736
    assert not lu.glm.is_owned(level), "No locks should be owned"
737

    
738

    
739
def _MapInstanceDisksToNodes(instances):
740
  """Creates a map from (node, volume) to instance name.
741

742
  @type instances: list of L{objects.Instance}
743
  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
744

745
  """
746
  return dict(((node, vol), inst.name)
747
              for inst in instances
748
              for (node, vols) in inst.MapLVsByNode().items()
749
              for vol in vols)
750

    
751

    
752
def _RunPostHook(lu, node_name):
753
  """Runs the post-hook for an opcode on a single node.
754

755
  """
756
  hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu)
757
  try:
758
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
759
  except:
760
    # pylint: disable-msg=W0702
761
    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
762

    
763

    
764
def _CheckOutputFields(static, dynamic, selected):
765
  """Checks whether all selected fields are valid.
766

767
  @type static: L{utils.FieldSet}
768
  @param static: static fields set
769
  @type dynamic: L{utils.FieldSet}
770
  @param dynamic: dynamic fields set
771

772
  """
773
  f = utils.FieldSet()
774
  f.Extend(static)
775
  f.Extend(dynamic)
776

    
777
  delta = f.NonMatching(selected)
778
  if delta:
779
    raise errors.OpPrereqError("Unknown output fields selected: %s"
780
                               % ",".join(delta), errors.ECODE_INVAL)
781

    
782

    
783
def _CheckGlobalHvParams(params):
784
  """Validates that given hypervisor params are not global ones.
785

786
  This will ensure that instances don't get customised versions of
787
  global params.
788

789
  """
790
  used_globals = constants.HVC_GLOBALS.intersection(params)
791
  if used_globals:
792
    msg = ("The following hypervisor parameters are global and cannot"
793
           " be customized at instance level, please modify them at"
794
           " cluster level: %s" % utils.CommaJoin(used_globals))
795
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
796

    
797

    
798
def _CheckNodeOnline(lu, node, msg=None):
799
  """Ensure that a given node is online.
800

801
  @param lu: the LU on behalf of which we make the check
802
  @param node: the node to check
803
  @param msg: if passed, should be a message to replace the default one
804
  @raise errors.OpPrereqError: if the node is offline
805

806
  """
807
  if msg is None:
808
    msg = "Can't use offline node"
809
  if lu.cfg.GetNodeInfo(node).offline:
810
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
811

    
812

    
813
def _CheckNodeNotDrained(lu, node):
814
  """Ensure that a given node is not drained.
815

816
  @param lu: the LU on behalf of which we make the check
817
  @param node: the node to check
818
  @raise errors.OpPrereqError: if the node is drained
819

820
  """
821
  if lu.cfg.GetNodeInfo(node).drained:
822
    raise errors.OpPrereqError("Can't use drained node %s" % node,
823
                               errors.ECODE_STATE)
824

    
825

    
826
def _CheckNodeVmCapable(lu, node):
827
  """Ensure that a given node is vm capable.
828

829
  @param lu: the LU on behalf of which we make the check
830
  @param node: the node to check
831
  @raise errors.OpPrereqError: if the node is not vm capable
832

833
  """
834
  if not lu.cfg.GetNodeInfo(node).vm_capable:
835
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
836
                               errors.ECODE_STATE)
837

    
838

    
839
def _CheckNodeHasOS(lu, node, os_name, force_variant):
840
  """Ensure that a node supports a given OS.
841

842
  @param lu: the LU on behalf of which we make the check
843
  @param node: the node to check
844
  @param os_name: the OS to query about
845
  @param force_variant: whether to ignore variant errors
846
  @raise errors.OpPrereqError: if the node is not supporting the OS
847

848
  """
849
  result = lu.rpc.call_os_get(node, os_name)
850
  result.Raise("OS '%s' not in supported OS list for node %s" %
851
               (os_name, node),
852
               prereq=True, ecode=errors.ECODE_INVAL)
853
  if not force_variant:
854
    _CheckOSVariant(result.payload, os_name)
855

    
856

    
857
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
858
  """Ensure that a node has the given secondary ip.
859

860
  @type lu: L{LogicalUnit}
861
  @param lu: the LU on behalf of which we make the check
862
  @type node: string
863
  @param node: the node to check
864
  @type secondary_ip: string
865
  @param secondary_ip: the ip to check
866
  @type prereq: boolean
867
  @param prereq: whether to throw a prerequisite or an execute error
868
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
869
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
870

871
  """
872
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
873
  result.Raise("Failure checking secondary ip on node %s" % node,
874
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
875
  if not result.payload:
876
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
877
           " please fix and re-run this command" % secondary_ip)
878
    if prereq:
879
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
880
    else:
881
      raise errors.OpExecError(msg)
882

    
883

    
884
def _GetClusterDomainSecret():
885
  """Reads the cluster domain secret.
886

887
  """
888
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
889
                               strict=True)
890

    
891

    
892
def _CheckInstanceDown(lu, instance, reason):
893
  """Ensure that an instance is not running."""
894
  if instance.admin_up:
895
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
896
                               (instance.name, reason), errors.ECODE_STATE)
897

    
898
  pnode = instance.primary_node
899
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
900
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
901
              prereq=True, ecode=errors.ECODE_ENVIRON)
902

    
903
  if instance.name in ins_l.payload:
904
    raise errors.OpPrereqError("Instance %s is running, %s" %
905
                               (instance.name, reason), errors.ECODE_STATE)
906

    
907

    
908
def _ExpandItemName(fn, name, kind):
909
  """Expand an item name.
910

911
  @param fn: the function to use for expansion
912
  @param name: requested item name
913
  @param kind: text description ('Node' or 'Instance')
914
  @return: the resolved (full) name
915
  @raise errors.OpPrereqError: if the item is not found
916

917
  """
918
  full_name = fn(name)
919
  if full_name is None:
920
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
921
                               errors.ECODE_NOENT)
922
  return full_name
923

    
924

    
925
def _ExpandNodeName(cfg, name):
926
  """Wrapper over L{_ExpandItemName} for nodes."""
927
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
928

    
929

    
930
def _ExpandInstanceName(cfg, name):
931
  """Wrapper over L{_ExpandItemName} for instance."""
932
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
933

    
934

    
935
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
936
                          memory, vcpus, nics, disk_template, disks,
937
                          bep, hvp, hypervisor_name, tags):
938
  """Builds instance related env variables for hooks
939

940
  This builds the hook environment from individual variables.
941

942
  @type name: string
943
  @param name: the name of the instance
944
  @type primary_node: string
945
  @param primary_node: the name of the instance's primary node
946
  @type secondary_nodes: list
947
  @param secondary_nodes: list of secondary nodes as strings
948
  @type os_type: string
949
  @param os_type: the name of the instance's OS
950
  @type status: boolean
951
  @param status: the should_run status of the instance
952
  @type memory: string
953
  @param memory: the memory size of the instance
954
  @type vcpus: string
955
  @param vcpus: the count of VCPUs the instance has
956
  @type nics: list
957
  @param nics: list of tuples (ip, mac, mode, link) representing
958
      the NICs the instance has
959
  @type disk_template: string
960
  @param disk_template: the disk template of the instance
961
  @type disks: list
962
  @param disks: the list of (size, mode) pairs
963
  @type bep: dict
964
  @param bep: the backend parameters for the instance
965
  @type hvp: dict
966
  @param hvp: the hypervisor parameters for the instance
967
  @type hypervisor_name: string
968
  @param hypervisor_name: the hypervisor for the instance
969
  @type tags: list
970
  @param tags: list of instance tags as strings
971
  @rtype: dict
972
  @return: the hook environment for this instance
973

974
  """
975
  if status:
976
    str_status = "up"
977
  else:
978
    str_status = "down"
979
  env = {
980
    "OP_TARGET": name,
981
    "INSTANCE_NAME": name,
982
    "INSTANCE_PRIMARY": primary_node,
983
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
984
    "INSTANCE_OS_TYPE": os_type,
985
    "INSTANCE_STATUS": str_status,
986
    "INSTANCE_MEMORY": memory,
987
    "INSTANCE_VCPUS": vcpus,
988
    "INSTANCE_DISK_TEMPLATE": disk_template,
989
    "INSTANCE_HYPERVISOR": hypervisor_name,
990
  }
991

    
992
  if nics:
993
    nic_count = len(nics)
994
    for idx, (ip, mac, mode, link) in enumerate(nics):
995
      if ip is None:
996
        ip = ""
997
      env["INSTANCE_NIC%d_IP" % idx] = ip
998
      env["INSTANCE_NIC%d_MAC" % idx] = mac
999
      env["INSTANCE_NIC%d_MODE" % idx] = mode
1000
      env["INSTANCE_NIC%d_LINK" % idx] = link
1001
      if mode == constants.NIC_MODE_BRIDGED:
1002
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
1003
  else:
1004
    nic_count = 0
1005

    
1006
  env["INSTANCE_NIC_COUNT"] = nic_count
1007

    
1008
  if disks:
1009
    disk_count = len(disks)
1010
    for idx, (size, mode) in enumerate(disks):
1011
      env["INSTANCE_DISK%d_SIZE" % idx] = size
1012
      env["INSTANCE_DISK%d_MODE" % idx] = mode
1013
  else:
1014
    disk_count = 0
1015

    
1016
  env["INSTANCE_DISK_COUNT"] = disk_count
1017

    
1018
  if not tags:
1019
    tags = []
1020

    
1021
  env["INSTANCE_TAGS"] = " ".join(tags)
1022

    
1023
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
1024
    for key, value in source.items():
1025
      env["INSTANCE_%s_%s" % (kind, key)] = value
1026

    
1027
  return env
1028

    
1029

    
1030
def _NICListToTuple(lu, nics):
1031
  """Build a list of nic information tuples.
1032

1033
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
1034
  value in LUInstanceQueryData.
1035

1036
  @type lu:  L{LogicalUnit}
1037
  @param lu: the logical unit on whose behalf we execute
1038
  @type nics: list of L{objects.NIC}
1039
  @param nics: list of nics to convert to hooks tuples
1040

1041
  """
1042
  hooks_nics = []
1043
  cluster = lu.cfg.GetClusterInfo()
1044
  for nic in nics:
1045
    ip = nic.ip
1046
    mac = nic.mac
1047
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
1048
    mode = filled_params[constants.NIC_MODE]
1049
    link = filled_params[constants.NIC_LINK]
1050
    hooks_nics.append((ip, mac, mode, link))
1051
  return hooks_nics
1052

    
1053

    
1054
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
1055
  """Builds instance related env variables for hooks from an object.
1056

1057
  @type lu: L{LogicalUnit}
1058
  @param lu: the logical unit on whose behalf we execute
1059
  @type instance: L{objects.Instance}
1060
  @param instance: the instance for which we should build the
1061
      environment
1062
  @type override: dict
1063
  @param override: dictionary with key/values that will override
1064
      our values
1065
  @rtype: dict
1066
  @return: the hook environment dictionary
1067

1068
  """
1069
  cluster = lu.cfg.GetClusterInfo()
1070
  bep = cluster.FillBE(instance)
1071
  hvp = cluster.FillHV(instance)
1072
  args = {
1073
    "name": instance.name,
1074
    "primary_node": instance.primary_node,
1075
    "secondary_nodes": instance.secondary_nodes,
1076
    "os_type": instance.os,
1077
    "status": instance.admin_up,
1078
    "memory": bep[constants.BE_MEMORY],
1079
    "vcpus": bep[constants.BE_VCPUS],
1080
    "nics": _NICListToTuple(lu, instance.nics),
1081
    "disk_template": instance.disk_template,
1082
    "disks": [(disk.size, disk.mode) for disk in instance.disks],
1083
    "bep": bep,
1084
    "hvp": hvp,
1085
    "hypervisor_name": instance.hypervisor,
1086
    "tags": instance.tags,
1087
  }
1088
  if override:
1089
    args.update(override)
1090
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
1091

    
1092

    
1093
def _AdjustCandidatePool(lu, exceptions):
1094
  """Adjust the candidate pool after node operations.
1095

1096
  """
1097
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1098
  if mod_list:
1099
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1100
               utils.CommaJoin(node.name for node in mod_list))
1101
    for name in mod_list:
1102
      lu.context.ReaddNode(name)
1103
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1104
  if mc_now > mc_max:
1105
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1106
               (mc_now, mc_max))
1107

    
1108

    
1109
def _DecideSelfPromotion(lu, exceptions=None):
1110
  """Decide whether I should promote myself as a master candidate.
1111

1112
  """
1113
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1114
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1115
  # the new node will increase mc_max with one, so:
1116
  mc_should = min(mc_should + 1, cp_size)
1117
  return mc_now < mc_should
1118

    
1119

    
1120
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1121
  """Check that the brigdes needed by a list of nics exist.
1122

1123
  """
1124
  cluster = lu.cfg.GetClusterInfo()
1125
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1126
  brlist = [params[constants.NIC_LINK] for params in paramslist
1127
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1128
  if brlist:
1129
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1130
    result.Raise("Error checking bridges on destination node '%s'" %
1131
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1132

    
1133

    
1134
def _CheckInstanceBridgesExist(lu, instance, node=None):
1135
  """Check that the brigdes needed by an instance exist.
1136

1137
  """
1138
  if node is None:
1139
    node = instance.primary_node
1140
  _CheckNicsBridgesExist(lu, instance.nics, node)
1141

    
1142

    
1143
def _CheckOSVariant(os_obj, name):
1144
  """Check whether an OS name conforms to the os variants specification.
1145

1146
  @type os_obj: L{objects.OS}
1147
  @param os_obj: OS object to check
1148
  @type name: string
1149
  @param name: OS name passed by the user, to check for validity
1150

1151
  """
1152
  variant = objects.OS.GetVariant(name)
1153
  if not os_obj.supported_variants:
1154
    if variant:
1155
      raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
1156
                                 " passed)" % (os_obj.name, variant),
1157
                                 errors.ECODE_INVAL)
1158
    return
1159
  if not variant:
1160
    raise errors.OpPrereqError("OS name must include a variant",
1161
                               errors.ECODE_INVAL)
1162

    
1163
  if variant not in os_obj.supported_variants:
1164
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1165

    
1166

    
1167
def _GetNodeInstancesInner(cfg, fn):
1168
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1169

    
1170

    
1171
def _GetNodeInstances(cfg, node_name):
1172
  """Returns a list of all primary and secondary instances on a node.
1173

1174
  """
1175

    
1176
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1177

    
1178

    
1179
def _GetNodePrimaryInstances(cfg, node_name):
1180
  """Returns primary instances on a node.
1181

1182
  """
1183
  return _GetNodeInstancesInner(cfg,
1184
                                lambda inst: node_name == inst.primary_node)
1185

    
1186

    
1187
def _GetNodeSecondaryInstances(cfg, node_name):
1188
  """Returns secondary instances on a node.
1189

1190
  """
1191
  return _GetNodeInstancesInner(cfg,
1192
                                lambda inst: node_name in inst.secondary_nodes)
1193

    
1194

    
1195
def _GetStorageTypeArgs(cfg, storage_type):
1196
  """Returns the arguments for a storage type.
1197

1198
  """
1199
  # Special case for file storage
1200
  if storage_type == constants.ST_FILE:
1201
    # storage.FileStorage wants a list of storage directories
1202
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1203

    
1204
  return []
1205

    
1206

    
1207
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
1208
  faulty = []
1209

    
1210
  for dev in instance.disks:
1211
    cfg.SetDiskID(dev, node_name)
1212

    
1213
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
1214
  result.Raise("Failed to get disk status from node %s" % node_name,
1215
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1216

    
1217
  for idx, bdev_status in enumerate(result.payload):
1218
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1219
      faulty.append(idx)
1220

    
1221
  return faulty
1222

    
1223

    
1224
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1225
  """Check the sanity of iallocator and node arguments and use the
1226
  cluster-wide iallocator if appropriate.
1227

1228
  Check that at most one of (iallocator, node) is specified. If none is
1229
  specified, then the LU's opcode's iallocator slot is filled with the
1230
  cluster-wide default iallocator.
1231

1232
  @type iallocator_slot: string
1233
  @param iallocator_slot: the name of the opcode iallocator slot
1234
  @type node_slot: string
1235
  @param node_slot: the name of the opcode target node slot
1236

1237
  """
1238
  node = getattr(lu.op, node_slot, None)
1239
  iallocator = getattr(lu.op, iallocator_slot, None)
1240

    
1241
  if node is not None and iallocator is not None:
1242
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
1243
                               errors.ECODE_INVAL)
1244
  elif node is None and iallocator is None:
1245
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1246
    if default_iallocator:
1247
      setattr(lu.op, iallocator_slot, default_iallocator)
1248
    else:
1249
      raise errors.OpPrereqError("No iallocator or node given and no"
1250
                                 " cluster-wide default iallocator found;"
1251
                                 " please specify either an iallocator or a"
1252
                                 " node, or set a cluster-wide default"
1253
                                 " iallocator")
1254

    
1255

    
1256
def _GetDefaultIAllocator(cfg, iallocator):
1257
  """Decides on which iallocator to use.
1258

1259
  @type cfg: L{config.ConfigWriter}
1260
  @param cfg: Cluster configuration object
1261
  @type iallocator: string or None
1262
  @param iallocator: Iallocator specified in opcode
1263
  @rtype: string
1264
  @return: Iallocator name
1265

1266
  """
1267
  if not iallocator:
1268
    # Use default iallocator
1269
    iallocator = cfg.GetDefaultIAllocator()
1270

    
1271
  if not iallocator:
1272
    raise errors.OpPrereqError("No iallocator was specified, neither in the"
1273
                               " opcode nor as a cluster-wide default",
1274
                               errors.ECODE_INVAL)
1275

    
1276
  return iallocator
1277

    
1278

    
1279
class LUClusterPostInit(LogicalUnit):
1280
  """Logical unit for running hooks after cluster initialization.
1281

1282
  """
1283
  HPATH = "cluster-init"
1284
  HTYPE = constants.HTYPE_CLUSTER
1285

    
1286
  def BuildHooksEnv(self):
1287
    """Build hooks env.
1288

1289
    """
1290
    return {
1291
      "OP_TARGET": self.cfg.GetClusterName(),
1292
      }
1293

    
1294
  def BuildHooksNodes(self):
1295
    """Build hooks nodes.
1296

1297
    """
1298
    return ([], [self.cfg.GetMasterNode()])
1299

    
1300
  def Exec(self, feedback_fn):
1301
    """Nothing to do.
1302

1303
    """
1304
    return True
1305

    
1306

    
1307
class LUClusterDestroy(LogicalUnit):
1308
  """Logical unit for destroying the cluster.
1309

1310
  """
1311
  HPATH = "cluster-destroy"
1312
  HTYPE = constants.HTYPE_CLUSTER
1313

    
1314
  def BuildHooksEnv(self):
1315
    """Build hooks env.
1316

1317
    """
1318
    return {
1319
      "OP_TARGET": self.cfg.GetClusterName(),
1320
      }
1321

    
1322
  def BuildHooksNodes(self):
1323
    """Build hooks nodes.
1324

1325
    """
1326
    return ([], [])
1327

    
1328
  def CheckPrereq(self):
1329
    """Check prerequisites.
1330

1331
    This checks whether the cluster is empty.
1332

1333
    Any errors are signaled by raising errors.OpPrereqError.
1334

1335
    """
1336
    master = self.cfg.GetMasterNode()
1337

    
1338
    nodelist = self.cfg.GetNodeList()
1339
    if len(nodelist) != 1 or nodelist[0] != master:
1340
      raise errors.OpPrereqError("There are still %d node(s) in"
1341
                                 " this cluster." % (len(nodelist) - 1),
1342
                                 errors.ECODE_INVAL)
1343
    instancelist = self.cfg.GetInstanceList()
1344
    if instancelist:
1345
      raise errors.OpPrereqError("There are still %d instance(s) in"
1346
                                 " this cluster." % len(instancelist),
1347
                                 errors.ECODE_INVAL)
1348

    
1349
  def Exec(self, feedback_fn):
1350
    """Destroys the cluster.
1351

1352
    """
1353
    master = self.cfg.GetMasterNode()
1354

    
1355
    # Run post hooks on master node before it's removed
1356
    _RunPostHook(self, master)
1357

    
1358
    result = self.rpc.call_node_stop_master(master, False)
1359
    result.Raise("Could not disable the master role")
1360

    
1361
    return master
1362

    
1363

    
1364
def _VerifyCertificate(filename):
1365
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1366

1367
  @type filename: string
1368
  @param filename: Path to PEM file
1369

1370
  """
1371
  try:
1372
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1373
                                           utils.ReadFile(filename))
1374
  except Exception, err: # pylint: disable-msg=W0703
1375
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1376
            "Failed to load X509 certificate %s: %s" % (filename, err))
1377

    
1378
  (errcode, msg) = \
1379
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1380
                                constants.SSL_CERT_EXPIRATION_ERROR)
1381

    
1382
  if msg:
1383
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1384
  else:
1385
    fnamemsg = None
1386

    
1387
  if errcode is None:
1388
    return (None, fnamemsg)
1389
  elif errcode == utils.CERT_WARNING:
1390
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1391
  elif errcode == utils.CERT_ERROR:
1392
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1393

    
1394
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1395

    
1396

    
1397
def _GetAllHypervisorParameters(cluster, instances):
1398
  """Compute the set of all hypervisor parameters.
1399

1400
  @type cluster: L{objects.Cluster}
1401
  @param cluster: the cluster object
1402
  @param instances: list of L{objects.Instance}
1403
  @param instances: additional instances from which to obtain parameters
1404
  @rtype: list of (origin, hypervisor, parameters)
1405
  @return: a list with all parameters found, indicating the hypervisor they
1406
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1407

1408
  """
1409
  hvp_data = []
1410

    
1411
  for hv_name in cluster.enabled_hypervisors:
1412
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1413

    
1414
  for os_name, os_hvp in cluster.os_hvp.items():
1415
    for hv_name, hv_params in os_hvp.items():
1416
      if hv_params:
1417
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1418
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1419

    
1420
  # TODO: collapse identical parameter values in a single one
1421
  for instance in instances:
1422
    if instance.hvparams:
1423
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1424
                       cluster.FillHV(instance)))
1425

    
1426
  return hvp_data
1427

    
1428

    
1429
class _VerifyErrors(object):
1430
  """Mix-in for cluster/group verify LUs.
1431

1432
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1433
  self.op and self._feedback_fn to be available.)
1434

1435
  """
1436
  TCLUSTER = "cluster"
1437
  TNODE = "node"
1438
  TINSTANCE = "instance"
1439

    
1440
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1441
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1442
  ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK")
1443
  ECLUSTERDANGLINGNODES = (TNODE, "ECLUSTERDANGLINGNODES")
1444
  ECLUSTERDANGLINGINST = (TNODE, "ECLUSTERDANGLINGINST")
1445
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1446
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1447
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1448
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1449
  EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
1450
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1451
  EINSTANCESPLITGROUPS = (TINSTANCE, "EINSTANCESPLITGROUPS")
1452
  ENODEDRBD = (TNODE, "ENODEDRBD")
1453
  ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
1454
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1455
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1456
  ENODEHV = (TNODE, "ENODEHV")
1457
  ENODELVM = (TNODE, "ENODELVM")
1458
  ENODEN1 = (TNODE, "ENODEN1")
1459
  ENODENET = (TNODE, "ENODENET")
1460
  ENODEOS = (TNODE, "ENODEOS")
1461
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1462
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1463
  ENODERPC = (TNODE, "ENODERPC")
1464
  ENODESSH = (TNODE, "ENODESSH")
1465
  ENODEVERSION = (TNODE, "ENODEVERSION")
1466
  ENODESETUP = (TNODE, "ENODESETUP")
1467
  ENODETIME = (TNODE, "ENODETIME")
1468
  ENODEOOBPATH = (TNODE, "ENODEOOBPATH")
1469

    
1470
  ETYPE_FIELD = "code"
1471
  ETYPE_ERROR = "ERROR"
1472
  ETYPE_WARNING = "WARNING"
1473

    
1474
  def _Error(self, ecode, item, msg, *args, **kwargs):
1475
    """Format an error message.
1476

1477
    Based on the opcode's error_codes parameter, either format a
1478
    parseable error code, or a simpler error string.
1479

1480
    This must be called only from Exec and functions called from Exec.
1481

1482
    """
1483
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1484
    itype, etxt = ecode
1485
    # first complete the msg
1486
    if args:
1487
      msg = msg % args
1488
    # then format the whole message
1489
    if self.op.error_codes: # This is a mix-in. pylint: disable-msg=E1101
1490
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1491
    else:
1492
      if item:
1493
        item = " " + item
1494
      else:
1495
        item = ""
1496
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1497
    # and finally report it via the feedback_fn
1498
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable-msg=E1101
1499

    
1500
  def _ErrorIf(self, cond, *args, **kwargs):
1501
    """Log an error message if the passed condition is True.
1502

1503
    """
1504
    cond = (bool(cond)
1505
            or self.op.debug_simulate_errors) # pylint: disable-msg=E1101
1506
    if cond:
1507
      self._Error(*args, **kwargs)
1508
    # do not mark the operation as failed for WARN cases only
1509
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1510
      self.bad = self.bad or cond
1511

    
1512

    
1513
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1514
  """Verifies the cluster config.
1515

1516
  """
1517
  REQ_BGL = True
1518

    
1519
  def _VerifyHVP(self, hvp_data):
1520
    """Verifies locally the syntax of the hypervisor parameters.
1521

1522
    """
1523
    for item, hv_name, hv_params in hvp_data:
1524
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1525
             (item, hv_name))
1526
      try:
1527
        hv_class = hypervisor.GetHypervisor(hv_name)
1528
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1529
        hv_class.CheckParameterSyntax(hv_params)
1530
      except errors.GenericError, err:
1531
        self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
1532

    
1533
  def ExpandNames(self):
1534
    # Information can be safely retrieved as the BGL is acquired in exclusive
1535
    # mode
1536
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1537
    self.all_node_info = self.cfg.GetAllNodesInfo()
1538
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1539
    self.needed_locks = {}
1540

    
1541
  def Exec(self, feedback_fn):
1542
    """Verify integrity of cluster, performing various test on nodes.
1543

1544
    """
1545
    self.bad = False
1546
    self._feedback_fn = feedback_fn
1547

    
1548
    feedback_fn("* Verifying cluster config")
1549

    
1550
    for msg in self.cfg.VerifyConfig():
1551
      self._ErrorIf(True, self.ECLUSTERCFG, None, msg)
1552

    
1553
    feedback_fn("* Verifying cluster certificate files")
1554

    
1555
    for cert_filename in constants.ALL_CERT_FILES:
1556
      (errcode, msg) = _VerifyCertificate(cert_filename)
1557
      self._ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
1558

    
1559
    feedback_fn("* Verifying hypervisor parameters")
1560

    
1561
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1562
                                                self.all_inst_info.values()))
1563

    
1564
    feedback_fn("* Verifying all nodes belong to an existing group")
1565

    
1566
    # We do this verification here because, should this bogus circumstance
1567
    # occur, it would never be caught by VerifyGroup, which only acts on
1568
    # nodes/instances reachable from existing node groups.
1569

    
1570
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1571
                         if node.group not in self.all_group_info)
1572

    
1573
    dangling_instances = {}
1574
    no_node_instances = []
1575

    
1576
    for inst in self.all_inst_info.values():
1577
      if inst.primary_node in dangling_nodes:
1578
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1579
      elif inst.primary_node not in self.all_node_info:
1580
        no_node_instances.append(inst.name)
1581

    
1582
    pretty_dangling = [
1583
        "%s (%s)" %
1584
        (node.name,
1585
         utils.CommaJoin(dangling_instances.get(node.name,
1586
                                                ["no instances"])))
1587
        for node in dangling_nodes]
1588

    
1589
    self._ErrorIf(bool(dangling_nodes), self.ECLUSTERDANGLINGNODES, None,
1590
                  "the following nodes (and their instances) belong to a non"
1591
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1592

    
1593
    self._ErrorIf(bool(no_node_instances), self.ECLUSTERDANGLINGINST, None,
1594
                  "the following instances have a non-existing primary-node:"
1595
                  " %s", utils.CommaJoin(no_node_instances))
1596

    
1597
    return (not self.bad, [g.name for g in self.all_group_info.values()])
1598

    
1599

    
1600
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1601
  """Verifies the status of a node group.
1602

1603
  """
1604
  HPATH = "cluster-verify"
1605
  HTYPE = constants.HTYPE_CLUSTER
1606
  REQ_BGL = False
1607

    
1608
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1609

    
1610
  class NodeImage(object):
1611
    """A class representing the logical and physical status of a node.
1612

1613
    @type name: string
1614
    @ivar name: the node name to which this object refers
1615
    @ivar volumes: a structure as returned from
1616
        L{ganeti.backend.GetVolumeList} (runtime)
1617
    @ivar instances: a list of running instances (runtime)
1618
    @ivar pinst: list of configured primary instances (config)
1619
    @ivar sinst: list of configured secondary instances (config)
1620
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1621
        instances for which this node is secondary (config)
1622
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1623
    @ivar dfree: free disk, as reported by the node (runtime)
1624
    @ivar offline: the offline status (config)
1625
    @type rpc_fail: boolean
1626
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1627
        not whether the individual keys were correct) (runtime)
1628
    @type lvm_fail: boolean
1629
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1630
    @type hyp_fail: boolean
1631
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1632
    @type ghost: boolean
1633
    @ivar ghost: whether this is a known node or not (config)
1634
    @type os_fail: boolean
1635
    @ivar os_fail: whether the RPC call didn't return valid OS data
1636
    @type oslist: list
1637
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1638
    @type vm_capable: boolean
1639
    @ivar vm_capable: whether the node can host instances
1640

1641
    """
1642
    def __init__(self, offline=False, name=None, vm_capable=True):
1643
      self.name = name
1644
      self.volumes = {}
1645
      self.instances = []
1646
      self.pinst = []
1647
      self.sinst = []
1648
      self.sbp = {}
1649
      self.mfree = 0
1650
      self.dfree = 0
1651
      self.offline = offline
1652
      self.vm_capable = vm_capable
1653
      self.rpc_fail = False
1654
      self.lvm_fail = False
1655
      self.hyp_fail = False
1656
      self.ghost = False
1657
      self.os_fail = False
1658
      self.oslist = {}
1659

    
1660
  def ExpandNames(self):
1661
    # This raises errors.OpPrereqError on its own:
1662
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1663

    
1664
    # Get instances in node group; this is unsafe and needs verification later
1665
    inst_names = self.cfg.GetNodeGroupInstances(self.group_uuid)
1666

    
1667
    self.needed_locks = {
1668
      locking.LEVEL_INSTANCE: inst_names,
1669
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1670
      locking.LEVEL_NODE: [],
1671
      }
1672

    
1673
    self.share_locks = _ShareAll()
1674

    
1675
  def DeclareLocks(self, level):
1676
    if level == locking.LEVEL_NODE:
1677
      # Get members of node group; this is unsafe and needs verification later
1678
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1679

    
1680
      all_inst_info = self.cfg.GetAllInstancesInfo()
1681

    
1682
      # In Exec(), we warn about mirrored instances that have primary and
1683
      # secondary living in separate node groups. To fully verify that
1684
      # volumes for these instances are healthy, we will need to do an
1685
      # extra call to their secondaries. We ensure here those nodes will
1686
      # be locked.
1687
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1688
        # Important: access only the instances whose lock is owned
1689
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1690
          nodes.update(all_inst_info[inst].secondary_nodes)
1691

    
1692
      self.needed_locks[locking.LEVEL_NODE] = nodes
1693

    
1694
  def CheckPrereq(self):
1695
    group_nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1696
    group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
1697

    
1698
    unlocked_nodes = \
1699
        group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1700

    
1701
    unlocked_instances = \
1702
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1703

    
1704
    if unlocked_nodes:
1705
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1706
                                 utils.CommaJoin(unlocked_nodes))
1707

    
1708
    if unlocked_instances:
1709
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1710
                                 utils.CommaJoin(unlocked_instances))
1711

    
1712
    self.all_node_info = self.cfg.GetAllNodesInfo()
1713
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1714

    
1715
    self.my_node_names = utils.NiceSort(group_nodes)
1716
    self.my_inst_names = utils.NiceSort(group_instances)
1717

    
1718
    self.my_node_info = dict((name, self.all_node_info[name])
1719
                             for name in self.my_node_names)
1720

    
1721
    self.my_inst_info = dict((name, self.all_inst_info[name])
1722
                             for name in self.my_inst_names)
1723

    
1724
    # We detect here the nodes that will need the extra RPC calls for verifying
1725
    # split LV volumes; they should be locked.
1726
    extra_lv_nodes = set()
1727

    
1728
    for inst in self.my_inst_info.values():
1729
      if inst.disk_template in constants.DTS_INT_MIRROR:
1730
        group = self.my_node_info[inst.primary_node].group
1731
        for nname in inst.secondary_nodes:
1732
          if self.all_node_info[nname].group != group:
1733
            extra_lv_nodes.add(nname)
1734

    
1735
    unlocked_lv_nodes = \
1736
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1737

    
1738
    if unlocked_lv_nodes:
1739
      raise errors.OpPrereqError("these nodes could be locked: %s" %
1740
                                 utils.CommaJoin(unlocked_lv_nodes))
1741
    self.extra_lv_nodes = list(extra_lv_nodes)
1742

    
1743
  def _VerifyNode(self, ninfo, nresult):
1744
    """Perform some basic validation on data returned from a node.
1745

1746
      - check the result data structure is well formed and has all the
1747
        mandatory fields
1748
      - check ganeti version
1749

1750
    @type ninfo: L{objects.Node}
1751
    @param ninfo: the node to check
1752
    @param nresult: the results from the node
1753
    @rtype: boolean
1754
    @return: whether overall this call was successful (and we can expect
1755
         reasonable values in the respose)
1756

1757
    """
1758
    node = ninfo.name
1759
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1760

    
1761
    # main result, nresult should be a non-empty dict
1762
    test = not nresult or not isinstance(nresult, dict)
1763
    _ErrorIf(test, self.ENODERPC, node,
1764
                  "unable to verify node: no data returned")
1765
    if test:
1766
      return False
1767

    
1768
    # compares ganeti version
1769
    local_version = constants.PROTOCOL_VERSION
1770
    remote_version = nresult.get("version", None)
1771
    test = not (remote_version and
1772
                isinstance(remote_version, (list, tuple)) and
1773
                len(remote_version) == 2)
1774
    _ErrorIf(test, self.ENODERPC, node,
1775
             "connection to node returned invalid data")
1776
    if test:
1777
      return False
1778

    
1779
    test = local_version != remote_version[0]
1780
    _ErrorIf(test, self.ENODEVERSION, node,
1781
             "incompatible protocol versions: master %s,"
1782
             " node %s", local_version, remote_version[0])
1783
    if test:
1784
      return False
1785

    
1786
    # node seems compatible, we can actually try to look into its results
1787

    
1788
    # full package version
1789
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1790
                  self.ENODEVERSION, node,
1791
                  "software version mismatch: master %s, node %s",
1792
                  constants.RELEASE_VERSION, remote_version[1],
1793
                  code=self.ETYPE_WARNING)
1794

    
1795
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1796
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1797
      for hv_name, hv_result in hyp_result.iteritems():
1798
        test = hv_result is not None
1799
        _ErrorIf(test, self.ENODEHV, node,
1800
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1801

    
1802
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1803
    if ninfo.vm_capable and isinstance(hvp_result, list):
1804
      for item, hv_name, hv_result in hvp_result:
1805
        _ErrorIf(True, self.ENODEHV, node,
1806
                 "hypervisor %s parameter verify failure (source %s): %s",
1807
                 hv_name, item, hv_result)
1808

    
1809
    test = nresult.get(constants.NV_NODESETUP,
1810
                       ["Missing NODESETUP results"])
1811
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1812
             "; ".join(test))
1813

    
1814
    return True
1815

    
1816
  def _VerifyNodeTime(self, ninfo, nresult,
1817
                      nvinfo_starttime, nvinfo_endtime):
1818
    """Check the node time.
1819

1820
    @type ninfo: L{objects.Node}
1821
    @param ninfo: the node to check
1822
    @param nresult: the remote results for the node
1823
    @param nvinfo_starttime: the start time of the RPC call
1824
    @param nvinfo_endtime: the end time of the RPC call
1825

1826
    """
1827
    node = ninfo.name
1828
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1829

    
1830
    ntime = nresult.get(constants.NV_TIME, None)
1831
    try:
1832
      ntime_merged = utils.MergeTime(ntime)
1833
    except (ValueError, TypeError):
1834
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1835
      return
1836

    
1837
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1838
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1839
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1840
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1841
    else:
1842
      ntime_diff = None
1843

    
1844
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1845
             "Node time diverges by at least %s from master node time",
1846
             ntime_diff)
1847

    
1848
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1849
    """Check the node LVM results.
1850

1851
    @type ninfo: L{objects.Node}
1852
    @param ninfo: the node to check
1853
    @param nresult: the remote results for the node
1854
    @param vg_name: the configured VG name
1855

1856
    """
1857
    if vg_name is None:
1858
      return
1859

    
1860
    node = ninfo.name
1861
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1862

    
1863
    # checks vg existence and size > 20G
1864
    vglist = nresult.get(constants.NV_VGLIST, None)
1865
    test = not vglist
1866
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1867
    if not test:
1868
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1869
                                            constants.MIN_VG_SIZE)
1870
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1871

    
1872
    # check pv names
1873
    pvlist = nresult.get(constants.NV_PVLIST, None)
1874
    test = pvlist is None
1875
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1876
    if not test:
1877
      # check that ':' is not present in PV names, since it's a
1878
      # special character for lvcreate (denotes the range of PEs to
1879
      # use on the PV)
1880
      for _, pvname, owner_vg in pvlist:
1881
        test = ":" in pvname
1882
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1883
                 " '%s' of VG '%s'", pvname, owner_vg)
1884

    
1885
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1886
    """Check the node bridges.
1887

1888
    @type ninfo: L{objects.Node}
1889
    @param ninfo: the node to check
1890
    @param nresult: the remote results for the node
1891
    @param bridges: the expected list of bridges
1892

1893
    """
1894
    if not bridges:
1895
      return
1896

    
1897
    node = ninfo.name
1898
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1899

    
1900
    missing = nresult.get(constants.NV_BRIDGES, None)
1901
    test = not isinstance(missing, list)
1902
    _ErrorIf(test, self.ENODENET, node,
1903
             "did not return valid bridge information")
1904
    if not test:
1905
      _ErrorIf(bool(missing), self.ENODENET, node, "missing bridges: %s" %
1906
               utils.CommaJoin(sorted(missing)))
1907

    
1908
  def _VerifyNodeNetwork(self, ninfo, nresult):
1909
    """Check the node network connectivity results.
1910

1911
    @type ninfo: L{objects.Node}
1912
    @param ninfo: the node to check
1913
    @param nresult: the remote results for the node
1914

1915
    """
1916
    node = ninfo.name
1917
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1918

    
1919
    test = constants.NV_NODELIST not in nresult
1920
    _ErrorIf(test, self.ENODESSH, node,
1921
             "node hasn't returned node ssh connectivity data")
1922
    if not test:
1923
      if nresult[constants.NV_NODELIST]:
1924
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1925
          _ErrorIf(True, self.ENODESSH, node,
1926
                   "ssh communication with node '%s': %s", a_node, a_msg)
1927

    
1928
    test = constants.NV_NODENETTEST not in nresult
1929
    _ErrorIf(test, self.ENODENET, node,
1930
             "node hasn't returned node tcp connectivity data")
1931
    if not test:
1932
      if nresult[constants.NV_NODENETTEST]:
1933
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1934
        for anode in nlist:
1935
          _ErrorIf(True, self.ENODENET, node,
1936
                   "tcp communication with node '%s': %s",
1937
                   anode, nresult[constants.NV_NODENETTEST][anode])
1938

    
1939
    test = constants.NV_MASTERIP not in nresult
1940
    _ErrorIf(test, self.ENODENET, node,
1941
             "node hasn't returned node master IP reachability data")
1942
    if not test:
1943
      if not nresult[constants.NV_MASTERIP]:
1944
        if node == self.master_node:
1945
          msg = "the master node cannot reach the master IP (not configured?)"
1946
        else:
1947
          msg = "cannot reach the master IP"
1948
        _ErrorIf(True, self.ENODENET, node, msg)
1949

    
1950
  def _VerifyInstance(self, instance, instanceconfig, node_image,
1951
                      diskstatus):
1952
    """Verify an instance.
1953

1954
    This function checks to see if the required block devices are
1955
    available on the instance's node.
1956

1957
    """
1958
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1959
    node_current = instanceconfig.primary_node
1960

    
1961
    node_vol_should = {}
1962
    instanceconfig.MapLVsByNode(node_vol_should)
1963

    
1964
    for node in node_vol_should:
1965
      n_img = node_image[node]
1966
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1967
        # ignore missing volumes on offline or broken nodes
1968
        continue
1969
      for volume in node_vol_should[node]:
1970
        test = volume not in n_img.volumes
1971
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1972
                 "volume %s missing on node %s", volume, node)
1973

    
1974
    if instanceconfig.admin_up:
1975
      pri_img = node_image[node_current]
1976
      test = instance not in pri_img.instances and not pri_img.offline
1977
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1978
               "instance not running on its primary node %s",
1979
               node_current)
1980

    
1981
    diskdata = [(nname, success, status, idx)
1982
                for (nname, disks) in diskstatus.items()
1983
                for idx, (success, status) in enumerate(disks)]
1984

    
1985
    for nname, success, bdev_status, idx in diskdata:
1986
      # the 'ghost node' construction in Exec() ensures that we have a
1987
      # node here
1988
      snode = node_image[nname]
1989
      bad_snode = snode.ghost or snode.offline
1990
      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
1991
               self.EINSTANCEFAULTYDISK, instance,
1992
               "couldn't retrieve status for disk/%s on %s: %s",
1993
               idx, nname, bdev_status)
1994
      _ErrorIf((instanceconfig.admin_up and success and
1995
                bdev_status.ldisk_status == constants.LDS_FAULTY),
1996
               self.EINSTANCEFAULTYDISK, instance,
1997
               "disk/%s on %s is faulty", idx, nname)
1998

    
1999
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2000
    """Verify if there are any unknown volumes in the cluster.
2001

2002
    The .os, .swap and backup volumes are ignored. All other volumes are
2003
    reported as unknown.
2004

2005
    @type reserved: L{ganeti.utils.FieldSet}
2006
    @param reserved: a FieldSet of reserved volume names
2007

2008
    """
2009
    for node, n_img in node_image.items():
2010
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2011
        # skip non-healthy nodes
2012
        continue
2013
      for volume in n_img.volumes:
2014
        test = ((node not in node_vol_should or
2015
                volume not in node_vol_should[node]) and
2016
                not reserved.Matches(volume))
2017
        self._ErrorIf(test, self.ENODEORPHANLV, node,
2018
                      "volume %s is unknown", volume)
2019

    
2020
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
2021
    """Verify N+1 Memory Resilience.
2022

2023
    Check that if one single node dies we can still start all the
2024
    instances it was primary for.
2025

2026
    """
2027
    cluster_info = self.cfg.GetClusterInfo()
2028
    for node, n_img in node_image.items():
2029
      # This code checks that every node which is now listed as
2030
      # secondary has enough memory to host all instances it is
2031
      # supposed to should a single other node in the cluster fail.
2032
      # FIXME: not ready for failover to an arbitrary node
2033
      # FIXME: does not support file-backed instances
2034
      # WARNING: we currently take into account down instances as well
2035
      # as up ones, considering that even if they're down someone
2036
      # might want to start them even in the event of a node failure.
2037
      if n_img.offline:
2038
        # we're skipping offline nodes from the N+1 warning, since
2039
        # most likely we don't have good memory infromation from them;
2040
        # we already list instances living on such nodes, and that's
2041
        # enough warning
2042
        continue
2043
      for prinode, instances in n_img.sbp.items():
2044
        needed_mem = 0
2045
        for instance in instances:
2046
          bep = cluster_info.FillBE(instance_cfg[instance])
2047
          if bep[constants.BE_AUTO_BALANCE]:
2048
            needed_mem += bep[constants.BE_MEMORY]
2049
        test = n_img.mfree < needed_mem
2050
        self._ErrorIf(test, self.ENODEN1, node,
2051
                      "not enough memory to accomodate instance failovers"
2052
                      " should node %s fail (%dMiB needed, %dMiB available)",
2053
                      prinode, needed_mem, n_img.mfree)
2054

    
2055
  @classmethod
2056
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
2057
                   (files_all, files_all_opt, files_mc, files_vm)):
2058
    """Verifies file checksums collected from all nodes.
2059

2060
    @param errorif: Callback for reporting errors
2061
    @param nodeinfo: List of L{objects.Node} objects
2062
    @param master_node: Name of master node
2063
    @param all_nvinfo: RPC results
2064

2065
    """
2066
    node_names = frozenset(node.name for node in nodeinfo if not node.offline)
2067

    
2068
    assert master_node in node_names
2069
    assert (len(files_all | files_all_opt | files_mc | files_vm) ==
2070
            sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
2071
           "Found file listed in more than one file list"
2072

    
2073
    # Define functions determining which nodes to consider for a file
2074
    file2nodefn = dict([(filename, fn)
2075
      for (files, fn) in [(files_all, None),
2076
                          (files_all_opt, None),
2077
                          (files_mc, lambda node: (node.master_candidate or
2078
                                                   node.name == master_node)),
2079
                          (files_vm, lambda node: node.vm_capable)]
2080
      for filename in files])
2081

    
2082
    fileinfo = dict((filename, {}) for filename in file2nodefn.keys())
2083

    
2084
    for node in nodeinfo:
2085
      if node.offline:
2086
        continue
2087

    
2088
      nresult = all_nvinfo[node.name]
2089

    
2090
      if nresult.fail_msg or not nresult.payload:
2091
        node_files = None
2092
      else:
2093
        node_files = nresult.payload.get(constants.NV_FILELIST, None)
2094

    
2095
      test = not (node_files and isinstance(node_files, dict))
2096
      errorif(test, cls.ENODEFILECHECK, node.name,
2097
              "Node did not return file checksum data")
2098
      if test:
2099
        continue
2100

    
2101
      for (filename, checksum) in node_files.items():
2102
        # Check if the file should be considered for a node
2103
        fn = file2nodefn[filename]
2104
        if fn is None or fn(node):
2105
          fileinfo[filename].setdefault(checksum, set()).add(node.name)
2106

    
2107
    for (filename, checksums) in fileinfo.items():
2108
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2109

    
2110
      # Nodes having the file
2111
      with_file = frozenset(node_name
2112
                            for nodes in fileinfo[filename].values()
2113
                            for node_name in nodes)
2114

    
2115
      # Nodes missing file
2116
      missing_file = node_names - with_file
2117

    
2118
      if filename in files_all_opt:
2119
        # All or no nodes
2120
        errorif(missing_file and missing_file != node_names,
2121
                cls.ECLUSTERFILECHECK, None,
2122
                "File %s is optional, but it must exist on all or no"
2123
                " nodes (not found on %s)",
2124
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2125
      else:
2126
        errorif(missing_file, cls.ECLUSTERFILECHECK, None,
2127
                "File %s is missing from node(s) %s", filename,
2128
                utils.CommaJoin(utils.NiceSort(missing_file)))
2129

    
2130
      # See if there are multiple versions of the file
2131
      test = len(checksums) > 1
2132
      if test:
2133
        variants = ["variant %s on %s" %
2134
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2135
                    for (idx, (checksum, nodes)) in
2136
                      enumerate(sorted(checksums.items()))]
2137
      else:
2138
        variants = []
2139

    
2140
      errorif(test, cls.ECLUSTERFILECHECK, None,
2141
              "File %s found with %s different checksums (%s)",
2142
              filename, len(checksums), "; ".join(variants))
2143

    
2144
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2145
                      drbd_map):
2146
    """Verifies and the node DRBD status.
2147

2148
    @type ninfo: L{objects.Node}
2149
    @param ninfo: the node to check
2150
    @param nresult: the remote results for the node
2151
    @param instanceinfo: the dict of instances
2152
    @param drbd_helper: the configured DRBD usermode helper
2153
    @param drbd_map: the DRBD map as returned by
2154
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2155

2156
    """
2157
    node = ninfo.name
2158
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2159

    
2160
    if drbd_helper:
2161
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2162
      test = (helper_result == None)
2163
      _ErrorIf(test, self.ENODEDRBDHELPER, node,
2164
               "no drbd usermode helper returned")
2165
      if helper_result:
2166
        status, payload = helper_result
2167
        test = not status
2168
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
2169
                 "drbd usermode helper check unsuccessful: %s", payload)
2170
        test = status and (payload != drbd_helper)
2171
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
2172
                 "wrong drbd usermode helper: %s", payload)
2173

    
2174
    # compute the DRBD minors
2175
    node_drbd = {}
2176
    for minor, instance in drbd_map[node].items():
2177
      test = instance not in instanceinfo
2178
      _ErrorIf(test, self.ECLUSTERCFG, None,
2179
               "ghost instance '%s' in temporary DRBD map", instance)
2180
        # ghost instance should not be running, but otherwise we
2181
        # don't give double warnings (both ghost instance and
2182
        # unallocated minor in use)
2183
      if test:
2184
        node_drbd[minor] = (instance, False)
2185
      else:
2186
        instance = instanceinfo[instance]
2187
        node_drbd[minor] = (instance.name, instance.admin_up)
2188

    
2189
    # and now check them
2190
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2191
    test = not isinstance(used_minors, (tuple, list))
2192
    _ErrorIf(test, self.ENODEDRBD, node,
2193
             "cannot parse drbd status file: %s", str(used_minors))
2194
    if test:
2195
      # we cannot check drbd status
2196
      return
2197

    
2198
    for minor, (iname, must_exist) in node_drbd.items():
2199
      test = minor not in used_minors and must_exist
2200
      _ErrorIf(test, self.ENODEDRBD, node,
2201
               "drbd minor %d of instance %s is not active", minor, iname)
2202
    for minor in used_minors:
2203
      test = minor not in node_drbd
2204
      _ErrorIf(test, self.ENODEDRBD, node,
2205
               "unallocated drbd minor %d is in use", minor)
2206

    
2207
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2208
    """Builds the node OS structures.
2209

2210
    @type ninfo: L{objects.Node}
2211
    @param ninfo: the node to check
2212
    @param nresult: the remote results for the node
2213
    @param nimg: the node image object
2214

2215
    """
2216
    node = ninfo.name
2217
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2218

    
2219
    remote_os = nresult.get(constants.NV_OSLIST, None)
2220
    test = (not isinstance(remote_os, list) or
2221
            not compat.all(isinstance(v, list) and len(v) == 7
2222
                           for v in remote_os))
2223

    
2224
    _ErrorIf(test, self.ENODEOS, node,
2225
             "node hasn't returned valid OS data")
2226

    
2227
    nimg.os_fail = test
2228

    
2229
    if test:
2230
      return
2231

    
2232
    os_dict = {}
2233

    
2234
    for (name, os_path, status, diagnose,
2235
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2236

    
2237
      if name not in os_dict:
2238
        os_dict[name] = []
2239

    
2240
      # parameters is a list of lists instead of list of tuples due to
2241
      # JSON lacking a real tuple type, fix it:
2242
      parameters = [tuple(v) for v in parameters]
2243
      os_dict[name].append((os_path, status, diagnose,
2244
                            set(variants), set(parameters), set(api_ver)))
2245

    
2246
    nimg.oslist = os_dict
2247

    
2248
  def _VerifyNodeOS(self, ninfo, nimg, base):
2249
    """Verifies the node OS list.
2250

2251
    @type ninfo: L{objects.Node}
2252
    @param ninfo: the node to check
2253
    @param nimg: the node image object
2254
    @param base: the 'template' node we match against (e.g. from the master)
2255

2256
    """
2257
    node = ninfo.name
2258
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2259

    
2260
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2261

    
2262
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2263
    for os_name, os_data in nimg.oslist.items():
2264
      assert os_data, "Empty OS status for OS %s?!" % os_name
2265
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2266
      _ErrorIf(not f_status, self.ENODEOS, node,
2267
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2268
      _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
2269
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2270
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2271
      # comparisons with the 'base' image
2272
      test = os_name not in base.oslist
2273
      _ErrorIf(test, self.ENODEOS, node,
2274
               "Extra OS %s not present on reference node (%s)",
2275
               os_name, base.name)
2276
      if test:
2277
        continue
2278
      assert base.oslist[os_name], "Base node has empty OS status?"
2279
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2280
      if not b_status:
2281
        # base OS is invalid, skipping
2282
        continue
2283
      for kind, a, b in [("API version", f_api, b_api),
2284
                         ("variants list", f_var, b_var),
2285
                         ("parameters", beautify_params(f_param),
2286
                          beautify_params(b_param))]:
2287
        _ErrorIf(a != b, self.ENODEOS, node,
2288
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2289
                 kind, os_name, base.name,
2290
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2291

    
2292
    # check any missing OSes
2293
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2294
    _ErrorIf(missing, self.ENODEOS, node,
2295
             "OSes present on reference node %s but missing on this node: %s",
2296
             base.name, utils.CommaJoin(missing))
2297

    
2298
  def _VerifyOob(self, ninfo, nresult):
2299
    """Verifies out of band functionality of a node.
2300

2301
    @type ninfo: L{objects.Node}
2302
    @param ninfo: the node to check
2303
    @param nresult: the remote results for the node
2304

2305
    """
2306
    node = ninfo.name
2307
    # We just have to verify the paths on master and/or master candidates
2308
    # as the oob helper is invoked on the master
2309
    if ((ninfo.master_candidate or ninfo.master_capable) and
2310
        constants.NV_OOB_PATHS in nresult):
2311
      for path_result in nresult[constants.NV_OOB_PATHS]:
2312
        self._ErrorIf(path_result, self.ENODEOOBPATH, node, path_result)
2313

    
2314
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2315
    """Verifies and updates the node volume data.
2316

2317
    This function will update a L{NodeImage}'s internal structures
2318
    with data from the remote call.
2319

2320
    @type ninfo: L{objects.Node}
2321
    @param ninfo: the node to check
2322
    @param nresult: the remote results for the node
2323
    @param nimg: the node image object
2324
    @param vg_name: the configured VG name
2325

2326
    """
2327
    node = ninfo.name
2328
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2329

    
2330
    nimg.lvm_fail = True
2331
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2332
    if vg_name is None:
2333
      pass
2334
    elif isinstance(lvdata, basestring):
2335
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
2336
               utils.SafeEncode(lvdata))
2337
    elif not isinstance(lvdata, dict):
2338
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
2339
    else:
2340
      nimg.volumes = lvdata
2341
      nimg.lvm_fail = False
2342

    
2343
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2344
    """Verifies and updates the node instance list.
2345

2346
    If the listing was successful, then updates this node's instance
2347
    list. Otherwise, it marks the RPC call as failed for the instance
2348
    list key.
2349

2350
    @type ninfo: L{objects.Node}
2351
    @param ninfo: the node to check
2352
    @param nresult: the remote results for the node
2353
    @param nimg: the node image object
2354

2355
    """
2356
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2357
    test = not isinstance(idata, list)
2358
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
2359
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
2360
    if test:
2361
      nimg.hyp_fail = True
2362
    else:
2363
      nimg.instances = idata
2364

    
2365
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2366
    """Verifies and computes a node information map
2367

2368
    @type ninfo: L{objects.Node}
2369
    @param ninfo: the node to check
2370
    @param nresult: the remote results for the node
2371
    @param nimg: the node image object
2372
    @param vg_name: the configured VG name
2373

2374
    """
2375
    node = ninfo.name
2376
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2377

    
2378
    # try to read free memory (from the hypervisor)
2379
    hv_info = nresult.get(constants.NV_HVINFO, None)
2380
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2381
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
2382
    if not test:
2383
      try:
2384
        nimg.mfree = int(hv_info["memory_free"])
2385
      except (ValueError, TypeError):
2386
        _ErrorIf(True, self.ENODERPC, node,
2387
                 "node returned invalid nodeinfo, check hypervisor")
2388

    
2389
    # FIXME: devise a free space model for file based instances as well
2390
    if vg_name is not None:
2391
      test = (constants.NV_VGLIST not in nresult or
2392
              vg_name not in nresult[constants.NV_VGLIST])
2393
      _ErrorIf(test, self.ENODELVM, node,
2394
               "node didn't return data for the volume group '%s'"
2395
               " - it is either missing or broken", vg_name)
2396
      if not test:
2397
        try:
2398
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2399
        except (ValueError, TypeError):
2400
          _ErrorIf(True, self.ENODERPC, node,
2401
                   "node returned invalid LVM info, check LVM status")
2402

    
2403
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2404
    """Gets per-disk status information for all instances.
2405

2406
    @type nodelist: list of strings
2407
    @param nodelist: Node names
2408
    @type node_image: dict of (name, L{objects.Node})
2409
    @param node_image: Node objects
2410
    @type instanceinfo: dict of (name, L{objects.Instance})
2411
    @param instanceinfo: Instance objects
2412
    @rtype: {instance: {node: [(succes, payload)]}}
2413
    @return: a dictionary of per-instance dictionaries with nodes as
2414
        keys and disk information as values; the disk information is a
2415
        list of tuples (success, payload)
2416

2417
    """
2418
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2419

    
2420
    node_disks = {}
2421
    node_disks_devonly = {}
2422
    diskless_instances = set()
2423
    diskless = constants.DT_DISKLESS
2424

    
2425
    for nname in nodelist:
2426
      node_instances = list(itertools.chain(node_image[nname].pinst,
2427
                                            node_image[nname].sinst))
2428
      diskless_instances.update(inst for inst in node_instances
2429
                                if instanceinfo[inst].disk_template == diskless)
2430
      disks = [(inst, disk)
2431
               for inst in node_instances
2432
               for disk in instanceinfo[inst].disks]
2433

    
2434
      if not disks:
2435
        # No need to collect data
2436
        continue
2437

    
2438
      node_disks[nname] = disks
2439

    
2440
      # Creating copies as SetDiskID below will modify the objects and that can
2441
      # lead to incorrect data returned from nodes
2442
      devonly = [dev.Copy() for (_, dev) in disks]
2443

    
2444
      for dev in devonly:
2445
        self.cfg.SetDiskID(dev, nname)
2446

    
2447
      node_disks_devonly[nname] = devonly
2448

    
2449
    assert len(node_disks) == len(node_disks_devonly)
2450

    
2451
    # Collect data from all nodes with disks
2452
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2453
                                                          node_disks_devonly)
2454

    
2455
    assert len(result) == len(node_disks)
2456

    
2457
    instdisk = {}
2458

    
2459
    for (nname, nres) in result.items():
2460
      disks = node_disks[nname]
2461

    
2462
      if nres.offline:
2463
        # No data from this node
2464
        data = len(disks) * [(False, "node offline")]
2465
      else:
2466
        msg = nres.fail_msg
2467
        _ErrorIf(msg, self.ENODERPC, nname,
2468
                 "while getting disk information: %s", msg)
2469
        if msg:
2470
          # No data from this node
2471
          data = len(disks) * [(False, msg)]
2472
        else:
2473
          data = []
2474
          for idx, i in enumerate(nres.payload):
2475
            if isinstance(i, (tuple, list)) and len(i) == 2:
2476
              data.append(i)
2477
            else:
2478
              logging.warning("Invalid result from node %s, entry %d: %s",
2479
                              nname, idx, i)
2480
              data.append((False, "Invalid result from the remote node"))
2481

    
2482
      for ((inst, _), status) in zip(disks, data):
2483
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2484

    
2485
    # Add empty entries for diskless instances.
2486
    for inst in diskless_instances:
2487
      assert inst not in instdisk
2488
      instdisk[inst] = {}
2489

    
2490
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2491
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2492
                      compat.all(isinstance(s, (tuple, list)) and
2493
                                 len(s) == 2 for s in statuses)
2494
                      for inst, nnames in instdisk.items()
2495
                      for nname, statuses in nnames.items())
2496
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2497

    
2498
    return instdisk
2499

    
2500
  def BuildHooksEnv(self):
2501
    """Build hooks env.
2502

2503
    Cluster-Verify hooks just ran in the post phase and their failure makes
2504
    the output be logged in the verify output and the verification to fail.
2505

2506
    """
2507
    env = {
2508
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2509
      }
2510

    
2511
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2512
               for node in self.my_node_info.values())
2513

    
2514
    return env
2515

    
2516
  def BuildHooksNodes(self):
2517
    """Build hooks nodes.
2518

2519
    """
2520
    return ([], self.my_node_names)
2521

    
2522
  def Exec(self, feedback_fn):
2523
    """Verify integrity of the node group, performing various test on nodes.
2524

2525
    """
2526
    # This method has too many local variables. pylint: disable-msg=R0914
2527

    
2528
    if not self.my_node_names:
2529
      # empty node group
2530
      feedback_fn("* Empty node group, skipping verification")
2531
      return True
2532

    
2533
    self.bad = False
2534
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2535
    verbose = self.op.verbose
2536
    self._feedback_fn = feedback_fn
2537

    
2538
    vg_name = self.cfg.GetVGName()
2539
    drbd_helper = self.cfg.GetDRBDHelper()
2540
    cluster = self.cfg.GetClusterInfo()
2541
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2542
    hypervisors = cluster.enabled_hypervisors
2543
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2544

    
2545
    i_non_redundant = [] # Non redundant instances
2546
    i_non_a_balanced = [] # Non auto-balanced instances
2547
    n_offline = 0 # Count of offline nodes
2548
    n_drained = 0 # Count of nodes being drained
2549
    node_vol_should = {}
2550

    
2551
    # FIXME: verify OS list
2552

    
2553
    # File verification
2554
    filemap = _ComputeAncillaryFiles(cluster, False)
2555

    
2556
    # do local checksums
2557
    master_node = self.master_node = self.cfg.GetMasterNode()
2558
    master_ip = self.cfg.GetMasterIP()
2559

    
2560
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2561

    
2562
    # We will make nodes contact all nodes in their group, and one node from
2563
    # every other group.
2564
    # TODO: should it be a *random* node, different every time?
2565
    online_nodes = [node.name for node in node_data_list if not node.offline]
2566
    other_group_nodes = {}
2567

    
2568
    for name in sorted(self.all_node_info):
2569
      node = self.all_node_info[name]
2570
      if (node.group not in other_group_nodes
2571
          and node.group != self.group_uuid
2572
          and not node.offline):
2573
        other_group_nodes[node.group] = node.name
2574

    
2575
    node_verify_param = {
2576
      constants.NV_FILELIST:
2577
        utils.UniqueSequence(filename
2578
                             for files in filemap
2579
                             for filename in files),
2580
      constants.NV_NODELIST: online_nodes + other_group_nodes.values(),
2581
      constants.NV_HYPERVISOR: hypervisors,
2582
      constants.NV_HVPARAMS:
2583
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2584
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2585
                                 for node in node_data_list
2586
                                 if not node.offline],
2587
      constants.NV_INSTANCELIST: hypervisors,
2588
      constants.NV_VERSION: None,
2589
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2590
      constants.NV_NODESETUP: None,
2591
      constants.NV_TIME: None,
2592
      constants.NV_MASTERIP: (master_node, master_ip),
2593
      constants.NV_OSLIST: None,
2594
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2595
      }
2596

    
2597
    if vg_name is not None:
2598
      node_verify_param[constants.NV_VGLIST] = None
2599
      node_verify_param[constants.NV_LVLIST] = vg_name
2600
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2601
      node_verify_param[constants.NV_DRBDLIST] = None
2602

    
2603
    if drbd_helper:
2604
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2605

    
2606
    # bridge checks
2607
    # FIXME: this needs to be changed per node-group, not cluster-wide
2608
    bridges = set()
2609
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2610
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2611
      bridges.add(default_nicpp[constants.NIC_LINK])
2612
    for instance in self.my_inst_info.values():
2613
      for nic in instance.nics:
2614
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2615
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2616
          bridges.add(full_nic[constants.NIC_LINK])
2617

    
2618
    if bridges:
2619
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2620

    
2621
    # Build our expected cluster state
2622
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2623
                                                 name=node.name,
2624
                                                 vm_capable=node.vm_capable))
2625
                      for node in node_data_list)
2626

    
2627
    # Gather OOB paths
2628
    oob_paths = []
2629
    for node in self.all_node_info.values():
2630
      path = _SupportsOob(self.cfg, node)
2631
      if path and path not in oob_paths:
2632
        oob_paths.append(path)
2633

    
2634
    if oob_paths:
2635
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2636

    
2637
    for instance in self.my_inst_names:
2638
      inst_config = self.my_inst_info[instance]
2639

    
2640
      for nname in inst_config.all_nodes:
2641
        if nname not in node_image:
2642
          gnode = self.NodeImage(name=nname)
2643
          gnode.ghost = (nname not in self.all_node_info)
2644
          node_image[nname] = gnode
2645

    
2646
      inst_config.MapLVsByNode(node_vol_should)
2647

    
2648
      pnode = inst_config.primary_node
2649
      node_image[pnode].pinst.append(instance)
2650

    
2651
      for snode in inst_config.secondary_nodes:
2652
        nimg = node_image[snode]
2653
        nimg.sinst.append(instance)
2654
        if pnode not in nimg.sbp:
2655
          nimg.sbp[pnode] = []
2656
        nimg.sbp[pnode].append(instance)
2657

    
2658
    # At this point, we have the in-memory data structures complete,
2659
    # except for the runtime information, which we'll gather next
2660

    
2661
    # Due to the way our RPC system works, exact response times cannot be
2662
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2663
    # time before and after executing the request, we can at least have a time
2664
    # window.
2665
    nvinfo_starttime = time.time()
2666
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2667
                                           node_verify_param,
2668
                                           self.cfg.GetClusterName())
2669
    nvinfo_endtime = time.time()
2670

    
2671
    if self.extra_lv_nodes and vg_name is not None:
2672
      extra_lv_nvinfo = \
2673
          self.rpc.call_node_verify(self.extra_lv_nodes,
2674
                                    {constants.NV_LVLIST: vg_name},
2675
                                    self.cfg.GetClusterName())
2676
    else:
2677
      extra_lv_nvinfo = {}
2678

    
2679
    all_drbd_map = self.cfg.ComputeDRBDMap()
2680

    
2681
    feedback_fn("* Gathering disk information (%s nodes)" %
2682
                len(self.my_node_names))
2683
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2684
                                     self.my_inst_info)
2685

    
2686
    feedback_fn("* Verifying configuration file consistency")
2687

    
2688
    # If not all nodes are being checked, we need to make sure the master node
2689
    # and a non-checked vm_capable node are in the list.
2690
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2691
    if absent_nodes:
2692
      vf_nvinfo = all_nvinfo.copy()
2693
      vf_node_info = list(self.my_node_info.values())
2694
      additional_nodes = []
2695
      if master_node not in self.my_node_info:
2696
        additional_nodes.append(master_node)
2697
        vf_node_info.append(self.all_node_info[master_node])
2698
      # Add the first vm_capable node we find which is not included
2699
      for node in absent_nodes:
2700
        nodeinfo = self.all_node_info[node]
2701
        if nodeinfo.vm_capable and not nodeinfo.offline:
2702
          additional_nodes.append(node)
2703
          vf_node_info.append(self.all_node_info[node])
2704
          break
2705
      key = constants.NV_FILELIST
2706
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2707
                                                 {key: node_verify_param[key]},
2708
                                                 self.cfg.GetClusterName()))
2709
    else:
2710
      vf_nvinfo = all_nvinfo
2711
      vf_node_info = self.my_node_info.values()
2712

    
2713
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2714

    
2715
    feedback_fn("* Verifying node status")
2716

    
2717
    refos_img = None
2718

    
2719
    for node_i in node_data_list:
2720
      node = node_i.name
2721
      nimg = node_image[node]
2722

    
2723
      if node_i.offline:
2724
        if verbose:
2725
          feedback_fn("* Skipping offline node %s" % (node,))
2726
        n_offline += 1
2727
        continue
2728

    
2729
      if node == master_node:
2730
        ntype = "master"
2731
      elif node_i.master_candidate:
2732
        ntype = "master candidate"
2733
      elif node_i.drained:
2734
        ntype = "drained"
2735
        n_drained += 1
2736
      else:
2737
        ntype = "regular"
2738
      if verbose:
2739
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2740

    
2741
      msg = all_nvinfo[node].fail_msg
2742
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
2743
      if msg:
2744
        nimg.rpc_fail = True
2745
        continue
2746

    
2747
      nresult = all_nvinfo[node].payload
2748

    
2749
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2750
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2751
      self._VerifyNodeNetwork(node_i, nresult)
2752
      self._VerifyOob(node_i, nresult)
2753

    
2754
      if nimg.vm_capable:
2755
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2756
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2757
                             all_drbd_map)
2758

    
2759
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2760
        self._UpdateNodeInstances(node_i, nresult, nimg)
2761
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2762
        self._UpdateNodeOS(node_i, nresult, nimg)
2763

    
2764
        if not nimg.os_fail:
2765
          if refos_img is None:
2766
            refos_img = nimg
2767
          self._VerifyNodeOS(node_i, nimg, refos_img)
2768
        self._VerifyNodeBridges(node_i, nresult, bridges)
2769

    
2770
        # Check whether all running instancies are primary for the node. (This
2771
        # can no longer be done from _VerifyInstance below, since some of the
2772
        # wrong instances could be from other node groups.)
2773
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2774

    
2775
        for inst in non_primary_inst:
2776
          test = inst in self.all_inst_info
2777
          _ErrorIf(test, self.EINSTANCEWRONGNODE, inst,
2778
                   "instance should not run on node %s", node_i.name)
2779
          _ErrorIf(not test, self.ENODEORPHANINSTANCE, node_i.name,
2780
                   "node is running unknown instance %s", inst)
2781

    
2782
    for node, result in extra_lv_nvinfo.items():
2783
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2784
                              node_image[node], vg_name)
2785

    
2786
    feedback_fn("* Verifying instance status")
2787
    for instance in self.my_inst_names:
2788
      if verbose:
2789
        feedback_fn("* Verifying instance %s" % instance)
2790
      inst_config = self.my_inst_info[instance]
2791
      self._VerifyInstance(instance, inst_config, node_image,
2792
                           instdisk[instance])
2793
      inst_nodes_offline = []
2794

    
2795
      pnode = inst_config.primary_node
2796
      pnode_img = node_image[pnode]
2797
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2798
               self.ENODERPC, pnode, "instance %s, connection to"
2799
               " primary node failed", instance)
2800

    
2801
      _ErrorIf(inst_config.admin_up and pnode_img.offline,
2802
               self.EINSTANCEBADNODE, instance,
2803
               "instance is marked as running and lives on offline node %s",
2804
               inst_config.primary_node)
2805

    
2806
      # If the instance is non-redundant we cannot survive losing its primary
2807
      # node, so we are not N+1 compliant. On the other hand we have no disk
2808
      # templates with more than one secondary so that situation is not well
2809
      # supported either.
2810
      # FIXME: does not support file-backed instances
2811
      if not inst_config.secondary_nodes:
2812
        i_non_redundant.append(instance)
2813

    
2814
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
2815
               instance, "instance has multiple secondary nodes: %s",
2816
               utils.CommaJoin(inst_config.secondary_nodes),
2817
               code=self.ETYPE_WARNING)
2818

    
2819
      if inst_config.disk_template in constants.DTS_INT_MIRROR:
2820
        pnode = inst_config.primary_node
2821
        instance_nodes = utils.NiceSort(inst_config.all_nodes)
2822
        instance_groups = {}
2823

    
2824
        for node in instance_nodes:
2825
          instance_groups.setdefault(self.all_node_info[node].group,
2826
                                     []).append(node)
2827

    
2828
        pretty_list = [
2829
          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
2830
          # Sort so that we always list the primary node first.
2831
          for group, nodes in sorted(instance_groups.items(),
2832
                                     key=lambda (_, nodes): pnode in nodes,
2833
                                     reverse=True)]
2834

    
2835
        self._ErrorIf(len(instance_groups) > 1, self.EINSTANCESPLITGROUPS,
2836
                      instance, "instance has primary and secondary nodes in"
2837
                      " different groups: %s", utils.CommaJoin(pretty_list),
2838
                      code=self.ETYPE_WARNING)
2839

    
2840
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2841
        i_non_a_balanced.append(instance)
2842

    
2843
      for snode in inst_config.secondary_nodes:
2844
        s_img = node_image[snode]
2845
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
2846
                 "instance %s, connection to secondary node failed", instance)
2847

    
2848
        if s_img.offline:
2849
          inst_nodes_offline.append(snode)
2850

    
2851
      # warn that the instance lives on offline nodes
2852
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
2853
               "instance has offline secondary node(s) %s",
2854
               utils.CommaJoin(inst_nodes_offline))
2855
      # ... or ghost/non-vm_capable nodes
2856
      for node in inst_config.all_nodes:
2857
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
2858
                 "instance lives on ghost node %s", node)
2859
        _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE,
2860
                 instance, "instance lives on non-vm_capable node %s", node)
2861

    
2862
    feedback_fn("* Verifying orphan volumes")
2863
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2864

    
2865
    # We will get spurious "unknown volume" warnings if any node of this group
2866
    # is secondary for an instance whose primary is in another group. To avoid
2867
    # them, we find these instances and add their volumes to node_vol_should.
2868
    for inst in self.all_inst_info.values():
2869
      for secondary in inst.secondary_nodes:
2870
        if (secondary in self.my_node_info
2871
            and inst.name not in self.my_inst_info):
2872
          inst.MapLVsByNode(node_vol_should)
2873
          break
2874

    
2875
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2876

    
2877
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2878
      feedback_fn("* Verifying N+1 Memory redundancy")
2879
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2880

    
2881
    feedback_fn("* Other Notes")
2882
    if i_non_redundant:
2883
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2884
                  % len(i_non_redundant))
2885

    
2886
    if i_non_a_balanced:
2887
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2888
                  % len(i_non_a_balanced))
2889

    
2890
    if n_offline:
2891
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2892

    
2893
    if n_drained:
2894
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2895

    
2896
    return not self.bad
2897

    
2898
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2899
    """Analyze the post-hooks' result
2900

2901
    This method analyses the hook result, handles it, and sends some
2902
    nicely-formatted feedback back to the user.
2903

2904
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2905
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2906
    @param hooks_results: the results of the multi-node hooks rpc call
2907
    @param feedback_fn: function used send feedback back to the caller
2908
    @param lu_result: previous Exec result
2909
    @return: the new Exec result, based on the previous result
2910
        and hook results
2911

2912
    """
2913
    # We only really run POST phase hooks, only for non-empty groups,
2914
    # and are only interested in their results
2915
    if not self.my_node_names:
2916
      # empty node group
2917
      pass
2918
    elif phase == constants.HOOKS_PHASE_POST:
2919
      # Used to change hooks' output to proper indentation
2920
      feedback_fn("* Hooks Results")
2921
      assert hooks_results, "invalid result from hooks"
2922

    
2923
      for node_name in hooks_results:
2924
        res = hooks_results[node_name]
2925
        msg = res.fail_msg
2926
        test = msg and not res.offline
2927
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
2928
                      "Communication failure in hooks execution: %s", msg)
2929
        if res.offline or msg:
2930
          # No need to investigate payload if node is offline or gave an error.
2931
          # override manually lu_result here as _ErrorIf only
2932
          # overrides self.bad
2933
          lu_result = 1
2934
          continue
2935
        for script, hkr, output in res.payload:
2936
          test = hkr == constants.HKR_FAIL
2937
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
2938
                        "Script %s failed, output:", script)
2939
          if test:
2940
            output = self._HOOKS_INDENT_RE.sub("      ", output)
2941
            feedback_fn("%s" % output)
2942
            lu_result = 0
2943

    
2944
    return lu_result
2945

    
2946

    
2947
class LUClusterVerifyDisks(NoHooksLU):
2948
  """Verifies the cluster disks status.
2949

2950
  """
2951
  REQ_BGL = False
2952

    
2953
  def ExpandNames(self):
2954
    self.share_locks = _ShareAll()
2955
    self.needed_locks = {
2956
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
2957
      }
2958

    
2959
  def Exec(self, feedback_fn):
2960
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2961

    
2962
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2963
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2964
                           for group in group_names])
2965

    
2966

    
2967
class LUGroupVerifyDisks(NoHooksLU):
2968
  """Verifies the status of all disks in a node group.
2969

2970
  """
2971
  REQ_BGL = False
2972

    
2973
  def ExpandNames(self):
2974
    # Raises errors.OpPrereqError on its own if group can't be found
2975
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
2976

    
2977
    self.share_locks = _ShareAll()
2978
    self.needed_locks = {
2979
      locking.LEVEL_INSTANCE: [],
2980
      locking.LEVEL_NODEGROUP: [],
2981
      locking.LEVEL_NODE: [],
2982
      }
2983

    
2984
  def DeclareLocks(self, level):
2985
    if level == locking.LEVEL_INSTANCE:
2986
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
2987

    
2988
      # Lock instances optimistically, needs verification once node and group
2989
      # locks have been acquired
2990
      self.needed_locks[locking.LEVEL_INSTANCE] = \
2991
        self.cfg.GetNodeGroupInstances(self.group_uuid)
2992

    
2993
    elif level == locking.LEVEL_NODEGROUP:
2994
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
2995

    
2996
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
2997
        set([self.group_uuid] +
2998
            # Lock all groups used by instances optimistically; this requires
2999
            # going via the node before it's locked, requiring verification
3000
            # later on
3001
            [group_uuid
3002
             for instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
3003
             for group_uuid in self.cfg.GetInstanceNodeGroups(instance_name)])
3004

    
3005
    elif level == locking.LEVEL_NODE:
3006
      # This will only lock the nodes in the group to be verified which contain
3007
      # actual instances
3008
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3009
      self._LockInstancesNodes()
3010

    
3011
      # Lock all nodes in group to be verified
3012
      assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
3013
      member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
3014
      self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
3015

    
3016
  def CheckPrereq(self):
3017
    owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
3018
    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
3019
    owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
3020

    
3021
    assert self.group_uuid in owned_groups
3022

    
3023
    # Check if locked instances are still correct
3024
    _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
3025

    
3026
    # Get instance information
3027
    self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
3028

    
3029
    # Check if node groups for locked instances are still correct
3030
    for (instance_name, inst) in self.instances.items():
3031
      assert owned_nodes.issuperset(inst.all_nodes), \
3032
        "Instance %s's nodes changed while we kept the lock" % instance_name
3033

    
3034
      inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
3035
                                             owned_groups)
3036

    
3037
      assert self.group_uuid in inst_groups, \
3038
        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
3039

    
3040
  def Exec(self, feedback_fn):
3041
    """Verify integrity of cluster disks.
3042

3043
    @rtype: tuple of three items
3044
    @return: a tuple of (dict of node-to-node_error, list of instances
3045
        which need activate-disks, dict of instance: (node, volume) for
3046
        missing volumes
3047

3048
    """
3049
    res_nodes = {}
3050
    res_instances = set()
3051
    res_missing = {}
3052

    
3053
    nv_dict = _MapInstanceDisksToNodes([inst
3054
                                        for inst in self.instances.values()
3055
                                        if inst.admin_up])
3056

    
3057
    if nv_dict:
3058
      nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
3059
                             set(self.cfg.GetVmCapableNodeList()))
3060

    
3061
      node_lvs = self.rpc.call_lv_list(nodes, [])
3062

    
3063
      for (node, node_res) in node_lvs.items():
3064
        if node_res.offline:
3065
          continue
3066

    
3067
        msg = node_res.fail_msg
3068
        if msg:
3069
          logging.warning("Error enumerating LVs on node %s: %s", node, msg)
3070
          res_nodes[node] = msg
3071
          continue
3072

    
3073
        for lv_name, (_, _, lv_online) in node_res.payload.items():
3074
          inst = nv_dict.pop((node, lv_name), None)
3075
          if not (lv_online or inst is None):
3076
            res_instances.add(inst)
3077

    
3078
      # any leftover items in nv_dict are missing LVs, let's arrange the data
3079
      # better
3080
      for key, inst in nv_dict.iteritems():
3081
        res_missing.setdefault(inst, []).append(key)
3082

    
3083
    return (res_nodes, list(res_instances), res_missing)
3084

    
3085

    
3086
class LUClusterRepairDiskSizes(NoHooksLU):
3087
  """Verifies the cluster disks sizes.
3088

3089
  """
3090
  REQ_BGL = False
3091

    
3092
  def ExpandNames(self):
3093
    if self.op.instances:
3094
      self.wanted_names = _GetWantedInstances(self, self.op.instances)
3095
      self.needed_locks = {
3096
        locking.LEVEL_NODE: [],
3097
        locking.LEVEL_INSTANCE: self.wanted_names,
3098
        }
3099
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3100
    else:
3101
      self.wanted_names = None
3102
      self.needed_locks = {
3103
        locking.LEVEL_NODE: locking.ALL_SET,
3104
        locking.LEVEL_INSTANCE: locking.ALL_SET,
3105
        }
3106
    self.share_locks = _ShareAll()
3107

    
3108
  def DeclareLocks(self, level):
3109
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
3110
      self._LockInstancesNodes(primary_only=True)
3111

    
3112
  def CheckPrereq(self):
3113
    """Check prerequisites.
3114

3115
    This only checks the optional instance list against the existing names.
3116

3117
    """
3118
    if self.wanted_names is None:
3119
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
3120

    
3121
    self.wanted_instances = \
3122
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
3123

    
3124
  def _EnsureChildSizes(self, disk):
3125
    """Ensure children of the disk have the needed disk size.
3126

3127
    This is valid mainly for DRBD8 and fixes an issue where the
3128
    children have smaller disk size.
3129

3130
    @param disk: an L{ganeti.objects.Disk} object
3131

3132
    """
3133
    if disk.dev_type == constants.LD_DRBD8:
3134
      assert disk.children, "Empty children for DRBD8?"
3135
      fchild = disk.children[0]
3136
      mismatch = fchild.size < disk.size
3137
      if mismatch:
3138
        self.LogInfo("Child disk has size %d, parent %d, fixing",
3139
                     fchild.size, disk.size)
3140
        fchild.size = disk.size
3141

    
3142
      # and we recurse on this child only, not on the metadev
3143
      return self._EnsureChildSizes(fchild) or mismatch
3144
    else:
3145
      return False
3146

    
3147
  def Exec(self, feedback_fn):
3148
    """Verify the size of cluster disks.
3149

3150
    """
3151
    # TODO: check child disks too
3152
    # TODO: check differences in size between primary/secondary nodes
3153
    per_node_disks = {}
3154
    for instance in self.wanted_instances:
3155
      pnode = instance.primary_node
3156
      if pnode not in per_node_disks:
3157
        per_node_disks[pnode] = []
3158
      for idx, disk in enumerate(instance.disks):
3159
        per_node_disks[pnode].append((instance, idx, disk))
3160

    
3161
    changed = []
3162
    for node, dskl in per_node_disks.items():
3163
      newl = [v[2].Copy() for v in dskl]
3164
      for dsk in newl:
3165
        self.cfg.SetDiskID(dsk, node)
3166
      result = self.rpc.call_blockdev_getsize(node, newl)
3167
      if result.fail_msg:
3168
        self.LogWarning("Failure in blockdev_getsize call to node"
3169
                        " %s, ignoring", node)
3170
        continue
3171
      if len(result.payload) != len(dskl):
3172
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
3173
                        " result.payload=%s", node, len(dskl), result.payload)
3174
        self.LogWarning("Invalid result from node %s, ignoring node results",
3175
                        node)
3176
        continue
3177
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
3178
        if size is None:
3179
          self.LogWarning("Disk %d of instance %s did not return size"
3180
                          " information, ignoring", idx, instance.name)
3181
          continue
3182
        if not isinstance(size, (int, long)):
3183
          self.LogWarning("Disk %d of instance %s did not return valid"
3184
                          " size information, ignoring", idx, instance.name)
3185
          continue
3186
        size = size >> 20
3187
        if size != disk.size:
3188
          self.LogInfo("Disk %d of instance %s has mismatched size,"
3189
                       " correcting: recorded %d, actual %d", idx,
3190
                       instance.name, disk.size, size)
3191
          disk.size = size
3192
          self.cfg.Update(instance, feedback_fn)
3193
          changed.append((instance.name, idx, size))
3194
        if self._EnsureChildSizes(disk):
3195
          self.cfg.Update(instance, feedback_fn)
3196
          changed.append((instance.name, idx, disk.size))
3197
    return changed
3198

    
3199

    
3200
class LUClusterRename(LogicalUnit):
3201
  """Rename the cluster.
3202

3203
  """
3204
  HPATH = "cluster-rename"
3205
  HTYPE = constants.HTYPE_CLUSTER
3206

    
3207
  def BuildHooksEnv(self):
3208
    """Build hooks env.
3209

3210
    """
3211
    return {
3212
      "OP_TARGET": self.cfg.GetClusterName(),
3213
      "NEW_NAME": self.op.name,
3214
      }
3215

    
3216
  def BuildHooksNodes(self):
3217
    """Build hooks nodes.
3218

3219
    """
3220
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
3221

    
3222
  def CheckPrereq(self):
3223
    """Verify that the passed name is a valid one.
3224

3225
    """
3226
    hostname = netutils.GetHostname(name=self.op.name,
3227
                                    family=self.cfg.GetPrimaryIPFamily())
3228

    
3229
    new_name = hostname.name
3230
    self.ip = new_ip = hostname.ip
3231
    old_name = self.cfg.GetClusterName()
3232
    old_ip = self.cfg.GetMasterIP()
3233
    if new_name == old_name and new_ip == old_ip:
3234
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
3235
                                 " cluster has changed",
3236
                                 errors.ECODE_INVAL)
3237
    if new_ip != old_ip:
3238
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
3239
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
3240
                                   " reachable on the network" %
3241
                                   new_ip, errors.ECODE_NOTUNIQUE)
3242

    
3243
    self.op.name = new_name
3244

    
3245
  def Exec(self, feedback_fn):
3246
    """Rename the cluster.
3247

3248
    """
3249
    clustername = self.op.name
3250
    ip = self.ip
3251

    
3252
    # shutdown the master IP
3253
    master = self.cfg.GetMasterNode()
3254
    result = self.rpc.call_node_stop_master(master, False)
3255
    result.Raise("Could not disable the master role")
3256

    
3257
    try:
3258
      cluster = self.cfg.GetClusterInfo()
3259
      cluster.cluster_name = clustername
3260
      cluster.master_ip = ip
3261
      self.cfg.Update(cluster, feedback_fn)
3262

    
3263
      # update the known hosts file
3264
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
3265
      node_list = self.cfg.GetOnlineNodeList()
3266
      try:
3267
        node_list.remove(master)
3268
      except ValueError:
3269
        pass
3270
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
3271
    finally:
3272
      result = self.rpc.call_node_start_master(master, False, False)
3273
      msg = result.fail_msg
3274
      if msg:
3275
        self.LogWarning("Could not re-enable the master role on"
3276
                        " the master, please restart manually: %s", msg)
3277

    
3278
    return clustername
3279

    
3280

    
3281
class LUClusterSetParams(LogicalUnit):
3282
  """Change the parameters of the cluster.
3283

3284
  """
3285
  HPATH = "cluster-modify"
3286
  HTYPE = constants.HTYPE_CLUSTER
3287
  REQ_BGL = False
3288

    
3289
  def CheckArguments(self):
3290
    """Check parameters
3291

3292
    """
3293
    if self.op.uid_pool:
3294
      uidpool.CheckUidPool(self.op.uid_pool)
3295

    
3296
    if self.op.add_uids:
3297
      uidpool.CheckUidPool(self.op.add_uids)
3298

    
3299
    if self.op.remove_uids:
3300
      uidpool.CheckUidPool(self.op.remove_uids)
3301

    
3302
  def ExpandNames(self):
3303
    # FIXME: in the future maybe other cluster params won't require checking on
3304
    # all nodes to be modified.
3305
    self.needed_locks = {
3306
      locking.LEVEL_NODE: locking.ALL_SET,
3307
    }
3308
    self.share_locks[locking.LEVEL_NODE] = 1
3309

    
3310
  def BuildHooksEnv(self):
3311
    """Build hooks env.
3312

3313
    """
3314
    return {
3315
      "OP_TARGET": self.cfg.GetClusterName(),
3316
      "NEW_VG_NAME": self.op.vg_name,
3317
      }
3318

    
3319
  def BuildHooksNodes(self):
3320
    """Build hooks nodes.
3321

3322
    """
3323
    mn = self.cfg.GetMasterNode()
3324
    return ([mn], [mn])
3325

    
3326
  def CheckPrereq(self):
3327
    """Check prerequisites.
3328

3329
    This checks whether the given params don't conflict and
3330
    if the given volume group is valid.
3331

3332
    """
3333
    if self.op.vg_name is not None and not self.op.vg_name:
3334
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
3335
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
3336
                                   " instances exist", errors.ECODE_INVAL)
3337

    
3338
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
3339
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
3340
        raise errors.OpPrereqError("Cannot disable drbd helper while"
3341
                                   " drbd-based instances exist",
3342
                                   errors.ECODE_INVAL)
3343

    
3344
    node_list = self.owned_locks(locking.LEVEL_NODE)
3345

    
3346
    # if vg_name not None, checks given volume group on all nodes
3347
    if self.op.vg_name:
3348
      vglist = self.rpc.call_vg_list(node_list)
3349
      for node in node_list:
3350
        msg = vglist[node].fail_msg
3351
        if msg:
3352
          # ignoring down node
3353
          self.LogWarning("Error while gathering data on node %s"
3354
                          " (ignoring node): %s", node, msg)
3355
          continue
3356
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
3357
                                              self.op.vg_name,
3358
                                              constants.MIN_VG_SIZE)
3359
        if vgstatus:
3360
          raise errors.OpPrereqError("Error on node '%s': %s" %
3361
                                     (node, vgstatus), errors.ECODE_ENVIRON)
3362

    
3363
    if self.op.drbd_helper:
3364
      # checks given drbd helper on all nodes
3365
      helpers = self.rpc.call_drbd_helper(node_list)
3366
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
3367
        if ninfo.offline:
3368
          self.LogInfo("Not checking drbd helper on offline node %s", node)
3369
          continue
3370
        msg = helpers[node].fail_msg
3371
        if msg:
3372
          raise errors.OpPrereqError("Error checking drbd helper on node"
3373
                                     " '%s': %s" % (node, msg),
3374
                                     errors.ECODE_ENVIRON)
3375
        node_helper = helpers[node].payload
3376
        if node_helper != self.op.drbd_helper:
3377
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
3378
                                     (node, node_helper), errors.ECODE_ENVIRON)
3379

    
3380
    self.cluster = cluster = self.cfg.GetClusterInfo()
3381
    # validate params changes
3382
    if self.op.beparams:
3383
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
3384
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
3385

    
3386
    if self.op.ndparams:
3387
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
3388
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
3389

    
3390
      # TODO: we need a more general way to handle resetting
3391
      # cluster-level parameters to default values
3392
      if self.new_ndparams["oob_program"] == "":
3393
        self.new_ndparams["oob_program"] = \
3394
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
3395

    
3396
    if self.op.nicparams:
3397
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
3398
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
3399
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
3400
      nic_errors = []
3401

    
3402
      # check all instances for consistency
3403
      for instance in self.cfg.GetAllInstancesInfo().values():
3404
        for nic_idx, nic in enumerate(instance.nics):
3405
          params_copy = copy.deepcopy(nic.nicparams)
3406
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
3407

    
3408
          # check parameter syntax
3409
          try:
3410
            objects.NIC.CheckParameterSyntax(params_filled)
3411
          except errors.ConfigurationError, err:
3412
            nic_errors.append("Instance %s, nic/%d: %s" %
3413
                              (instance.name, nic_idx, err))
3414

    
3415
          # if we're moving instances to routed, check that they have an ip
3416
          target_mode = params_filled[constants.NIC_MODE]
3417
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
3418
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
3419
                              " address" % (instance.name, nic_idx))
3420
      if nic_errors:
3421
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
3422
                                   "\n".join(nic_errors))
3423

    
3424
    # hypervisor list/parameters
3425
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
3426
    if self.op.hvparams:
3427
      for hv_name, hv_dict in self.op.hvparams.items():
3428
        if hv_name not in self.new_hvparams:
3429
          self.new_hvparams[hv_name] = hv_dict
3430
        else:
3431
          self.new_hvparams[hv_name].update(hv_dict)
3432

    
3433
    # os hypervisor parameters
3434
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
3435
    if self.op.os_hvp:
3436
      for os_name, hvs in self.op.os_hvp.items():
3437
        if os_name not in self.new_os_hvp:
3438
          self.new_os_hvp[os_name] = hvs
3439
        else:
3440
          for hv_name, hv_dict in hvs.items():
3441
            if hv_name not in self.new_os_hvp[os_name]:
3442
              self.new_os_hvp[os_name][hv_name] = hv_dict
3443
            else:
3444
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
3445

    
3446
    # os parameters
3447
    self.new_osp = objects.FillDict(cluster.osparams, {})
3448
    if self.op.osparams:
3449
      for os_name, osp in self.op.osparams.items():
3450
        if os_name not in self.new_osp:
3451
          self.new_osp[os_name] = {}
3452

    
3453
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
3454
                                                  use_none=True)
3455

    
3456
        if not self.new_osp[os_name]:
3457
          # we removed all parameters
3458
          del self.new_osp[os_name]
3459
        else:
3460
          # check the parameter validity (remote check)
3461
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
3462
                         os_name, self.new_osp[os_name])
3463

    
3464
    # changes to the hypervisor list
3465
    if self.op.enabled_hypervisors is not None:
3466
      self.hv_list = self.op.enabled_hypervisors
3467
      for hv in self.hv_list:
3468
        # if the hypervisor doesn't already exist in the cluster
3469
        # hvparams, we initialize it to empty, and then (in both
3470
        # cases) we make sure to fill the defaults, as we might not
3471
        # have a complete defaults list if the hypervisor wasn't
3472
        # enabled before
3473
        if hv not in new_hvp:
3474
          new_hvp[hv] = {}
3475
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
3476
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
3477
    else:
3478
      self.hv_list = cluster.enabled_hypervisors
3479

    
3480
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
3481
      # either the enabled list has changed, or the parameters have, validate
3482
      for hv_name, hv_params in self.new_hvparams.items():
3483
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
3484
            (self.op.enabled_hypervisors and
3485
             hv_name in self.op.enabled_hypervisors)):
3486
          # either this is a new hypervisor, or its parameters have changed
3487
          hv_class = hypervisor.GetHypervisor(hv_name)
3488
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3489
          hv_class.CheckParameterSyntax(hv_params)
3490
          _CheckHVParams(self, node_list, hv_name, hv_params)
3491

    
3492
    if self.op.os_hvp:
3493
      # no need to check any newly-enabled hypervisors, since the
3494
      # defaults have already been checked in the above code-block
3495
      for os_name, os_hvp in self.new_os_hvp.items():
3496
        for hv_name, hv_params in os_hvp.items():
3497
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3498
          # we need to fill in the new os_hvp on top of the actual hv_p
3499
          cluster_defaults = self.new_hvparams.get(hv_name, {})
3500
          new_osp = objects.FillDict(cluster_defaults, hv_params)
3501
          hv_class = hypervisor.GetHypervisor(hv_name)
3502
          hv_class.CheckParameterSyntax(new_osp)
3503
          _CheckHVParams(self, node_list, hv_name, new_osp)
3504

    
3505
    if self.op.default_iallocator:
3506
      alloc_script = utils.FindFile(self.op.default_iallocator,
3507
                                    constants.IALLOCATOR_SEARCH_PATH,
3508
                                    os.path.isfile)
3509
      if alloc_script is None:
3510
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
3511
                                   " specified" % self.op.default_iallocator,
3512
                                   errors.ECODE_INVAL)
3513

    
3514
  def Exec(self, feedback_fn):
3515
    """Change the parameters of the cluster.
3516

3517
    """
3518
    if self.op.vg_name is not None:
3519
      new_volume = self.op.vg_name
3520
      if not new_volume:
3521
        new_volume = None
3522
      if new_volume != self.cfg.GetVGName():
3523
        self.cfg.SetVGName(new_volume)
3524
      else:
3525
        feedback_fn("Cluster LVM configuration already in desired"
3526
                    " state, not changing")
3527
    if self.op.drbd_helper is not None:
3528
      new_helper = self.op.drbd_helper
3529
      if not new_helper:
3530
        new_helper = None
3531
      if new_helper != self.cfg.GetDRBDHelper():
3532
        self.cfg.SetDRBDHelper(new_helper)
3533
      else:
3534
        feedback_fn("Cluster DRBD helper already in desired state,"
3535
                    " not changing")
3536
    if self.op.hvparams:
3537
      self.cluster.hvparams = self.new_hvparams
3538
    if self.op.os_hvp:
3539
      self.cluster.os_hvp = self.new_os_hvp
3540
    if self.op.enabled_hypervisors is not None:
3541
      self.cluster.hvparams = self.new_hvparams
3542
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
3543
    if self.op.beparams:
3544
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
3545
    if self.op.nicparams:
3546
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
3547
    if self.op.osparams:
3548
      self.cluster.osparams = self.new_osp
3549
    if self.op.ndparams:
3550
      self.cluster.ndparams = self.new_ndparams
3551

    
3552
    if self.op.candidate_pool_size is not None:
3553
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3554
      # we need to update the pool size here, otherwise the save will fail
3555
      _AdjustCandidatePool(self, [])
3556

    
3557
    if self.op.maintain_node_health is not None:
3558
      self.cluster.maintain_node_health = self.op.maintain_node_health
3559

    
3560
    if self.op.prealloc_wipe_disks is not None:
3561
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3562

    
3563
    if self.op.add_uids is not None:
3564
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3565

    
3566
    if self.op.remove_uids is not None:
3567
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3568

    
3569
    if self.op.uid_pool is not None:
3570
      self.cluster.uid_pool = self.op.uid_pool
3571

    
3572
    if self.op.default_iallocator is not None:
3573
      self.cluster.default_iallocator = self.op.default_iallocator
3574

    
3575
    if self.op.reserved_lvs is not None:
3576
      self.cluster.reserved_lvs = self.op.reserved_lvs
3577

    
3578
    def helper_os(aname, mods, desc):
3579
      desc += " OS list"
3580
      lst = getattr(self.cluster, aname)
3581
      for key, val in mods:
3582
        if key == constants.DDM_ADD:
3583
          if val in lst:
3584
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3585
          else:
3586
            lst.append(val)
3587
        elif key == constants.DDM_REMOVE:
3588
          if val in lst:
3589
            lst.remove(val)
3590
          else:
3591
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3592
        else:
3593
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3594

    
3595
    if self.op.hidden_os:
3596
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3597

    
3598
    if self.op.blacklisted_os:
3599
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3600

    
3601
    if self.op.master_netdev:
3602
      master = self.cfg.GetMasterNode()
3603
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3604
                  self.cluster.master_netdev)
3605
      result = self.rpc.call_node_stop_master(master, False)
3606
      result.Raise("Could not disable the master ip")
3607
      feedback_fn("Changing master_netdev from %s to %s" %
3608
                  (self.cluster.master_netdev, self.op.master_netdev))
3609
      self.cluster.master_netdev = self.op.master_netdev
3610

    
3611
    self.cfg.Update(self.cluster, feedback_fn)
3612

    
3613
    if self.op.master_netdev:
3614
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3615
                  self.op.master_netdev)
3616
      result = self.rpc.call_node_start_master(master, False, False)
3617
      if result.fail_msg:
3618
        self.LogWarning("Could not re-enable the master ip on"
3619
                        " the master, please restart manually: %s",
3620
                        result.fail_msg)
3621

    
3622

    
3623
def _UploadHelper(lu, nodes, fname):
3624
  """Helper for uploading a file and showing warnings.
3625

3626
  """
3627
  if os.path.exists(fname):
3628
    result = lu.rpc.call_upload_file(nodes, fname)
3629
    for to_node, to_result in result.items():
3630
      msg = to_result.fail_msg
3631
      if msg:
3632
        msg = ("Copy of file %s to node %s failed: %s" %
3633
               (fname, to_node, msg))
3634
        lu.proc.LogWarning(msg)
3635

    
3636

    
3637
def _ComputeAncillaryFiles(cluster, redist):
3638
  """Compute files external to Ganeti which need to be consistent.
3639

3640
  @type redist: boolean
3641
  @param redist: Whether to include files which need to be redistributed
3642

3643
  """
3644
  # Compute files for all nodes
3645
  files_all = set([
3646
    constants.SSH_KNOWN_HOSTS_FILE,
3647
    constants.CONFD_HMAC_KEY,
3648
    constants.CLUSTER_DOMAIN_SECRET_FILE,
3649
    ])
3650

    
3651
  if not redist:
3652
    files_all.update(constants.ALL_CERT_FILES)
3653
    files_all.update(ssconf.SimpleStore().GetFileList())
3654

    
3655
  if cluster.modify_etc_hosts:
3656
    files_all.add(constants.ETC_HOSTS)
3657

    
3658
  # Files which must either exist on all nodes or on none
3659
  files_all_opt = set([
3660
    constants.RAPI_USERS_FILE,
3661
    ])
3662

    
3663
  # Files which should only be on master candidates
3664
  files_mc = set()
3665
  if not redist:
3666
    files_mc.add(constants.CLUSTER_CONF_FILE)
3667

    
3668
  # Files which should only be on VM-capable nodes
3669
  files_vm = set(filename
3670
    for hv_name in cluster.enabled_hypervisors
3671
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles())
3672

    
3673
  # Filenames must be unique
3674
  assert (len(files_all | files_all_opt | files_mc | files_vm) ==
3675
          sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
3676
         "Found file listed in more than one file list"
3677

    
3678
  return (files_all, files_all_opt, files_mc, files_vm)
3679

    
3680

    
3681
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3682
  """Distribute additional files which are part of the cluster configuration.
3683

3684
  ConfigWriter takes care of distributing the config and ssconf files, but
3685
  there are more files which should be distributed to all nodes. This function
3686
  makes sure those are copied.
3687

3688
  @param lu: calling logical unit
3689
  @param additional_nodes: list of nodes not in the config to distribute to
3690
  @type additional_vm: boolean
3691
  @param additional_vm: whether the additional nodes are vm-capable or not
3692

3693
  """
3694
  # Gather target nodes
3695
  cluster = lu.cfg.GetClusterInfo()
3696
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3697

    
3698
  online_nodes = lu.cfg.GetOnlineNodeList()
3699
  vm_nodes = lu.cfg.GetVmCapableNodeList()
3700

    
3701
  if additional_nodes is not None:
3702
    online_nodes.extend(additional_nodes)
3703
    if additional_vm:
3704
      vm_nodes.extend(additional_nodes)
3705

    
3706
  # Never distribute to master node
3707
  for nodelist in [online_nodes, vm_nodes]:
3708
    if master_info.name in nodelist:
3709
      nodelist.remove(master_info.name)
3710

    
3711
  # Gather file lists
3712
  (files_all, files_all_opt, files_mc, files_vm) = \
3713
    _ComputeAncillaryFiles(cluster, True)
3714

    
3715
  # Never re-distribute configuration file from here
3716
  assert not (constants.CLUSTER_CONF_FILE in files_all or
3717
              constants.CLUSTER_CONF_FILE in files_vm)
3718
  assert not files_mc, "Master candidates not handled in this function"
3719

    
3720
  filemap = [
3721
    (online_nodes, files_all),
3722
    (online_nodes, files_all_opt),
3723
    (vm_nodes, files_vm),
3724
    ]
3725

    
3726
  # Upload the files
3727
  for (node_list, files) in filemap:
3728
    for fname in files:
3729
      _UploadHelper(lu, node_list, fname)
3730

    
3731

    
3732
class LUClusterRedistConf(NoHooksLU):
3733
  """Force the redistribution of cluster configuration.
3734

3735
  This is a very simple LU.
3736

3737
  """
3738
  REQ_BGL = False
3739

    
3740
  def ExpandNames(self):
3741
    self.needed_locks = {
3742
      locking.LEVEL_NODE: locking.ALL_SET,
3743
    }
3744
    self.share_locks[locking.LEVEL_NODE] = 1
3745

    
3746
  def Exec(self, feedback_fn):
3747
    """Redistribute the configuration.
3748

3749
    """
3750
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3751
    _RedistributeAncillaryFiles(self)
3752

    
3753

    
3754
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3755
  """Sleep and poll for an instance's disk to sync.
3756

3757
  """
3758
  if not instance.disks or disks is not None and not disks:
3759
    return True
3760

    
3761
  disks = _ExpandCheckDisks(instance, disks)
3762

    
3763
  if not oneshot:
3764
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3765

    
3766
  node = instance.primary_node
3767

    
3768
  for dev in disks:
3769
    lu.cfg.SetDiskID(dev, node)
3770

    
3771
  # TODO: Convert to utils.Retry
3772

    
3773
  retries = 0
3774
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3775
  while True:
3776
    max_time = 0
3777
    done = True
3778
    cumul_degraded = False
3779
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3780
    msg = rstats.fail_msg
3781
    if msg:
3782
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3783
      retries += 1
3784
      if retries >= 10:
3785
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3786
                                 " aborting." % node)
3787
      time.sleep(6)
3788
      continue
3789
    rstats = rstats.payload
3790
    retries = 0
3791
    for i, mstat in enumerate(rstats):
3792
      if mstat is None:
3793
        lu.LogWarning("Can't compute data for node %s/%s",
3794
                           node, disks[i].iv_name)
3795
        continue
3796

    
3797
      cumul_degraded = (cumul_degraded or
3798
                        (mstat.is_degraded and mstat.sync_percent is None))
3799
      if mstat.sync_percent is not None:
3800
        done = False
3801
        if mstat.estimated_time is not None:
3802
          rem_time = ("%s remaining (estimated)" %
3803
                      utils.FormatSeconds(mstat.estimated_time))
3804
          max_time = mstat.estimated_time
3805
        else:
3806
          rem_time = "no time estimate"
3807
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
3808
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
3809

    
3810
    # if we're done but degraded, let's do a few small retries, to
3811
    # make sure we see a stable and not transient situation; therefore
3812
    # we force restart of the loop
3813
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3814
      logging.info("Degraded disks found, %d retries left", degr_retries)
3815
      degr_retries -= 1
3816
      time.sleep(1)
3817
      continue
3818

    
3819
    if done or oneshot:
3820
      break
3821

    
3822
    time.sleep(min(60, max_time))
3823

    
3824
  if done:
3825
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
3826
  return not cumul_degraded
3827

    
3828

    
3829
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3830
  """Check that mirrors are not degraded.
3831

3832
  The ldisk parameter, if True, will change the test from the
3833
  is_degraded attribute (which represents overall non-ok status for
3834
  the device(s)) to the ldisk (representing the local storage status).
3835

3836
  """
3837
  lu.cfg.SetDiskID(dev, node)
3838

    
3839
  result = True
3840

    
3841
  if on_primary or dev.AssembleOnSecondary():
3842
    rstats = lu.rpc.call_blockdev_find(node, dev)
3843
    msg = rstats.fail_msg
3844
    if msg:
3845
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
3846
      result = False
3847
    elif not rstats.payload:
3848
      lu.LogWarning("Can't find disk on node %s", node)
3849
      result = False
3850
    else:
3851
      if ldisk:
3852
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
3853
      else:
3854
        result = result and not rstats.payload.is_degraded
3855

    
3856
  if dev.children:
3857
    for child in dev.children:
3858
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
3859

    
3860
  return result
3861

    
3862

    
3863
class LUOobCommand(NoHooksLU):
3864
  """Logical unit for OOB handling.
3865

3866
  """
3867
  REG_BGL = False
3868
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
3869

    
3870
  def ExpandNames(self):
3871
    """Gather locks we need.
3872

3873
    """
3874
    if self.op.node_names:
3875
      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
3876
      lock_names = self.op.node_names
3877
    else:
3878
      lock_names = locking.ALL_SET
3879

    
3880
    self.needed_locks = {
3881
      locking.LEVEL_NODE: lock_names,
3882
      }
3883

    
3884
  def CheckPrereq(self):
3885
    """Check prerequisites.
3886

3887
    This checks:
3888
     - the node exists in the configuration
3889
     - OOB is supported
3890

3891
    Any errors are signaled by raising errors.OpPrereqError.
3892

3893
    """
3894
    self.nodes = []
3895
    self.master_node = self.cfg.GetMasterNode()
3896

    
3897
    assert self.op.power_delay >= 0.0
3898

    
3899
    if self.op.node_names:
3900
      if (self.op.command in self._SKIP_MASTER and
3901
          self.master_node in self.op.node_names):
3902
        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
3903
        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
3904

    
3905
        if master_oob_handler:
3906
          additional_text = ("run '%s %s %s' if you want to operate on the"
3907
                             " master regardless") % (master_oob_handler,
3908
                                                      self.op.command,
3909
                                                      self.master_node)
3910
        else:
3911
          additional_text = "it does not support out-of-band operations"
3912

    
3913
        raise errors.OpPrereqError(("Operating on the master node %s is not"
3914
                                    " allowed for %s; %s") %
3915
                                   (self.master_node, self.op.command,
3916
                                    additional_text), errors.ECODE_INVAL)
3917
    else:
3918
      self.op.node_names = self.cfg.GetNodeList()
3919
      if self.op.command in self._SKIP_MASTER:
3920
        self.op.node_names.remove(self.master_node)
3921

    
3922
    if self.op.command in self._SKIP_MASTER:
3923
      assert self.master_node not in self.op.node_names
3924

    
3925
    for (node_name, node) in self.cfg.GetMultiNodeInfo(self.op.node_names):
3926
      if node is None:
3927
        raise errors.OpPrereqError("Node %s not found" % node_name,
3928
                                   errors.ECODE_NOENT)
3929
      else:
3930
        self.nodes.append(node)
3931

    
3932
      if (not self.op.ignore_status and
3933
          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
3934
        raise errors.OpPrereqError(("Cannot power off node %s because it is"
3935
                                    " not marked offline") % node_name,
3936
                                   errors.ECODE_STATE)
3937

    
3938
  def Exec(self, feedback_fn):
3939
    """Execute OOB and return result if we expect any.
3940

3941
    """
3942
    master_node = self.master_node
3943
    ret = []
3944

    
3945
    for idx, node in enumerate(utils.NiceSort(self.nodes,
3946
                                              key=lambda node: node.name)):
3947
      node_entry = [(constants.RS_NORMAL, node.name)]
3948
      ret.append(node_entry)
3949

    
3950
      oob_program = _SupportsOob(self.cfg, node)
3951

    
3952
      if not oob_program:
3953
        node_entry.append((constants.RS_UNAVAIL, None))
3954
        continue
3955

    
3956
      logging.info("Executing out-of-band command '%s' using '%s' on %s",
3957
                   self.op.command, oob_program, node.name)
3958
      result = self.rpc.call_run_oob(master_node, oob_program,
3959
                                     self.op.command, node.name,
3960
                                     self.op.timeout)
3961

    
3962
      if result.fail_msg:
3963
        self.LogWarning("Out-of-band RPC failed on node '%s': %s",
3964
                        node.name, result.fail_msg)
3965
        node_entry.append((constants.RS_NODATA, None))
3966
      else:
3967
        try:
3968
          self._CheckPayload(result)
3969
        except errors.OpExecError, err:
3970
          self.LogWarning("Payload returned by node '%s' is not valid: %s",
3971
                          node.name, err)
3972
          node_entry.append((constants.RS_NODATA, None))
3973
        else:
3974
          if self.op.command == constants.OOB_HEALTH:
3975
            # For health we should log important events
3976
            for item, status in result.payload:
3977
              if status in [constants.OOB_STATUS_WARNING,
3978
                            constants.OOB_STATUS_CRITICAL]:
3979
                self.LogWarning("Item '%s' on node '%s' has status '%s'",
3980
                                item, node.name, status)
3981

    
3982
          if self.op.command == constants.OOB_POWER_ON:
3983
            node.powered = True
3984
          elif self.op.command == constants.OOB_POWER_OFF:
3985
            node.powered = False
3986
          elif self.op.command == constants.OOB_POWER_STATUS:
3987
            powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
3988
            if powered != node.powered:
3989
              logging.warning(("Recorded power state (%s) of node '%s' does not"
3990
                               " match actual power state (%s)"), node.powered,
3991
                              node.name, powered)
3992

    
3993
          # For configuration changing commands we should update the node
3994
          if self.op.command in (constants.OOB_POWER_ON,
3995
                                 constants.OOB_POWER_OFF):
3996
            self.cfg.Update(node, feedback_fn)
3997

    
3998
          node_entry.append((constants.RS_NORMAL, result.payload))
3999

    
4000
          if (self.op.command == constants.OOB_POWER_ON and
4001
              idx < len(self.nodes) - 1):
4002
            time.sleep(self.op.power_delay)
4003

    
4004
    return ret
4005

    
4006
  def _CheckPayload(self, result):
4007
    """Checks if the payload is valid.
4008

4009
    @param result: RPC result
4010
    @raises errors.OpExecError: If payload is not valid
4011

4012
    """
4013
    errs = []
4014
    if self.op.command == constants.OOB_HEALTH:
4015
      if not isinstance(result.payload, list):
4016
        errs.append("command 'health' is expected to return a list but got %s" %
4017
                    type(result.payload))
4018
      else:
4019
        for item, status in result.payload:
4020
          if status not in constants.OOB_STATUSES:
4021
            errs.append("health item '%s' has invalid status '%s'" %
4022
                        (item, status))
4023

    
4024
    if self.op.command == constants.OOB_POWER_STATUS:
4025
      if not isinstance(result.payload, dict):
4026
        errs.append("power-status is expected to return a dict but got %s" %
4027
                    type(result.payload))
4028

    
4029
    if self.op.command in [
4030
        constants.OOB_POWER_ON,
4031
        constants.OOB_POWER_OFF,
4032
        constants.OOB_POWER_CYCLE,
4033
        ]:
4034
      if result.payload is not None:
4035
        errs.append("%s is expected to not return payload but got '%s'" %
4036
                    (self.op.command, result.payload))
4037

    
4038
    if errs:
4039
      raise errors.OpExecError("Check of out-of-band payload failed due to %s" %
4040
                               utils.CommaJoin(errs))
4041

    
4042
class _OsQuery(_QueryBase):
4043
  FIELDS = query.OS_FIELDS
4044

    
4045
  def ExpandNames(self, lu):
4046
    # Lock all nodes in shared mode
4047
    # Temporary removal of locks, should be reverted later
4048
    # TODO: reintroduce locks when they are lighter-weight
4049
    lu.needed_locks = {}
4050
    #self.share_locks[locking.LEVEL_NODE] = 1
4051
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4052

    
4053
    # The following variables interact with _QueryBase._GetNames
4054
    if self.names:
4055
      self.wanted = self.names
4056
    else:
4057
      self.wanted = locking.ALL_SET
4058

    
4059
    self.do_locking = self.use_locking
4060

    
4061
  def DeclareLocks(self, lu, level):
4062
    pass
4063

    
4064
  @staticmethod
4065
  def _DiagnoseByOS(rlist):
4066
    """Remaps a per-node return list into an a per-os per-node dictionary
4067

4068
    @param rlist: a map with node names as keys and OS objects as values
4069

4070
    @rtype: dict
4071
    @return: a dictionary with osnames as keys and as value another
4072
        map, with nodes as keys and tuples of (path, status, diagnose,
4073
        variants, parameters, api_versions) as values, eg::
4074

4075
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
4076
                                     (/srv/..., False, "invalid api")],
4077
                           "node2": [(/srv/..., True, "", [], [])]}
4078
          }
4079

4080
    """
4081
    all_os = {}
4082
    # we build here the list of nodes that didn't fail the RPC (at RPC
4083
    # level), so that nodes with a non-responding node daemon don't
4084
    # make all OSes invalid
4085
    good_nodes = [node_name for node_name in rlist
4086
                  if not rlist[node_name].fail_msg]
4087
    for node_name, nr in rlist.items():