Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ c85b15c1

History | View | Annotate | Download (477.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay to many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import logging
36
import copy
37
import OpenSSL
38
import socket
39
import tempfile
40
import shutil
41
import itertools
42
import operator
43

    
44
from ganeti import ssh
45
from ganeti import utils
46
from ganeti import errors
47
from ganeti import hypervisor
48
from ganeti import locking
49
from ganeti import constants
50
from ganeti import objects
51
from ganeti import serializer
52
from ganeti import ssconf
53
from ganeti import uidpool
54
from ganeti import compat
55
from ganeti import masterd
56
from ganeti import netutils
57
from ganeti import query
58
from ganeti import qlang
59
from ganeti import opcodes
60
from ganeti import ht
61
from ganeti import runtime
62

    
63
import ganeti.masterd.instance # pylint: disable=W0611
64

    
65

    
66
class ResultWithJobs:
67
  """Data container for LU results with jobs.
68

69
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
70
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
71
  contained in the C{jobs} attribute and include the job IDs in the opcode
72
  result.
73

74
  """
75
  def __init__(self, jobs, **kwargs):
76
    """Initializes this class.
77

78
    Additional return values can be specified as keyword arguments.
79

80
    @type jobs: list of lists of L{opcode.OpCode}
81
    @param jobs: A list of lists of opcode objects
82

83
    """
84
    self.jobs = jobs
85
    self.other = kwargs
86

    
87

    
88
class LogicalUnit(object):
89
  """Logical Unit base class.
90

91
  Subclasses must follow these rules:
92
    - implement ExpandNames
93
    - implement CheckPrereq (except when tasklets are used)
94
    - implement Exec (except when tasklets are used)
95
    - implement BuildHooksEnv
96
    - implement BuildHooksNodes
97
    - redefine HPATH and HTYPE
98
    - optionally redefine their run requirements:
99
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
100

101
  Note that all commands require root permissions.
102

103
  @ivar dry_run_result: the value (if any) that will be returned to the caller
104
      in dry-run mode (signalled by opcode dry_run parameter)
105

106
  """
107
  HPATH = None
108
  HTYPE = None
109
  REQ_BGL = True
110

    
111
  def __init__(self, processor, op, context, rpc):
112
    """Constructor for LogicalUnit.
113

114
    This needs to be overridden in derived classes in order to check op
115
    validity.
116

117
    """
118
    self.proc = processor
119
    self.op = op
120
    self.cfg = context.cfg
121
    self.glm = context.glm
122
    # readability alias
123
    self.owned_locks = context.glm.list_owned
124
    self.context = context
125
    self.rpc = rpc
126
    # Dicts used to declare locking needs to mcpu
127
    self.needed_locks = None
128
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
129
    self.add_locks = {}
130
    self.remove_locks = {}
131
    # Used to force good behavior when calling helper functions
132
    self.recalculate_locks = {}
133
    # logging
134
    self.Log = processor.Log # pylint: disable=C0103
135
    self.LogWarning = processor.LogWarning # pylint: disable=C0103
136
    self.LogInfo = processor.LogInfo # pylint: disable=C0103
137
    self.LogStep = processor.LogStep # pylint: disable=C0103
138
    # support for dry-run
139
    self.dry_run_result = None
140
    # support for generic debug attribute
141
    if (not hasattr(self.op, "debug_level") or
142
        not isinstance(self.op.debug_level, int)):
143
      self.op.debug_level = 0
144

    
145
    # Tasklets
146
    self.tasklets = None
147

    
148
    # Validate opcode parameters and set defaults
149
    self.op.Validate(True)
150

    
151
    self.CheckArguments()
152

    
153
  def CheckArguments(self):
154
    """Check syntactic validity for the opcode arguments.
155

156
    This method is for doing a simple syntactic check and ensure
157
    validity of opcode parameters, without any cluster-related
158
    checks. While the same can be accomplished in ExpandNames and/or
159
    CheckPrereq, doing these separate is better because:
160

161
      - ExpandNames is left as as purely a lock-related function
162
      - CheckPrereq is run after we have acquired locks (and possible
163
        waited for them)
164

165
    The function is allowed to change the self.op attribute so that
166
    later methods can no longer worry about missing parameters.
167

168
    """
169
    pass
170

    
171
  def ExpandNames(self):
172
    """Expand names for this LU.
173

174
    This method is called before starting to execute the opcode, and it should
175
    update all the parameters of the opcode to their canonical form (e.g. a
176
    short node name must be fully expanded after this method has successfully
177
    completed). This way locking, hooks, logging, etc. can work correctly.
178

179
    LUs which implement this method must also populate the self.needed_locks
180
    member, as a dict with lock levels as keys, and a list of needed lock names
181
    as values. Rules:
182

183
      - use an empty dict if you don't need any lock
184
      - if you don't need any lock at a particular level omit that level
185
      - don't put anything for the BGL level
186
      - if you want all locks at a level use locking.ALL_SET as a value
187

188
    If you need to share locks (rather than acquire them exclusively) at one
189
    level you can modify self.share_locks, setting a true value (usually 1) for
190
    that level. By default locks are not shared.
191

192
    This function can also define a list of tasklets, which then will be
193
    executed in order instead of the usual LU-level CheckPrereq and Exec
194
    functions, if those are not defined by the LU.
195

196
    Examples::
197

198
      # Acquire all nodes and one instance
199
      self.needed_locks = {
200
        locking.LEVEL_NODE: locking.ALL_SET,
201
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
202
      }
203
      # Acquire just two nodes
204
      self.needed_locks = {
205
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
206
      }
207
      # Acquire no locks
208
      self.needed_locks = {} # No, you can't leave it to the default value None
209

210
    """
211
    # The implementation of this method is mandatory only if the new LU is
212
    # concurrent, so that old LUs don't need to be changed all at the same
213
    # time.
214
    if self.REQ_BGL:
215
      self.needed_locks = {} # Exclusive LUs don't need locks.
216
    else:
217
      raise NotImplementedError
218

    
219
  def DeclareLocks(self, level):
220
    """Declare LU locking needs for a level
221

222
    While most LUs can just declare their locking needs at ExpandNames time,
223
    sometimes there's the need to calculate some locks after having acquired
224
    the ones before. This function is called just before acquiring locks at a
225
    particular level, but after acquiring the ones at lower levels, and permits
226
    such calculations. It can be used to modify self.needed_locks, and by
227
    default it does nothing.
228

229
    This function is only called if you have something already set in
230
    self.needed_locks for the level.
231

232
    @param level: Locking level which is going to be locked
233
    @type level: member of ganeti.locking.LEVELS
234

235
    """
236

    
237
  def CheckPrereq(self):
238
    """Check prerequisites for this LU.
239

240
    This method should check that the prerequisites for the execution
241
    of this LU are fulfilled. It can do internode communication, but
242
    it should be idempotent - no cluster or system changes are
243
    allowed.
244

245
    The method should raise errors.OpPrereqError in case something is
246
    not fulfilled. Its return value is ignored.
247

248
    This method should also update all the parameters of the opcode to
249
    their canonical form if it hasn't been done by ExpandNames before.
250

251
    """
252
    if self.tasklets is not None:
253
      for (idx, tl) in enumerate(self.tasklets):
254
        logging.debug("Checking prerequisites for tasklet %s/%s",
255
                      idx + 1, len(self.tasklets))
256
        tl.CheckPrereq()
257
    else:
258
      pass
259

    
260
  def Exec(self, feedback_fn):
261
    """Execute the LU.
262

263
    This method should implement the actual work. It should raise
264
    errors.OpExecError for failures that are somewhat dealt with in
265
    code, or expected.
266

267
    """
268
    if self.tasklets is not None:
269
      for (idx, tl) in enumerate(self.tasklets):
270
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
271
        tl.Exec(feedback_fn)
272
    else:
273
      raise NotImplementedError
274

    
275
  def BuildHooksEnv(self):
276
    """Build hooks environment for this LU.
277

278
    @rtype: dict
279
    @return: Dictionary containing the environment that will be used for
280
      running the hooks for this LU. The keys of the dict must not be prefixed
281
      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
282
      will extend the environment with additional variables. If no environment
283
      should be defined, an empty dictionary should be returned (not C{None}).
284
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
285
      will not be called.
286

287
    """
288
    raise NotImplementedError
289

    
290
  def BuildHooksNodes(self):
291
    """Build list of nodes to run LU's hooks.
292

293
    @rtype: tuple; (list, list)
294
    @return: Tuple containing a list of node names on which the hook
295
      should run before the execution and a list of node names on which the
296
      hook should run after the execution. No nodes should be returned as an
297
      empty list (and not None).
298
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
299
      will not be called.
300

301
    """
302
    raise NotImplementedError
303

    
304
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
305
    """Notify the LU about the results of its hooks.
306

307
    This method is called every time a hooks phase is executed, and notifies
308
    the Logical Unit about the hooks' result. The LU can then use it to alter
309
    its result based on the hooks.  By default the method does nothing and the
310
    previous result is passed back unchanged but any LU can define it if it
311
    wants to use the local cluster hook-scripts somehow.
312

313
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
314
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
315
    @param hook_results: the results of the multi-node hooks rpc call
316
    @param feedback_fn: function used send feedback back to the caller
317
    @param lu_result: the previous Exec result this LU had, or None
318
        in the PRE phase
319
    @return: the new Exec result, based on the previous result
320
        and hook results
321

322
    """
323
    # API must be kept, thus we ignore the unused argument and could
324
    # be a function warnings
325
    # pylint: disable=W0613,R0201
326
    return lu_result
327

    
328
  def _ExpandAndLockInstance(self):
329
    """Helper function to expand and lock an instance.
330

331
    Many LUs that work on an instance take its name in self.op.instance_name
332
    and need to expand it and then declare the expanded name for locking. This
333
    function does it, and then updates self.op.instance_name to the expanded
334
    name. It also initializes needed_locks as a dict, if this hasn't been done
335
    before.
336

337
    """
338
    if self.needed_locks is None:
339
      self.needed_locks = {}
340
    else:
341
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
342
        "_ExpandAndLockInstance called with instance-level locks set"
343
    self.op.instance_name = _ExpandInstanceName(self.cfg,
344
                                                self.op.instance_name)
345
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
346

    
347
  def _LockInstancesNodes(self, primary_only=False):
348
    """Helper function to declare instances' nodes for locking.
349

350
    This function should be called after locking one or more instances to lock
351
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
352
    with all primary or secondary nodes for instances already locked and
353
    present in self.needed_locks[locking.LEVEL_INSTANCE].
354

355
    It should be called from DeclareLocks, and for safety only works if
356
    self.recalculate_locks[locking.LEVEL_NODE] is set.
357

358
    In the future it may grow parameters to just lock some instance's nodes, or
359
    to just lock primaries or secondary nodes, if needed.
360

361
    If should be called in DeclareLocks in a way similar to::
362

363
      if level == locking.LEVEL_NODE:
364
        self._LockInstancesNodes()
365

366
    @type primary_only: boolean
367
    @param primary_only: only lock primary nodes of locked instances
368

369
    """
370
    assert locking.LEVEL_NODE in self.recalculate_locks, \
371
      "_LockInstancesNodes helper function called with no nodes to recalculate"
372

    
373
    # TODO: check if we're really been called with the instance locks held
374

    
375
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
376
    # future we might want to have different behaviors depending on the value
377
    # of self.recalculate_locks[locking.LEVEL_NODE]
378
    wanted_nodes = []
379
    locked_i = self.owned_locks(locking.LEVEL_INSTANCE)
380
    for _, instance in self.cfg.GetMultiInstanceInfo(locked_i):
381
      wanted_nodes.append(instance.primary_node)
382
      if not primary_only:
383
        wanted_nodes.extend(instance.secondary_nodes)
384

    
385
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
386
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
387
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
388
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
389

    
390
    del self.recalculate_locks[locking.LEVEL_NODE]
391

    
392

    
393
class NoHooksLU(LogicalUnit): # pylint: disable=W0223
394
  """Simple LU which runs no hooks.
395

396
  This LU is intended as a parent for other LogicalUnits which will
397
  run no hooks, in order to reduce duplicate code.
398

399
  """
400
  HPATH = None
401
  HTYPE = None
402

    
403
  def BuildHooksEnv(self):
404
    """Empty BuildHooksEnv for NoHooksLu.
405

406
    This just raises an error.
407

408
    """
409
    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
410

    
411
  def BuildHooksNodes(self):
412
    """Empty BuildHooksNodes for NoHooksLU.
413

414
    """
415
    raise AssertionError("BuildHooksNodes called for NoHooksLU")
416

    
417

    
418
class Tasklet:
419
  """Tasklet base class.
420

421
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
422
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
423
  tasklets know nothing about locks.
424

425
  Subclasses must follow these rules:
426
    - Implement CheckPrereq
427
    - Implement Exec
428

429
  """
430
  def __init__(self, lu):
431
    self.lu = lu
432

    
433
    # Shortcuts
434
    self.cfg = lu.cfg
435
    self.rpc = lu.rpc
436

    
437
  def CheckPrereq(self):
438
    """Check prerequisites for this tasklets.
439

440
    This method should check whether the prerequisites for the execution of
441
    this tasklet are fulfilled. It can do internode communication, but it
442
    should be idempotent - no cluster or system changes are allowed.
443

444
    The method should raise errors.OpPrereqError in case something is not
445
    fulfilled. Its return value is ignored.
446

447
    This method should also update all parameters to their canonical form if it
448
    hasn't been done before.
449

450
    """
451
    pass
452

    
453
  def Exec(self, feedback_fn):
454
    """Execute the tasklet.
455

456
    This method should implement the actual work. It should raise
457
    errors.OpExecError for failures that are somewhat dealt with in code, or
458
    expected.
459

460
    """
461
    raise NotImplementedError
462

    
463

    
464
class _QueryBase:
465
  """Base for query utility classes.
466

467
  """
468
  #: Attribute holding field definitions
469
  FIELDS = None
470

    
471
  def __init__(self, filter_, fields, use_locking):
472
    """Initializes this class.
473

474
    """
475
    self.use_locking = use_locking
476

    
477
    self.query = query.Query(self.FIELDS, fields, filter_=filter_,
478
                             namefield="name")
479
    self.requested_data = self.query.RequestedData()
480
    self.names = self.query.RequestedNames()
481

    
482
    # Sort only if no names were requested
483
    self.sort_by_name = not self.names
484

    
485
    self.do_locking = None
486
    self.wanted = None
487

    
488
  def _GetNames(self, lu, all_names, lock_level):
489
    """Helper function to determine names asked for in the query.
490

491
    """
492
    if self.do_locking:
493
      names = lu.owned_locks(lock_level)
494
    else:
495
      names = all_names
496

    
497
    if self.wanted == locking.ALL_SET:
498
      assert not self.names
499
      # caller didn't specify names, so ordering is not important
500
      return utils.NiceSort(names)
501

    
502
    # caller specified names and we must keep the same order
503
    assert self.names
504
    assert not self.do_locking or lu.glm.is_owned(lock_level)
505

    
506
    missing = set(self.wanted).difference(names)
507
    if missing:
508
      raise errors.OpExecError("Some items were removed before retrieving"
509
                               " their data: %s" % missing)
510

    
511
    # Return expanded names
512
    return self.wanted
513

    
514
  def ExpandNames(self, lu):
515
    """Expand names for this query.
516

517
    See L{LogicalUnit.ExpandNames}.
518

519
    """
520
    raise NotImplementedError()
521

    
522
  def DeclareLocks(self, lu, level):
523
    """Declare locks for this query.
524

525
    See L{LogicalUnit.DeclareLocks}.
526

527
    """
528
    raise NotImplementedError()
529

    
530
  def _GetQueryData(self, lu):
531
    """Collects all data for this query.
532

533
    @return: Query data object
534

535
    """
536
    raise NotImplementedError()
537

    
538
  def NewStyleQuery(self, lu):
539
    """Collect data and execute query.
540

541
    """
542
    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
543
                                  sort_by_name=self.sort_by_name)
544

    
545
  def OldStyleQuery(self, lu):
546
    """Collect data and execute query.
547

548
    """
549
    return self.query.OldStyleQuery(self._GetQueryData(lu),
550
                                    sort_by_name=self.sort_by_name)
551

    
552

    
553
def _ShareAll():
554
  """Returns a dict declaring all lock levels shared.
555

556
  """
557
  return dict.fromkeys(locking.LEVELS, 1)
558

    
559

    
560
def _CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes,
561
                              cur_group_uuid):
562
  """Checks if node groups for locked instances are still correct.
563

564
  @type cfg: L{config.ConfigWriter}
565
  @param cfg: Cluster configuration
566
  @type instances: dict; string as key, L{objects.Instance} as value
567
  @param instances: Dictionary, instance name as key, instance object as value
568
  @type owned_groups: iterable of string
569
  @param owned_groups: List of owned groups
570
  @type owned_nodes: iterable of string
571
  @param owned_nodes: List of owned nodes
572
  @type cur_group_uuid: string or None
573
  @type cur_group_uuid: Optional group UUID to check against instance's groups
574

575
  """
576
  for (name, inst) in instances.items():
577
    assert owned_nodes.issuperset(inst.all_nodes), \
578
      "Instance %s's nodes changed while we kept the lock" % name
579

    
580
    inst_groups = _CheckInstanceNodeGroups(cfg, name, owned_groups)
581

    
582
    assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
583
      "Instance %s has no node in group %s" % (name, cur_group_uuid)
584

    
585

    
586
def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
587
  """Checks if the owned node groups are still correct for an instance.
588

589
  @type cfg: L{config.ConfigWriter}
590
  @param cfg: The cluster configuration
591
  @type instance_name: string
592
  @param instance_name: Instance name
593
  @type owned_groups: set or frozenset
594
  @param owned_groups: List of currently owned node groups
595

596
  """
597
  inst_groups = cfg.GetInstanceNodeGroups(instance_name)
598

    
599
  if not owned_groups.issuperset(inst_groups):
600
    raise errors.OpPrereqError("Instance %s's node groups changed since"
601
                               " locks were acquired, current groups are"
602
                               " are '%s', owning groups '%s'; retry the"
603
                               " operation" %
604
                               (instance_name,
605
                                utils.CommaJoin(inst_groups),
606
                                utils.CommaJoin(owned_groups)),
607
                               errors.ECODE_STATE)
608

    
609
  return inst_groups
610

    
611

    
612
def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
613
  """Checks if the instances in a node group are still correct.
614

615
  @type cfg: L{config.ConfigWriter}
616
  @param cfg: The cluster configuration
617
  @type group_uuid: string
618
  @param group_uuid: Node group UUID
619
  @type owned_instances: set or frozenset
620
  @param owned_instances: List of currently owned instances
621

622
  """
623
  wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
624
  if owned_instances != wanted_instances:
625
    raise errors.OpPrereqError("Instances in node group '%s' changed since"
626
                               " locks were acquired, wanted '%s', have '%s';"
627
                               " retry the operation" %
628
                               (group_uuid,
629
                                utils.CommaJoin(wanted_instances),
630
                                utils.CommaJoin(owned_instances)),
631
                               errors.ECODE_STATE)
632

    
633
  return wanted_instances
634

    
635

    
636
def _SupportsOob(cfg, node):
637
  """Tells if node supports OOB.
638

639
  @type cfg: L{config.ConfigWriter}
640
  @param cfg: The cluster configuration
641
  @type node: L{objects.Node}
642
  @param node: The node
643
  @return: The OOB script if supported or an empty string otherwise
644

645
  """
646
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
647

    
648

    
649
def _GetWantedNodes(lu, nodes):
650
  """Returns list of checked and expanded node names.
651

652
  @type lu: L{LogicalUnit}
653
  @param lu: the logical unit on whose behalf we execute
654
  @type nodes: list
655
  @param nodes: list of node names or None for all nodes
656
  @rtype: list
657
  @return: the list of nodes, sorted
658
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
659

660
  """
661
  if nodes:
662
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
663

    
664
  return utils.NiceSort(lu.cfg.GetNodeList())
665

    
666

    
667
def _GetWantedInstances(lu, instances):
668
  """Returns list of checked and expanded instance names.
669

670
  @type lu: L{LogicalUnit}
671
  @param lu: the logical unit on whose behalf we execute
672
  @type instances: list
673
  @param instances: list of instance names or None for all instances
674
  @rtype: list
675
  @return: the list of instances, sorted
676
  @raise errors.OpPrereqError: if the instances parameter is wrong type
677
  @raise errors.OpPrereqError: if any of the passed instances is not found
678

679
  """
680
  if instances:
681
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
682
  else:
683
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
684
  return wanted
685

    
686

    
687
def _GetUpdatedParams(old_params, update_dict,
688
                      use_default=True, use_none=False):
689
  """Return the new version of a parameter dictionary.
690

691
  @type old_params: dict
692
  @param old_params: old parameters
693
  @type update_dict: dict
694
  @param update_dict: dict containing new parameter values, or
695
      constants.VALUE_DEFAULT to reset the parameter to its default
696
      value
697
  @param use_default: boolean
698
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
699
      values as 'to be deleted' values
700
  @param use_none: boolean
701
  @type use_none: whether to recognise C{None} values as 'to be
702
      deleted' values
703
  @rtype: dict
704
  @return: the new parameter dictionary
705

706
  """
707
  params_copy = copy.deepcopy(old_params)
708
  for key, val in update_dict.iteritems():
709
    if ((use_default and val == constants.VALUE_DEFAULT) or
710
        (use_none and val is None)):
711
      try:
712
        del params_copy[key]
713
      except KeyError:
714
        pass
715
    else:
716
      params_copy[key] = val
717
  return params_copy
718

    
719

    
720
def _ReleaseLocks(lu, level, names=None, keep=None):
721
  """Releases locks owned by an LU.
722

723
  @type lu: L{LogicalUnit}
724
  @param level: Lock level
725
  @type names: list or None
726
  @param names: Names of locks to release
727
  @type keep: list or None
728
  @param keep: Names of locks to retain
729

730
  """
731
  assert not (keep is not None and names is not None), \
732
         "Only one of the 'names' and the 'keep' parameters can be given"
733

    
734
  if names is not None:
735
    should_release = names.__contains__
736
  elif keep:
737
    should_release = lambda name: name not in keep
738
  else:
739
    should_release = None
740

    
741
  if should_release:
742
    retain = []
743
    release = []
744

    
745
    # Determine which locks to release
746
    for name in lu.owned_locks(level):
747
      if should_release(name):
748
        release.append(name)
749
      else:
750
        retain.append(name)
751

    
752
    assert len(lu.owned_locks(level)) == (len(retain) + len(release))
753

    
754
    # Release just some locks
755
    lu.glm.release(level, names=release)
756

    
757
    assert frozenset(lu.owned_locks(level)) == frozenset(retain)
758
  else:
759
    # Release everything
760
    lu.glm.release(level)
761

    
762
    assert not lu.glm.is_owned(level), "No locks should be owned"
763

    
764

    
765
def _MapInstanceDisksToNodes(instances):
766
  """Creates a map from (node, volume) to instance name.
767

768
  @type instances: list of L{objects.Instance}
769
  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
770

771
  """
772
  return dict(((node, vol), inst.name)
773
              for inst in instances
774
              for (node, vols) in inst.MapLVsByNode().items()
775
              for vol in vols)
776

    
777

    
778
def _RunPostHook(lu, node_name):
779
  """Runs the post-hook for an opcode on a single node.
780

781
  """
782
  hm = lu.proc.BuildHooksManager(lu)
783
  try:
784
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
785
  except:
786
    # pylint: disable=W0702
787
    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
788

    
789

    
790
def _CheckOutputFields(static, dynamic, selected):
791
  """Checks whether all selected fields are valid.
792

793
  @type static: L{utils.FieldSet}
794
  @param static: static fields set
795
  @type dynamic: L{utils.FieldSet}
796
  @param dynamic: dynamic fields set
797

798
  """
799
  f = utils.FieldSet()
800
  f.Extend(static)
801
  f.Extend(dynamic)
802

    
803
  delta = f.NonMatching(selected)
804
  if delta:
805
    raise errors.OpPrereqError("Unknown output fields selected: %s"
806
                               % ",".join(delta), errors.ECODE_INVAL)
807

    
808

    
809
def _CheckGlobalHvParams(params):
810
  """Validates that given hypervisor params are not global ones.
811

812
  This will ensure that instances don't get customised versions of
813
  global params.
814

815
  """
816
  used_globals = constants.HVC_GLOBALS.intersection(params)
817
  if used_globals:
818
    msg = ("The following hypervisor parameters are global and cannot"
819
           " be customized at instance level, please modify them at"
820
           " cluster level: %s" % utils.CommaJoin(used_globals))
821
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
822

    
823

    
824
def _CheckNodeOnline(lu, node, msg=None):
825
  """Ensure that a given node is online.
826

827
  @param lu: the LU on behalf of which we make the check
828
  @param node: the node to check
829
  @param msg: if passed, should be a message to replace the default one
830
  @raise errors.OpPrereqError: if the node is offline
831

832
  """
833
  if msg is None:
834
    msg = "Can't use offline node"
835
  if lu.cfg.GetNodeInfo(node).offline:
836
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
837

    
838

    
839
def _CheckNodeNotDrained(lu, node):
840
  """Ensure that a given node is not drained.
841

842
  @param lu: the LU on behalf of which we make the check
843
  @param node: the node to check
844
  @raise errors.OpPrereqError: if the node is drained
845

846
  """
847
  if lu.cfg.GetNodeInfo(node).drained:
848
    raise errors.OpPrereqError("Can't use drained node %s" % node,
849
                               errors.ECODE_STATE)
850

    
851

    
852
def _CheckNodeVmCapable(lu, node):
853
  """Ensure that a given node is vm capable.
854

855
  @param lu: the LU on behalf of which we make the check
856
  @param node: the node to check
857
  @raise errors.OpPrereqError: if the node is not vm capable
858

859
  """
860
  if not lu.cfg.GetNodeInfo(node).vm_capable:
861
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
862
                               errors.ECODE_STATE)
863

    
864

    
865
def _CheckNodeHasOS(lu, node, os_name, force_variant):
866
  """Ensure that a node supports a given OS.
867

868
  @param lu: the LU on behalf of which we make the check
869
  @param node: the node to check
870
  @param os_name: the OS to query about
871
  @param force_variant: whether to ignore variant errors
872
  @raise errors.OpPrereqError: if the node is not supporting the OS
873

874
  """
875
  result = lu.rpc.call_os_get(node, os_name)
876
  result.Raise("OS '%s' not in supported OS list for node %s" %
877
               (os_name, node),
878
               prereq=True, ecode=errors.ECODE_INVAL)
879
  if not force_variant:
880
    _CheckOSVariant(result.payload, os_name)
881

    
882

    
883
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
884
  """Ensure that a node has the given secondary ip.
885

886
  @type lu: L{LogicalUnit}
887
  @param lu: the LU on behalf of which we make the check
888
  @type node: string
889
  @param node: the node to check
890
  @type secondary_ip: string
891
  @param secondary_ip: the ip to check
892
  @type prereq: boolean
893
  @param prereq: whether to throw a prerequisite or an execute error
894
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
895
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
896

897
  """
898
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
899
  result.Raise("Failure checking secondary ip on node %s" % node,
900
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
901
  if not result.payload:
902
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
903
           " please fix and re-run this command" % secondary_ip)
904
    if prereq:
905
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
906
    else:
907
      raise errors.OpExecError(msg)
908

    
909

    
910
def _GetClusterDomainSecret():
911
  """Reads the cluster domain secret.
912

913
  """
914
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
915
                               strict=True)
916

    
917

    
918
def _CheckInstanceDown(lu, instance, reason):
919
  """Ensure that an instance is not running."""
920
  if instance.admin_up:
921
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
922
                               (instance.name, reason), errors.ECODE_STATE)
923

    
924
  pnode = instance.primary_node
925
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
926
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
927
              prereq=True, ecode=errors.ECODE_ENVIRON)
928

    
929
  if instance.name in ins_l.payload:
930
    raise errors.OpPrereqError("Instance %s is running, %s" %
931
                               (instance.name, reason), errors.ECODE_STATE)
932

    
933

    
934
def _ExpandItemName(fn, name, kind):
935
  """Expand an item name.
936

937
  @param fn: the function to use for expansion
938
  @param name: requested item name
939
  @param kind: text description ('Node' or 'Instance')
940
  @return: the resolved (full) name
941
  @raise errors.OpPrereqError: if the item is not found
942

943
  """
944
  full_name = fn(name)
945
  if full_name is None:
946
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
947
                               errors.ECODE_NOENT)
948
  return full_name
949

    
950

    
951
def _ExpandNodeName(cfg, name):
952
  """Wrapper over L{_ExpandItemName} for nodes."""
953
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
954

    
955

    
956
def _ExpandInstanceName(cfg, name):
957
  """Wrapper over L{_ExpandItemName} for instance."""
958
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
959

    
960

    
961
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
962
                          memory, vcpus, nics, disk_template, disks,
963
                          bep, hvp, hypervisor_name, tags):
964
  """Builds instance related env variables for hooks
965

966
  This builds the hook environment from individual variables.
967

968
  @type name: string
969
  @param name: the name of the instance
970
  @type primary_node: string
971
  @param primary_node: the name of the instance's primary node
972
  @type secondary_nodes: list
973
  @param secondary_nodes: list of secondary nodes as strings
974
  @type os_type: string
975
  @param os_type: the name of the instance's OS
976
  @type status: boolean
977
  @param status: the should_run status of the instance
978
  @type memory: string
979
  @param memory: the memory size of the instance
980
  @type vcpus: string
981
  @param vcpus: the count of VCPUs the instance has
982
  @type nics: list
983
  @param nics: list of tuples (ip, mac, mode, link) representing
984
      the NICs the instance has
985
  @type disk_template: string
986
  @param disk_template: the disk template of the instance
987
  @type disks: list
988
  @param disks: the list of (size, mode) pairs
989
  @type bep: dict
990
  @param bep: the backend parameters for the instance
991
  @type hvp: dict
992
  @param hvp: the hypervisor parameters for the instance
993
  @type hypervisor_name: string
994
  @param hypervisor_name: the hypervisor for the instance
995
  @type tags: list
996
  @param tags: list of instance tags as strings
997
  @rtype: dict
998
  @return: the hook environment for this instance
999

1000
  """
1001
  if status:
1002
    str_status = "up"
1003
  else:
1004
    str_status = "down"
1005
  env = {
1006
    "OP_TARGET": name,
1007
    "INSTANCE_NAME": name,
1008
    "INSTANCE_PRIMARY": primary_node,
1009
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
1010
    "INSTANCE_OS_TYPE": os_type,
1011
    "INSTANCE_STATUS": str_status,
1012
    "INSTANCE_MEMORY": memory,
1013
    "INSTANCE_VCPUS": vcpus,
1014
    "INSTANCE_DISK_TEMPLATE": disk_template,
1015
    "INSTANCE_HYPERVISOR": hypervisor_name,
1016
  }
1017

    
1018
  if nics:
1019
    nic_count = len(nics)
1020
    for idx, (ip, mac, mode, link) in enumerate(nics):
1021
      if ip is None:
1022
        ip = ""
1023
      env["INSTANCE_NIC%d_IP" % idx] = ip
1024
      env["INSTANCE_NIC%d_MAC" % idx] = mac
1025
      env["INSTANCE_NIC%d_MODE" % idx] = mode
1026
      env["INSTANCE_NIC%d_LINK" % idx] = link
1027
      if mode == constants.NIC_MODE_BRIDGED:
1028
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
1029
  else:
1030
    nic_count = 0
1031

    
1032
  env["INSTANCE_NIC_COUNT"] = nic_count
1033

    
1034
  if disks:
1035
    disk_count = len(disks)
1036
    for idx, (size, mode) in enumerate(disks):
1037
      env["INSTANCE_DISK%d_SIZE" % idx] = size
1038
      env["INSTANCE_DISK%d_MODE" % idx] = mode
1039
  else:
1040
    disk_count = 0
1041

    
1042
  env["INSTANCE_DISK_COUNT"] = disk_count
1043

    
1044
  if not tags:
1045
    tags = []
1046

    
1047
  env["INSTANCE_TAGS"] = " ".join(tags)
1048

    
1049
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
1050
    for key, value in source.items():
1051
      env["INSTANCE_%s_%s" % (kind, key)] = value
1052

    
1053
  return env
1054

    
1055

    
1056
def _NICListToTuple(lu, nics):
1057
  """Build a list of nic information tuples.
1058

1059
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
1060
  value in LUInstanceQueryData.
1061

1062
  @type lu:  L{LogicalUnit}
1063
  @param lu: the logical unit on whose behalf we execute
1064
  @type nics: list of L{objects.NIC}
1065
  @param nics: list of nics to convert to hooks tuples
1066

1067
  """
1068
  hooks_nics = []
1069
  cluster = lu.cfg.GetClusterInfo()
1070
  for nic in nics:
1071
    ip = nic.ip
1072
    mac = nic.mac
1073
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
1074
    mode = filled_params[constants.NIC_MODE]
1075
    link = filled_params[constants.NIC_LINK]
1076
    hooks_nics.append((ip, mac, mode, link))
1077
  return hooks_nics
1078

    
1079

    
1080
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
1081
  """Builds instance related env variables for hooks from an object.
1082

1083
  @type lu: L{LogicalUnit}
1084
  @param lu: the logical unit on whose behalf we execute
1085
  @type instance: L{objects.Instance}
1086
  @param instance: the instance for which we should build the
1087
      environment
1088
  @type override: dict
1089
  @param override: dictionary with key/values that will override
1090
      our values
1091
  @rtype: dict
1092
  @return: the hook environment dictionary
1093

1094
  """
1095
  cluster = lu.cfg.GetClusterInfo()
1096
  bep = cluster.FillBE(instance)
1097
  hvp = cluster.FillHV(instance)
1098
  args = {
1099
    "name": instance.name,
1100
    "primary_node": instance.primary_node,
1101
    "secondary_nodes": instance.secondary_nodes,
1102
    "os_type": instance.os,
1103
    "status": instance.admin_up,
1104
    "memory": bep[constants.BE_MEMORY],
1105
    "vcpus": bep[constants.BE_VCPUS],
1106
    "nics": _NICListToTuple(lu, instance.nics),
1107
    "disk_template": instance.disk_template,
1108
    "disks": [(disk.size, disk.mode) for disk in instance.disks],
1109
    "bep": bep,
1110
    "hvp": hvp,
1111
    "hypervisor_name": instance.hypervisor,
1112
    "tags": instance.tags,
1113
  }
1114
  if override:
1115
    args.update(override)
1116
  return _BuildInstanceHookEnv(**args) # pylint: disable=W0142
1117

    
1118

    
1119
def _AdjustCandidatePool(lu, exceptions):
1120
  """Adjust the candidate pool after node operations.
1121

1122
  """
1123
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1124
  if mod_list:
1125
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1126
               utils.CommaJoin(node.name for node in mod_list))
1127
    for name in mod_list:
1128
      lu.context.ReaddNode(name)
1129
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1130
  if mc_now > mc_max:
1131
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1132
               (mc_now, mc_max))
1133

    
1134

    
1135
def _DecideSelfPromotion(lu, exceptions=None):
1136
  """Decide whether I should promote myself as a master candidate.
1137

1138
  """
1139
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1140
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1141
  # the new node will increase mc_max with one, so:
1142
  mc_should = min(mc_should + 1, cp_size)
1143
  return mc_now < mc_should
1144

    
1145

    
1146
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1147
  """Check that the brigdes needed by a list of nics exist.
1148

1149
  """
1150
  cluster = lu.cfg.GetClusterInfo()
1151
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1152
  brlist = [params[constants.NIC_LINK] for params in paramslist
1153
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1154
  if brlist:
1155
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1156
    result.Raise("Error checking bridges on destination node '%s'" %
1157
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1158

    
1159

    
1160
def _CheckInstanceBridgesExist(lu, instance, node=None):
1161
  """Check that the brigdes needed by an instance exist.
1162

1163
  """
1164
  if node is None:
1165
    node = instance.primary_node
1166
  _CheckNicsBridgesExist(lu, instance.nics, node)
1167

    
1168

    
1169
def _CheckOSVariant(os_obj, name):
1170
  """Check whether an OS name conforms to the os variants specification.
1171

1172
  @type os_obj: L{objects.OS}
1173
  @param os_obj: OS object to check
1174
  @type name: string
1175
  @param name: OS name passed by the user, to check for validity
1176

1177
  """
1178
  variant = objects.OS.GetVariant(name)
1179
  if not os_obj.supported_variants:
1180
    if variant:
1181
      raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
1182
                                 " passed)" % (os_obj.name, variant),
1183
                                 errors.ECODE_INVAL)
1184
    return
1185
  if not variant:
1186
    raise errors.OpPrereqError("OS name must include a variant",
1187
                               errors.ECODE_INVAL)
1188

    
1189
  if variant not in os_obj.supported_variants:
1190
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1191

    
1192

    
1193
def _GetNodeInstancesInner(cfg, fn):
1194
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1195

    
1196

    
1197
def _GetNodeInstances(cfg, node_name):
1198
  """Returns a list of all primary and secondary instances on a node.
1199

1200
  """
1201

    
1202
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1203

    
1204

    
1205
def _GetNodePrimaryInstances(cfg, node_name):
1206
  """Returns primary instances on a node.
1207

1208
  """
1209
  return _GetNodeInstancesInner(cfg,
1210
                                lambda inst: node_name == inst.primary_node)
1211

    
1212

    
1213
def _GetNodeSecondaryInstances(cfg, node_name):
1214
  """Returns secondary instances on a node.
1215

1216
  """
1217
  return _GetNodeInstancesInner(cfg,
1218
                                lambda inst: node_name in inst.secondary_nodes)
1219

    
1220

    
1221
def _GetStorageTypeArgs(cfg, storage_type):
1222
  """Returns the arguments for a storage type.
1223

1224
  """
1225
  # Special case for file storage
1226
  if storage_type == constants.ST_FILE:
1227
    # storage.FileStorage wants a list of storage directories
1228
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1229

    
1230
  return []
1231

    
1232

    
1233
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
1234
  faulty = []
1235

    
1236
  for dev in instance.disks:
1237
    cfg.SetDiskID(dev, node_name)
1238

    
1239
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
1240
  result.Raise("Failed to get disk status from node %s" % node_name,
1241
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1242

    
1243
  for idx, bdev_status in enumerate(result.payload):
1244
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1245
      faulty.append(idx)
1246

    
1247
  return faulty
1248

    
1249

    
1250
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1251
  """Check the sanity of iallocator and node arguments and use the
1252
  cluster-wide iallocator if appropriate.
1253

1254
  Check that at most one of (iallocator, node) is specified. If none is
1255
  specified, then the LU's opcode's iallocator slot is filled with the
1256
  cluster-wide default iallocator.
1257

1258
  @type iallocator_slot: string
1259
  @param iallocator_slot: the name of the opcode iallocator slot
1260
  @type node_slot: string
1261
  @param node_slot: the name of the opcode target node slot
1262

1263
  """
1264
  node = getattr(lu.op, node_slot, None)
1265
  iallocator = getattr(lu.op, iallocator_slot, None)
1266

    
1267
  if node is not None and iallocator is not None:
1268
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
1269
                               errors.ECODE_INVAL)
1270
  elif node is None and iallocator is None:
1271
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1272
    if default_iallocator:
1273
      setattr(lu.op, iallocator_slot, default_iallocator)
1274
    else:
1275
      raise errors.OpPrereqError("No iallocator or node given and no"
1276
                                 " cluster-wide default iallocator found;"
1277
                                 " please specify either an iallocator or a"
1278
                                 " node, or set a cluster-wide default"
1279
                                 " iallocator")
1280

    
1281

    
1282
def _GetDefaultIAllocator(cfg, iallocator):
1283
  """Decides on which iallocator to use.
1284

1285
  @type cfg: L{config.ConfigWriter}
1286
  @param cfg: Cluster configuration object
1287
  @type iallocator: string or None
1288
  @param iallocator: Iallocator specified in opcode
1289
  @rtype: string
1290
  @return: Iallocator name
1291

1292
  """
1293
  if not iallocator:
1294
    # Use default iallocator
1295
    iallocator = cfg.GetDefaultIAllocator()
1296

    
1297
  if not iallocator:
1298
    raise errors.OpPrereqError("No iallocator was specified, neither in the"
1299
                               " opcode nor as a cluster-wide default",
1300
                               errors.ECODE_INVAL)
1301

    
1302
  return iallocator
1303

    
1304

    
1305
class LUClusterPostInit(LogicalUnit):
1306
  """Logical unit for running hooks after cluster initialization.
1307

1308
  """
1309
  HPATH = "cluster-init"
1310
  HTYPE = constants.HTYPE_CLUSTER
1311

    
1312
  def BuildHooksEnv(self):
1313
    """Build hooks env.
1314

1315
    """
1316
    return {
1317
      "OP_TARGET": self.cfg.GetClusterName(),
1318
      }
1319

    
1320
  def BuildHooksNodes(self):
1321
    """Build hooks nodes.
1322

1323
    """
1324
    return ([], [self.cfg.GetMasterNode()])
1325

    
1326
  def Exec(self, feedback_fn):
1327
    """Nothing to do.
1328

1329
    """
1330
    return True
1331

    
1332

    
1333
class LUClusterDestroy(LogicalUnit):
1334
  """Logical unit for destroying the cluster.
1335

1336
  """
1337
  HPATH = "cluster-destroy"
1338
  HTYPE = constants.HTYPE_CLUSTER
1339

    
1340
  def BuildHooksEnv(self):
1341
    """Build hooks env.
1342

1343
    """
1344
    return {
1345
      "OP_TARGET": self.cfg.GetClusterName(),
1346
      }
1347

    
1348
  def BuildHooksNodes(self):
1349
    """Build hooks nodes.
1350

1351
    """
1352
    return ([], [])
1353

    
1354
  def CheckPrereq(self):
1355
    """Check prerequisites.
1356

1357
    This checks whether the cluster is empty.
1358

1359
    Any errors are signaled by raising errors.OpPrereqError.
1360

1361
    """
1362
    master = self.cfg.GetMasterNode()
1363

    
1364
    nodelist = self.cfg.GetNodeList()
1365
    if len(nodelist) != 1 or nodelist[0] != master:
1366
      raise errors.OpPrereqError("There are still %d node(s) in"
1367
                                 " this cluster." % (len(nodelist) - 1),
1368
                                 errors.ECODE_INVAL)
1369
    instancelist = self.cfg.GetInstanceList()
1370
    if instancelist:
1371
      raise errors.OpPrereqError("There are still %d instance(s) in"
1372
                                 " this cluster." % len(instancelist),
1373
                                 errors.ECODE_INVAL)
1374

    
1375
  def Exec(self, feedback_fn):
1376
    """Destroys the cluster.
1377

1378
    """
1379
    master = self.cfg.GetMasterNode()
1380

    
1381
    # Run post hooks on master node before it's removed
1382
    _RunPostHook(self, master)
1383

    
1384
    result = self.rpc.call_node_deactivate_master_ip(master)
1385
    result.Raise("Could not disable the master role")
1386

    
1387
    return master
1388

    
1389

    
1390
def _VerifyCertificate(filename):
1391
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1392

1393
  @type filename: string
1394
  @param filename: Path to PEM file
1395

1396
  """
1397
  try:
1398
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1399
                                           utils.ReadFile(filename))
1400
  except Exception, err: # pylint: disable=W0703
1401
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1402
            "Failed to load X509 certificate %s: %s" % (filename, err))
1403

    
1404
  (errcode, msg) = \
1405
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1406
                                constants.SSL_CERT_EXPIRATION_ERROR)
1407

    
1408
  if msg:
1409
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1410
  else:
1411
    fnamemsg = None
1412

    
1413
  if errcode is None:
1414
    return (None, fnamemsg)
1415
  elif errcode == utils.CERT_WARNING:
1416
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1417
  elif errcode == utils.CERT_ERROR:
1418
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1419

    
1420
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1421

    
1422

    
1423
def _GetAllHypervisorParameters(cluster, instances):
1424
  """Compute the set of all hypervisor parameters.
1425

1426
  @type cluster: L{objects.Cluster}
1427
  @param cluster: the cluster object
1428
  @param instances: list of L{objects.Instance}
1429
  @param instances: additional instances from which to obtain parameters
1430
  @rtype: list of (origin, hypervisor, parameters)
1431
  @return: a list with all parameters found, indicating the hypervisor they
1432
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1433

1434
  """
1435
  hvp_data = []
1436

    
1437
  for hv_name in cluster.enabled_hypervisors:
1438
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1439

    
1440
  for os_name, os_hvp in cluster.os_hvp.items():
1441
    for hv_name, hv_params in os_hvp.items():
1442
      if hv_params:
1443
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1444
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1445

    
1446
  # TODO: collapse identical parameter values in a single one
1447
  for instance in instances:
1448
    if instance.hvparams:
1449
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1450
                       cluster.FillHV(instance)))
1451

    
1452
  return hvp_data
1453

    
1454

    
1455
class _VerifyErrors(object):
1456
  """Mix-in for cluster/group verify LUs.
1457

1458
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1459
  self.op and self._feedback_fn to be available.)
1460

1461
  """
1462
  TCLUSTER = "cluster"
1463
  TNODE = "node"
1464
  TINSTANCE = "instance"
1465

    
1466
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1467
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1468
  ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK")
1469
  ECLUSTERDANGLINGNODES = (TNODE, "ECLUSTERDANGLINGNODES")
1470
  ECLUSTERDANGLINGINST = (TNODE, "ECLUSTERDANGLINGINST")
1471
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1472
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1473
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1474
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1475
  EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
1476
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1477
  EINSTANCESPLITGROUPS = (TINSTANCE, "EINSTANCESPLITGROUPS")
1478
  ENODEDRBD = (TNODE, "ENODEDRBD")
1479
  ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
1480
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1481
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1482
  ENODEHV = (TNODE, "ENODEHV")
1483
  ENODELVM = (TNODE, "ENODELVM")
1484
  ENODEN1 = (TNODE, "ENODEN1")
1485
  ENODENET = (TNODE, "ENODENET")
1486
  ENODEOS = (TNODE, "ENODEOS")
1487
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1488
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1489
  ENODERPC = (TNODE, "ENODERPC")
1490
  ENODESSH = (TNODE, "ENODESSH")
1491
  ENODEVERSION = (TNODE, "ENODEVERSION")
1492
  ENODESETUP = (TNODE, "ENODESETUP")
1493
  ENODETIME = (TNODE, "ENODETIME")
1494
  ENODEOOBPATH = (TNODE, "ENODEOOBPATH")
1495

    
1496
  ETYPE_FIELD = "code"
1497
  ETYPE_ERROR = "ERROR"
1498
  ETYPE_WARNING = "WARNING"
1499

    
1500
  def _Error(self, ecode, item, msg, *args, **kwargs):
1501
    """Format an error message.
1502

1503
    Based on the opcode's error_codes parameter, either format a
1504
    parseable error code, or a simpler error string.
1505

1506
    This must be called only from Exec and functions called from Exec.
1507

1508
    """
1509
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1510
    itype, etxt = ecode
1511
    # first complete the msg
1512
    if args:
1513
      msg = msg % args
1514
    # then format the whole message
1515
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1516
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1517
    else:
1518
      if item:
1519
        item = " " + item
1520
      else:
1521
        item = ""
1522
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1523
    # and finally report it via the feedback_fn
1524
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1525

    
1526
  def _ErrorIf(self, cond, *args, **kwargs):
1527
    """Log an error message if the passed condition is True.
1528

1529
    """
1530
    cond = (bool(cond)
1531
            or self.op.debug_simulate_errors) # pylint: disable=E1101
1532
    if cond:
1533
      self._Error(*args, **kwargs)
1534
    # do not mark the operation as failed for WARN cases only
1535
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1536
      self.bad = self.bad or cond
1537

    
1538

    
1539
class LUClusterVerify(NoHooksLU):
1540
  """Submits all jobs necessary to verify the cluster.
1541

1542
  """
1543
  REQ_BGL = False
1544

    
1545
  def ExpandNames(self):
1546
    self.needed_locks = {}
1547

    
1548
  def Exec(self, feedback_fn):
1549
    jobs = []
1550

    
1551
    if self.op.group_name:
1552
      groups = [self.op.group_name]
1553
      depends_fn = lambda: None
1554
    else:
1555
      groups = self.cfg.GetNodeGroupList()
1556

    
1557
      # Verify global configuration
1558
      jobs.append([opcodes.OpClusterVerifyConfig()])
1559

    
1560
      # Always depend on global verification
1561
      depends_fn = lambda: [(-len(jobs), [])]
1562

    
1563
    jobs.extend([opcodes.OpClusterVerifyGroup(group_name=group,
1564
                                              depends=depends_fn())]
1565
                for group in groups)
1566

    
1567
    # Fix up all parameters
1568
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1569
      op.debug_simulate_errors = self.op.debug_simulate_errors
1570
      op.verbose = self.op.verbose
1571
      op.error_codes = self.op.error_codes
1572
      try:
1573
        op.skip_checks = self.op.skip_checks
1574
      except AttributeError:
1575
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1576

    
1577
    return ResultWithJobs(jobs)
1578

    
1579

    
1580
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1581
  """Verifies the cluster config.
1582

1583
  """
1584
  REQ_BGL = True
1585

    
1586
  def _VerifyHVP(self, hvp_data):
1587
    """Verifies locally the syntax of the hypervisor parameters.
1588

1589
    """
1590
    for item, hv_name, hv_params in hvp_data:
1591
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1592
             (item, hv_name))
1593
      try:
1594
        hv_class = hypervisor.GetHypervisor(hv_name)
1595
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1596
        hv_class.CheckParameterSyntax(hv_params)
1597
      except errors.GenericError, err:
1598
        self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
1599

    
1600
  def ExpandNames(self):
1601
    # Information can be safely retrieved as the BGL is acquired in exclusive
1602
    # mode
1603
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
1604
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1605
    self.all_node_info = self.cfg.GetAllNodesInfo()
1606
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1607
    self.needed_locks = {}
1608

    
1609
  def Exec(self, feedback_fn):
1610
    """Verify integrity of cluster, performing various test on nodes.
1611

1612
    """
1613
    self.bad = False
1614
    self._feedback_fn = feedback_fn
1615

    
1616
    feedback_fn("* Verifying cluster config")
1617

    
1618
    for msg in self.cfg.VerifyConfig():
1619
      self._ErrorIf(True, self.ECLUSTERCFG, None, msg)
1620

    
1621
    feedback_fn("* Verifying cluster certificate files")
1622

    
1623
    for cert_filename in constants.ALL_CERT_FILES:
1624
      (errcode, msg) = _VerifyCertificate(cert_filename)
1625
      self._ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
1626

    
1627
    feedback_fn("* Verifying hypervisor parameters")
1628

    
1629
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1630
                                                self.all_inst_info.values()))
1631

    
1632
    feedback_fn("* Verifying all nodes belong to an existing group")
1633

    
1634
    # We do this verification here because, should this bogus circumstance
1635
    # occur, it would never be caught by VerifyGroup, which only acts on
1636
    # nodes/instances reachable from existing node groups.
1637

    
1638
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1639
                         if node.group not in self.all_group_info)
1640

    
1641
    dangling_instances = {}
1642
    no_node_instances = []
1643

    
1644
    for inst in self.all_inst_info.values():
1645
      if inst.primary_node in dangling_nodes:
1646
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1647
      elif inst.primary_node not in self.all_node_info:
1648
        no_node_instances.append(inst.name)
1649

    
1650
    pretty_dangling = [
1651
        "%s (%s)" %
1652
        (node.name,
1653
         utils.CommaJoin(dangling_instances.get(node.name,
1654
                                                ["no instances"])))
1655
        for node in dangling_nodes]
1656

    
1657
    self._ErrorIf(bool(dangling_nodes), self.ECLUSTERDANGLINGNODES, None,
1658
                  "the following nodes (and their instances) belong to a non"
1659
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1660

    
1661
    self._ErrorIf(bool(no_node_instances), self.ECLUSTERDANGLINGINST, None,
1662
                  "the following instances have a non-existing primary-node:"
1663
                  " %s", utils.CommaJoin(no_node_instances))
1664

    
1665
    return not self.bad
1666

    
1667

    
1668
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1669
  """Verifies the status of a node group.
1670

1671
  """
1672
  HPATH = "cluster-verify"
1673
  HTYPE = constants.HTYPE_CLUSTER
1674
  REQ_BGL = False
1675

    
1676
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1677

    
1678
  class NodeImage(object):
1679
    """A class representing the logical and physical status of a node.
1680

1681
    @type name: string
1682
    @ivar name: the node name to which this object refers
1683
    @ivar volumes: a structure as returned from
1684
        L{ganeti.backend.GetVolumeList} (runtime)
1685
    @ivar instances: a list of running instances (runtime)
1686
    @ivar pinst: list of configured primary instances (config)
1687
    @ivar sinst: list of configured secondary instances (config)
1688
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1689
        instances for which this node is secondary (config)
1690
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1691
    @ivar dfree: free disk, as reported by the node (runtime)
1692
    @ivar offline: the offline status (config)
1693
    @type rpc_fail: boolean
1694
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1695
        not whether the individual keys were correct) (runtime)
1696
    @type lvm_fail: boolean
1697
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1698
    @type hyp_fail: boolean
1699
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1700
    @type ghost: boolean
1701
    @ivar ghost: whether this is a known node or not (config)
1702
    @type os_fail: boolean
1703
    @ivar os_fail: whether the RPC call didn't return valid OS data
1704
    @type oslist: list
1705
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1706
    @type vm_capable: boolean
1707
    @ivar vm_capable: whether the node can host instances
1708

1709
    """
1710
    def __init__(self, offline=False, name=None, vm_capable=True):
1711
      self.name = name
1712
      self.volumes = {}
1713
      self.instances = []
1714
      self.pinst = []
1715
      self.sinst = []
1716
      self.sbp = {}
1717
      self.mfree = 0
1718
      self.dfree = 0
1719
      self.offline = offline
1720
      self.vm_capable = vm_capable
1721
      self.rpc_fail = False
1722
      self.lvm_fail = False
1723
      self.hyp_fail = False
1724
      self.ghost = False
1725
      self.os_fail = False
1726
      self.oslist = {}
1727

    
1728
  def ExpandNames(self):
1729
    # This raises errors.OpPrereqError on its own:
1730
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1731

    
1732
    # Get instances in node group; this is unsafe and needs verification later
1733
    inst_names = \
1734
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1735

    
1736
    self.needed_locks = {
1737
      locking.LEVEL_INSTANCE: inst_names,
1738
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1739
      locking.LEVEL_NODE: [],
1740
      }
1741

    
1742
    self.share_locks = _ShareAll()
1743

    
1744
  def DeclareLocks(self, level):
1745
    if level == locking.LEVEL_NODE:
1746
      # Get members of node group; this is unsafe and needs verification later
1747
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1748

    
1749
      all_inst_info = self.cfg.GetAllInstancesInfo()
1750

    
1751
      # In Exec(), we warn about mirrored instances that have primary and
1752
      # secondary living in separate node groups. To fully verify that
1753
      # volumes for these instances are healthy, we will need to do an
1754
      # extra call to their secondaries. We ensure here those nodes will
1755
      # be locked.
1756
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1757
        # Important: access only the instances whose lock is owned
1758
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1759
          nodes.update(all_inst_info[inst].secondary_nodes)
1760

    
1761
      self.needed_locks[locking.LEVEL_NODE] = nodes
1762

    
1763
  def CheckPrereq(self):
1764
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1765
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1766

    
1767
    group_nodes = set(self.group_info.members)
1768
    group_instances = \
1769
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1770

    
1771
    unlocked_nodes = \
1772
        group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1773

    
1774
    unlocked_instances = \
1775
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1776

    
1777
    if unlocked_nodes:
1778
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1779
                                 utils.CommaJoin(unlocked_nodes),
1780
                                 errors.ECODE_STATE)
1781

    
1782
    if unlocked_instances:
1783
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1784
                                 utils.CommaJoin(unlocked_instances),
1785
                                 errors.ECODE_STATE)
1786

    
1787
    self.all_node_info = self.cfg.GetAllNodesInfo()
1788
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1789

    
1790
    self.my_node_names = utils.NiceSort(group_nodes)
1791
    self.my_inst_names = utils.NiceSort(group_instances)
1792

    
1793
    self.my_node_info = dict((name, self.all_node_info[name])
1794
                             for name in self.my_node_names)
1795

    
1796
    self.my_inst_info = dict((name, self.all_inst_info[name])
1797
                             for name in self.my_inst_names)
1798

    
1799
    # We detect here the nodes that will need the extra RPC calls for verifying
1800
    # split LV volumes; they should be locked.
1801
    extra_lv_nodes = set()
1802

    
1803
    for inst in self.my_inst_info.values():
1804
      if inst.disk_template in constants.DTS_INT_MIRROR:
1805
        for nname in inst.all_nodes:
1806
          if self.all_node_info[nname].group != self.group_uuid:
1807
            extra_lv_nodes.add(nname)
1808

    
1809
    unlocked_lv_nodes = \
1810
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1811

    
1812
    if unlocked_lv_nodes:
1813
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1814
                                 utils.CommaJoin(unlocked_lv_nodes),
1815
                                 errors.ECODE_STATE)
1816
    self.extra_lv_nodes = list(extra_lv_nodes)
1817

    
1818
  def _VerifyNode(self, ninfo, nresult):
1819
    """Perform some basic validation on data returned from a node.
1820

1821
      - check the result data structure is well formed and has all the
1822
        mandatory fields
1823
      - check ganeti version
1824

1825
    @type ninfo: L{objects.Node}
1826
    @param ninfo: the node to check
1827
    @param nresult: the results from the node
1828
    @rtype: boolean
1829
    @return: whether overall this call was successful (and we can expect
1830
         reasonable values in the respose)
1831

1832
    """
1833
    node = ninfo.name
1834
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1835

    
1836
    # main result, nresult should be a non-empty dict
1837
    test = not nresult or not isinstance(nresult, dict)
1838
    _ErrorIf(test, self.ENODERPC, node,
1839
                  "unable to verify node: no data returned")
1840
    if test:
1841
      return False
1842

    
1843
    # compares ganeti version
1844
    local_version = constants.PROTOCOL_VERSION
1845
    remote_version = nresult.get("version", None)
1846
    test = not (remote_version and
1847
                isinstance(remote_version, (list, tuple)) and
1848
                len(remote_version) == 2)
1849
    _ErrorIf(test, self.ENODERPC, node,
1850
             "connection to node returned invalid data")
1851
    if test:
1852
      return False
1853

    
1854
    test = local_version != remote_version[0]
1855
    _ErrorIf(test, self.ENODEVERSION, node,
1856
             "incompatible protocol versions: master %s,"
1857
             " node %s", local_version, remote_version[0])
1858
    if test:
1859
      return False
1860

    
1861
    # node seems compatible, we can actually try to look into its results
1862

    
1863
    # full package version
1864
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1865
                  self.ENODEVERSION, node,
1866
                  "software version mismatch: master %s, node %s",
1867
                  constants.RELEASE_VERSION, remote_version[1],
1868
                  code=self.ETYPE_WARNING)
1869

    
1870
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1871
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1872
      for hv_name, hv_result in hyp_result.iteritems():
1873
        test = hv_result is not None
1874
        _ErrorIf(test, self.ENODEHV, node,
1875
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1876

    
1877
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1878
    if ninfo.vm_capable and isinstance(hvp_result, list):
1879
      for item, hv_name, hv_result in hvp_result:
1880
        _ErrorIf(True, self.ENODEHV, node,
1881
                 "hypervisor %s parameter verify failure (source %s): %s",
1882
                 hv_name, item, hv_result)
1883

    
1884
    test = nresult.get(constants.NV_NODESETUP,
1885
                       ["Missing NODESETUP results"])
1886
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1887
             "; ".join(test))
1888

    
1889
    return True
1890

    
1891
  def _VerifyNodeTime(self, ninfo, nresult,
1892
                      nvinfo_starttime, nvinfo_endtime):
1893
    """Check the node time.
1894

1895
    @type ninfo: L{objects.Node}
1896
    @param ninfo: the node to check
1897
    @param nresult: the remote results for the node
1898
    @param nvinfo_starttime: the start time of the RPC call
1899
    @param nvinfo_endtime: the end time of the RPC call
1900

1901
    """
1902
    node = ninfo.name
1903
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1904

    
1905
    ntime = nresult.get(constants.NV_TIME, None)
1906
    try:
1907
      ntime_merged = utils.MergeTime(ntime)
1908
    except (ValueError, TypeError):
1909
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1910
      return
1911

    
1912
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1913
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1914
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1915
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1916
    else:
1917
      ntime_diff = None
1918

    
1919
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1920
             "Node time diverges by at least %s from master node time",
1921
             ntime_diff)
1922

    
1923
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1924
    """Check the node LVM results.
1925

1926
    @type ninfo: L{objects.Node}
1927
    @param ninfo: the node to check
1928
    @param nresult: the remote results for the node
1929
    @param vg_name: the configured VG name
1930

1931
    """
1932
    if vg_name is None:
1933
      return
1934

    
1935
    node = ninfo.name
1936
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1937

    
1938
    # checks vg existence and size > 20G
1939
    vglist = nresult.get(constants.NV_VGLIST, None)
1940
    test = not vglist
1941
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1942
    if not test:
1943
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1944
                                            constants.MIN_VG_SIZE)
1945
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1946

    
1947
    # check pv names
1948
    pvlist = nresult.get(constants.NV_PVLIST, None)
1949
    test = pvlist is None
1950
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1951
    if not test:
1952
      # check that ':' is not present in PV names, since it's a
1953
      # special character for lvcreate (denotes the range of PEs to
1954
      # use on the PV)
1955
      for _, pvname, owner_vg in pvlist:
1956
        test = ":" in pvname
1957
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1958
                 " '%s' of VG '%s'", pvname, owner_vg)
1959

    
1960
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1961
    """Check the node bridges.
1962

1963
    @type ninfo: L{objects.Node}
1964
    @param ninfo: the node to check
1965
    @param nresult: the remote results for the node
1966
    @param bridges: the expected list of bridges
1967

1968
    """
1969
    if not bridges:
1970
      return
1971

    
1972
    node = ninfo.name
1973
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1974

    
1975
    missing = nresult.get(constants.NV_BRIDGES, None)
1976
    test = not isinstance(missing, list)
1977
    _ErrorIf(test, self.ENODENET, node,
1978
             "did not return valid bridge information")
1979
    if not test:
1980
      _ErrorIf(bool(missing), self.ENODENET, node, "missing bridges: %s" %
1981
               utils.CommaJoin(sorted(missing)))
1982

    
1983
  def _VerifyNodeNetwork(self, ninfo, nresult):
1984
    """Check the node network connectivity results.
1985

1986
    @type ninfo: L{objects.Node}
1987
    @param ninfo: the node to check
1988
    @param nresult: the remote results for the node
1989

1990
    """
1991
    node = ninfo.name
1992
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1993

    
1994
    test = constants.NV_NODELIST not in nresult
1995
    _ErrorIf(test, self.ENODESSH, node,
1996
             "node hasn't returned node ssh connectivity data")
1997
    if not test:
1998
      if nresult[constants.NV_NODELIST]:
1999
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2000
          _ErrorIf(True, self.ENODESSH, node,
2001
                   "ssh communication with node '%s': %s", a_node, a_msg)
2002

    
2003
    test = constants.NV_NODENETTEST not in nresult
2004
    _ErrorIf(test, self.ENODENET, node,
2005
             "node hasn't returned node tcp connectivity data")
2006
    if not test:
2007
      if nresult[constants.NV_NODENETTEST]:
2008
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2009
        for anode in nlist:
2010
          _ErrorIf(True, self.ENODENET, node,
2011
                   "tcp communication with node '%s': %s",
2012
                   anode, nresult[constants.NV_NODENETTEST][anode])
2013

    
2014
    test = constants.NV_MASTERIP not in nresult
2015
    _ErrorIf(test, self.ENODENET, node,
2016
             "node hasn't returned node master IP reachability data")
2017
    if not test:
2018
      if not nresult[constants.NV_MASTERIP]:
2019
        if node == self.master_node:
2020
          msg = "the master node cannot reach the master IP (not configured?)"
2021
        else:
2022
          msg = "cannot reach the master IP"
2023
        _ErrorIf(True, self.ENODENET, node, msg)
2024

    
2025
  def _VerifyInstance(self, instance, instanceconfig, node_image,
2026
                      diskstatus):
2027
    """Verify an instance.
2028

2029
    This function checks to see if the required block devices are
2030
    available on the instance's node.
2031

2032
    """
2033
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2034
    node_current = instanceconfig.primary_node
2035

    
2036
    node_vol_should = {}
2037
    instanceconfig.MapLVsByNode(node_vol_should)
2038

    
2039
    for node in node_vol_should:
2040
      n_img = node_image[node]
2041
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2042
        # ignore missing volumes on offline or broken nodes
2043
        continue
2044
      for volume in node_vol_should[node]:
2045
        test = volume not in n_img.volumes
2046
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
2047
                 "volume %s missing on node %s", volume, node)
2048

    
2049
    if instanceconfig.admin_up:
2050
      pri_img = node_image[node_current]
2051
      test = instance not in pri_img.instances and not pri_img.offline
2052
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
2053
               "instance not running on its primary node %s",
2054
               node_current)
2055

    
2056
    diskdata = [(nname, success, status, idx)
2057
                for (nname, disks) in diskstatus.items()
2058
                for idx, (success, status) in enumerate(disks)]
2059

    
2060
    for nname, success, bdev_status, idx in diskdata:
2061
      # the 'ghost node' construction in Exec() ensures that we have a
2062
      # node here
2063
      snode = node_image[nname]
2064
      bad_snode = snode.ghost or snode.offline
2065
      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
2066
               self.EINSTANCEFAULTYDISK, instance,
2067
               "couldn't retrieve status for disk/%s on %s: %s",
2068
               idx, nname, bdev_status)
2069
      _ErrorIf((instanceconfig.admin_up and success and
2070
                bdev_status.ldisk_status == constants.LDS_FAULTY),
2071
               self.EINSTANCEFAULTYDISK, instance,
2072
               "disk/%s on %s is faulty", idx, nname)
2073

    
2074
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2075
    """Verify if there are any unknown volumes in the cluster.
2076

2077
    The .os, .swap and backup volumes are ignored. All other volumes are
2078
    reported as unknown.
2079

2080
    @type reserved: L{ganeti.utils.FieldSet}
2081
    @param reserved: a FieldSet of reserved volume names
2082

2083
    """
2084
    for node, n_img in node_image.items():
2085
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2086
          self.all_node_info[node].group != self.group_uuid):
2087
        # skip non-healthy nodes
2088
        continue
2089
      for volume in n_img.volumes:
2090
        test = ((node not in node_vol_should or
2091
                volume not in node_vol_should[node]) and
2092
                not reserved.Matches(volume))
2093
        self._ErrorIf(test, self.ENODEORPHANLV, node,
2094
                      "volume %s is unknown", volume)
2095

    
2096
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
2097
    """Verify N+1 Memory Resilience.
2098

2099
    Check that if one single node dies we can still start all the
2100
    instances it was primary for.
2101

2102
    """
2103
    cluster_info = self.cfg.GetClusterInfo()
2104
    for node, n_img in node_image.items():
2105
      # This code checks that every node which is now listed as
2106
      # secondary has enough memory to host all instances it is
2107
      # supposed to should a single other node in the cluster fail.
2108
      # FIXME: not ready for failover to an arbitrary node
2109
      # FIXME: does not support file-backed instances
2110
      # WARNING: we currently take into account down instances as well
2111
      # as up ones, considering that even if they're down someone
2112
      # might want to start them even in the event of a node failure.
2113
      if n_img.offline or self.all_node_info[node].group != self.group_uuid:
2114
        # we're skipping nodes marked offline and nodes in other groups from
2115
        # the N+1 warning, since most likely we don't have good memory
2116
        # infromation from them; we already list instances living on such
2117
        # nodes, and that's enough warning
2118
        continue
2119
      for prinode, instances in n_img.sbp.items():
2120
        needed_mem = 0
2121
        for instance in instances:
2122
          bep = cluster_info.FillBE(instance_cfg[instance])
2123
          if bep[constants.BE_AUTO_BALANCE]:
2124
            needed_mem += bep[constants.BE_MEMORY]
2125
        test = n_img.mfree < needed_mem
2126
        self._ErrorIf(test, self.ENODEN1, node,
2127
                      "not enough memory to accomodate instance failovers"
2128
                      " should node %s fail (%dMiB needed, %dMiB available)",
2129
                      prinode, needed_mem, n_img.mfree)
2130

    
2131
  @classmethod
2132
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
2133
                   (files_all, files_opt, files_mc, files_vm)):
2134
    """Verifies file checksums collected from all nodes.
2135

2136
    @param errorif: Callback for reporting errors
2137
    @param nodeinfo: List of L{objects.Node} objects
2138
    @param master_node: Name of master node
2139
    @param all_nvinfo: RPC results
2140

2141
    """
2142
    # Define functions determining which nodes to consider for a file
2143
    files2nodefn = [
2144
      (files_all, None),
2145
      (files_mc, lambda node: (node.master_candidate or
2146
                               node.name == master_node)),
2147
      (files_vm, lambda node: node.vm_capable),
2148
      ]
2149

    
2150
    # Build mapping from filename to list of nodes which should have the file
2151
    nodefiles = {}
2152
    for (files, fn) in files2nodefn:
2153
      if fn is None:
2154
        filenodes = nodeinfo
2155
      else:
2156
        filenodes = filter(fn, nodeinfo)
2157
      nodefiles.update((filename,
2158
                        frozenset(map(operator.attrgetter("name"), filenodes)))
2159
                       for filename in files)
2160

    
2161
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2162

    
2163
    fileinfo = dict((filename, {}) for filename in nodefiles)
2164
    ignore_nodes = set()
2165

    
2166
    for node in nodeinfo:
2167
      if node.offline:
2168
        ignore_nodes.add(node.name)
2169
        continue
2170

    
2171
      nresult = all_nvinfo[node.name]
2172

    
2173
      if nresult.fail_msg or not nresult.payload:
2174
        node_files = None
2175
      else:
2176
        node_files = nresult.payload.get(constants.NV_FILELIST, None)
2177

    
2178
      test = not (node_files and isinstance(node_files, dict))
2179
      errorif(test, cls.ENODEFILECHECK, node.name,
2180
              "Node did not return file checksum data")
2181
      if test:
2182
        ignore_nodes.add(node.name)
2183
        continue
2184

    
2185
      # Build per-checksum mapping from filename to nodes having it
2186
      for (filename, checksum) in node_files.items():
2187
        assert filename in nodefiles
2188
        fileinfo[filename].setdefault(checksum, set()).add(node.name)
2189

    
2190
    for (filename, checksums) in fileinfo.items():
2191
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2192

    
2193
      # Nodes having the file
2194
      with_file = frozenset(node_name
2195
                            for nodes in fileinfo[filename].values()
2196
                            for node_name in nodes) - ignore_nodes
2197

    
2198
      expected_nodes = nodefiles[filename] - ignore_nodes
2199

    
2200
      # Nodes missing file
2201
      missing_file = expected_nodes - with_file
2202

    
2203
      if filename in files_opt:
2204
        # All or no nodes
2205
        errorif(missing_file and missing_file != expected_nodes,
2206
                cls.ECLUSTERFILECHECK, None,
2207
                "File %s is optional, but it must exist on all or no"
2208
                " nodes (not found on %s)",
2209
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2210
      else:
2211
        # Non-optional files
2212
        errorif(missing_file, cls.ECLUSTERFILECHECK, None,
2213
                "File %s is missing from node(s) %s", filename,
2214
                utils.CommaJoin(utils.NiceSort(missing_file)))
2215

    
2216
        # Warn if a node has a file it shouldn't
2217
        unexpected = with_file - expected_nodes
2218
        errorif(unexpected,
2219
                cls.ECLUSTERFILECHECK, None,
2220
                "File %s should not exist on node(s) %s",
2221
                filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2222

    
2223
      # See if there are multiple versions of the file
2224
      test = len(checksums) > 1
2225
      if test:
2226
        variants = ["variant %s on %s" %
2227
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2228
                    for (idx, (checksum, nodes)) in
2229
                      enumerate(sorted(checksums.items()))]
2230
      else:
2231
        variants = []
2232

    
2233
      errorif(test, cls.ECLUSTERFILECHECK, None,
2234
              "File %s found with %s different checksums (%s)",
2235
              filename, len(checksums), "; ".join(variants))
2236

    
2237
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2238
                      drbd_map):
2239
    """Verifies and the node DRBD status.
2240

2241
    @type ninfo: L{objects.Node}
2242
    @param ninfo: the node to check
2243
    @param nresult: the remote results for the node
2244
    @param instanceinfo: the dict of instances
2245
    @param drbd_helper: the configured DRBD usermode helper
2246
    @param drbd_map: the DRBD map as returned by
2247
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2248

2249
    """
2250
    node = ninfo.name
2251
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2252

    
2253
    if drbd_helper:
2254
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2255
      test = (helper_result == None)
2256
      _ErrorIf(test, self.ENODEDRBDHELPER, node,
2257
               "no drbd usermode helper returned")
2258
      if helper_result:
2259
        status, payload = helper_result
2260
        test = not status
2261
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
2262
                 "drbd usermode helper check unsuccessful: %s", payload)
2263
        test = status and (payload != drbd_helper)
2264
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
2265
                 "wrong drbd usermode helper: %s", payload)
2266

    
2267
    # compute the DRBD minors
2268
    node_drbd = {}
2269
    for minor, instance in drbd_map[node].items():
2270
      test = instance not in instanceinfo
2271
      _ErrorIf(test, self.ECLUSTERCFG, None,
2272
               "ghost instance '%s' in temporary DRBD map", instance)
2273
        # ghost instance should not be running, but otherwise we
2274
        # don't give double warnings (both ghost instance and
2275
        # unallocated minor in use)
2276
      if test:
2277
        node_drbd[minor] = (instance, False)
2278
      else:
2279
        instance = instanceinfo[instance]
2280
        node_drbd[minor] = (instance.name, instance.admin_up)
2281

    
2282
    # and now check them
2283
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2284
    test = not isinstance(used_minors, (tuple, list))
2285
    _ErrorIf(test, self.ENODEDRBD, node,
2286
             "cannot parse drbd status file: %s", str(used_minors))
2287
    if test:
2288
      # we cannot check drbd status
2289
      return
2290

    
2291
    for minor, (iname, must_exist) in node_drbd.items():
2292
      test = minor not in used_minors and must_exist
2293
      _ErrorIf(test, self.ENODEDRBD, node,
2294
               "drbd minor %d of instance %s is not active", minor, iname)
2295
    for minor in used_minors:
2296
      test = minor not in node_drbd
2297
      _ErrorIf(test, self.ENODEDRBD, node,
2298
               "unallocated drbd minor %d is in use", minor)
2299

    
2300
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2301
    """Builds the node OS structures.
2302

2303
    @type ninfo: L{objects.Node}
2304
    @param ninfo: the node to check
2305
    @param nresult: the remote results for the node
2306
    @param nimg: the node image object
2307

2308
    """
2309
    node = ninfo.name
2310
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2311

    
2312
    remote_os = nresult.get(constants.NV_OSLIST, None)
2313
    test = (not isinstance(remote_os, list) or
2314
            not compat.all(isinstance(v, list) and len(v) == 7
2315
                           for v in remote_os))
2316

    
2317
    _ErrorIf(test, self.ENODEOS, node,
2318
             "node hasn't returned valid OS data")
2319

    
2320
    nimg.os_fail = test
2321

    
2322
    if test:
2323
      return
2324

    
2325
    os_dict = {}
2326

    
2327
    for (name, os_path, status, diagnose,
2328
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2329

    
2330
      if name not in os_dict:
2331
        os_dict[name] = []
2332

    
2333
      # parameters is a list of lists instead of list of tuples due to
2334
      # JSON lacking a real tuple type, fix it:
2335
      parameters = [tuple(v) for v in parameters]
2336
      os_dict[name].append((os_path, status, diagnose,
2337
                            set(variants), set(parameters), set(api_ver)))
2338

    
2339
    nimg.oslist = os_dict
2340

    
2341
  def _VerifyNodeOS(self, ninfo, nimg, base):
2342
    """Verifies the node OS list.
2343

2344
    @type ninfo: L{objects.Node}
2345
    @param ninfo: the node to check
2346
    @param nimg: the node image object
2347
    @param base: the 'template' node we match against (e.g. from the master)
2348

2349
    """
2350
    node = ninfo.name
2351
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2352

    
2353
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2354

    
2355
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2356
    for os_name, os_data in nimg.oslist.items():
2357
      assert os_data, "Empty OS status for OS %s?!" % os_name
2358
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2359
      _ErrorIf(not f_status, self.ENODEOS, node,
2360
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2361
      _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
2362
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2363
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2364
      # comparisons with the 'base' image
2365
      test = os_name not in base.oslist
2366
      _ErrorIf(test, self.ENODEOS, node,
2367
               "Extra OS %s not present on reference node (%s)",
2368
               os_name, base.name)
2369
      if test:
2370
        continue
2371
      assert base.oslist[os_name], "Base node has empty OS status?"
2372
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2373
      if not b_status:
2374
        # base OS is invalid, skipping
2375
        continue
2376
      for kind, a, b in [("API version", f_api, b_api),
2377
                         ("variants list", f_var, b_var),
2378
                         ("parameters", beautify_params(f_param),
2379
                          beautify_params(b_param))]:
2380
        _ErrorIf(a != b, self.ENODEOS, node,
2381
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2382
                 kind, os_name, base.name,
2383
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2384

    
2385
    # check any missing OSes
2386
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2387
    _ErrorIf(missing, self.ENODEOS, node,
2388
             "OSes present on reference node %s but missing on this node: %s",
2389
             base.name, utils.CommaJoin(missing))
2390

    
2391
  def _VerifyOob(self, ninfo, nresult):
2392
    """Verifies out of band functionality of a node.
2393

2394
    @type ninfo: L{objects.Node}
2395
    @param ninfo: the node to check
2396
    @param nresult: the remote results for the node
2397

2398
    """
2399
    node = ninfo.name
2400
    # We just have to verify the paths on master and/or master candidates
2401
    # as the oob helper is invoked on the master
2402
    if ((ninfo.master_candidate or ninfo.master_capable) and
2403
        constants.NV_OOB_PATHS in nresult):
2404
      for path_result in nresult[constants.NV_OOB_PATHS]:
2405
        self._ErrorIf(path_result, self.ENODEOOBPATH, node, path_result)
2406

    
2407
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2408
    """Verifies and updates the node volume data.
2409

2410
    This function will update a L{NodeImage}'s internal structures
2411
    with data from the remote call.
2412

2413
    @type ninfo: L{objects.Node}
2414
    @param ninfo: the node to check
2415
    @param nresult: the remote results for the node
2416
    @param nimg: the node image object
2417
    @param vg_name: the configured VG name
2418

2419
    """
2420
    node = ninfo.name
2421
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2422

    
2423
    nimg.lvm_fail = True
2424
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2425
    if vg_name is None:
2426
      pass
2427
    elif isinstance(lvdata, basestring):
2428
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
2429
               utils.SafeEncode(lvdata))
2430
    elif not isinstance(lvdata, dict):
2431
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
2432
    else:
2433
      nimg.volumes = lvdata
2434
      nimg.lvm_fail = False
2435

    
2436
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2437
    """Verifies and updates the node instance list.
2438

2439
    If the listing was successful, then updates this node's instance
2440
    list. Otherwise, it marks the RPC call as failed for the instance
2441
    list key.
2442

2443
    @type ninfo: L{objects.Node}
2444
    @param ninfo: the node to check
2445
    @param nresult: the remote results for the node
2446
    @param nimg: the node image object
2447

2448
    """
2449
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2450
    test = not isinstance(idata, list)
2451
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
2452
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
2453
    if test:
2454
      nimg.hyp_fail = True
2455
    else:
2456
      nimg.instances = idata
2457

    
2458
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2459
    """Verifies and computes a node information map
2460

2461
    @type ninfo: L{objects.Node}
2462
    @param ninfo: the node to check
2463
    @param nresult: the remote results for the node
2464
    @param nimg: the node image object
2465
    @param vg_name: the configured VG name
2466

2467
    """
2468
    node = ninfo.name
2469
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2470

    
2471
    # try to read free memory (from the hypervisor)
2472
    hv_info = nresult.get(constants.NV_HVINFO, None)
2473
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2474
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
2475
    if not test:
2476
      try:
2477
        nimg.mfree = int(hv_info["memory_free"])
2478
      except (ValueError, TypeError):
2479
        _ErrorIf(True, self.ENODERPC, node,
2480
                 "node returned invalid nodeinfo, check hypervisor")
2481

    
2482
    # FIXME: devise a free space model for file based instances as well
2483
    if vg_name is not None:
2484
      test = (constants.NV_VGLIST not in nresult or
2485
              vg_name not in nresult[constants.NV_VGLIST])
2486
      _ErrorIf(test, self.ENODELVM, node,
2487
               "node didn't return data for the volume group '%s'"
2488
               " - it is either missing or broken", vg_name)
2489
      if not test:
2490
        try:
2491
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2492
        except (ValueError, TypeError):
2493
          _ErrorIf(True, self.ENODERPC, node,
2494
                   "node returned invalid LVM info, check LVM status")
2495

    
2496
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2497
    """Gets per-disk status information for all instances.
2498

2499
    @type nodelist: list of strings
2500
    @param nodelist: Node names
2501
    @type node_image: dict of (name, L{objects.Node})
2502
    @param node_image: Node objects
2503
    @type instanceinfo: dict of (name, L{objects.Instance})
2504
    @param instanceinfo: Instance objects
2505
    @rtype: {instance: {node: [(succes, payload)]}}
2506
    @return: a dictionary of per-instance dictionaries with nodes as
2507
        keys and disk information as values; the disk information is a
2508
        list of tuples (success, payload)
2509

2510
    """
2511
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2512

    
2513
    node_disks = {}
2514
    node_disks_devonly = {}
2515
    diskless_instances = set()
2516
    diskless = constants.DT_DISKLESS
2517

    
2518
    for nname in nodelist:
2519
      node_instances = list(itertools.chain(node_image[nname].pinst,
2520
                                            node_image[nname].sinst))
2521
      diskless_instances.update(inst for inst in node_instances
2522
                                if instanceinfo[inst].disk_template == diskless)
2523
      disks = [(inst, disk)
2524
               for inst in node_instances
2525
               for disk in instanceinfo[inst].disks]
2526

    
2527
      if not disks:
2528
        # No need to collect data
2529
        continue
2530

    
2531
      node_disks[nname] = disks
2532

    
2533
      # Creating copies as SetDiskID below will modify the objects and that can
2534
      # lead to incorrect data returned from nodes
2535
      devonly = [dev.Copy() for (_, dev) in disks]
2536

    
2537
      for dev in devonly:
2538
        self.cfg.SetDiskID(dev, nname)
2539

    
2540
      node_disks_devonly[nname] = devonly
2541

    
2542
    assert len(node_disks) == len(node_disks_devonly)
2543

    
2544
    # Collect data from all nodes with disks
2545
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2546
                                                          node_disks_devonly)
2547

    
2548
    assert len(result) == len(node_disks)
2549

    
2550
    instdisk = {}
2551

    
2552
    for (nname, nres) in result.items():
2553
      disks = node_disks[nname]
2554

    
2555
      if nres.offline:
2556
        # No data from this node
2557
        data = len(disks) * [(False, "node offline")]
2558
      else:
2559
        msg = nres.fail_msg
2560
        _ErrorIf(msg, self.ENODERPC, nname,
2561
                 "while getting disk information: %s", msg)
2562
        if msg:
2563
          # No data from this node
2564
          data = len(disks) * [(False, msg)]
2565
        else:
2566
          data = []
2567
          for idx, i in enumerate(nres.payload):
2568
            if isinstance(i, (tuple, list)) and len(i) == 2:
2569
              data.append(i)
2570
            else:
2571
              logging.warning("Invalid result from node %s, entry %d: %s",
2572
                              nname, idx, i)
2573
              data.append((False, "Invalid result from the remote node"))
2574

    
2575
      for ((inst, _), status) in zip(disks, data):
2576
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2577

    
2578
    # Add empty entries for diskless instances.
2579
    for inst in diskless_instances:
2580
      assert inst not in instdisk
2581
      instdisk[inst] = {}
2582

    
2583
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2584
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2585
                      compat.all(isinstance(s, (tuple, list)) and
2586
                                 len(s) == 2 for s in statuses)
2587
                      for inst, nnames in instdisk.items()
2588
                      for nname, statuses in nnames.items())
2589
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2590

    
2591
    return instdisk
2592

    
2593
  @staticmethod
2594
  def _SshNodeSelector(group_uuid, all_nodes):
2595
    """Create endless iterators for all potential SSH check hosts.
2596

2597
    """
2598
    nodes = [node for node in all_nodes
2599
             if (node.group != group_uuid and
2600
                 not node.offline)]
2601
    keyfunc = operator.attrgetter("group")
2602

    
2603
    return map(itertools.cycle,
2604
               [sorted(map(operator.attrgetter("name"), names))
2605
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2606
                                                  keyfunc)])
2607

    
2608
  @classmethod
2609
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2610
    """Choose which nodes should talk to which other nodes.
2611

2612
    We will make nodes contact all nodes in their group, and one node from
2613
    every other group.
2614

2615
    @warning: This algorithm has a known issue if one node group is much
2616
      smaller than others (e.g. just one node). In such a case all other
2617
      nodes will talk to the single node.
2618

2619
    """
2620
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2621
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2622

    
2623
    return (online_nodes,
2624
            dict((name, sorted([i.next() for i in sel]))
2625
                 for name in online_nodes))
2626

    
2627
  def BuildHooksEnv(self):
2628
    """Build hooks env.
2629

2630
    Cluster-Verify hooks just ran in the post phase and their failure makes
2631
    the output be logged in the verify output and the verification to fail.
2632

2633
    """
2634
    env = {
2635
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2636
      }
2637

    
2638
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2639
               for node in self.my_node_info.values())
2640

    
2641
    return env
2642

    
2643
  def BuildHooksNodes(self):
2644
    """Build hooks nodes.
2645

2646
    """
2647
    return ([], self.my_node_names)
2648

    
2649
  def Exec(self, feedback_fn):
2650
    """Verify integrity of the node group, performing various test on nodes.
2651

2652
    """
2653
    # This method has too many local variables. pylint: disable=R0914
2654
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2655

    
2656
    if not self.my_node_names:
2657
      # empty node group
2658
      feedback_fn("* Empty node group, skipping verification")
2659
      return True
2660

    
2661
    self.bad = False
2662
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2663
    verbose = self.op.verbose
2664
    self._feedback_fn = feedback_fn
2665

    
2666
    vg_name = self.cfg.GetVGName()
2667
    drbd_helper = self.cfg.GetDRBDHelper()
2668
    cluster = self.cfg.GetClusterInfo()
2669
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2670
    hypervisors = cluster.enabled_hypervisors
2671
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2672

    
2673
    i_non_redundant = [] # Non redundant instances
2674
    i_non_a_balanced = [] # Non auto-balanced instances
2675
    n_offline = 0 # Count of offline nodes
2676
    n_drained = 0 # Count of nodes being drained
2677
    node_vol_should = {}
2678

    
2679
    # FIXME: verify OS list
2680

    
2681
    # File verification
2682
    filemap = _ComputeAncillaryFiles(cluster, False)
2683

    
2684
    # do local checksums
2685
    master_node = self.master_node = self.cfg.GetMasterNode()
2686
    master_ip = self.cfg.GetMasterIP()
2687

    
2688
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2689

    
2690
    node_verify_param = {
2691
      constants.NV_FILELIST:
2692
        utils.UniqueSequence(filename
2693
                             for files in filemap
2694
                             for filename in files),
2695
      constants.NV_NODELIST:
2696
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2697
                                  self.all_node_info.values()),
2698
      constants.NV_HYPERVISOR: hypervisors,
2699
      constants.NV_HVPARAMS:
2700
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2701
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2702
                                 for node in node_data_list
2703
                                 if not node.offline],
2704
      constants.NV_INSTANCELIST: hypervisors,
2705
      constants.NV_VERSION: None,
2706
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2707
      constants.NV_NODESETUP: None,
2708
      constants.NV_TIME: None,
2709
      constants.NV_MASTERIP: (master_node, master_ip),
2710
      constants.NV_OSLIST: None,
2711
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2712
      }
2713

    
2714
    if vg_name is not None:
2715
      node_verify_param[constants.NV_VGLIST] = None
2716
      node_verify_param[constants.NV_LVLIST] = vg_name
2717
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2718
      node_verify_param[constants.NV_DRBDLIST] = None
2719

    
2720
    if drbd_helper:
2721
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2722

    
2723
    # bridge checks
2724
    # FIXME: this needs to be changed per node-group, not cluster-wide
2725
    bridges = set()
2726
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2727
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2728
      bridges.add(default_nicpp[constants.NIC_LINK])
2729
    for instance in self.my_inst_info.values():
2730
      for nic in instance.nics:
2731
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2732
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2733
          bridges.add(full_nic[constants.NIC_LINK])
2734

    
2735
    if bridges:
2736
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2737

    
2738
    # Build our expected cluster state
2739
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2740
                                                 name=node.name,
2741
                                                 vm_capable=node.vm_capable))
2742
                      for node in node_data_list)
2743

    
2744
    # Gather OOB paths
2745
    oob_paths = []
2746
    for node in self.all_node_info.values():
2747
      path = _SupportsOob(self.cfg, node)
2748
      if path and path not in oob_paths:
2749
        oob_paths.append(path)
2750

    
2751
    if oob_paths:
2752
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2753

    
2754
    for instance in self.my_inst_names:
2755
      inst_config = self.my_inst_info[instance]
2756

    
2757
      for nname in inst_config.all_nodes:
2758
        if nname not in node_image:
2759
          gnode = self.NodeImage(name=nname)
2760
          gnode.ghost = (nname not in self.all_node_info)
2761
          node_image[nname] = gnode
2762

    
2763
      inst_config.MapLVsByNode(node_vol_should)
2764

    
2765
      pnode = inst_config.primary_node
2766
      node_image[pnode].pinst.append(instance)
2767

    
2768
      for snode in inst_config.secondary_nodes:
2769
        nimg = node_image[snode]
2770
        nimg.sinst.append(instance)
2771
        if pnode not in nimg.sbp:
2772
          nimg.sbp[pnode] = []
2773
        nimg.sbp[pnode].append(instance)
2774

    
2775
    # At this point, we have the in-memory data structures complete,
2776
    # except for the runtime information, which we'll gather next
2777

    
2778
    # Due to the way our RPC system works, exact response times cannot be
2779
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2780
    # time before and after executing the request, we can at least have a time
2781
    # window.
2782
    nvinfo_starttime = time.time()
2783
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2784
                                           node_verify_param,
2785
                                           self.cfg.GetClusterName())
2786
    nvinfo_endtime = time.time()
2787

    
2788
    if self.extra_lv_nodes and vg_name is not None:
2789
      extra_lv_nvinfo = \
2790
          self.rpc.call_node_verify(self.extra_lv_nodes,
2791
                                    {constants.NV_LVLIST: vg_name},
2792
                                    self.cfg.GetClusterName())
2793
    else:
2794
      extra_lv_nvinfo = {}
2795

    
2796
    all_drbd_map = self.cfg.ComputeDRBDMap()
2797

    
2798
    feedback_fn("* Gathering disk information (%s nodes)" %
2799
                len(self.my_node_names))
2800
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2801
                                     self.my_inst_info)
2802

    
2803
    feedback_fn("* Verifying configuration file consistency")
2804

    
2805
    # If not all nodes are being checked, we need to make sure the master node
2806
    # and a non-checked vm_capable node are in the list.
2807
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2808
    if absent_nodes:
2809
      vf_nvinfo = all_nvinfo.copy()
2810
      vf_node_info = list(self.my_node_info.values())
2811
      additional_nodes = []
2812
      if master_node not in self.my_node_info:
2813
        additional_nodes.append(master_node)
2814
        vf_node_info.append(self.all_node_info[master_node])
2815
      # Add the first vm_capable node we find which is not included
2816
      for node in absent_nodes:
2817
        nodeinfo = self.all_node_info[node]
2818
        if nodeinfo.vm_capable and not nodeinfo.offline:
2819
          additional_nodes.append(node)
2820
          vf_node_info.append(self.all_node_info[node])
2821
          break
2822
      key = constants.NV_FILELIST
2823
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2824
                                                 {key: node_verify_param[key]},
2825
                                                 self.cfg.GetClusterName()))
2826
    else:
2827
      vf_nvinfo = all_nvinfo
2828
      vf_node_info = self.my_node_info.values()
2829

    
2830
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2831

    
2832
    feedback_fn("* Verifying node status")
2833

    
2834
    refos_img = None
2835

    
2836
    for node_i in node_data_list:
2837
      node = node_i.name
2838
      nimg = node_image[node]
2839

    
2840
      if node_i.offline:
2841
        if verbose:
2842
          feedback_fn("* Skipping offline node %s" % (node,))
2843
        n_offline += 1
2844
        continue
2845

    
2846
      if node == master_node:
2847
        ntype = "master"
2848
      elif node_i.master_candidate:
2849
        ntype = "master candidate"
2850
      elif node_i.drained:
2851
        ntype = "drained"
2852
        n_drained += 1
2853
      else:
2854
        ntype = "regular"
2855
      if verbose:
2856
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2857

    
2858
      msg = all_nvinfo[node].fail_msg
2859
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
2860
      if msg:
2861
        nimg.rpc_fail = True
2862
        continue
2863

    
2864
      nresult = all_nvinfo[node].payload
2865

    
2866
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2867
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2868
      self._VerifyNodeNetwork(node_i, nresult)
2869
      self._VerifyOob(node_i, nresult)
2870

    
2871
      if nimg.vm_capable:
2872
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2873
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2874
                             all_drbd_map)
2875

    
2876
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2877
        self._UpdateNodeInstances(node_i, nresult, nimg)
2878
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2879
        self._UpdateNodeOS(node_i, nresult, nimg)
2880

    
2881
        if not nimg.os_fail:
2882
          if refos_img is None:
2883
            refos_img = nimg
2884
          self._VerifyNodeOS(node_i, nimg, refos_img)
2885
        self._VerifyNodeBridges(node_i, nresult, bridges)
2886

    
2887
        # Check whether all running instancies are primary for the node. (This
2888
        # can no longer be done from _VerifyInstance below, since some of the
2889
        # wrong instances could be from other node groups.)
2890
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2891

    
2892
        for inst in non_primary_inst:
2893
          test = inst in self.all_inst_info
2894
          _ErrorIf(test, self.EINSTANCEWRONGNODE, inst,
2895
                   "instance should not run on node %s", node_i.name)
2896
          _ErrorIf(not test, self.ENODEORPHANINSTANCE, node_i.name,
2897
                   "node is running unknown instance %s", inst)
2898

    
2899
    for node, result in extra_lv_nvinfo.items():
2900
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2901
                              node_image[node], vg_name)
2902

    
2903
    feedback_fn("* Verifying instance status")
2904
    for instance in self.my_inst_names:
2905
      if verbose:
2906
        feedback_fn("* Verifying instance %s" % instance)
2907
      inst_config = self.my_inst_info[instance]
2908
      self._VerifyInstance(instance, inst_config, node_image,
2909
                           instdisk[instance])
2910
      inst_nodes_offline = []
2911

    
2912
      pnode = inst_config.primary_node
2913
      pnode_img = node_image[pnode]
2914
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2915
               self.ENODERPC, pnode, "instance %s, connection to"
2916
               " primary node failed", instance)
2917

    
2918
      _ErrorIf(inst_config.admin_up and pnode_img.offline,
2919
               self.EINSTANCEBADNODE, instance,
2920
               "instance is marked as running and lives on offline node %s",
2921
               inst_config.primary_node)
2922

    
2923
      # If the instance is non-redundant we cannot survive losing its primary
2924
      # node, so we are not N+1 compliant. On the other hand we have no disk
2925
      # templates with more than one secondary so that situation is not well
2926
      # supported either.
2927
      # FIXME: does not support file-backed instances
2928
      if not inst_config.secondary_nodes:
2929
        i_non_redundant.append(instance)
2930

    
2931
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
2932
               instance, "instance has multiple secondary nodes: %s",
2933
               utils.CommaJoin(inst_config.secondary_nodes),
2934
               code=self.ETYPE_WARNING)
2935

    
2936
      if inst_config.disk_template in constants.DTS_INT_MIRROR:
2937
        pnode = inst_config.primary_node
2938
        instance_nodes = utils.NiceSort(inst_config.all_nodes)
2939
        instance_groups = {}
2940

    
2941
        for node in instance_nodes:
2942
          instance_groups.setdefault(self.all_node_info[node].group,
2943
                                     []).append(node)
2944

    
2945
        pretty_list = [
2946
          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
2947
          # Sort so that we always list the primary node first.
2948
          for group, nodes in sorted(instance_groups.items(),
2949
                                     key=lambda (_, nodes): pnode in nodes,
2950
                                     reverse=True)]
2951

    
2952
        self._ErrorIf(len(instance_groups) > 1, self.EINSTANCESPLITGROUPS,
2953
                      instance, "instance has primary and secondary nodes in"
2954
                      " different groups: %s", utils.CommaJoin(pretty_list),
2955
                      code=self.ETYPE_WARNING)
2956

    
2957
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2958
        i_non_a_balanced.append(instance)
2959

    
2960
      for snode in inst_config.secondary_nodes:
2961
        s_img = node_image[snode]
2962
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
2963
                 "instance %s, connection to secondary node failed", instance)
2964

    
2965
        if s_img.offline:
2966
          inst_nodes_offline.append(snode)
2967

    
2968
      # warn that the instance lives on offline nodes
2969
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
2970
               "instance has offline secondary node(s) %s",
2971
               utils.CommaJoin(inst_nodes_offline))
2972
      # ... or ghost/non-vm_capable nodes
2973
      for node in inst_config.all_nodes:
2974
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
2975
                 "instance lives on ghost node %s", node)
2976
        _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE,
2977
                 instance, "instance lives on non-vm_capable node %s", node)
2978

    
2979
    feedback_fn("* Verifying orphan volumes")
2980
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2981

    
2982
    # We will get spurious "unknown volume" warnings if any node of this group
2983
    # is secondary for an instance whose primary is in another group. To avoid
2984
    # them, we find these instances and add their volumes to node_vol_should.
2985
    for inst in self.all_inst_info.values():
2986
      for secondary in inst.secondary_nodes:
2987
        if (secondary in self.my_node_info
2988
            and inst.name not in self.my_inst_info):
2989
          inst.MapLVsByNode(node_vol_should)
2990
          break
2991

    
2992
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2993

    
2994
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2995
      feedback_fn("* Verifying N+1 Memory redundancy")
2996
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2997

    
2998
    feedback_fn("* Other Notes")
2999
    if i_non_redundant:
3000
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3001
                  % len(i_non_redundant))
3002

    
3003
    if i_non_a_balanced:
3004
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3005
                  % len(i_non_a_balanced))
3006

    
3007
    if n_offline:
3008
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3009

    
3010
    if n_drained:
3011
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3012

    
3013
    return not self.bad
3014

    
3015
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3016
    """Analyze the post-hooks' result
3017

3018
    This method analyses the hook result, handles it, and sends some
3019
    nicely-formatted feedback back to the user.
3020

3021
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3022
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3023
    @param hooks_results: the results of the multi-node hooks rpc call
3024
    @param feedback_fn: function used send feedback back to the caller
3025
    @param lu_result: previous Exec result
3026
    @return: the new Exec result, based on the previous result
3027
        and hook results
3028

3029
    """
3030
    # We only really run POST phase hooks, only for non-empty groups,
3031
    # and are only interested in their results
3032
    if not self.my_node_names:
3033
      # empty node group
3034
      pass
3035
    elif phase == constants.HOOKS_PHASE_POST:
3036
      # Used to change hooks' output to proper indentation
3037
      feedback_fn("* Hooks Results")
3038
      assert hooks_results, "invalid result from hooks"
3039

    
3040
      for node_name in hooks_results:
3041
        res = hooks_results[node_name]
3042
        msg = res.fail_msg
3043
        test = msg and not res.offline
3044
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
3045
                      "Communication failure in hooks execution: %s", msg)
3046
        if res.offline or msg:
3047
          # No need to investigate payload if node is offline or gave
3048
          # an error.
3049
          continue
3050
        for script, hkr, output in res.payload:
3051
          test = hkr == constants.HKR_FAIL
3052
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
3053
                        "Script %s failed, output:", script)
3054
          if test:
3055
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3056
            feedback_fn("%s" % output)
3057
            lu_result = False
3058

    
3059
    return lu_result
3060

    
3061

    
3062
class LUClusterVerifyDisks(NoHooksLU):
3063
  """Verifies the cluster disks status.
3064

3065
  """
3066
  REQ_BGL = False
3067

    
3068
  def ExpandNames(self):
3069
    self.share_locks = _ShareAll()
3070
    self.needed_locks = {
3071
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3072
      }
3073

    
3074
  def Exec(self, feedback_fn):
3075
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3076

    
3077
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3078
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3079
                           for group in group_names])
3080

    
3081

    
3082
class LUGroupVerifyDisks(NoHooksLU):
3083
  """Verifies the status of all disks in a node group.
3084

3085
  """
3086
  REQ_BGL = False
3087

    
3088
  def ExpandNames(self):
3089
    # Raises errors.OpPrereqError on its own if group can't be found
3090
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
3091

    
3092
    self.share_locks = _ShareAll()
3093
    self.needed_locks = {
3094
      locking.LEVEL_INSTANCE: [],
3095
      locking.LEVEL_NODEGROUP: [],
3096
      locking.LEVEL_NODE: [],
3097
      }
3098

    
3099
  def DeclareLocks(self, level):
3100
    if level == locking.LEVEL_INSTANCE:
3101
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
3102

    
3103
      # Lock instances optimistically, needs verification once node and group
3104
      # locks have been acquired
3105
      self.needed_locks[locking.LEVEL_INSTANCE] = \
3106
        self.cfg.GetNodeGroupInstances(self.group_uuid)
3107

    
3108
    elif level == locking.LEVEL_NODEGROUP:
3109
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3110

    
3111
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
3112
        set([self.group_uuid] +
3113
            # Lock all groups used by instances optimistically; this requires
3114
            # going via the node before it's locked, requiring verification
3115
            # later on
3116
            [group_uuid
3117
             for instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
3118
             for group_uuid in self.cfg.GetInstanceNodeGroups(instance_name)])
3119

    
3120
    elif level == locking.LEVEL_NODE:
3121
      # This will only lock the nodes in the group to be verified which contain
3122
      # actual instances
3123
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3124
      self._LockInstancesNodes()
3125

    
3126
      # Lock all nodes in group to be verified
3127
      assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
3128
      member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
3129
      self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
3130

    
3131
  def CheckPrereq(self):
3132
    owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
3133
    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
3134
    owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
3135

    
3136
    assert self.group_uuid in owned_groups
3137

    
3138
    # Check if locked instances are still correct
3139
    _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
3140

    
3141
    # Get instance information
3142
    self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
3143

    
3144
    # Check if node groups for locked instances are still correct
3145
    _CheckInstancesNodeGroups(self.cfg, self.instances,
3146
                              owned_groups, owned_nodes, self.group_uuid)
3147

    
3148
  def Exec(self, feedback_fn):
3149
    """Verify integrity of cluster disks.
3150

3151
    @rtype: tuple of three items
3152
    @return: a tuple of (dict of node-to-node_error, list of instances
3153
        which need activate-disks, dict of instance: (node, volume) for
3154
        missing volumes
3155

3156
    """
3157
    res_nodes = {}
3158
    res_instances = set()
3159
    res_missing = {}
3160

    
3161
    nv_dict = _MapInstanceDisksToNodes([inst
3162
                                        for inst in self.instances.values()
3163
                                        if inst.admin_up])
3164

    
3165
    if nv_dict:
3166
      nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
3167
                             set(self.cfg.GetVmCapableNodeList()))
3168

    
3169
      node_lvs = self.rpc.call_lv_list(nodes, [])
3170

    
3171
      for (node, node_res) in node_lvs.items():
3172
        if node_res.offline:
3173
          continue
3174

    
3175
        msg = node_res.fail_msg
3176
        if msg:
3177
          logging.warning("Error enumerating LVs on node %s: %s", node, msg)
3178
          res_nodes[node] = msg
3179
          continue
3180

    
3181
        for lv_name, (_, _, lv_online) in node_res.payload.items():
3182
          inst = nv_dict.pop((node, lv_name), None)
3183
          if not (lv_online or inst is None):
3184
            res_instances.add(inst)
3185

    
3186
      # any leftover items in nv_dict are missing LVs, let's arrange the data
3187
      # better
3188
      for key, inst in nv_dict.iteritems():
3189
        res_missing.setdefault(inst, []).append(list(key))
3190

    
3191
    return (res_nodes, list(res_instances), res_missing)
3192

    
3193

    
3194
class LUClusterRepairDiskSizes(NoHooksLU):
3195
  """Verifies the cluster disks sizes.
3196

3197
  """
3198
  REQ_BGL = False
3199

    
3200
  def ExpandNames(self):
3201
    if self.op.instances:
3202
      self.wanted_names = _GetWantedInstances(self, self.op.instances)
3203
      self.needed_locks = {
3204
        locking.LEVEL_NODE: [],
3205
        locking.LEVEL_INSTANCE: self.wanted_names,
3206
        }
3207
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3208
    else:
3209
      self.wanted_names = None
3210
      self.needed_locks = {
3211
        locking.LEVEL_NODE: locking.ALL_SET,
3212
        locking.LEVEL_INSTANCE: locking.ALL_SET,
3213
        }
3214
    self.share_locks = {
3215
      locking.LEVEL_NODE: 1,
3216
      locking.LEVEL_INSTANCE: 0,
3217
      }
3218

    
3219
  def DeclareLocks(self, level):
3220
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
3221
      self._LockInstancesNodes(primary_only=True)
3222

    
3223
  def CheckPrereq(self):
3224
    """Check prerequisites.
3225

3226
    This only checks the optional instance list against the existing names.
3227

3228
    """
3229
    if self.wanted_names is None:
3230
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
3231

    
3232
    self.wanted_instances = \
3233
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
3234

    
3235
  def _EnsureChildSizes(self, disk):
3236
    """Ensure children of the disk have the needed disk size.
3237

3238
    This is valid mainly for DRBD8 and fixes an issue where the
3239
    children have smaller disk size.
3240

3241
    @param disk: an L{ganeti.objects.Disk} object
3242

3243
    """
3244
    if disk.dev_type == constants.LD_DRBD8:
3245
      assert disk.children, "Empty children for DRBD8?"
3246
      fchild = disk.children[0]
3247
      mismatch = fchild.size < disk.size
3248
      if mismatch:
3249
        self.LogInfo("Child disk has size %d, parent %d, fixing",
3250
                     fchild.size, disk.size)
3251
        fchild.size = disk.size
3252

    
3253
      # and we recurse on this child only, not on the metadev
3254
      return self._EnsureChildSizes(fchild) or mismatch
3255
    else:
3256
      return False
3257

    
3258
  def Exec(self, feedback_fn):
3259
    """Verify the size of cluster disks.
3260

3261
    """
3262
    # TODO: check child disks too
3263
    # TODO: check differences in size between primary/secondary nodes
3264
    per_node_disks = {}
3265
    for instance in self.wanted_instances:
3266
      pnode = instance.primary_node
3267
      if pnode not in per_node_disks:
3268
        per_node_disks[pnode] = []
3269
      for idx, disk in enumerate(instance.disks):
3270
        per_node_disks[pnode].append((instance, idx, disk))
3271

    
3272
    changed = []
3273
    for node, dskl in per_node_disks.items():
3274
      newl = [v[2].Copy() for v in dskl]
3275
      for dsk in newl:
3276
        self.cfg.SetDiskID(dsk, node)
3277
      result = self.rpc.call_blockdev_getsize(node, newl)
3278
      if result.fail_msg:
3279
        self.LogWarning("Failure in blockdev_getsize call to node"
3280
                        " %s, ignoring", node)
3281
        continue
3282
      if len(result.payload) != len(dskl):
3283
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
3284
                        " result.payload=%s", node, len(dskl), result.payload)
3285
        self.LogWarning("Invalid result from node %s, ignoring node results",
3286
                        node)
3287
        continue
3288
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
3289
        if size is None:
3290
          self.LogWarning("Disk %d of instance %s did not return size"
3291
                          " information, ignoring", idx, instance.name)
3292
          continue
3293
        if not isinstance(size, (int, long)):
3294
          self.LogWarning("Disk %d of instance %s did not return valid"
3295
                          " size information, ignoring", idx, instance.name)
3296
          continue
3297
        size = size >> 20
3298
        if size != disk.size:
3299
          self.LogInfo("Disk %d of instance %s has mismatched size,"
3300
                       " correcting: recorded %d, actual %d", idx,
3301
                       instance.name, disk.size, size)
3302
          disk.size = size
3303
          self.cfg.Update(instance, feedback_fn)
3304
          changed.append((instance.name, idx, size))
3305
        if self._EnsureChildSizes(disk):
3306
          self.cfg.Update(instance, feedback_fn)
3307
          changed.append((instance.name, idx, disk.size))
3308
    return changed
3309

    
3310

    
3311
class LUClusterRename(LogicalUnit):
3312
  """Rename the cluster.
3313

3314
  """
3315
  HPATH = "cluster-rename"
3316
  HTYPE = constants.HTYPE_CLUSTER
3317

    
3318
  def BuildHooksEnv(self):
3319
    """Build hooks env.
3320

3321
    """
3322
    return {
3323
      "OP_TARGET": self.cfg.GetClusterName(),
3324
      "NEW_NAME": self.op.name,
3325
      }
3326

    
3327
  def BuildHooksNodes(self):
3328
    """Build hooks nodes.
3329

3330
    """
3331
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
3332

    
3333
  def CheckPrereq(self):
3334
    """Verify that the passed name is a valid one.
3335

3336
    """
3337
    hostname = netutils.GetHostname(name=self.op.name,
3338
                                    family=self.cfg.GetPrimaryIPFamily())
3339

    
3340
    new_name = hostname.name
3341
    self.ip = new_ip = hostname.ip
3342
    old_name = self.cfg.GetClusterName()
3343
    old_ip = self.cfg.GetMasterIP()
3344
    if new_name == old_name and new_ip == old_ip:
3345
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
3346
                                 " cluster has changed",
3347
                                 errors.ECODE_INVAL)
3348
    if new_ip != old_ip:
3349
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
3350
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
3351
                                   " reachable on the network" %
3352
                                   new_ip, errors.ECODE_NOTUNIQUE)
3353

    
3354
    self.op.name = new_name
3355

    
3356
  def Exec(self, feedback_fn):
3357
    """Rename the cluster.
3358

3359
    """
3360
    clustername = self.op.name
3361
    ip = self.ip
3362

    
3363
    # shutdown the master IP
3364
    master = self.cfg.GetMasterNode()
3365
    result = self.rpc.call_node_deactivate_master_ip(master)
3366
    result.Raise("Could not disable the master role")
3367

    
3368
    try:
3369
      cluster = self.cfg.GetClusterInfo()
3370
      cluster.cluster_name = clustername
3371
      cluster.master_ip = ip
3372
      self.cfg.Update(cluster, feedback_fn)
3373

    
3374
      # update the known hosts file
3375
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
3376
      node_list = self.cfg.GetOnlineNodeList()
3377
      try:
3378
        node_list.remove(master)
3379
      except ValueError:
3380
        pass
3381
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
3382
    finally:
3383
      result = self.rpc.call_node_activate_master_ip(master)
3384
      msg = result.fail_msg
3385
      if msg:
3386
        self.LogWarning("Could not re-enable the master role on"
3387
                        " the master, please restart manually: %s", msg)
3388

    
3389
    return clustername
3390

    
3391

    
3392
class LUClusterSetParams(LogicalUnit):
3393
  """Change the parameters of the cluster.
3394

3395
  """
3396
  HPATH = "cluster-modify"
3397
  HTYPE = constants.HTYPE_CLUSTER
3398
  REQ_BGL = False
3399

    
3400
  def CheckArguments(self):
3401
    """Check parameters
3402

3403
    """
3404
    if self.op.uid_pool:
3405
      uidpool.CheckUidPool(self.op.uid_pool)
3406

    
3407
    if self.op.add_uids:
3408
      uidpool.CheckUidPool(self.op.add_uids)
3409

    
3410
    if self.op.remove_uids:
3411
      uidpool.CheckUidPool(self.op.remove_uids)
3412

    
3413
  def ExpandNames(self):
3414
    # FIXME: in the future maybe other cluster params won't require checking on
3415
    # all nodes to be modified.
3416
    self.needed_locks = {
3417
      locking.LEVEL_NODE: locking.ALL_SET,
3418
    }
3419
    self.share_locks[locking.LEVEL_NODE] = 1
3420

    
3421
  def BuildHooksEnv(self):
3422
    """Build hooks env.
3423

3424
    """
3425
    return {
3426
      "OP_TARGET": self.cfg.GetClusterName(),
3427
      "NEW_VG_NAME": self.op.vg_name,
3428
      }
3429

    
3430
  def BuildHooksNodes(self):
3431
    """Build hooks nodes.
3432

3433
    """
3434
    mn = self.cfg.GetMasterNode()
3435
    return ([mn], [mn])
3436

    
3437
  def CheckPrereq(self):
3438
    """Check prerequisites.
3439

3440
    This checks whether the given params don't conflict and
3441
    if the given volume group is valid.
3442

3443
    """
3444
    if self.op.vg_name is not None and not self.op.vg_name:
3445
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
3446
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
3447
                                   " instances exist", errors.ECODE_INVAL)
3448

    
3449
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
3450
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
3451
        raise errors.OpPrereqError("Cannot disable drbd helper while"
3452
                                   " drbd-based instances exist",
3453
                                   errors.ECODE_INVAL)
3454

    
3455
    node_list = self.owned_locks(locking.LEVEL_NODE)
3456

    
3457
    # if vg_name not None, checks given volume group on all nodes
3458
    if self.op.vg_name:
3459
      vglist = self.rpc.call_vg_list(node_list)
3460
      for node in node_list:
3461
        msg = vglist[node].fail_msg
3462
        if msg:
3463
          # ignoring down node
3464
          self.LogWarning("Error while gathering data on node %s"
3465
                          " (ignoring node): %s", node, msg)
3466
          continue
3467
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
3468
                                              self.op.vg_name,
3469
                                              constants.MIN_VG_SIZE)
3470
        if vgstatus:
3471
          raise errors.OpPrereqError("Error on node '%s': %s" %
3472
                                     (node, vgstatus), errors.ECODE_ENVIRON)
3473

    
3474
    if self.op.drbd_helper:
3475
      # checks given drbd helper on all nodes
3476
      helpers = self.rpc.call_drbd_helper(node_list)
3477
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
3478
        if ninfo.offline:
3479
          self.LogInfo("Not checking drbd helper on offline node %s", node)
3480
          continue
3481
        msg = helpers[node].fail_msg
3482
        if msg:
3483
          raise errors.OpPrereqError("Error checking drbd helper on node"
3484
                                     " '%s': %s" % (node, msg),
3485
                                     errors.ECODE_ENVIRON)
3486
        node_helper = helpers[node].payload
3487
        if node_helper != self.op.drbd_helper:
3488
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
3489
                                     (node, node_helper), errors.ECODE_ENVIRON)
3490

    
3491
    self.cluster = cluster = self.cfg.GetClusterInfo()
3492
    # validate params changes
3493
    if self.op.beparams:
3494
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
3495
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
3496

    
3497
    if self.op.ndparams:
3498
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
3499
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
3500

    
3501
      # TODO: we need a more general way to handle resetting
3502
      # cluster-level parameters to default values
3503
      if self.new_ndparams["oob_program"] == "":
3504
        self.new_ndparams["oob_program"] = \
3505
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
3506

    
3507
    if self.op.nicparams:
3508
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
3509
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
3510
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
3511
      nic_errors = []
3512

    
3513
      # check all instances for consistency
3514
      for instance in self.cfg.GetAllInstancesInfo().values():
3515
        for nic_idx, nic in enumerate(instance.nics):
3516
          params_copy = copy.deepcopy(nic.nicparams)
3517
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
3518

    
3519
          # check parameter syntax
3520
          try:
3521
            objects.NIC.CheckParameterSyntax(params_filled)
3522
          except errors.ConfigurationError, err:
3523
            nic_errors.append("Instance %s, nic/%d: %s" %
3524
                              (instance.name, nic_idx, err))
3525

    
3526
          # if we're moving instances to routed, check that they have an ip
3527
          target_mode = params_filled[constants.NIC_MODE]
3528
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
3529
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
3530
                              " address" % (instance.name, nic_idx))
3531
      if nic_errors:
3532
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
3533
                                   "\n".join(nic_errors))
3534

    
3535
    # hypervisor list/parameters
3536
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
3537
    if self.op.hvparams:
3538
      for hv_name, hv_dict in self.op.hvparams.items():
3539
        if hv_name not in self.new_hvparams:
3540
          self.new_hvparams[hv_name] = hv_dict
3541
        else:
3542
          self.new_hvparams[hv_name].update(hv_dict)
3543

    
3544
    # os hypervisor parameters
3545
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
3546
    if self.op.os_hvp:
3547
      for os_name, hvs in self.op.os_hvp.items():
3548
        if os_name not in self.new_os_hvp:
3549
          self.new_os_hvp[os_name] = hvs
3550
        else:
3551
          for hv_name, hv_dict in hvs.items():
3552
            if hv_name not in self.new_os_hvp[os_name]:
3553
              self.new_os_hvp[os_name][hv_name] = hv_dict
3554
            else:
3555
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
3556

    
3557
    # os parameters
3558
    self.new_osp = objects.FillDict(cluster.osparams, {})
3559
    if self.op.osparams:
3560
      for os_name, osp in self.op.osparams.items():
3561
        if os_name not in self.new_osp:
3562
          self.new_osp[os_name] = {}
3563

    
3564
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
3565
                                                  use_none=True)
3566

    
3567
        if not self.new_osp[os_name]:
3568
          # we removed all parameters
3569
          del self.new_osp[os_name]
3570
        else:
3571
          # check the parameter validity (remote check)
3572
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
3573
                         os_name, self.new_osp[os_name])
3574

    
3575
    # changes to the hypervisor list
3576
    if self.op.enabled_hypervisors is not None:
3577
      self.hv_list = self.op.enabled_hypervisors
3578
      for hv in self.hv_list:
3579
        # if the hypervisor doesn't already exist in the cluster
3580
        # hvparams, we initialize it to empty, and then (in both
3581
        # cases) we make sure to fill the defaults, as we might not
3582
        # have a complete defaults list if the hypervisor wasn't
3583
        # enabled before
3584
        if hv not in new_hvp:
3585
          new_hvp[hv] = {}
3586
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
3587
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
3588
    else:
3589
      self.hv_list = cluster.enabled_hypervisors
3590

    
3591
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
3592
      # either the enabled list has changed, or the parameters have, validate
3593
      for hv_name, hv_params in self.new_hvparams.items():
3594
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
3595
            (self.op.enabled_hypervisors and
3596
             hv_name in self.op.enabled_hypervisors)):
3597
          # either this is a new hypervisor, or its parameters have changed
3598
          hv_class = hypervisor.GetHypervisor(hv_name)
3599
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3600
          hv_class.CheckParameterSyntax(hv_params)
3601
          _CheckHVParams(self, node_list, hv_name, hv_params)
3602

    
3603
    if self.op.os_hvp:
3604
      # no need to check any newly-enabled hypervisors, since the
3605
      # defaults have already been checked in the above code-block
3606
      for os_name, os_hvp in self.new_os_hvp.items():
3607
        for hv_name, hv_params in os_hvp.items():
3608
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3609
          # we need to fill in the new os_hvp on top of the actual hv_p
3610
          cluster_defaults = self.new_hvparams.get(hv_name, {})
3611
          new_osp = objects.FillDict(cluster_defaults, hv_params)
3612
          hv_class = hypervisor.GetHypervisor(hv_name)
3613
          hv_class.CheckParameterSyntax(new_osp)
3614
          _CheckHVParams(self, node_list, hv_name, new_osp)
3615

    
3616
    if self.op.default_iallocator:
3617
      alloc_script = utils.FindFile(self.op.default_iallocator,
3618
                                    constants.IALLOCATOR_SEARCH_PATH,
3619
                                    os.path.isfile)
3620
      if alloc_script is None:
3621
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
3622
                                   " specified" % self.op.default_iallocator,
3623
                                   errors.ECODE_INVAL)
3624

    
3625
  def Exec(self, feedback_fn):
3626
    """Change the parameters of the cluster.
3627

3628
    """
3629
    if self.op.vg_name is not None:
3630
      new_volume = self.op.vg_name
3631
      if not new_volume:
3632
        new_volume = None
3633
      if new_volume != self.cfg.GetVGName():
3634
        self.cfg.SetVGName(new_volume)
3635
      else:
3636
        feedback_fn("Cluster LVM configuration already in desired"
3637
                    " state, not changing")
3638
    if self.op.drbd_helper is not None:
3639
      new_helper = self.op.drbd_helper
3640
      if not new_helper:
3641
        new_helper = None
3642
      if new_helper != self.cfg.GetDRBDHelper():
3643
        self.cfg.SetDRBDHelper(new_helper)
3644
      else:
3645
        feedback_fn("Cluster DRBD helper already in desired state,"
3646
                    " not changing")
3647
    if self.op.hvparams:
3648
      self.cluster.hvparams = self.new_hvparams
3649
    if self.op.os_hvp:
3650
      self.cluster.os_hvp = self.new_os_hvp
3651
    if self.op.enabled_hypervisors is not None:
3652
      self.cluster.hvparams = self.new_hvparams
3653
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
3654
    if self.op.beparams:
3655
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
3656
    if self.op.nicparams:
3657
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
3658
    if self.op.osparams:
3659
      self.cluster.osparams = self.new_osp
3660
    if self.op.ndparams:
3661
      self.cluster.ndparams = self.new_ndparams
3662

    
3663
    if self.op.candidate_pool_size is not None:
3664
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3665
      # we need to update the pool size here, otherwise the save will fail
3666
      _AdjustCandidatePool(self, [])
3667

    
3668
    if self.op.maintain_node_health is not None:
3669
      self.cluster.maintain_node_health = self.op.maintain_node_health
3670

    
3671
    if self.op.prealloc_wipe_disks is not None:
3672
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3673

    
3674
    if self.op.add_uids is not None:
3675
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3676

    
3677
    if self.op.remove_uids is not None:
3678
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3679

    
3680
    if self.op.uid_pool is not None:
3681
      self.cluster.uid_pool = self.op.uid_pool
3682

    
3683
    if self.op.default_iallocator is not None:
3684
      self.cluster.default_iallocator = self.op.default_iallocator
3685

    
3686
    if self.op.reserved_lvs is not None:
3687
      self.cluster.reserved_lvs = self.op.reserved_lvs
3688

    
3689
    def helper_os(aname, mods, desc):
3690
      desc += " OS list"
3691
      lst = getattr(self.cluster, aname)
3692
      for key, val in mods:
3693
        if key == constants.DDM_ADD:
3694
          if val in lst:
3695
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3696
          else:
3697
            lst.append(val)
3698
        elif key == constants.DDM_REMOVE:
3699
          if val in lst:
3700
            lst.remove(val)
3701
          else:
3702
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3703
        else:
3704
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3705

    
3706
    if self.op.hidden_os:
3707
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3708

    
3709
    if self.op.blacklisted_os:
3710
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3711

    
3712
    if self.op.master_netdev:
3713
      master = self.cfg.GetMasterNode()
3714
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3715
                  self.cluster.master_netdev)
3716
      result = self.rpc.call_node_deactivate_master_ip(master)
3717
      result.Raise("Could not disable the master ip")
3718
      feedback_fn("Changing master_netdev from %s to %s" %
3719
                  (self.cluster.master_netdev, self.op.master_netdev))
3720
      self.cluster.master_netdev = self.op.master_netdev
3721

    
3722
    self.cfg.Update(self.cluster, feedback_fn)
3723

    
3724
    if self.op.master_netdev:
3725
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3726
                  self.op.master_netdev)
3727
      result = self.rpc.call_node_activate_master_ip(master)
3728
      if result.fail_msg:
3729
        self.LogWarning("Could not re-enable the master ip on"
3730
                        " the master, please restart manually: %s",
3731
                        result.fail_msg)
3732

    
3733

    
3734
def _UploadHelper(lu, nodes, fname):
3735
  """Helper for uploading a file and showing warnings.
3736

3737
  """
3738
  if os.path.exists(fname):
3739
    result = lu.rpc.call_upload_file(nodes, fname)
3740
    for to_node, to_result in result.items():
3741
      msg = to_result.fail_msg
3742
      if msg:
3743
        msg = ("Copy of file %s to node %s failed: %s" %
3744
               (fname, to_node, msg))
3745
        lu.proc.LogWarning(msg)
3746

    
3747

    
3748
def _ComputeAncillaryFiles(cluster, redist):
3749
  """Compute files external to Ganeti which need to be consistent.
3750

3751
  @type redist: boolean
3752
  @param redist: Whether to include files which need to be redistributed
3753

3754
  """
3755
  # Compute files for all nodes
3756
  files_all = set([
3757
    constants.SSH_KNOWN_HOSTS_FILE,
3758
    constants.CONFD_HMAC_KEY,
3759
    constants.CLUSTER_DOMAIN_SECRET_FILE,
3760
    constants.RAPI_USERS_FILE,
3761
    ])
3762

    
3763
  if not redist:
3764
    files_all.update(constants.ALL_CERT_FILES)
3765
    files_all.update(ssconf.SimpleStore().GetFileList())
3766
  else:
3767
    # we need to ship at least the RAPI certificate
3768
    files_all.add(constants.RAPI_CERT_FILE)
3769

    
3770
  if cluster.modify_etc_hosts:
3771
    files_all.add(constants.ETC_HOSTS)
3772

    
3773
  # Files which are optional, these must:
3774
  # - be present in one other category as well
3775
  # - either exist or not exist on all nodes of that category (mc, vm all)
3776
  files_opt = set([
3777
    constants.RAPI_USERS_FILE,
3778
    ])
3779

    
3780
  # Files which should only be on master candidates
3781
  files_mc = set()
3782
  if not redist:
3783
    files_mc.add(constants.CLUSTER_CONF_FILE)
3784

    
3785
  # Files which should only be on VM-capable nodes
3786
  files_vm = set(filename
3787
    for hv_name in cluster.enabled_hypervisors
3788
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0])
3789

    
3790
  files_opt |= set(filename
3791
    for hv_name in cluster.enabled_hypervisors
3792
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1])
3793

    
3794
  # Filenames in each category must be unique
3795
  all_files_set = files_all | files_mc | files_vm
3796
  assert (len(all_files_set) ==
3797
          sum(map(len, [files_all, files_mc, files_vm]))), \
3798
         "Found file listed in more than one file list"
3799

    
3800
  # Optional files must be present in one other category
3801
  assert all_files_set.issuperset(files_opt), \
3802
         "Optional file not in a different required list"
3803

    
3804
  return (files_all, files_opt, files_mc, files_vm)
3805

    
3806

    
3807
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3808
  """Distribute additional files which are part of the cluster configuration.
3809

3810
  ConfigWriter takes care of distributing the config and ssconf files, but
3811
  there are more files which should be distributed to all nodes. This function
3812
  makes sure those are copied.
3813

3814
  @param lu: calling logical unit
3815
  @param additional_nodes: list of nodes not in the config to distribute to
3816
  @type additional_vm: boolean
3817
  @param additional_vm: whether the additional nodes are vm-capable or not
3818

3819
  """
3820
  # Gather target nodes
3821
  cluster = lu.cfg.GetClusterInfo()
3822
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3823

    
3824
  online_nodes = lu.cfg.GetOnlineNodeList()
3825
  vm_nodes = lu.cfg.GetVmCapableNodeList()
3826

    
3827
  if additional_nodes is not None:
3828
    online_nodes.extend(additional_nodes)
3829
    if additional_vm:
3830
      vm_nodes.extend(additional_nodes)
3831

    
3832
  # Never distribute to master node
3833
  for nodelist in [online_nodes, vm_nodes]:
3834
    if master_info.name in nodelist:
3835
      nodelist.remove(master_info.name)
3836

    
3837
  # Gather file lists
3838
  (files_all, _, files_mc, files_vm) = \
3839
    _ComputeAncillaryFiles(cluster, True)
3840

    
3841
  # Never re-distribute configuration file from here
3842
  assert not (constants.CLUSTER_CONF_FILE in files_all or
3843
              constants.CLUSTER_CONF_FILE in files_vm)
3844
  assert not files_mc, "Master candidates not handled in this function"
3845

    
3846
  filemap = [
3847
    (online_nodes, files_all),
3848
    (vm_nodes, files_vm),
3849
    ]
3850

    
3851
  # Upload the files
3852
  for (node_list, files) in filemap:
3853
    for fname in files:
3854
      _UploadHelper(lu, node_list, fname)
3855

    
3856

    
3857
class LUClusterRedistConf(NoHooksLU):
3858
  """Force the redistribution of cluster configuration.
3859

3860
  This is a very simple LU.
3861

3862
  """
3863
  REQ_BGL = False
3864

    
3865
  def ExpandNames(self):
3866
    self.needed_locks = {
3867
      locking.LEVEL_NODE: locking.ALL_SET,
3868
    }
3869
    self.share_locks[locking.LEVEL_NODE] = 1
3870

    
3871
  def Exec(self, feedback_fn):
3872
    """Redistribute the configuration.
3873

3874
    """
3875
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3876
    _RedistributeAncillaryFiles(self)
3877

    
3878

    
3879
class LUClusterActivateMasterIp(NoHooksLU):
3880
  """Activate the master IP on the master node.
3881

3882
  """
3883
  def Exec(self, feedback_fn):
3884
    """Activate the master IP.
3885

3886
    """
3887
    master = self.cfg.GetMasterNode()
3888
    result = self.rpc.call_node_activate_master_ip(master)
3889
    result.Raise("Could not activate the master IP")
3890

    
3891

    
3892
class LUClusterDeactivateMasterIp(NoHooksLU):
3893
  """Deactivate the master IP on the master node.
3894

3895
  """
3896
  def Exec(self, feedback_fn):
3897
    """Deactivate the master IP.
3898

3899
    """
3900
    master = self.cfg.GetMasterNode()
3901
    result = self.rpc.call_node_deactivate_master_ip(master)
3902
    result.Raise("Could not deactivate the master IP")
3903

    
3904

    
3905
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3906
  """Sleep and poll for an instance's disk to sync.
3907

3908
  """
3909
  if not instance.disks or disks is not None and not disks:
3910
    return True
3911

    
3912
  disks = _ExpandCheckDisks(instance, disks)
3913

    
3914
  if not oneshot:
3915
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3916

    
3917
  node = instance.primary_node
3918

    
3919
  for dev in disks:
3920
    lu.cfg.SetDiskID(dev, node)
3921

    
3922
  # TODO: Convert to utils.Retry
3923

    
3924
  retries = 0
3925
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3926
  while True:
3927
    max_time = 0
3928
    done = True
3929
    cumul_degraded = False
3930
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3931
    msg = rstats.fail_msg
3932
    if msg:
3933
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3934
      retries += 1
3935
      if retries >= 10:
3936
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3937
                                 " aborting." % node)
3938
      time.sleep(6)
3939
      continue
3940
    rstats = rstats.payload
3941
    retries = 0
3942
    for i, mstat in enumerate(rstats):
3943
      if mstat is None:
3944
        lu.LogWarning("Can't compute data for node %s/%s",
3945
                           node, disks[i].iv_name)
3946
        continue
3947

    
3948
      cumul_degraded = (cumul_degraded or
3949
                        (mstat.is_degraded and mstat.sync_percent is None))
3950
      if mstat.sync_percent is not None:
3951
        done = False
3952
        if mstat.estimated_time is not None:
3953
          rem_time = ("%s remaining (estimated)" %
3954
                      utils.FormatSeconds(mstat.estimated_time))
3955
          max_time = mstat.estimated_time
3956
        else:
3957
          rem_time = "no time estimate"
3958
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
3959
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
3960

    
3961
    # if we're done but degraded, let's do a few small retries, to
3962
    # make sure we see a stable and not transient situation; therefore
3963
    # we force restart of the loop
3964
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3965
      logging.info("Degraded disks found, %d retries left", degr_retries)
3966
      degr_retries -= 1
3967
      time.sleep(1)
3968
      continue
3969

    
3970
    if done or oneshot:
3971
      break
3972

    
3973
    time.sleep(min(60, max_time))
3974

    
3975
  if done:
3976
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
3977
  return not cumul_degraded
3978

    
3979

    
3980
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3981
  """Check that mirrors are not degraded.
3982

3983
  The ldisk parameter, if True, will change the test from the
3984
  is_degraded attribute (which represents overall non-ok status for
3985
  the device(s)) to the ldisk (representing the local storage status).
3986

3987
  """
3988
  lu.cfg.SetDiskID(dev, node)
3989

    
3990
  result = True
3991

    
3992
  if on_primary or dev.AssembleOnSecondary():
3993
    rstats = lu.rpc.call_blockdev_find(node, dev)
3994
    msg = rstats.fail_msg
3995
    if msg:
3996
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
3997
      result = False
3998
    elif not rstats.payload:
3999
      lu.LogWarning("Can't find disk on node %s", node)
4000
      result = False
4001
    else:
4002
      if ldisk:
4003
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
4004
      else:
4005
        result = result and not rstats.payload.is_degraded
4006

    
4007
  if dev.children:
4008
    for child in dev.children:
4009
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
4010

    
4011
  return result
4012

    
4013

    
4014
class LUOobCommand(NoHooksLU):
4015
  """Logical unit for OOB handling.
4016

4017
  """
4018
  REG_BGL = False
4019
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
4020

    
4021
  def ExpandNames(self):
4022
    """Gather locks we need.
4023

4024
    """
4025
    if self.op.node_names:
4026
      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
4027
      lock_names = self.op.node_names
4028
    else:
4029
      lock_names = locking.ALL_SET
4030

    
4031
    self.needed_locks = {
4032
      locking.LEVEL_NODE: lock_names,
4033
      }
4034

    
4035
  def CheckPrereq(self):
4036
    """Check prerequisites.
4037

4038
    This checks:
4039
     - the node exists in the configuration
4040
     - OOB is supported
4041

4042
    Any errors are signaled by raising errors.OpPrereqError.
4043

4044
    """
4045
    self.nodes = []
4046
    self.master_node = self.cfg.GetMasterNode()
4047

    
4048
    assert self.op.power_delay >= 0.0
4049

    
4050
    if self.op.node_names:
4051
      if (self.op.command in self._SKIP_MASTER and
4052
          self.master_node in self.op.node_names):
4053
        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
4054
        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
4055

    
4056
        if master_oob_handler:
4057
          additional_text = ("run '%s %s %s' if you want to operate on the"
4058
                             " master regardless") % (master_oob_handler,
4059
                                                      self.op.command,
4060
                                                      self.master_node)
4061
        else:
4062
          additional_text = "it does not support out-of-band operations"
4063

    
4064
        raise errors.OpPrereqError(("Operating on the master node %s is not"
4065
                                    " allowed for %s; %s") %
4066
                                   (self.master_node, self.op.command,
4067
                                    additional_text), errors.ECODE_INVAL)
4068
    else:
4069
      self.op.node_names = self.cfg.GetNodeList()
4070
      if self.op.command in self._SKIP_MASTER:
4071
        self.op.node_names.remove(self.master_node)
4072

    
4073
    if self.op.command in self._SKIP_MASTER:
4074
      assert self.master_node not in self.op.node_names
4075

    
4076
    for (node_name, node) in self.cfg.GetMultiNodeInfo(self.op.node_names):
4077
      if node is None:
4078
        raise errors.OpPrereqError("Node %s not found" % node_name,
4079
                                   errors.ECODE_NOENT)
4080
      else:
4081
        self.nodes.append(node)
4082

    
4083
      if (not self.op.ignore_status and
4084
          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
4085
        raise errors.OpPrereqError(("Cannot power off node %s because it is"
4086
                                    " not marked offline") % node_name,
4087
                                   errors.ECODE_STATE)
4088

    
4089
  def Exec(self, feedback_fn):
4090
    """