Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 566db1f2

History | View | Annotate | Download (479.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay to many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import logging
36
import copy
37
import OpenSSL
38
import socket
39
import tempfile
40
import shutil
41
import itertools
42
import operator
43

    
44
from ganeti import ssh
45
from ganeti import utils
46
from ganeti import errors
47
from ganeti import hypervisor
48
from ganeti import locking
49
from ganeti import constants
50
from ganeti import objects
51
from ganeti import serializer
52
from ganeti import ssconf
53
from ganeti import uidpool
54
from ganeti import compat
55
from ganeti import masterd
56
from ganeti import netutils
57
from ganeti import query
58
from ganeti import qlang
59
from ganeti import opcodes
60
from ganeti import ht
61
from ganeti import runtime
62

    
63
import ganeti.masterd.instance # pylint: disable=W0611
64

    
65

    
66
class ResultWithJobs:
67
  """Data container for LU results with jobs.
68

69
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
70
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
71
  contained in the C{jobs} attribute and include the job IDs in the opcode
72
  result.
73

74
  """
75
  def __init__(self, jobs, **kwargs):
76
    """Initializes this class.
77

78
    Additional return values can be specified as keyword arguments.
79

80
    @type jobs: list of lists of L{opcode.OpCode}
81
    @param jobs: A list of lists of opcode objects
82

83
    """
84
    self.jobs = jobs
85
    self.other = kwargs
86

    
87

    
88
class LogicalUnit(object):
89
  """Logical Unit base class.
90

91
  Subclasses must follow these rules:
92
    - implement ExpandNames
93
    - implement CheckPrereq (except when tasklets are used)
94
    - implement Exec (except when tasklets are used)
95
    - implement BuildHooksEnv
96
    - implement BuildHooksNodes
97
    - redefine HPATH and HTYPE
98
    - optionally redefine their run requirements:
99
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
100

101
  Note that all commands require root permissions.
102

103
  @ivar dry_run_result: the value (if any) that will be returned to the caller
104
      in dry-run mode (signalled by opcode dry_run parameter)
105

106
  """
107
  HPATH = None
108
  HTYPE = None
109
  REQ_BGL = True
110

    
111
  def __init__(self, processor, op, context, rpc):
112
    """Constructor for LogicalUnit.
113

114
    This needs to be overridden in derived classes in order to check op
115
    validity.
116

117
    """
118
    self.proc = processor
119
    self.op = op
120
    self.cfg = context.cfg
121
    self.glm = context.glm
122
    # readability alias
123
    self.owned_locks = context.glm.list_owned
124
    self.context = context
125
    self.rpc = rpc
126
    # Dicts used to declare locking needs to mcpu
127
    self.needed_locks = None
128
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
129
    self.add_locks = {}
130
    self.remove_locks = {}
131
    # Used to force good behavior when calling helper functions
132
    self.recalculate_locks = {}
133
    # logging
134
    self.Log = processor.Log # pylint: disable=C0103
135
    self.LogWarning = processor.LogWarning # pylint: disable=C0103
136
    self.LogInfo = processor.LogInfo # pylint: disable=C0103
137
    self.LogStep = processor.LogStep # pylint: disable=C0103
138
    # support for dry-run
139
    self.dry_run_result = None
140
    # support for generic debug attribute
141
    if (not hasattr(self.op, "debug_level") or
142
        not isinstance(self.op.debug_level, int)):
143
      self.op.debug_level = 0
144

    
145
    # Tasklets
146
    self.tasklets = None
147

    
148
    # Validate opcode parameters and set defaults
149
    self.op.Validate(True)
150

    
151
    self.CheckArguments()
152

    
153
  def CheckArguments(self):
154
    """Check syntactic validity for the opcode arguments.
155

156
    This method is for doing a simple syntactic check and ensure
157
    validity of opcode parameters, without any cluster-related
158
    checks. While the same can be accomplished in ExpandNames and/or
159
    CheckPrereq, doing these separate is better because:
160

161
      - ExpandNames is left as as purely a lock-related function
162
      - CheckPrereq is run after we have acquired locks (and possible
163
        waited for them)
164

165
    The function is allowed to change the self.op attribute so that
166
    later methods can no longer worry about missing parameters.
167

168
    """
169
    pass
170

    
171
  def ExpandNames(self):
172
    """Expand names for this LU.
173

174
    This method is called before starting to execute the opcode, and it should
175
    update all the parameters of the opcode to their canonical form (e.g. a
176
    short node name must be fully expanded after this method has successfully
177
    completed). This way locking, hooks, logging, etc. can work correctly.
178

179
    LUs which implement this method must also populate the self.needed_locks
180
    member, as a dict with lock levels as keys, and a list of needed lock names
181
    as values. Rules:
182

183
      - use an empty dict if you don't need any lock
184
      - if you don't need any lock at a particular level omit that level
185
      - don't put anything for the BGL level
186
      - if you want all locks at a level use locking.ALL_SET as a value
187

188
    If you need to share locks (rather than acquire them exclusively) at one
189
    level you can modify self.share_locks, setting a true value (usually 1) for
190
    that level. By default locks are not shared.
191

192
    This function can also define a list of tasklets, which then will be
193
    executed in order instead of the usual LU-level CheckPrereq and Exec
194
    functions, if those are not defined by the LU.
195

196
    Examples::
197

198
      # Acquire all nodes and one instance
199
      self.needed_locks = {
200
        locking.LEVEL_NODE: locking.ALL_SET,
201
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
202
      }
203
      # Acquire just two nodes
204
      self.needed_locks = {
205
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
206
      }
207
      # Acquire no locks
208
      self.needed_locks = {} # No, you can't leave it to the default value None
209

210
    """
211
    # The implementation of this method is mandatory only if the new LU is
212
    # concurrent, so that old LUs don't need to be changed all at the same
213
    # time.
214
    if self.REQ_BGL:
215
      self.needed_locks = {} # Exclusive LUs don't need locks.
216
    else:
217
      raise NotImplementedError
218

    
219
  def DeclareLocks(self, level):
220
    """Declare LU locking needs for a level
221

222
    While most LUs can just declare their locking needs at ExpandNames time,
223
    sometimes there's the need to calculate some locks after having acquired
224
    the ones before. This function is called just before acquiring locks at a
225
    particular level, but after acquiring the ones at lower levels, and permits
226
    such calculations. It can be used to modify self.needed_locks, and by
227
    default it does nothing.
228

229
    This function is only called if you have something already set in
230
    self.needed_locks for the level.
231

232
    @param level: Locking level which is going to be locked
233
    @type level: member of ganeti.locking.LEVELS
234

235
    """
236

    
237
  def CheckPrereq(self):
238
    """Check prerequisites for this LU.
239

240
    This method should check that the prerequisites for the execution
241
    of this LU are fulfilled. It can do internode communication, but
242
    it should be idempotent - no cluster or system changes are
243
    allowed.
244

245
    The method should raise errors.OpPrereqError in case something is
246
    not fulfilled. Its return value is ignored.
247

248
    This method should also update all the parameters of the opcode to
249
    their canonical form if it hasn't been done by ExpandNames before.
250

251
    """
252
    if self.tasklets is not None:
253
      for (idx, tl) in enumerate(self.tasklets):
254
        logging.debug("Checking prerequisites for tasklet %s/%s",
255
                      idx + 1, len(self.tasklets))
256
        tl.CheckPrereq()
257
    else:
258
      pass
259

    
260
  def Exec(self, feedback_fn):
261
    """Execute the LU.
262

263
    This method should implement the actual work. It should raise
264
    errors.OpExecError for failures that are somewhat dealt with in
265
    code, or expected.
266

267
    """
268
    if self.tasklets is not None:
269
      for (idx, tl) in enumerate(self.tasklets):
270
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
271
        tl.Exec(feedback_fn)
272
    else:
273
      raise NotImplementedError
274

    
275
  def BuildHooksEnv(self):
276
    """Build hooks environment for this LU.
277

278
    @rtype: dict
279
    @return: Dictionary containing the environment that will be used for
280
      running the hooks for this LU. The keys of the dict must not be prefixed
281
      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
282
      will extend the environment with additional variables. If no environment
283
      should be defined, an empty dictionary should be returned (not C{None}).
284
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
285
      will not be called.
286

287
    """
288
    raise NotImplementedError
289

    
290
  def BuildHooksNodes(self):
291
    """Build list of nodes to run LU's hooks.
292

293
    @rtype: tuple; (list, list)
294
    @return: Tuple containing a list of node names on which the hook
295
      should run before the execution and a list of node names on which the
296
      hook should run after the execution. No nodes should be returned as an
297
      empty list (and not None).
298
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
299
      will not be called.
300

301
    """
302
    raise NotImplementedError
303

    
304
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
305
    """Notify the LU about the results of its hooks.
306

307
    This method is called every time a hooks phase is executed, and notifies
308
    the Logical Unit about the hooks' result. The LU can then use it to alter
309
    its result based on the hooks.  By default the method does nothing and the
310
    previous result is passed back unchanged but any LU can define it if it
311
    wants to use the local cluster hook-scripts somehow.
312

313
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
314
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
315
    @param hook_results: the results of the multi-node hooks rpc call
316
    @param feedback_fn: function used send feedback back to the caller
317
    @param lu_result: the previous Exec result this LU had, or None
318
        in the PRE phase
319
    @return: the new Exec result, based on the previous result
320
        and hook results
321

322
    """
323
    # API must be kept, thus we ignore the unused argument and could
324
    # be a function warnings
325
    # pylint: disable=W0613,R0201
326
    return lu_result
327

    
328
  def _ExpandAndLockInstance(self):
329
    """Helper function to expand and lock an instance.
330

331
    Many LUs that work on an instance take its name in self.op.instance_name
332
    and need to expand it and then declare the expanded name for locking. This
333
    function does it, and then updates self.op.instance_name to the expanded
334
    name. It also initializes needed_locks as a dict, if this hasn't been done
335
    before.
336

337
    """
338
    if self.needed_locks is None:
339
      self.needed_locks = {}
340
    else:
341
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
342
        "_ExpandAndLockInstance called with instance-level locks set"
343
    self.op.instance_name = _ExpandInstanceName(self.cfg,
344
                                                self.op.instance_name)
345
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
346

    
347
  def _LockInstancesNodes(self, primary_only=False):
348
    """Helper function to declare instances' nodes for locking.
349

350
    This function should be called after locking one or more instances to lock
351
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
352
    with all primary or secondary nodes for instances already locked and
353
    present in self.needed_locks[locking.LEVEL_INSTANCE].
354

355
    It should be called from DeclareLocks, and for safety only works if
356
    self.recalculate_locks[locking.LEVEL_NODE] is set.
357

358
    In the future it may grow parameters to just lock some instance's nodes, or
359
    to just lock primaries or secondary nodes, if needed.
360

361
    If should be called in DeclareLocks in a way similar to::
362

363
      if level == locking.LEVEL_NODE:
364
        self._LockInstancesNodes()
365

366
    @type primary_only: boolean
367
    @param primary_only: only lock primary nodes of locked instances
368

369
    """
370
    assert locking.LEVEL_NODE in self.recalculate_locks, \
371
      "_LockInstancesNodes helper function called with no nodes to recalculate"
372

    
373
    # TODO: check if we're really been called with the instance locks held
374

    
375
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
376
    # future we might want to have different behaviors depending on the value
377
    # of self.recalculate_locks[locking.LEVEL_NODE]
378
    wanted_nodes = []
379
    locked_i = self.owned_locks(locking.LEVEL_INSTANCE)
380
    for _, instance in self.cfg.GetMultiInstanceInfo(locked_i):
381
      wanted_nodes.append(instance.primary_node)
382
      if not primary_only:
383
        wanted_nodes.extend(instance.secondary_nodes)
384

    
385
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
386
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
387
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
388
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
389

    
390
    del self.recalculate_locks[locking.LEVEL_NODE]
391

    
392

    
393
class NoHooksLU(LogicalUnit): # pylint: disable=W0223
394
  """Simple LU which runs no hooks.
395

396
  This LU is intended as a parent for other LogicalUnits which will
397
  run no hooks, in order to reduce duplicate code.
398

399
  """
400
  HPATH = None
401
  HTYPE = None
402

    
403
  def BuildHooksEnv(self):
404
    """Empty BuildHooksEnv for NoHooksLu.
405

406
    This just raises an error.
407

408
    """
409
    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
410

    
411
  def BuildHooksNodes(self):
412
    """Empty BuildHooksNodes for NoHooksLU.
413

414
    """
415
    raise AssertionError("BuildHooksNodes called for NoHooksLU")
416

    
417

    
418
class Tasklet:
419
  """Tasklet base class.
420

421
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
422
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
423
  tasklets know nothing about locks.
424

425
  Subclasses must follow these rules:
426
    - Implement CheckPrereq
427
    - Implement Exec
428

429
  """
430
  def __init__(self, lu):
431
    self.lu = lu
432

    
433
    # Shortcuts
434
    self.cfg = lu.cfg
435
    self.rpc = lu.rpc
436

    
437
  def CheckPrereq(self):
438
    """Check prerequisites for this tasklets.
439

440
    This method should check whether the prerequisites for the execution of
441
    this tasklet are fulfilled. It can do internode communication, but it
442
    should be idempotent - no cluster or system changes are allowed.
443

444
    The method should raise errors.OpPrereqError in case something is not
445
    fulfilled. Its return value is ignored.
446

447
    This method should also update all parameters to their canonical form if it
448
    hasn't been done before.
449

450
    """
451
    pass
452

    
453
  def Exec(self, feedback_fn):
454
    """Execute the tasklet.
455

456
    This method should implement the actual work. It should raise
457
    errors.OpExecError for failures that are somewhat dealt with in code, or
458
    expected.
459

460
    """
461
    raise NotImplementedError
462

    
463

    
464
class _QueryBase:
465
  """Base for query utility classes.
466

467
  """
468
  #: Attribute holding field definitions
469
  FIELDS = None
470

    
471
  def __init__(self, filter_, fields, use_locking):
472
    """Initializes this class.
473

474
    """
475
    self.use_locking = use_locking
476

    
477
    self.query = query.Query(self.FIELDS, fields, filter_=filter_,
478
                             namefield="name")
479
    self.requested_data = self.query.RequestedData()
480
    self.names = self.query.RequestedNames()
481

    
482
    # Sort only if no names were requested
483
    self.sort_by_name = not self.names
484

    
485
    self.do_locking = None
486
    self.wanted = None
487

    
488
  def _GetNames(self, lu, all_names, lock_level):
489
    """Helper function to determine names asked for in the query.
490

491
    """
492
    if self.do_locking:
493
      names = lu.owned_locks(lock_level)
494
    else:
495
      names = all_names
496

    
497
    if self.wanted == locking.ALL_SET:
498
      assert not self.names
499
      # caller didn't specify names, so ordering is not important
500
      return utils.NiceSort(names)
501

    
502
    # caller specified names and we must keep the same order
503
    assert self.names
504
    assert not self.do_locking or lu.glm.is_owned(lock_level)
505

    
506
    missing = set(self.wanted).difference(names)
507
    if missing:
508
      raise errors.OpExecError("Some items were removed before retrieving"
509
                               " their data: %s" % missing)
510

    
511
    # Return expanded names
512
    return self.wanted
513

    
514
  def ExpandNames(self, lu):
515
    """Expand names for this query.
516

517
    See L{LogicalUnit.ExpandNames}.
518

519
    """
520
    raise NotImplementedError()
521

    
522
  def DeclareLocks(self, lu, level):
523
    """Declare locks for this query.
524

525
    See L{LogicalUnit.DeclareLocks}.
526

527
    """
528
    raise NotImplementedError()
529

    
530
  def _GetQueryData(self, lu):
531
    """Collects all data for this query.
532

533
    @return: Query data object
534

535
    """
536
    raise NotImplementedError()
537

    
538
  def NewStyleQuery(self, lu):
539
    """Collect data and execute query.
540

541
    """
542
    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
543
                                  sort_by_name=self.sort_by_name)
544

    
545
  def OldStyleQuery(self, lu):
546
    """Collect data and execute query.
547

548
    """
549
    return self.query.OldStyleQuery(self._GetQueryData(lu),
550
                                    sort_by_name=self.sort_by_name)
551

    
552

    
553
def _ShareAll():
554
  """Returns a dict declaring all lock levels shared.
555

556
  """
557
  return dict.fromkeys(locking.LEVELS, 1)
558

    
559

    
560
def _CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes,
561
                              cur_group_uuid):
562
  """Checks if node groups for locked instances are still correct.
563

564
  @type cfg: L{config.ConfigWriter}
565
  @param cfg: Cluster configuration
566
  @type instances: dict; string as key, L{objects.Instance} as value
567
  @param instances: Dictionary, instance name as key, instance object as value
568
  @type owned_groups: iterable of string
569
  @param owned_groups: List of owned groups
570
  @type owned_nodes: iterable of string
571
  @param owned_nodes: List of owned nodes
572
  @type cur_group_uuid: string or None
573
  @param cur_group_uuid: Optional group UUID to check against instance's groups
574

575
  """
576
  for (name, inst) in instances.items():
577
    assert owned_nodes.issuperset(inst.all_nodes), \
578
      "Instance %s's nodes changed while we kept the lock" % name
579

    
580
    inst_groups = _CheckInstanceNodeGroups(cfg, name, owned_groups)
581

    
582
    assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
583
      "Instance %s has no node in group %s" % (name, cur_group_uuid)
584

    
585

    
586
def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
587
  """Checks if the owned node groups are still correct for an instance.
588

589
  @type cfg: L{config.ConfigWriter}
590
  @param cfg: The cluster configuration
591
  @type instance_name: string
592
  @param instance_name: Instance name
593
  @type owned_groups: set or frozenset
594
  @param owned_groups: List of currently owned node groups
595

596
  """
597
  inst_groups = cfg.GetInstanceNodeGroups(instance_name)
598

    
599
  if not owned_groups.issuperset(inst_groups):
600
    raise errors.OpPrereqError("Instance %s's node groups changed since"
601
                               " locks were acquired, current groups are"
602
                               " are '%s', owning groups '%s'; retry the"
603
                               " operation" %
604
                               (instance_name,
605
                                utils.CommaJoin(inst_groups),
606
                                utils.CommaJoin(owned_groups)),
607
                               errors.ECODE_STATE)
608

    
609
  return inst_groups
610

    
611

    
612
def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
613
  """Checks if the instances in a node group are still correct.
614

615
  @type cfg: L{config.ConfigWriter}
616
  @param cfg: The cluster configuration
617
  @type group_uuid: string
618
  @param group_uuid: Node group UUID
619
  @type owned_instances: set or frozenset
620
  @param owned_instances: List of currently owned instances
621

622
  """
623
  wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
624
  if owned_instances != wanted_instances:
625
    raise errors.OpPrereqError("Instances in node group '%s' changed since"
626
                               " locks were acquired, wanted '%s', have '%s';"
627
                               " retry the operation" %
628
                               (group_uuid,
629
                                utils.CommaJoin(wanted_instances),
630
                                utils.CommaJoin(owned_instances)),
631
                               errors.ECODE_STATE)
632

    
633
  return wanted_instances
634

    
635

    
636
def _SupportsOob(cfg, node):
637
  """Tells if node supports OOB.
638

639
  @type cfg: L{config.ConfigWriter}
640
  @param cfg: The cluster configuration
641
  @type node: L{objects.Node}
642
  @param node: The node
643
  @return: The OOB script if supported or an empty string otherwise
644

645
  """
646
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
647

    
648

    
649
def _GetWantedNodes(lu, nodes):
650
  """Returns list of checked and expanded node names.
651

652
  @type lu: L{LogicalUnit}
653
  @param lu: the logical unit on whose behalf we execute
654
  @type nodes: list
655
  @param nodes: list of node names or None for all nodes
656
  @rtype: list
657
  @return: the list of nodes, sorted
658
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
659

660
  """
661
  if nodes:
662
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
663

    
664
  return utils.NiceSort(lu.cfg.GetNodeList())
665

    
666

    
667
def _GetWantedInstances(lu, instances):
668
  """Returns list of checked and expanded instance names.
669

670
  @type lu: L{LogicalUnit}
671
  @param lu: the logical unit on whose behalf we execute
672
  @type instances: list
673
  @param instances: list of instance names or None for all instances
674
  @rtype: list
675
  @return: the list of instances, sorted
676
  @raise errors.OpPrereqError: if the instances parameter is wrong type
677
  @raise errors.OpPrereqError: if any of the passed instances is not found
678

679
  """
680
  if instances:
681
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
682
  else:
683
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
684
  return wanted
685

    
686

    
687
def _GetUpdatedParams(old_params, update_dict,
688
                      use_default=True, use_none=False):
689
  """Return the new version of a parameter dictionary.
690

691
  @type old_params: dict
692
  @param old_params: old parameters
693
  @type update_dict: dict
694
  @param update_dict: dict containing new parameter values, or
695
      constants.VALUE_DEFAULT to reset the parameter to its default
696
      value
697
  @param use_default: boolean
698
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
699
      values as 'to be deleted' values
700
  @param use_none: boolean
701
  @type use_none: whether to recognise C{None} values as 'to be
702
      deleted' values
703
  @rtype: dict
704
  @return: the new parameter dictionary
705

706
  """
707
  params_copy = copy.deepcopy(old_params)
708
  for key, val in update_dict.iteritems():
709
    if ((use_default and val == constants.VALUE_DEFAULT) or
710
        (use_none and val is None)):
711
      try:
712
        del params_copy[key]
713
      except KeyError:
714
        pass
715
    else:
716
      params_copy[key] = val
717
  return params_copy
718

    
719

    
720
def _ReleaseLocks(lu, level, names=None, keep=None):
721
  """Releases locks owned by an LU.
722

723
  @type lu: L{LogicalUnit}
724
  @param level: Lock level
725
  @type names: list or None
726
  @param names: Names of locks to release
727
  @type keep: list or None
728
  @param keep: Names of locks to retain
729

730
  """
731
  assert not (keep is not None and names is not None), \
732
         "Only one of the 'names' and the 'keep' parameters can be given"
733

    
734
  if names is not None:
735
    should_release = names.__contains__
736
  elif keep:
737
    should_release = lambda name: name not in keep
738
  else:
739
    should_release = None
740

    
741
  if should_release:
742
    retain = []
743
    release = []
744

    
745
    # Determine which locks to release
746
    for name in lu.owned_locks(level):
747
      if should_release(name):
748
        release.append(name)
749
      else:
750
        retain.append(name)
751

    
752
    assert len(lu.owned_locks(level)) == (len(retain) + len(release))
753

    
754
    # Release just some locks
755
    lu.glm.release(level, names=release)
756

    
757
    assert frozenset(lu.owned_locks(level)) == frozenset(retain)
758
  else:
759
    # Release everything
760
    lu.glm.release(level)
761

    
762
    assert not lu.glm.is_owned(level), "No locks should be owned"
763

    
764

    
765
def _MapInstanceDisksToNodes(instances):
766
  """Creates a map from (node, volume) to instance name.
767

768
  @type instances: list of L{objects.Instance}
769
  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
770

771
  """
772
  return dict(((node, vol), inst.name)
773
              for inst in instances
774
              for (node, vols) in inst.MapLVsByNode().items()
775
              for vol in vols)
776

    
777

    
778
def _RunPostHook(lu, node_name):
779
  """Runs the post-hook for an opcode on a single node.
780

781
  """
782
  hm = lu.proc.BuildHooksManager(lu)
783
  try:
784
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
785
  except:
786
    # pylint: disable=W0702
787
    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
788

    
789

    
790
def _CheckOutputFields(static, dynamic, selected):
791
  """Checks whether all selected fields are valid.
792

793
  @type static: L{utils.FieldSet}
794
  @param static: static fields set
795
  @type dynamic: L{utils.FieldSet}
796
  @param dynamic: dynamic fields set
797

798
  """
799
  f = utils.FieldSet()
800
  f.Extend(static)
801
  f.Extend(dynamic)
802

    
803
  delta = f.NonMatching(selected)
804
  if delta:
805
    raise errors.OpPrereqError("Unknown output fields selected: %s"
806
                               % ",".join(delta), errors.ECODE_INVAL)
807

    
808

    
809
def _CheckGlobalHvParams(params):
810
  """Validates that given hypervisor params are not global ones.
811

812
  This will ensure that instances don't get customised versions of
813
  global params.
814

815
  """
816
  used_globals = constants.HVC_GLOBALS.intersection(params)
817
  if used_globals:
818
    msg = ("The following hypervisor parameters are global and cannot"
819
           " be customized at instance level, please modify them at"
820
           " cluster level: %s" % utils.CommaJoin(used_globals))
821
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
822

    
823

    
824
def _CheckNodeOnline(lu, node, msg=None):
825
  """Ensure that a given node is online.
826

827
  @param lu: the LU on behalf of which we make the check
828
  @param node: the node to check
829
  @param msg: if passed, should be a message to replace the default one
830
  @raise errors.OpPrereqError: if the node is offline
831

832
  """
833
  if msg is None:
834
    msg = "Can't use offline node"
835
  if lu.cfg.GetNodeInfo(node).offline:
836
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
837

    
838

    
839
def _CheckNodeNotDrained(lu, node):
840
  """Ensure that a given node is not drained.
841

842
  @param lu: the LU on behalf of which we make the check
843
  @param node: the node to check
844
  @raise errors.OpPrereqError: if the node is drained
845

846
  """
847
  if lu.cfg.GetNodeInfo(node).drained:
848
    raise errors.OpPrereqError("Can't use drained node %s" % node,
849
                               errors.ECODE_STATE)
850

    
851

    
852
def _CheckNodeVmCapable(lu, node):
853
  """Ensure that a given node is vm capable.
854

855
  @param lu: the LU on behalf of which we make the check
856
  @param node: the node to check
857
  @raise errors.OpPrereqError: if the node is not vm capable
858

859
  """
860
  if not lu.cfg.GetNodeInfo(node).vm_capable:
861
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
862
                               errors.ECODE_STATE)
863

    
864

    
865
def _CheckNodeHasOS(lu, node, os_name, force_variant):
866
  """Ensure that a node supports a given OS.
867

868
  @param lu: the LU on behalf of which we make the check
869
  @param node: the node to check
870
  @param os_name: the OS to query about
871
  @param force_variant: whether to ignore variant errors
872
  @raise errors.OpPrereqError: if the node is not supporting the OS
873

874
  """
875
  result = lu.rpc.call_os_get(node, os_name)
876
  result.Raise("OS '%s' not in supported OS list for node %s" %
877
               (os_name, node),
878
               prereq=True, ecode=errors.ECODE_INVAL)
879
  if not force_variant:
880
    _CheckOSVariant(result.payload, os_name)
881

    
882

    
883
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
884
  """Ensure that a node has the given secondary ip.
885

886
  @type lu: L{LogicalUnit}
887
  @param lu: the LU on behalf of which we make the check
888
  @type node: string
889
  @param node: the node to check
890
  @type secondary_ip: string
891
  @param secondary_ip: the ip to check
892
  @type prereq: boolean
893
  @param prereq: whether to throw a prerequisite or an execute error
894
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
895
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
896

897
  """
898
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
899
  result.Raise("Failure checking secondary ip on node %s" % node,
900
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
901
  if not result.payload:
902
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
903
           " please fix and re-run this command" % secondary_ip)
904
    if prereq:
905
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
906
    else:
907
      raise errors.OpExecError(msg)
908

    
909

    
910
def _GetClusterDomainSecret():
911
  """Reads the cluster domain secret.
912

913
  """
914
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
915
                               strict=True)
916

    
917

    
918
def _CheckInstanceDown(lu, instance, reason):
919
  """Ensure that an instance is not running."""
920
  if instance.admin_up:
921
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
922
                               (instance.name, reason), errors.ECODE_STATE)
923

    
924
  pnode = instance.primary_node
925
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
926
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
927
              prereq=True, ecode=errors.ECODE_ENVIRON)
928

    
929
  if instance.name in ins_l.payload:
930
    raise errors.OpPrereqError("Instance %s is running, %s" %
931
                               (instance.name, reason), errors.ECODE_STATE)
932

    
933

    
934
def _ExpandItemName(fn, name, kind):
935
  """Expand an item name.
936

937
  @param fn: the function to use for expansion
938
  @param name: requested item name
939
  @param kind: text description ('Node' or 'Instance')
940
  @return: the resolved (full) name
941
  @raise errors.OpPrereqError: if the item is not found
942

943
  """
944
  full_name = fn(name)
945
  if full_name is None:
946
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
947
                               errors.ECODE_NOENT)
948
  return full_name
949

    
950

    
951
def _ExpandNodeName(cfg, name):
952
  """Wrapper over L{_ExpandItemName} for nodes."""
953
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
954

    
955

    
956
def _ExpandInstanceName(cfg, name):
957
  """Wrapper over L{_ExpandItemName} for instance."""
958
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
959

    
960

    
961
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
962
                          memory, vcpus, nics, disk_template, disks,
963
                          bep, hvp, hypervisor_name, tags):
964
  """Builds instance related env variables for hooks
965

966
  This builds the hook environment from individual variables.
967

968
  @type name: string
969
  @param name: the name of the instance
970
  @type primary_node: string
971
  @param primary_node: the name of the instance's primary node
972
  @type secondary_nodes: list
973
  @param secondary_nodes: list of secondary nodes as strings
974
  @type os_type: string
975
  @param os_type: the name of the instance's OS
976
  @type status: boolean
977
  @param status: the should_run status of the instance
978
  @type memory: string
979
  @param memory: the memory size of the instance
980
  @type vcpus: string
981
  @param vcpus: the count of VCPUs the instance has
982
  @type nics: list
983
  @param nics: list of tuples (ip, mac, mode, link) representing
984
      the NICs the instance has
985
  @type disk_template: string
986
  @param disk_template: the disk template of the instance
987
  @type disks: list
988
  @param disks: the list of (size, mode) pairs
989
  @type bep: dict
990
  @param bep: the backend parameters for the instance
991
  @type hvp: dict
992
  @param hvp: the hypervisor parameters for the instance
993
  @type hypervisor_name: string
994
  @param hypervisor_name: the hypervisor for the instance
995
  @type tags: list
996
  @param tags: list of instance tags as strings
997
  @rtype: dict
998
  @return: the hook environment for this instance
999

1000
  """
1001
  if status:
1002
    str_status = "up"
1003
  else:
1004
    str_status = "down"
1005
  env = {
1006
    "OP_TARGET": name,
1007
    "INSTANCE_NAME": name,
1008
    "INSTANCE_PRIMARY": primary_node,
1009
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
1010
    "INSTANCE_OS_TYPE": os_type,
1011
    "INSTANCE_STATUS": str_status,
1012
    "INSTANCE_MEMORY": memory,
1013
    "INSTANCE_VCPUS": vcpus,
1014
    "INSTANCE_DISK_TEMPLATE": disk_template,
1015
    "INSTANCE_HYPERVISOR": hypervisor_name,
1016
  }
1017

    
1018
  if nics:
1019
    nic_count = len(nics)
1020
    for idx, (ip, mac, mode, link) in enumerate(nics):
1021
      if ip is None:
1022
        ip = ""
1023
      env["INSTANCE_NIC%d_IP" % idx] = ip
1024
      env["INSTANCE_NIC%d_MAC" % idx] = mac
1025
      env["INSTANCE_NIC%d_MODE" % idx] = mode
1026
      env["INSTANCE_NIC%d_LINK" % idx] = link
1027
      if mode == constants.NIC_MODE_BRIDGED:
1028
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
1029
  else:
1030
    nic_count = 0
1031

    
1032
  env["INSTANCE_NIC_COUNT"] = nic_count
1033

    
1034
  if disks:
1035
    disk_count = len(disks)
1036
    for idx, (size, mode) in enumerate(disks):
1037
      env["INSTANCE_DISK%d_SIZE" % idx] = size
1038
      env["INSTANCE_DISK%d_MODE" % idx] = mode
1039
  else:
1040
    disk_count = 0
1041

    
1042
  env["INSTANCE_DISK_COUNT"] = disk_count
1043

    
1044
  if not tags:
1045
    tags = []
1046

    
1047
  env["INSTANCE_TAGS"] = " ".join(tags)
1048

    
1049
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
1050
    for key, value in source.items():
1051
      env["INSTANCE_%s_%s" % (kind, key)] = value
1052

    
1053
  return env
1054

    
1055

    
1056
def _NICListToTuple(lu, nics):
1057
  """Build a list of nic information tuples.
1058

1059
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
1060
  value in LUInstanceQueryData.
1061

1062
  @type lu:  L{LogicalUnit}
1063
  @param lu: the logical unit on whose behalf we execute
1064
  @type nics: list of L{objects.NIC}
1065
  @param nics: list of nics to convert to hooks tuples
1066

1067
  """
1068
  hooks_nics = []
1069
  cluster = lu.cfg.GetClusterInfo()
1070
  for nic in nics:
1071
    ip = nic.ip
1072
    mac = nic.mac
1073
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
1074
    mode = filled_params[constants.NIC_MODE]
1075
    link = filled_params[constants.NIC_LINK]
1076
    hooks_nics.append((ip, mac, mode, link))
1077
  return hooks_nics
1078

    
1079

    
1080
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
1081
  """Builds instance related env variables for hooks from an object.
1082

1083
  @type lu: L{LogicalUnit}
1084
  @param lu: the logical unit on whose behalf we execute
1085
  @type instance: L{objects.Instance}
1086
  @param instance: the instance for which we should build the
1087
      environment
1088
  @type override: dict
1089
  @param override: dictionary with key/values that will override
1090
      our values
1091
  @rtype: dict
1092
  @return: the hook environment dictionary
1093

1094
  """
1095
  cluster = lu.cfg.GetClusterInfo()
1096
  bep = cluster.FillBE(instance)
1097
  hvp = cluster.FillHV(instance)
1098
  args = {
1099
    "name": instance.name,
1100
    "primary_node": instance.primary_node,
1101
    "secondary_nodes": instance.secondary_nodes,
1102
    "os_type": instance.os,
1103
    "status": instance.admin_up,
1104
    "memory": bep[constants.BE_MEMORY],
1105
    "vcpus": bep[constants.BE_VCPUS],
1106
    "nics": _NICListToTuple(lu, instance.nics),
1107
    "disk_template": instance.disk_template,
1108
    "disks": [(disk.size, disk.mode) for disk in instance.disks],
1109
    "bep": bep,
1110
    "hvp": hvp,
1111
    "hypervisor_name": instance.hypervisor,
1112
    "tags": instance.tags,
1113
  }
1114
  if override:
1115
    args.update(override)
1116
  return _BuildInstanceHookEnv(**args) # pylint: disable=W0142
1117

    
1118

    
1119
def _AdjustCandidatePool(lu, exceptions):
1120
  """Adjust the candidate pool after node operations.
1121

1122
  """
1123
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1124
  if mod_list:
1125
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1126
               utils.CommaJoin(node.name for node in mod_list))
1127
    for name in mod_list:
1128
      lu.context.ReaddNode(name)
1129
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1130
  if mc_now > mc_max:
1131
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1132
               (mc_now, mc_max))
1133

    
1134

    
1135
def _DecideSelfPromotion(lu, exceptions=None):
1136
  """Decide whether I should promote myself as a master candidate.
1137

1138
  """
1139
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1140
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1141
  # the new node will increase mc_max with one, so:
1142
  mc_should = min(mc_should + 1, cp_size)
1143
  return mc_now < mc_should
1144

    
1145

    
1146
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1147
  """Check that the brigdes needed by a list of nics exist.
1148

1149
  """
1150
  cluster = lu.cfg.GetClusterInfo()
1151
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1152
  brlist = [params[constants.NIC_LINK] for params in paramslist
1153
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1154
  if brlist:
1155
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1156
    result.Raise("Error checking bridges on destination node '%s'" %
1157
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1158

    
1159

    
1160
def _CheckInstanceBridgesExist(lu, instance, node=None):
1161
  """Check that the brigdes needed by an instance exist.
1162

1163
  """
1164
  if node is None:
1165
    node = instance.primary_node
1166
  _CheckNicsBridgesExist(lu, instance.nics, node)
1167

    
1168

    
1169
def _CheckOSVariant(os_obj, name):
1170
  """Check whether an OS name conforms to the os variants specification.
1171

1172
  @type os_obj: L{objects.OS}
1173
  @param os_obj: OS object to check
1174
  @type name: string
1175
  @param name: OS name passed by the user, to check for validity
1176

1177
  """
1178
  variant = objects.OS.GetVariant(name)
1179
  if not os_obj.supported_variants:
1180
    if variant:
1181
      raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
1182
                                 " passed)" % (os_obj.name, variant),
1183
                                 errors.ECODE_INVAL)
1184
    return
1185
  if not variant:
1186
    raise errors.OpPrereqError("OS name must include a variant",
1187
                               errors.ECODE_INVAL)
1188

    
1189
  if variant not in os_obj.supported_variants:
1190
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1191

    
1192

    
1193
def _GetNodeInstancesInner(cfg, fn):
1194
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1195

    
1196

    
1197
def _GetNodeInstances(cfg, node_name):
1198
  """Returns a list of all primary and secondary instances on a node.
1199

1200
  """
1201

    
1202
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1203

    
1204

    
1205
def _GetNodePrimaryInstances(cfg, node_name):
1206
  """Returns primary instances on a node.
1207

1208
  """
1209
  return _GetNodeInstancesInner(cfg,
1210
                                lambda inst: node_name == inst.primary_node)
1211

    
1212

    
1213
def _GetNodeSecondaryInstances(cfg, node_name):
1214
  """Returns secondary instances on a node.
1215

1216
  """
1217
  return _GetNodeInstancesInner(cfg,
1218
                                lambda inst: node_name in inst.secondary_nodes)
1219

    
1220

    
1221
def _GetStorageTypeArgs(cfg, storage_type):
1222
  """Returns the arguments for a storage type.
1223

1224
  """
1225
  # Special case for file storage
1226
  if storage_type == constants.ST_FILE:
1227
    # storage.FileStorage wants a list of storage directories
1228
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1229

    
1230
  return []
1231

    
1232

    
1233
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
1234
  faulty = []
1235

    
1236
  for dev in instance.disks:
1237
    cfg.SetDiskID(dev, node_name)
1238

    
1239
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
1240
  result.Raise("Failed to get disk status from node %s" % node_name,
1241
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1242

    
1243
  for idx, bdev_status in enumerate(result.payload):
1244
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1245
      faulty.append(idx)
1246

    
1247
  return faulty
1248

    
1249

    
1250
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1251
  """Check the sanity of iallocator and node arguments and use the
1252
  cluster-wide iallocator if appropriate.
1253

1254
  Check that at most one of (iallocator, node) is specified. If none is
1255
  specified, then the LU's opcode's iallocator slot is filled with the
1256
  cluster-wide default iallocator.
1257

1258
  @type iallocator_slot: string
1259
  @param iallocator_slot: the name of the opcode iallocator slot
1260
  @type node_slot: string
1261
  @param node_slot: the name of the opcode target node slot
1262

1263
  """
1264
  node = getattr(lu.op, node_slot, None)
1265
  iallocator = getattr(lu.op, iallocator_slot, None)
1266

    
1267
  if node is not None and iallocator is not None:
1268
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
1269
                               errors.ECODE_INVAL)
1270
  elif node is None and iallocator is None:
1271
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1272
    if default_iallocator:
1273
      setattr(lu.op, iallocator_slot, default_iallocator)
1274
    else:
1275
      raise errors.OpPrereqError("No iallocator or node given and no"
1276
                                 " cluster-wide default iallocator found;"
1277
                                 " please specify either an iallocator or a"
1278
                                 " node, or set a cluster-wide default"
1279
                                 " iallocator")
1280

    
1281

    
1282
def _GetDefaultIAllocator(cfg, iallocator):
1283
  """Decides on which iallocator to use.
1284

1285
  @type cfg: L{config.ConfigWriter}
1286
  @param cfg: Cluster configuration object
1287
  @type iallocator: string or None
1288
  @param iallocator: Iallocator specified in opcode
1289
  @rtype: string
1290
  @return: Iallocator name
1291

1292
  """
1293
  if not iallocator:
1294
    # Use default iallocator
1295
    iallocator = cfg.GetDefaultIAllocator()
1296

    
1297
  if not iallocator:
1298
    raise errors.OpPrereqError("No iallocator was specified, neither in the"
1299
                               " opcode nor as a cluster-wide default",
1300
                               errors.ECODE_INVAL)
1301

    
1302
  return iallocator
1303

    
1304

    
1305
class LUClusterPostInit(LogicalUnit):
1306
  """Logical unit for running hooks after cluster initialization.
1307

1308
  """
1309
  HPATH = "cluster-init"
1310
  HTYPE = constants.HTYPE_CLUSTER
1311

    
1312
  def BuildHooksEnv(self):
1313
    """Build hooks env.
1314

1315
    """
1316
    return {
1317
      "OP_TARGET": self.cfg.GetClusterName(),
1318
      }
1319

    
1320
  def BuildHooksNodes(self):
1321
    """Build hooks nodes.
1322

1323
    """
1324
    return ([], [self.cfg.GetMasterNode()])
1325

    
1326
  def Exec(self, feedback_fn):
1327
    """Nothing to do.
1328

1329
    """
1330
    return True
1331

    
1332

    
1333
class LUClusterDestroy(LogicalUnit):
1334
  """Logical unit for destroying the cluster.
1335

1336
  """
1337
  HPATH = "cluster-destroy"
1338
  HTYPE = constants.HTYPE_CLUSTER
1339

    
1340
  def BuildHooksEnv(self):
1341
    """Build hooks env.
1342

1343
    """
1344
    return {
1345
      "OP_TARGET": self.cfg.GetClusterName(),
1346
      }
1347

    
1348
  def BuildHooksNodes(self):
1349
    """Build hooks nodes.
1350

1351
    """
1352
    return ([], [])
1353

    
1354
  def CheckPrereq(self):
1355
    """Check prerequisites.
1356

1357
    This checks whether the cluster is empty.
1358

1359
    Any errors are signaled by raising errors.OpPrereqError.
1360

1361
    """
1362
    master = self.cfg.GetMasterNode()
1363

    
1364
    nodelist = self.cfg.GetNodeList()
1365
    if len(nodelist) != 1 or nodelist[0] != master:
1366
      raise errors.OpPrereqError("There are still %d node(s) in"
1367
                                 " this cluster." % (len(nodelist) - 1),
1368
                                 errors.ECODE_INVAL)
1369
    instancelist = self.cfg.GetInstanceList()
1370
    if instancelist:
1371
      raise errors.OpPrereqError("There are still %d instance(s) in"
1372
                                 " this cluster." % len(instancelist),
1373
                                 errors.ECODE_INVAL)
1374

    
1375
  def Exec(self, feedback_fn):
1376
    """Destroys the cluster.
1377

1378
    """
1379
    master = self.cfg.GetMasterNode()
1380

    
1381
    # Run post hooks on master node before it's removed
1382
    _RunPostHook(self, master)
1383

    
1384
    result = self.rpc.call_node_deactivate_master_ip(master)
1385
    result.Raise("Could not disable the master role")
1386

    
1387
    return master
1388

    
1389

    
1390
def _VerifyCertificate(filename):
1391
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1392

1393
  @type filename: string
1394
  @param filename: Path to PEM file
1395

1396
  """
1397
  try:
1398
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1399
                                           utils.ReadFile(filename))
1400
  except Exception, err: # pylint: disable=W0703
1401
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1402
            "Failed to load X509 certificate %s: %s" % (filename, err))
1403

    
1404
  (errcode, msg) = \
1405
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1406
                                constants.SSL_CERT_EXPIRATION_ERROR)
1407

    
1408
  if msg:
1409
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1410
  else:
1411
    fnamemsg = None
1412

    
1413
  if errcode is None:
1414
    return (None, fnamemsg)
1415
  elif errcode == utils.CERT_WARNING:
1416
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1417
  elif errcode == utils.CERT_ERROR:
1418
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1419

    
1420
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1421

    
1422

    
1423
def _GetAllHypervisorParameters(cluster, instances):
1424
  """Compute the set of all hypervisor parameters.
1425

1426
  @type cluster: L{objects.Cluster}
1427
  @param cluster: the cluster object
1428
  @param instances: list of L{objects.Instance}
1429
  @param instances: additional instances from which to obtain parameters
1430
  @rtype: list of (origin, hypervisor, parameters)
1431
  @return: a list with all parameters found, indicating the hypervisor they
1432
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1433

1434
  """
1435
  hvp_data = []
1436

    
1437
  for hv_name in cluster.enabled_hypervisors:
1438
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1439

    
1440
  for os_name, os_hvp in cluster.os_hvp.items():
1441
    for hv_name, hv_params in os_hvp.items():
1442
      if hv_params:
1443
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1444
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1445

    
1446
  # TODO: collapse identical parameter values in a single one
1447
  for instance in instances:
1448
    if instance.hvparams:
1449
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1450
                       cluster.FillHV(instance)))
1451

    
1452
  return hvp_data
1453

    
1454

    
1455
class _VerifyErrors(object):
1456
  """Mix-in for cluster/group verify LUs.
1457

1458
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1459
  self.op and self._feedback_fn to be available.)
1460

1461
  """
1462
  TCLUSTER = "cluster"
1463
  TNODE = "node"
1464
  TINSTANCE = "instance"
1465

    
1466
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1467
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1468
  ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK")
1469
  ECLUSTERDANGLINGNODES = (TNODE, "ECLUSTERDANGLINGNODES")
1470
  ECLUSTERDANGLINGINST = (TNODE, "ECLUSTERDANGLINGINST")
1471
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1472
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1473
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1474
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1475
  EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
1476
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1477
  EINSTANCESPLITGROUPS = (TINSTANCE, "EINSTANCESPLITGROUPS")
1478
  ENODEDRBD = (TNODE, "ENODEDRBD")
1479
  ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
1480
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1481
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1482
  ENODEHV = (TNODE, "ENODEHV")
1483
  ENODELVM = (TNODE, "ENODELVM")
1484
  ENODEN1 = (TNODE, "ENODEN1")
1485
  ENODENET = (TNODE, "ENODENET")
1486
  ENODEOS = (TNODE, "ENODEOS")
1487
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1488
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1489
  ENODERPC = (TNODE, "ENODERPC")
1490
  ENODESSH = (TNODE, "ENODESSH")
1491
  ENODEVERSION = (TNODE, "ENODEVERSION")
1492
  ENODESETUP = (TNODE, "ENODESETUP")
1493
  ENODETIME = (TNODE, "ENODETIME")
1494
  ENODEOOBPATH = (TNODE, "ENODEOOBPATH")
1495

    
1496
  ETYPE_FIELD = "code"
1497
  ETYPE_ERROR = "ERROR"
1498
  ETYPE_WARNING = "WARNING"
1499

    
1500
  def _Error(self, ecode, item, msg, *args, **kwargs):
1501
    """Format an error message.
1502

1503
    Based on the opcode's error_codes parameter, either format a
1504
    parseable error code, or a simpler error string.
1505

1506
    This must be called only from Exec and functions called from Exec.
1507

1508
    """
1509
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1510
    itype, etxt = ecode
1511
    # first complete the msg
1512
    if args:
1513
      msg = msg % args
1514
    # then format the whole message
1515
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1516
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1517
    else:
1518
      if item:
1519
        item = " " + item
1520
      else:
1521
        item = ""
1522
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1523
    # and finally report it via the feedback_fn
1524
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1525

    
1526
  def _ErrorIf(self, cond, *args, **kwargs):
1527
    """Log an error message if the passed condition is True.
1528

1529
    """
1530
    cond = (bool(cond)
1531
            or self.op.debug_simulate_errors) # pylint: disable=E1101
1532
    if cond:
1533
      self._Error(*args, **kwargs)
1534
    # do not mark the operation as failed for WARN cases only
1535
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1536
      self.bad = self.bad or cond
1537

    
1538

    
1539
class LUClusterVerify(NoHooksLU):
1540
  """Submits all jobs necessary to verify the cluster.
1541

1542
  """
1543
  REQ_BGL = False
1544

    
1545
  def ExpandNames(self):
1546
    self.needed_locks = {}
1547

    
1548
  def Exec(self, feedback_fn):
1549
    jobs = []
1550

    
1551
    if self.op.group_name:
1552
      groups = [self.op.group_name]
1553
      depends_fn = lambda: None
1554
    else:
1555
      groups = self.cfg.GetNodeGroupList()
1556

    
1557
      # Verify global configuration
1558
      jobs.append([opcodes.OpClusterVerifyConfig()])
1559

    
1560
      # Always depend on global verification
1561
      depends_fn = lambda: [(-len(jobs), [])]
1562

    
1563
    jobs.extend([opcodes.OpClusterVerifyGroup(group_name=group,
1564
                                              depends=depends_fn())]
1565
                for group in groups)
1566

    
1567
    # Fix up all parameters
1568
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1569
      op.debug_simulate_errors = self.op.debug_simulate_errors
1570
      op.verbose = self.op.verbose
1571
      op.error_codes = self.op.error_codes
1572
      try:
1573
        op.skip_checks = self.op.skip_checks
1574
      except AttributeError:
1575
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1576

    
1577
    return ResultWithJobs(jobs)
1578

    
1579

    
1580
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1581
  """Verifies the cluster config.
1582

1583
  """
1584
  REQ_BGL = False
1585

    
1586
  def _VerifyHVP(self, hvp_data):
1587
    """Verifies locally the syntax of the hypervisor parameters.
1588

1589
    """
1590
    for item, hv_name, hv_params in hvp_data:
1591
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1592
             (item, hv_name))
1593
      try:
1594
        hv_class = hypervisor.GetHypervisor(hv_name)
1595
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1596
        hv_class.CheckParameterSyntax(hv_params)
1597
      except errors.GenericError, err:
1598
        self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
1599

    
1600
  def ExpandNames(self):
1601
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1602
    self.share_locks = _ShareAll()
1603

    
1604
  def CheckPrereq(self):
1605
    """Check prerequisites.
1606

1607
    """
1608
    # Retrieve all information
1609
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1610
    self.all_node_info = self.cfg.GetAllNodesInfo()
1611
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1612

    
1613
  def Exec(self, feedback_fn):
1614
    """Verify integrity of cluster, performing various test on nodes.
1615

1616
    """
1617
    self.bad = False
1618
    self._feedback_fn = feedback_fn
1619

    
1620
    feedback_fn("* Verifying cluster config")
1621

    
1622
    for msg in self.cfg.VerifyConfig():
1623
      self._ErrorIf(True, self.ECLUSTERCFG, None, msg)
1624

    
1625
    feedback_fn("* Verifying cluster certificate files")
1626

    
1627
    for cert_filename in constants.ALL_CERT_FILES:
1628
      (errcode, msg) = _VerifyCertificate(cert_filename)
1629
      self._ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
1630

    
1631
    feedback_fn("* Verifying hypervisor parameters")
1632

    
1633
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1634
                                                self.all_inst_info.values()))
1635

    
1636
    feedback_fn("* Verifying all nodes belong to an existing group")
1637

    
1638
    # We do this verification here because, should this bogus circumstance
1639
    # occur, it would never be caught by VerifyGroup, which only acts on
1640
    # nodes/instances reachable from existing node groups.
1641

    
1642
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1643
                         if node.group not in self.all_group_info)
1644

    
1645
    dangling_instances = {}
1646
    no_node_instances = []
1647

    
1648
    for inst in self.all_inst_info.values():
1649
      if inst.primary_node in dangling_nodes:
1650
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1651
      elif inst.primary_node not in self.all_node_info:
1652
        no_node_instances.append(inst.name)
1653

    
1654
    pretty_dangling = [
1655
        "%s (%s)" %
1656
        (node.name,
1657
         utils.CommaJoin(dangling_instances.get(node.name,
1658
                                                ["no instances"])))
1659
        for node in dangling_nodes]
1660

    
1661
    self._ErrorIf(bool(dangling_nodes), self.ECLUSTERDANGLINGNODES, None,
1662
                  "the following nodes (and their instances) belong to a non"
1663
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1664

    
1665
    self._ErrorIf(bool(no_node_instances), self.ECLUSTERDANGLINGINST, None,
1666
                  "the following instances have a non-existing primary-node:"
1667
                  " %s", utils.CommaJoin(no_node_instances))
1668

    
1669
    return not self.bad
1670

    
1671

    
1672
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1673
  """Verifies the status of a node group.
1674

1675
  """
1676
  HPATH = "cluster-verify"
1677
  HTYPE = constants.HTYPE_CLUSTER
1678
  REQ_BGL = False
1679

    
1680
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1681

    
1682
  class NodeImage(object):
1683
    """A class representing the logical and physical status of a node.
1684

1685
    @type name: string
1686
    @ivar name: the node name to which this object refers
1687
    @ivar volumes: a structure as returned from
1688
        L{ganeti.backend.GetVolumeList} (runtime)
1689
    @ivar instances: a list of running instances (runtime)
1690
    @ivar pinst: list of configured primary instances (config)
1691
    @ivar sinst: list of configured secondary instances (config)
1692
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1693
        instances for which this node is secondary (config)
1694
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1695
    @ivar dfree: free disk, as reported by the node (runtime)
1696
    @ivar offline: the offline status (config)
1697
    @type rpc_fail: boolean
1698
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1699
        not whether the individual keys were correct) (runtime)
1700
    @type lvm_fail: boolean
1701
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1702
    @type hyp_fail: boolean
1703
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1704
    @type ghost: boolean
1705
    @ivar ghost: whether this is a known node or not (config)
1706
    @type os_fail: boolean
1707
    @ivar os_fail: whether the RPC call didn't return valid OS data
1708
    @type oslist: list
1709
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1710
    @type vm_capable: boolean
1711
    @ivar vm_capable: whether the node can host instances
1712

1713
    """
1714
    def __init__(self, offline=False, name=None, vm_capable=True):
1715
      self.name = name
1716
      self.volumes = {}
1717
      self.instances = []
1718
      self.pinst = []
1719
      self.sinst = []
1720
      self.sbp = {}
1721
      self.mfree = 0
1722
      self.dfree = 0
1723
      self.offline = offline
1724
      self.vm_capable = vm_capable
1725
      self.rpc_fail = False
1726
      self.lvm_fail = False
1727
      self.hyp_fail = False
1728
      self.ghost = False
1729
      self.os_fail = False
1730
      self.oslist = {}
1731

    
1732
  def ExpandNames(self):
1733
    # This raises errors.OpPrereqError on its own:
1734
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1735

    
1736
    # Get instances in node group; this is unsafe and needs verification later
1737
    inst_names = \
1738
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1739

    
1740
    self.needed_locks = {
1741
      locking.LEVEL_INSTANCE: inst_names,
1742
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1743
      locking.LEVEL_NODE: [],
1744
      }
1745

    
1746
    self.share_locks = _ShareAll()
1747

    
1748
  def DeclareLocks(self, level):
1749
    if level == locking.LEVEL_NODE:
1750
      # Get members of node group; this is unsafe and needs verification later
1751
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1752

    
1753
      all_inst_info = self.cfg.GetAllInstancesInfo()
1754

    
1755
      # In Exec(), we warn about mirrored instances that have primary and
1756
      # secondary living in separate node groups. To fully verify that
1757
      # volumes for these instances are healthy, we will need to do an
1758
      # extra call to their secondaries. We ensure here those nodes will
1759
      # be locked.
1760
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1761
        # Important: access only the instances whose lock is owned
1762
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1763
          nodes.update(all_inst_info[inst].secondary_nodes)
1764

    
1765
      self.needed_locks[locking.LEVEL_NODE] = nodes
1766

    
1767
  def CheckPrereq(self):
1768
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1769
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1770

    
1771
    group_nodes = set(self.group_info.members)
1772
    group_instances = \
1773
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1774

    
1775
    unlocked_nodes = \
1776
        group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1777

    
1778
    unlocked_instances = \
1779
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1780

    
1781
    if unlocked_nodes:
1782
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1783
                                 utils.CommaJoin(unlocked_nodes),
1784
                                 errors.ECODE_STATE)
1785

    
1786
    if unlocked_instances:
1787
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1788
                                 utils.CommaJoin(unlocked_instances),
1789
                                 errors.ECODE_STATE)
1790

    
1791
    self.all_node_info = self.cfg.GetAllNodesInfo()
1792
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1793

    
1794
    self.my_node_names = utils.NiceSort(group_nodes)
1795
    self.my_inst_names = utils.NiceSort(group_instances)
1796

    
1797
    self.my_node_info = dict((name, self.all_node_info[name])
1798
                             for name in self.my_node_names)
1799

    
1800
    self.my_inst_info = dict((name, self.all_inst_info[name])
1801
                             for name in self.my_inst_names)
1802

    
1803
    # We detect here the nodes that will need the extra RPC calls for verifying
1804
    # split LV volumes; they should be locked.
1805
    extra_lv_nodes = set()
1806

    
1807
    for inst in self.my_inst_info.values():
1808
      if inst.disk_template in constants.DTS_INT_MIRROR:
1809
        for nname in inst.all_nodes:
1810
          if self.all_node_info[nname].group != self.group_uuid:
1811
            extra_lv_nodes.add(nname)
1812

    
1813
    unlocked_lv_nodes = \
1814
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1815

    
1816
    if unlocked_lv_nodes:
1817
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1818
                                 utils.CommaJoin(unlocked_lv_nodes),
1819
                                 errors.ECODE_STATE)
1820
    self.extra_lv_nodes = list(extra_lv_nodes)
1821

    
1822
  def _VerifyNode(self, ninfo, nresult):
1823
    """Perform some basic validation on data returned from a node.
1824

1825
      - check the result data structure is well formed and has all the
1826
        mandatory fields
1827
      - check ganeti version
1828

1829
    @type ninfo: L{objects.Node}
1830
    @param ninfo: the node to check
1831
    @param nresult: the results from the node
1832
    @rtype: boolean
1833
    @return: whether overall this call was successful (and we can expect
1834
         reasonable values in the respose)
1835

1836
    """
1837
    node = ninfo.name
1838
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1839

    
1840
    # main result, nresult should be a non-empty dict
1841
    test = not nresult or not isinstance(nresult, dict)
1842
    _ErrorIf(test, self.ENODERPC, node,
1843
                  "unable to verify node: no data returned")
1844
    if test:
1845
      return False
1846

    
1847
    # compares ganeti version
1848
    local_version = constants.PROTOCOL_VERSION
1849
    remote_version = nresult.get("version", None)
1850
    test = not (remote_version and
1851
                isinstance(remote_version, (list, tuple)) and
1852
                len(remote_version) == 2)
1853
    _ErrorIf(test, self.ENODERPC, node,
1854
             "connection to node returned invalid data")
1855
    if test:
1856
      return False
1857

    
1858
    test = local_version != remote_version[0]
1859
    _ErrorIf(test, self.ENODEVERSION, node,
1860
             "incompatible protocol versions: master %s,"
1861
             " node %s", local_version, remote_version[0])
1862
    if test:
1863
      return False
1864

    
1865
    # node seems compatible, we can actually try to look into its results
1866

    
1867
    # full package version
1868
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1869
                  self.ENODEVERSION, node,
1870
                  "software version mismatch: master %s, node %s",
1871
                  constants.RELEASE_VERSION, remote_version[1],
1872
                  code=self.ETYPE_WARNING)
1873

    
1874
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1875
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1876
      for hv_name, hv_result in hyp_result.iteritems():
1877
        test = hv_result is not None
1878
        _ErrorIf(test, self.ENODEHV, node,
1879
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1880

    
1881
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1882
    if ninfo.vm_capable and isinstance(hvp_result, list):
1883
      for item, hv_name, hv_result in hvp_result:
1884
        _ErrorIf(True, self.ENODEHV, node,
1885
                 "hypervisor %s parameter verify failure (source %s): %s",
1886
                 hv_name, item, hv_result)
1887

    
1888
    test = nresult.get(constants.NV_NODESETUP,
1889
                       ["Missing NODESETUP results"])
1890
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1891
             "; ".join(test))
1892

    
1893
    return True
1894

    
1895
  def _VerifyNodeTime(self, ninfo, nresult,
1896
                      nvinfo_starttime, nvinfo_endtime):
1897
    """Check the node time.
1898

1899
    @type ninfo: L{objects.Node}
1900
    @param ninfo: the node to check
1901
    @param nresult: the remote results for the node
1902
    @param nvinfo_starttime: the start time of the RPC call
1903
    @param nvinfo_endtime: the end time of the RPC call
1904

1905
    """
1906
    node = ninfo.name
1907
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1908

    
1909
    ntime = nresult.get(constants.NV_TIME, None)
1910
    try:
1911
      ntime_merged = utils.MergeTime(ntime)
1912
    except (ValueError, TypeError):
1913
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1914
      return
1915

    
1916
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1917
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1918
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1919
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1920
    else:
1921
      ntime_diff = None
1922

    
1923
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1924
             "Node time diverges by at least %s from master node time",
1925
             ntime_diff)
1926

    
1927
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1928
    """Check the node LVM results.
1929

1930
    @type ninfo: L{objects.Node}
1931
    @param ninfo: the node to check
1932
    @param nresult: the remote results for the node
1933
    @param vg_name: the configured VG name
1934

1935
    """
1936
    if vg_name is None:
1937
      return
1938

    
1939
    node = ninfo.name
1940
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1941

    
1942
    # checks vg existence and size > 20G
1943
    vglist = nresult.get(constants.NV_VGLIST, None)
1944
    test = not vglist
1945
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1946
    if not test:
1947
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1948
                                            constants.MIN_VG_SIZE)
1949
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1950

    
1951
    # check pv names
1952
    pvlist = nresult.get(constants.NV_PVLIST, None)
1953
    test = pvlist is None
1954
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1955
    if not test:
1956
      # check that ':' is not present in PV names, since it's a
1957
      # special character for lvcreate (denotes the range of PEs to
1958
      # use on the PV)
1959
      for _, pvname, owner_vg in pvlist:
1960
        test = ":" in pvname
1961
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1962
                 " '%s' of VG '%s'", pvname, owner_vg)
1963

    
1964
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1965
    """Check the node bridges.
1966

1967
    @type ninfo: L{objects.Node}
1968
    @param ninfo: the node to check
1969
    @param nresult: the remote results for the node
1970
    @param bridges: the expected list of bridges
1971

1972
    """
1973
    if not bridges:
1974
      return
1975

    
1976
    node = ninfo.name
1977
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1978

    
1979
    missing = nresult.get(constants.NV_BRIDGES, None)
1980
    test = not isinstance(missing, list)
1981
    _ErrorIf(test, self.ENODENET, node,
1982
             "did not return valid bridge information")
1983
    if not test:
1984
      _ErrorIf(bool(missing), self.ENODENET, node, "missing bridges: %s" %
1985
               utils.CommaJoin(sorted(missing)))
1986

    
1987
  def _VerifyNodeNetwork(self, ninfo, nresult):
1988
    """Check the node network connectivity results.
1989

1990
    @type ninfo: L{objects.Node}
1991
    @param ninfo: the node to check
1992
    @param nresult: the remote results for the node
1993

1994
    """
1995
    node = ninfo.name
1996
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1997

    
1998
    test = constants.NV_NODELIST not in nresult
1999
    _ErrorIf(test, self.ENODESSH, node,
2000
             "node hasn't returned node ssh connectivity data")
2001
    if not test:
2002
      if nresult[constants.NV_NODELIST]:
2003
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2004
          _ErrorIf(True, self.ENODESSH, node,
2005
                   "ssh communication with node '%s': %s", a_node, a_msg)
2006

    
2007
    test = constants.NV_NODENETTEST not in nresult
2008
    _ErrorIf(test, self.ENODENET, node,
2009
             "node hasn't returned node tcp connectivity data")
2010
    if not test:
2011
      if nresult[constants.NV_NODENETTEST]:
2012
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2013
        for anode in nlist:
2014
          _ErrorIf(True, self.ENODENET, node,
2015
                   "tcp communication with node '%s': %s",
2016
                   anode, nresult[constants.NV_NODENETTEST][anode])
2017

    
2018
    test = constants.NV_MASTERIP not in nresult
2019
    _ErrorIf(test, self.ENODENET, node,
2020
             "node hasn't returned node master IP reachability data")
2021
    if not test:
2022
      if not nresult[constants.NV_MASTERIP]:
2023
        if node == self.master_node:
2024
          msg = "the master node cannot reach the master IP (not configured?)"
2025
        else:
2026
          msg = "cannot reach the master IP"
2027
        _ErrorIf(True, self.ENODENET, node, msg)
2028

    
2029
  def _VerifyInstance(self, instance, instanceconfig, node_image,
2030
                      diskstatus):
2031
    """Verify an instance.
2032

2033
    This function checks to see if the required block devices are
2034
    available on the instance's node.
2035

2036
    """
2037
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2038
    node_current = instanceconfig.primary_node
2039

    
2040
    node_vol_should = {}
2041
    instanceconfig.MapLVsByNode(node_vol_should)
2042

    
2043
    for node in node_vol_should:
2044
      n_img = node_image[node]
2045
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2046
        # ignore missing volumes on offline or broken nodes
2047
        continue
2048
      for volume in node_vol_should[node]:
2049
        test = volume not in n_img.volumes
2050
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
2051
                 "volume %s missing on node %s", volume, node)
2052

    
2053
    if instanceconfig.admin_up:
2054
      pri_img = node_image[node_current]
2055
      test = instance not in pri_img.instances and not pri_img.offline
2056
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
2057
               "instance not running on its primary node %s",
2058
               node_current)
2059

    
2060
    diskdata = [(nname, success, status, idx)
2061
                for (nname, disks) in diskstatus.items()
2062
                for idx, (success, status) in enumerate(disks)]
2063

    
2064
    for nname, success, bdev_status, idx in diskdata:
2065
      # the 'ghost node' construction in Exec() ensures that we have a
2066
      # node here
2067
      snode = node_image[nname]
2068
      bad_snode = snode.ghost or snode.offline
2069
      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
2070
               self.EINSTANCEFAULTYDISK, instance,
2071
               "couldn't retrieve status for disk/%s on %s: %s",
2072
               idx, nname, bdev_status)
2073
      _ErrorIf((instanceconfig.admin_up and success and
2074
                bdev_status.ldisk_status == constants.LDS_FAULTY),
2075
               self.EINSTANCEFAULTYDISK, instance,
2076
               "disk/%s on %s is faulty", idx, nname)
2077

    
2078
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2079
    """Verify if there are any unknown volumes in the cluster.
2080

2081
    The .os, .swap and backup volumes are ignored. All other volumes are
2082
    reported as unknown.
2083

2084
    @type reserved: L{ganeti.utils.FieldSet}
2085
    @param reserved: a FieldSet of reserved volume names
2086

2087
    """
2088
    for node, n_img in node_image.items():
2089
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2090
          self.all_node_info[node].group != self.group_uuid):
2091
        # skip non-healthy nodes
2092
        continue
2093
      for volume in n_img.volumes:
2094
        test = ((node not in node_vol_should or
2095
                volume not in node_vol_should[node]) and
2096
                not reserved.Matches(volume))
2097
        self._ErrorIf(test, self.ENODEORPHANLV, node,
2098
                      "volume %s is unknown", volume)
2099

    
2100
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
2101
    """Verify N+1 Memory Resilience.
2102

2103
    Check that if one single node dies we can still start all the
2104
    instances it was primary for.
2105

2106
    """
2107
    cluster_info = self.cfg.GetClusterInfo()
2108
    for node, n_img in node_image.items():
2109
      # This code checks that every node which is now listed as
2110
      # secondary has enough memory to host all instances it is
2111
      # supposed to should a single other node in the cluster fail.
2112
      # FIXME: not ready for failover to an arbitrary node
2113
      # FIXME: does not support file-backed instances
2114
      # WARNING: we currently take into account down instances as well
2115
      # as up ones, considering that even if they're down someone
2116
      # might want to start them even in the event of a node failure.
2117
      if n_img.offline or self.all_node_info[node].group != self.group_uuid:
2118
        # we're skipping nodes marked offline and nodes in other groups from
2119
        # the N+1 warning, since most likely we don't have good memory
2120
        # infromation from them; we already list instances living on such
2121
        # nodes, and that's enough warning
2122
        continue
2123
      for prinode, instances in n_img.sbp.items():
2124
        needed_mem = 0
2125
        for instance in instances:
2126
          bep = cluster_info.FillBE(instance_cfg[instance])
2127
          if bep[constants.BE_AUTO_BALANCE]:
2128
            needed_mem += bep[constants.BE_MEMORY]
2129
        test = n_img.mfree < needed_mem
2130
        self._ErrorIf(test, self.ENODEN1, node,
2131
                      "not enough memory to accomodate instance failovers"
2132
                      " should node %s fail (%dMiB needed, %dMiB available)",
2133
                      prinode, needed_mem, n_img.mfree)
2134

    
2135
  @classmethod
2136
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
2137
                   (files_all, files_opt, files_mc, files_vm)):
2138
    """Verifies file checksums collected from all nodes.
2139

2140
    @param errorif: Callback for reporting errors
2141
    @param nodeinfo: List of L{objects.Node} objects
2142
    @param master_node: Name of master node
2143
    @param all_nvinfo: RPC results
2144

2145
    """
2146
    # Define functions determining which nodes to consider for a file
2147
    files2nodefn = [
2148
      (files_all, None),
2149
      (files_mc, lambda node: (node.master_candidate or
2150
                               node.name == master_node)),
2151
      (files_vm, lambda node: node.vm_capable),
2152
      ]
2153

    
2154
    # Build mapping from filename to list of nodes which should have the file
2155
    nodefiles = {}
2156
    for (files, fn) in files2nodefn:
2157
      if fn is None:
2158
        filenodes = nodeinfo
2159
      else:
2160
        filenodes = filter(fn, nodeinfo)
2161
      nodefiles.update((filename,
2162
                        frozenset(map(operator.attrgetter("name"), filenodes)))
2163
                       for filename in files)
2164

    
2165
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2166

    
2167
    fileinfo = dict((filename, {}) for filename in nodefiles)
2168
    ignore_nodes = set()
2169

    
2170
    for node in nodeinfo:
2171
      if node.offline:
2172
        ignore_nodes.add(node.name)
2173
        continue
2174

    
2175
      nresult = all_nvinfo[node.name]
2176

    
2177
      if nresult.fail_msg or not nresult.payload:
2178
        node_files = None
2179
      else:
2180
        node_files = nresult.payload.get(constants.NV_FILELIST, None)
2181

    
2182
      test = not (node_files and isinstance(node_files, dict))
2183
      errorif(test, cls.ENODEFILECHECK, node.name,
2184
              "Node did not return file checksum data")
2185
      if test:
2186
        ignore_nodes.add(node.name)
2187
        continue
2188

    
2189
      # Build per-checksum mapping from filename to nodes having it
2190
      for (filename, checksum) in node_files.items():
2191
        assert filename in nodefiles
2192
        fileinfo[filename].setdefault(checksum, set()).add(node.name)
2193

    
2194
    for (filename, checksums) in fileinfo.items():
2195
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2196

    
2197
      # Nodes having the file
2198
      with_file = frozenset(node_name
2199
                            for nodes in fileinfo[filename].values()
2200
                            for node_name in nodes) - ignore_nodes
2201

    
2202
      expected_nodes = nodefiles[filename] - ignore_nodes
2203

    
2204
      # Nodes missing file
2205
      missing_file = expected_nodes - with_file
2206

    
2207
      if filename in files_opt:
2208
        # All or no nodes
2209
        errorif(missing_file and missing_file != expected_nodes,
2210
                cls.ECLUSTERFILECHECK, None,
2211
                "File %s is optional, but it must exist on all or no"
2212
                " nodes (not found on %s)",
2213
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2214
      else:
2215
        # Non-optional files
2216
        errorif(missing_file, cls.ECLUSTERFILECHECK, None,
2217
                "File %s is missing from node(s) %s", filename,
2218
                utils.CommaJoin(utils.NiceSort(missing_file)))
2219

    
2220
        # Warn if a node has a file it shouldn't
2221
        unexpected = with_file - expected_nodes
2222
        errorif(unexpected,
2223
                cls.ECLUSTERFILECHECK, None,
2224
                "File %s should not exist on node(s) %s",
2225
                filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2226

    
2227
      # See if there are multiple versions of the file
2228
      test = len(checksums) > 1
2229
      if test:
2230
        variants = ["variant %s on %s" %
2231
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2232
                    for (idx, (checksum, nodes)) in
2233
                      enumerate(sorted(checksums.items()))]
2234
      else:
2235
        variants = []
2236

    
2237
      errorif(test, cls.ECLUSTERFILECHECK, None,
2238
              "File %s found with %s different checksums (%s)",
2239
              filename, len(checksums), "; ".join(variants))
2240

    
2241
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2242
                      drbd_map):
2243
    """Verifies and the node DRBD status.
2244

2245
    @type ninfo: L{objects.Node}
2246
    @param ninfo: the node to check
2247
    @param nresult: the remote results for the node
2248
    @param instanceinfo: the dict of instances
2249
    @param drbd_helper: the configured DRBD usermode helper
2250
    @param drbd_map: the DRBD map as returned by
2251
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2252

2253
    """
2254
    node = ninfo.name
2255
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2256

    
2257
    if drbd_helper:
2258
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2259
      test = (helper_result == None)
2260
      _ErrorIf(test, self.ENODEDRBDHELPER, node,
2261
               "no drbd usermode helper returned")
2262
      if helper_result:
2263
        status, payload = helper_result
2264
        test = not status
2265
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
2266
                 "drbd usermode helper check unsuccessful: %s", payload)
2267
        test = status and (payload != drbd_helper)
2268
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
2269
                 "wrong drbd usermode helper: %s", payload)
2270

    
2271
    # compute the DRBD minors
2272
    node_drbd = {}
2273
    for minor, instance in drbd_map[node].items():
2274
      test = instance not in instanceinfo
2275
      _ErrorIf(test, self.ECLUSTERCFG, None,
2276
               "ghost instance '%s' in temporary DRBD map", instance)
2277
        # ghost instance should not be running, but otherwise we
2278
        # don't give double warnings (both ghost instance and
2279
        # unallocated minor in use)
2280
      if test:
2281
        node_drbd[minor] = (instance, False)
2282
      else:
2283
        instance = instanceinfo[instance]
2284
        node_drbd[minor] = (instance.name, instance.admin_up)
2285

    
2286
    # and now check them
2287
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2288
    test = not isinstance(used_minors, (tuple, list))
2289
    _ErrorIf(test, self.ENODEDRBD, node,
2290
             "cannot parse drbd status file: %s", str(used_minors))
2291
    if test:
2292
      # we cannot check drbd status
2293
      return
2294

    
2295
    for minor, (iname, must_exist) in node_drbd.items():
2296
      test = minor not in used_minors and must_exist
2297
      _ErrorIf(test, self.ENODEDRBD, node,
2298
               "drbd minor %d of instance %s is not active", minor, iname)
2299
    for minor in used_minors:
2300
      test = minor not in node_drbd
2301
      _ErrorIf(test, self.ENODEDRBD, node,
2302
               "unallocated drbd minor %d is in use", minor)
2303

    
2304
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2305
    """Builds the node OS structures.
2306

2307
    @type ninfo: L{objects.Node}
2308
    @param ninfo: the node to check
2309
    @param nresult: the remote results for the node
2310
    @param nimg: the node image object
2311

2312
    """
2313
    node = ninfo.name
2314
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2315

    
2316
    remote_os = nresult.get(constants.NV_OSLIST, None)
2317
    test = (not isinstance(remote_os, list) or
2318
            not compat.all(isinstance(v, list) and len(v) == 7
2319
                           for v in remote_os))
2320

    
2321
    _ErrorIf(test, self.ENODEOS, node,
2322
             "node hasn't returned valid OS data")
2323

    
2324
    nimg.os_fail = test
2325

    
2326
    if test:
2327
      return
2328

    
2329
    os_dict = {}
2330

    
2331
    for (name, os_path, status, diagnose,
2332
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2333

    
2334
      if name not in os_dict:
2335
        os_dict[name] = []
2336

    
2337
      # parameters is a list of lists instead of list of tuples due to
2338
      # JSON lacking a real tuple type, fix it:
2339
      parameters = [tuple(v) for v in parameters]
2340
      os_dict[name].append((os_path, status, diagnose,
2341
                            set(variants), set(parameters), set(api_ver)))
2342

    
2343
    nimg.oslist = os_dict
2344

    
2345
  def _VerifyNodeOS(self, ninfo, nimg, base):
2346
    """Verifies the node OS list.
2347

2348
    @type ninfo: L{objects.Node}
2349
    @param ninfo: the node to check
2350
    @param nimg: the node image object
2351
    @param base: the 'template' node we match against (e.g. from the master)
2352

2353
    """
2354
    node = ninfo.name
2355
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2356

    
2357
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2358

    
2359
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2360
    for os_name, os_data in nimg.oslist.items():
2361
      assert os_data, "Empty OS status for OS %s?!" % os_name
2362
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2363
      _ErrorIf(not f_status, self.ENODEOS, node,
2364
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2365
      _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
2366
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2367
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2368
      # comparisons with the 'base' image
2369
      test = os_name not in base.oslist
2370
      _ErrorIf(test, self.ENODEOS, node,
2371
               "Extra OS %s not present on reference node (%s)",
2372
               os_name, base.name)
2373
      if test:
2374
        continue
2375
      assert base.oslist[os_name], "Base node has empty OS status?"
2376
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2377
      if not b_status:
2378
        # base OS is invalid, skipping
2379
        continue
2380
      for kind, a, b in [("API version", f_api, b_api),
2381
                         ("variants list", f_var, b_var),
2382
                         ("parameters", beautify_params(f_param),
2383
                          beautify_params(b_param))]:
2384
        _ErrorIf(a != b, self.ENODEOS, node,
2385
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2386
                 kind, os_name, base.name,
2387
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2388

    
2389
    # check any missing OSes
2390
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2391
    _ErrorIf(missing, self.ENODEOS, node,
2392
             "OSes present on reference node %s but missing on this node: %s",
2393
             base.name, utils.CommaJoin(missing))
2394

    
2395
  def _VerifyOob(self, ninfo, nresult):
2396
    """Verifies out of band functionality of a node.
2397

2398
    @type ninfo: L{objects.Node}
2399
    @param ninfo: the node to check
2400
    @param nresult: the remote results for the node
2401

2402
    """
2403
    node = ninfo.name
2404
    # We just have to verify the paths on master and/or master candidates
2405
    # as the oob helper is invoked on the master
2406
    if ((ninfo.master_candidate or ninfo.master_capable) and
2407
        constants.NV_OOB_PATHS in nresult):
2408
      for path_result in nresult[constants.NV_OOB_PATHS]:
2409
        self._ErrorIf(path_result, self.ENODEOOBPATH, node, path_result)
2410

    
2411
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2412
    """Verifies and updates the node volume data.
2413

2414
    This function will update a L{NodeImage}'s internal structures
2415
    with data from the remote call.
2416

2417
    @type ninfo: L{objects.Node}
2418
    @param ninfo: the node to check
2419
    @param nresult: the remote results for the node
2420
    @param nimg: the node image object
2421
    @param vg_name: the configured VG name
2422

2423
    """
2424
    node = ninfo.name
2425
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2426

    
2427
    nimg.lvm_fail = True
2428
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2429
    if vg_name is None:
2430
      pass
2431
    elif isinstance(lvdata, basestring):
2432
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
2433
               utils.SafeEncode(lvdata))
2434
    elif not isinstance(lvdata, dict):
2435
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
2436
    else:
2437
      nimg.volumes = lvdata
2438
      nimg.lvm_fail = False
2439

    
2440
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2441
    """Verifies and updates the node instance list.
2442

2443
    If the listing was successful, then updates this node's instance
2444
    list. Otherwise, it marks the RPC call as failed for the instance
2445
    list key.
2446

2447
    @type ninfo: L{objects.Node}
2448
    @param ninfo: the node to check
2449
    @param nresult: the remote results for the node
2450
    @param nimg: the node image object
2451

2452
    """
2453
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2454
    test = not isinstance(idata, list)
2455
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
2456
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
2457
    if test:
2458
      nimg.hyp_fail = True
2459
    else:
2460
      nimg.instances = idata
2461

    
2462
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2463
    """Verifies and computes a node information map
2464

2465
    @type ninfo: L{objects.Node}
2466
    @param ninfo: the node to check
2467
    @param nresult: the remote results for the node
2468
    @param nimg: the node image object
2469
    @param vg_name: the configured VG name
2470

2471
    """
2472
    node = ninfo.name
2473
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2474

    
2475
    # try to read free memory (from the hypervisor)
2476
    hv_info = nresult.get(constants.NV_HVINFO, None)
2477
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2478
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
2479
    if not test:
2480
      try:
2481
        nimg.mfree = int(hv_info["memory_free"])
2482
      except (ValueError, TypeError):
2483
        _ErrorIf(True, self.ENODERPC, node,
2484
                 "node returned invalid nodeinfo, check hypervisor")
2485

    
2486
    # FIXME: devise a free space model for file based instances as well
2487
    if vg_name is not None:
2488
      test = (constants.NV_VGLIST not in nresult or
2489
              vg_name not in nresult[constants.NV_VGLIST])
2490
      _ErrorIf(test, self.ENODELVM, node,
2491
               "node didn't return data for the volume group '%s'"
2492
               " - it is either missing or broken", vg_name)
2493
      if not test:
2494
        try:
2495
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2496
        except (ValueError, TypeError):
2497
          _ErrorIf(True, self.ENODERPC, node,
2498
                   "node returned invalid LVM info, check LVM status")
2499

    
2500
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2501
    """Gets per-disk status information for all instances.
2502

2503
    @type nodelist: list of strings
2504
    @param nodelist: Node names
2505
    @type node_image: dict of (name, L{objects.Node})
2506
    @param node_image: Node objects
2507
    @type instanceinfo: dict of (name, L{objects.Instance})
2508
    @param instanceinfo: Instance objects
2509
    @rtype: {instance: {node: [(succes, payload)]}}
2510
    @return: a dictionary of per-instance dictionaries with nodes as
2511
        keys and disk information as values; the disk information is a
2512
        list of tuples (success, payload)
2513

2514
    """
2515
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2516

    
2517
    node_disks = {}
2518
    node_disks_devonly = {}
2519
    diskless_instances = set()
2520
    diskless = constants.DT_DISKLESS
2521

    
2522
    for nname in nodelist:
2523
      node_instances = list(itertools.chain(node_image[nname].pinst,
2524
                                            node_image[nname].sinst))
2525
      diskless_instances.update(inst for inst in node_instances
2526
                                if instanceinfo[inst].disk_template == diskless)
2527
      disks = [(inst, disk)
2528
               for inst in node_instances
2529
               for disk in instanceinfo[inst].disks]
2530

    
2531
      if not disks:
2532
        # No need to collect data
2533
        continue
2534

    
2535
      node_disks[nname] = disks
2536

    
2537
      # Creating copies as SetDiskID below will modify the objects and that can
2538
      # lead to incorrect data returned from nodes
2539
      devonly = [dev.Copy() for (_, dev) in disks]
2540

    
2541
      for dev in devonly:
2542
        self.cfg.SetDiskID(dev, nname)
2543

    
2544
      node_disks_devonly[nname] = devonly
2545

    
2546
    assert len(node_disks) == len(node_disks_devonly)
2547

    
2548
    # Collect data from all nodes with disks
2549
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2550
                                                          node_disks_devonly)
2551

    
2552
    assert len(result) == len(node_disks)
2553

    
2554
    instdisk = {}
2555

    
2556
    for (nname, nres) in result.items():
2557
      disks = node_disks[nname]
2558

    
2559
      if nres.offline:
2560
        # No data from this node
2561
        data = len(disks) * [(False, "node offline")]
2562
      else:
2563
        msg = nres.fail_msg
2564
        _ErrorIf(msg, self.ENODERPC, nname,
2565
                 "while getting disk information: %s", msg)
2566
        if msg:
2567
          # No data from this node
2568
          data = len(disks) * [(False, msg)]
2569
        else:
2570
          data = []
2571
          for idx, i in enumerate(nres.payload):
2572
            if isinstance(i, (tuple, list)) and len(i) == 2:
2573
              data.append(i)
2574
            else:
2575
              logging.warning("Invalid result from node %s, entry %d: %s",
2576
                              nname, idx, i)
2577
              data.append((False, "Invalid result from the remote node"))
2578

    
2579
      for ((inst, _), status) in zip(disks, data):
2580
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2581

    
2582
    # Add empty entries for diskless instances.
2583
    for inst in diskless_instances:
2584
      assert inst not in instdisk
2585
      instdisk[inst] = {}
2586

    
2587
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2588
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2589
                      compat.all(isinstance(s, (tuple, list)) and
2590
                                 len(s) == 2 for s in statuses)
2591
                      for inst, nnames in instdisk.items()
2592
                      for nname, statuses in nnames.items())
2593
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2594

    
2595
    return instdisk
2596

    
2597
  @staticmethod
2598
  def _SshNodeSelector(group_uuid, all_nodes):
2599
    """Create endless iterators for all potential SSH check hosts.
2600

2601
    """
2602
    nodes = [node for node in all_nodes
2603
             if (node.group != group_uuid and
2604
                 not node.offline)]
2605
    keyfunc = operator.attrgetter("group")
2606

    
2607
    return map(itertools.cycle,
2608
               [sorted(map(operator.attrgetter("name"), names))
2609
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2610
                                                  keyfunc)])
2611

    
2612
  @classmethod
2613
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2614
    """Choose which nodes should talk to which other nodes.
2615

2616
    We will make nodes contact all nodes in their group, and one node from
2617
    every other group.
2618

2619
    @warning: This algorithm has a known issue if one node group is much
2620
      smaller than others (e.g. just one node). In such a case all other
2621
      nodes will talk to the single node.
2622

2623
    """
2624
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2625
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2626

    
2627
    return (online_nodes,
2628
            dict((name, sorted([i.next() for i in sel]))
2629
                 for name in online_nodes))
2630

    
2631
  def BuildHooksEnv(self):
2632
    """Build hooks env.
2633

2634
    Cluster-Verify hooks just ran in the post phase and their failure makes
2635
    the output be logged in the verify output and the verification to fail.
2636

2637
    """
2638
    env = {
2639
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2640
      }
2641

    
2642
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2643
               for node in self.my_node_info.values())
2644

    
2645
    return env
2646

    
2647
  def BuildHooksNodes(self):
2648
    """Build hooks nodes.
2649

2650
    """
2651
    return ([], self.my_node_names)
2652

    
2653
  def Exec(self, feedback_fn):
2654
    """Verify integrity of the node group, performing various test on nodes.
2655

2656
    """
2657
    # This method has too many local variables. pylint: disable=R0914
2658
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2659

    
2660
    if not self.my_node_names:
2661
      # empty node group
2662
      feedback_fn("* Empty node group, skipping verification")
2663
      return True
2664

    
2665
    self.bad = False
2666
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2667
    verbose = self.op.verbose
2668
    self._feedback_fn = feedback_fn
2669

    
2670
    vg_name = self.cfg.GetVGName()
2671
    drbd_helper = self.cfg.GetDRBDHelper()
2672
    cluster = self.cfg.GetClusterInfo()
2673
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2674
    hypervisors = cluster.enabled_hypervisors
2675
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2676

    
2677
    i_non_redundant = [] # Non redundant instances
2678
    i_non_a_balanced = [] # Non auto-balanced instances
2679
    n_offline = 0 # Count of offline nodes
2680
    n_drained = 0 # Count of nodes being drained
2681
    node_vol_should = {}
2682

    
2683
    # FIXME: verify OS list
2684

    
2685
    # File verification
2686
    filemap = _ComputeAncillaryFiles(cluster, False)
2687

    
2688
    # do local checksums
2689
    master_node = self.master_node = self.cfg.GetMasterNode()
2690
    master_ip = self.cfg.GetMasterIP()
2691

    
2692
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2693

    
2694
    node_verify_param = {
2695
      constants.NV_FILELIST:
2696
        utils.UniqueSequence(filename
2697
                             for files in filemap
2698
                             for filename in files),
2699
      constants.NV_NODELIST:
2700
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2701
                                  self.all_node_info.values()),
2702
      constants.NV_HYPERVISOR: hypervisors,
2703
      constants.NV_HVPARAMS:
2704
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2705
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2706
                                 for node in node_data_list
2707
                                 if not node.offline],
2708
      constants.NV_INSTANCELIST: hypervisors,
2709
      constants.NV_VERSION: None,
2710
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2711
      constants.NV_NODESETUP: None,
2712
      constants.NV_TIME: None,
2713
      constants.NV_MASTERIP: (master_node, master_ip),
2714
      constants.NV_OSLIST: None,
2715
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2716
      }
2717

    
2718
    if vg_name is not None:
2719
      node_verify_param[constants.NV_VGLIST] = None
2720
      node_verify_param[constants.NV_LVLIST] = vg_name
2721
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2722
      node_verify_param[constants.NV_DRBDLIST] = None
2723

    
2724
    if drbd_helper:
2725
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2726

    
2727
    # bridge checks
2728
    # FIXME: this needs to be changed per node-group, not cluster-wide
2729
    bridges = set()
2730
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2731
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2732
      bridges.add(default_nicpp[constants.NIC_LINK])
2733
    for instance in self.my_inst_info.values():
2734
      for nic in instance.nics:
2735
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2736
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2737
          bridges.add(full_nic[constants.NIC_LINK])
2738

    
2739
    if bridges:
2740
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2741

    
2742
    # Build our expected cluster state
2743
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2744
                                                 name=node.name,
2745
                                                 vm_capable=node.vm_capable))
2746
                      for node in node_data_list)
2747

    
2748
    # Gather OOB paths
2749
    oob_paths = []
2750
    for node in self.all_node_info.values():
2751
      path = _SupportsOob(self.cfg, node)
2752
      if path and path not in oob_paths:
2753
        oob_paths.append(path)
2754

    
2755
    if oob_paths:
2756
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2757

    
2758
    for instance in self.my_inst_names:
2759
      inst_config = self.my_inst_info[instance]
2760

    
2761
      for nname in inst_config.all_nodes:
2762
        if nname not in node_image:
2763
          gnode = self.NodeImage(name=nname)
2764
          gnode.ghost = (nname not in self.all_node_info)
2765
          node_image[nname] = gnode
2766

    
2767
      inst_config.MapLVsByNode(node_vol_should)
2768

    
2769
      pnode = inst_config.primary_node
2770
      node_image[pnode].pinst.append(instance)
2771

    
2772
      for snode in inst_config.secondary_nodes:
2773
        nimg = node_image[snode]
2774
        nimg.sinst.append(instance)
2775
        if pnode not in nimg.sbp:
2776
          nimg.sbp[pnode] = []
2777
        nimg.sbp[pnode].append(instance)
2778

    
2779
    # At this point, we have the in-memory data structures complete,
2780
    # except for the runtime information, which we'll gather next
2781

    
2782
    # Due to the way our RPC system works, exact response times cannot be
2783
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2784
    # time before and after executing the request, we can at least have a time
2785
    # window.
2786
    nvinfo_starttime = time.time()
2787
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2788
                                           node_verify_param,
2789
                                           self.cfg.GetClusterName())
2790
    nvinfo_endtime = time.time()
2791

    
2792
    if self.extra_lv_nodes and vg_name is not None:
2793
      extra_lv_nvinfo = \
2794
          self.rpc.call_node_verify(self.extra_lv_nodes,
2795
                                    {constants.NV_LVLIST: vg_name},
2796
                                    self.cfg.GetClusterName())
2797
    else:
2798
      extra_lv_nvinfo = {}
2799

    
2800
    all_drbd_map = self.cfg.ComputeDRBDMap()
2801

    
2802
    feedback_fn("* Gathering disk information (%s nodes)" %
2803
                len(self.my_node_names))
2804
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2805
                                     self.my_inst_info)
2806

    
2807
    feedback_fn("* Verifying configuration file consistency")
2808

    
2809
    # If not all nodes are being checked, we need to make sure the master node
2810
    # and a non-checked vm_capable node are in the list.
2811
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2812
    if absent_nodes:
2813
      vf_nvinfo = all_nvinfo.copy()
2814
      vf_node_info = list(self.my_node_info.values())
2815
      additional_nodes = []
2816
      if master_node not in self.my_node_info:
2817
        additional_nodes.append(master_node)
2818
        vf_node_info.append(self.all_node_info[master_node])
2819
      # Add the first vm_capable node we find which is not included
2820
      for node in absent_nodes:
2821
        nodeinfo = self.all_node_info[node]
2822
        if nodeinfo.vm_capable and not nodeinfo.offline:
2823
          additional_nodes.append(node)
2824
          vf_node_info.append(self.all_node_info[node])
2825
          break
2826
      key = constants.NV_FILELIST
2827
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2828
                                                 {key: node_verify_param[key]},
2829
                                                 self.cfg.GetClusterName()))
2830
    else:
2831
      vf_nvinfo = all_nvinfo
2832
      vf_node_info = self.my_node_info.values()
2833

    
2834
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2835

    
2836
    feedback_fn("* Verifying node status")
2837

    
2838
    refos_img = None
2839

    
2840
    for node_i in node_data_list:
2841
      node = node_i.name
2842
      nimg = node_image[node]
2843

    
2844
      if node_i.offline:
2845
        if verbose:
2846
          feedback_fn("* Skipping offline node %s" % (node,))
2847
        n_offline += 1
2848
        continue
2849

    
2850
      if node == master_node:
2851
        ntype = "master"
2852
      elif node_i.master_candidate:
2853
        ntype = "master candidate"
2854
      elif node_i.drained:
2855
        ntype = "drained"
2856
        n_drained += 1
2857
      else:
2858
        ntype = "regular"
2859
      if verbose:
2860
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2861

    
2862
      msg = all_nvinfo[node].fail_msg
2863
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
2864
      if msg:
2865
        nimg.rpc_fail = True
2866
        continue
2867

    
2868
      nresult = all_nvinfo[node].payload
2869

    
2870
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2871
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2872
      self._VerifyNodeNetwork(node_i, nresult)
2873
      self._VerifyOob(node_i, nresult)
2874

    
2875
      if nimg.vm_capable:
2876
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2877
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2878
                             all_drbd_map)
2879

    
2880
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2881
        self._UpdateNodeInstances(node_i, nresult, nimg)
2882
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2883
        self._UpdateNodeOS(node_i, nresult, nimg)
2884

    
2885
        if not nimg.os_fail:
2886
          if refos_img is None:
2887
            refos_img = nimg
2888
          self._VerifyNodeOS(node_i, nimg, refos_img)
2889
        self._VerifyNodeBridges(node_i, nresult, bridges)
2890

    
2891
        # Check whether all running instancies are primary for the node. (This
2892
        # can no longer be done from _VerifyInstance below, since some of the
2893
        # wrong instances could be from other node groups.)
2894
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2895

    
2896
        for inst in non_primary_inst:
2897
          test = inst in self.all_inst_info
2898
          _ErrorIf(test, self.EINSTANCEWRONGNODE, inst,
2899
                   "instance should not run on node %s", node_i.name)
2900
          _ErrorIf(not test, self.ENODEORPHANINSTANCE, node_i.name,
2901
                   "node is running unknown instance %s", inst)
2902

    
2903
    for node, result in extra_lv_nvinfo.items():
2904
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2905
                              node_image[node], vg_name)
2906

    
2907
    feedback_fn("* Verifying instance status")
2908
    for instance in self.my_inst_names:
2909
      if verbose:
2910
        feedback_fn("* Verifying instance %s" % instance)
2911
      inst_config = self.my_inst_info[instance]
2912
      self._VerifyInstance(instance, inst_config, node_image,
2913
                           instdisk[instance])
2914
      inst_nodes_offline = []
2915

    
2916
      pnode = inst_config.primary_node
2917
      pnode_img = node_image[pnode]
2918
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2919
               self.ENODERPC, pnode, "instance %s, connection to"
2920
               " primary node failed", instance)
2921

    
2922
      _ErrorIf(inst_config.admin_up and pnode_img.offline,
2923
               self.EINSTANCEBADNODE, instance,
2924
               "instance is marked as running and lives on offline node %s",
2925
               inst_config.primary_node)
2926

    
2927
      # If the instance is non-redundant we cannot survive losing its primary
2928
      # node, so we are not N+1 compliant. On the other hand we have no disk
2929
      # templates with more than one secondary so that situation is not well
2930
      # supported either.
2931
      # FIXME: does not support file-backed instances
2932
      if not inst_config.secondary_nodes:
2933
        i_non_redundant.append(instance)
2934

    
2935
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
2936
               instance, "instance has multiple secondary nodes: %s",
2937
               utils.CommaJoin(inst_config.secondary_nodes),
2938
               code=self.ETYPE_WARNING)
2939

    
2940
      if inst_config.disk_template in constants.DTS_INT_MIRROR:
2941
        pnode = inst_config.primary_node
2942
        instance_nodes = utils.NiceSort(inst_config.all_nodes)
2943
        instance_groups = {}
2944

    
2945
        for node in instance_nodes:
2946
          instance_groups.setdefault(self.all_node_info[node].group,
2947
                                     []).append(node)
2948

    
2949
        pretty_list = [
2950
          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
2951
          # Sort so that we always list the primary node first.
2952
          for group, nodes in sorted(instance_groups.items(),
2953
                                     key=lambda (_, nodes): pnode in nodes,
2954
                                     reverse=True)]
2955

    
2956
        self._ErrorIf(len(instance_groups) > 1, self.EINSTANCESPLITGROUPS,
2957
                      instance, "instance has primary and secondary nodes in"
2958
                      " different groups: %s", utils.CommaJoin(pretty_list),
2959
                      code=self.ETYPE_WARNING)
2960

    
2961
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2962
        i_non_a_balanced.append(instance)
2963

    
2964
      for snode in inst_config.secondary_nodes:
2965
        s_img = node_image[snode]
2966
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
2967
                 "instance %s, connection to secondary node failed", instance)
2968

    
2969
        if s_img.offline:
2970
          inst_nodes_offline.append(snode)
2971

    
2972
      # warn that the instance lives on offline nodes
2973
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
2974
               "instance has offline secondary node(s) %s",
2975
               utils.CommaJoin(inst_nodes_offline))
2976
      # ... or ghost/non-vm_capable nodes
2977
      for node in inst_config.all_nodes:
2978
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
2979
                 "instance lives on ghost node %s", node)
2980
        _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE,
2981
                 instance, "instance lives on non-vm_capable node %s", node)
2982

    
2983
    feedback_fn("* Verifying orphan volumes")
2984
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2985

    
2986
    # We will get spurious "unknown volume" warnings if any node of this group
2987
    # is secondary for an instance whose primary is in another group. To avoid
2988
    # them, we find these instances and add their volumes to node_vol_should.
2989
    for inst in self.all_inst_info.values():
2990
      for secondary in inst.secondary_nodes:
2991
        if (secondary in self.my_node_info
2992
            and inst.name not in self.my_inst_info):
2993
          inst.MapLVsByNode(node_vol_should)
2994
          break
2995

    
2996
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2997

    
2998
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2999
      feedback_fn("* Verifying N+1 Memory redundancy")
3000
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3001

    
3002
    feedback_fn("* Other Notes")
3003
    if i_non_redundant:
3004
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3005
                  % len(i_non_redundant))
3006

    
3007
    if i_non_a_balanced:
3008
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3009
                  % len(i_non_a_balanced))
3010

    
3011
    if n_offline:
3012
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3013

    
3014
    if n_drained:
3015
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3016

    
3017
    return not self.bad
3018

    
3019
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3020
    """Analyze the post-hooks' result
3021

3022
    This method analyses the hook result, handles it, and sends some
3023
    nicely-formatted feedback back to the user.
3024

3025
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3026
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3027
    @param hooks_results: the results of the multi-node hooks rpc call
3028
    @param feedback_fn: function used send feedback back to the caller
3029
    @param lu_result: previous Exec result
3030
    @return: the new Exec result, based on the previous result
3031
        and hook results
3032

3033
    """
3034
    # We only really run POST phase hooks, only for non-empty groups,
3035
    # and are only interested in their results
3036
    if not self.my_node_names:
3037
      # empty node group
3038
      pass
3039
    elif phase == constants.HOOKS_PHASE_POST:
3040
      # Used to change hooks' output to proper indentation
3041
      feedback_fn("* Hooks Results")
3042
      assert hooks_results, "invalid result from hooks"
3043

    
3044
      for node_name in hooks_results:
3045
        res = hooks_results[node_name]
3046
        msg = res.fail_msg
3047
        test = msg and not res.offline
3048
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
3049
                      "Communication failure in hooks execution: %s", msg)
3050
        if res.offline or msg:
3051
          # No need to investigate payload if node is offline or gave
3052
          # an error.
3053
          continue
3054
        for script, hkr, output in res.payload:
3055
          test = hkr == constants.HKR_FAIL
3056
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
3057
                        "Script %s failed, output:", script)
3058
          if test:
3059
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3060
            feedback_fn("%s" % output)
3061
            lu_result = False
3062

    
3063
    return lu_result
3064

    
3065

    
3066
class LUClusterVerifyDisks(NoHooksLU):
3067
  """Verifies the cluster disks status.
3068

3069
  """
3070
  REQ_BGL = False
3071

    
3072
  def ExpandNames(self):
3073
    self.share_locks = _ShareAll()
3074
    self.needed_locks = {
3075
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3076
      }
3077

    
3078
  def Exec(self, feedback_fn):
3079
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3080

    
3081
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3082
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3083
                           for group in group_names])
3084

    
3085

    
3086
class LUGroupVerifyDisks(NoHooksLU):
3087
  """Verifies the status of all disks in a node group.
3088

3089
  """
3090
  REQ_BGL = False
3091

    
3092
  def ExpandNames(self):
3093
    # Raises errors.OpPrereqError on its own if group can't be found
3094
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
3095

    
3096
    self.share_locks = _ShareAll()
3097
    self.needed_locks = {
3098
      locking.LEVEL_INSTANCE: [],
3099
      locking.LEVEL_NODEGROUP: [],
3100
      locking.LEVEL_NODE: [],
3101
      }
3102

    
3103
  def DeclareLocks(self, level):
3104
    if level == locking.LEVEL_INSTANCE:
3105
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
3106

    
3107
      # Lock instances optimistically, needs verification once node and group
3108
      # locks have been acquired
3109
      self.needed_locks[locking.LEVEL_INSTANCE] = \
3110
        self.cfg.GetNodeGroupInstances(self.group_uuid)
3111

    
3112
    elif level == locking.LEVEL_NODEGROUP:
3113
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3114

    
3115
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
3116
        set([self.group_uuid] +
3117
            # Lock all groups used by instances optimistically; this requires
3118
            # going via the node before it's locked, requiring verification
3119
            # later on
3120
            [group_uuid
3121
             for instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
3122
             for group_uuid in self.cfg.GetInstanceNodeGroups(instance_name)])
3123

    
3124
    elif level == locking.LEVEL_NODE:
3125
      # This will only lock the nodes in the group to be verified which contain
3126
      # actual instances
3127
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3128
      self._LockInstancesNodes()
3129

    
3130
      # Lock all nodes in group to be verified
3131
      assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
3132
      member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
3133
      self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
3134

    
3135
  def CheckPrereq(self):
3136
    owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
3137
    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
3138
    owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
3139

    
3140
    assert self.group_uuid in owned_groups
3141

    
3142
    # Check if locked instances are still correct
3143
    _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
3144

    
3145
    # Get instance information
3146
    self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
3147

    
3148
    # Check if node groups for locked instances are still correct
3149
    _CheckInstancesNodeGroups(self.cfg, self.instances,
3150
                              owned_groups, owned_nodes, self.group_uuid)
3151

    
3152
  def Exec(self, feedback_fn):
3153
    """Verify integrity of cluster disks.
3154

3155
    @rtype: tuple of three items
3156
    @return: a tuple of (dict of node-to-node_error, list of instances
3157
        which need activate-disks, dict of instance: (node, volume) for
3158
        missing volumes
3159

3160
    """
3161
    res_nodes = {}
3162
    res_instances = set()
3163
    res_missing = {}
3164

    
3165
    nv_dict = _MapInstanceDisksToNodes([inst
3166
                                        for inst in self.instances.values()
3167
                                        if inst.admin_up])
3168

    
3169
    if nv_dict:
3170
      nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
3171
                             set(self.cfg.GetVmCapableNodeList()))
3172

    
3173
      node_lvs = self.rpc.call_lv_list(nodes, [])
3174

    
3175
      for (node, node_res) in node_lvs.items():
3176
        if node_res.offline:
3177
          continue
3178

    
3179
        msg = node_res.fail_msg
3180
        if msg:
3181
          logging.warning("Error enumerating LVs on node %s: %s", node, msg)
3182
          res_nodes[node] = msg
3183
          continue
3184

    
3185
        for lv_name, (_, _, lv_online) in node_res.payload.items():
3186
          inst = nv_dict.pop((node, lv_name), None)
3187
          if not (lv_online or inst is None):
3188
            res_instances.add(inst)
3189

    
3190
      # any leftover items in nv_dict are missing LVs, let's arrange the data
3191
      # better
3192
      for key, inst in nv_dict.iteritems():
3193
        res_missing.setdefault(inst, []).append(list(key))
3194

    
3195
    return (res_nodes, list(res_instances), res_missing)
3196

    
3197

    
3198
class LUClusterRepairDiskSizes(NoHooksLU):
3199
  """Verifies the cluster disks sizes.
3200

3201
  """
3202
  REQ_BGL = False
3203

    
3204
  def ExpandNames(self):
3205
    if self.op.instances:
3206
      self.wanted_names = _GetWantedInstances(self, self.op.instances)
3207
      self.needed_locks = {
3208
        locking.LEVEL_NODE: [],
3209
        locking.LEVEL_INSTANCE: self.wanted_names,
3210
        }
3211
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3212
    else:
3213
      self.wanted_names = None
3214
      self.needed_locks = {
3215
        locking.LEVEL_NODE: locking.ALL_SET,
3216
        locking.LEVEL_INSTANCE: locking.ALL_SET,
3217
        }
3218
    self.share_locks = {
3219
      locking.LEVEL_NODE: 1,
3220
      locking.LEVEL_INSTANCE: 0,
3221
      }
3222

    
3223
  def DeclareLocks(self, level):
3224
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
3225
      self._LockInstancesNodes(primary_only=True)
3226

    
3227
  def CheckPrereq(self):
3228
    """Check prerequisites.
3229

3230
    This only checks the optional instance list against the existing names.
3231

3232
    """
3233
    if self.wanted_names is None:
3234
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
3235

    
3236
    self.wanted_instances = \
3237
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
3238

    
3239
  def _EnsureChildSizes(self, disk):
3240
    """Ensure children of the disk have the needed disk size.
3241

3242
    This is valid mainly for DRBD8 and fixes an issue where the
3243
    children have smaller disk size.
3244

3245
    @param disk: an L{ganeti.objects.Disk} object
3246

3247
    """
3248
    if disk.dev_type == constants.LD_DRBD8:
3249
      assert disk.children, "Empty children for DRBD8?"
3250
      fchild = disk.children[0]
3251
      mismatch = fchild.size < disk.size
3252
      if mismatch:
3253
        self.LogInfo("Child disk has size %d, parent %d, fixing",
3254
                     fchild.size, disk.size)
3255
        fchild.size = disk.size
3256

    
3257
      # and we recurse on this child only, not on the metadev
3258
      return self._EnsureChildSizes(fchild) or mismatch
3259
    else:
3260
      return False
3261

    
3262
  def Exec(self, feedback_fn):
3263
    """Verify the size of cluster disks.
3264

3265
    """
3266
    # TODO: check child disks too
3267
    # TODO: check differences in size between primary/secondary nodes
3268
    per_node_disks = {}
3269
    for instance in self.wanted_instances:
3270
      pnode = instance.primary_node
3271
      if pnode not in per_node_disks:
3272
        per_node_disks[pnode] = []
3273
      for idx, disk in enumerate(instance.disks):
3274
        per_node_disks[pnode].append((instance, idx, disk))
3275

    
3276
    changed = []
3277
    for node, dskl in per_node_disks.items():
3278
      newl = [v[2].Copy() for v in dskl]
3279
      for dsk in newl:
3280
        self.cfg.SetDiskID(dsk, node)
3281
      result = self.rpc.call_blockdev_getsize(node, newl)
3282
      if result.fail_msg:
3283
        self.LogWarning("Failure in blockdev_getsize call to node"
3284
                        " %s, ignoring", node)
3285
        continue
3286
      if len(result.payload) != len(dskl):
3287
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
3288
                        " result.payload=%s", node, len(dskl), result.payload)
3289
        self.LogWarning("Invalid result from node %s, ignoring node results",
3290
                        node)
3291
        continue
3292
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
3293
        if size is None:
3294
          self.LogWarning("Disk %d of instance %s did not return size"
3295
                          " information, ignoring", idx, instance.name)
3296
          continue
3297
        if not isinstance(size, (int, long)):
3298
          self.LogWarning("Disk %d of instance %s did not return valid"
3299
                          " size information, ignoring", idx, instance.name)
3300
          continue
3301
        size = size >> 20
3302
        if size != disk.size:
3303
          self.LogInfo("Disk %d of instance %s has mismatched size,"
3304
                       " correcting: recorded %d, actual %d", idx,
3305
                       instance.name, disk.size, size)
3306
          disk.size = size
3307
          self.cfg.Update(instance, feedback_fn)
3308
          changed.append((instance.name, idx, size))
3309
        if self._EnsureChildSizes(disk):
3310
          self.cfg.Update(instance, feedback_fn)
3311
          changed.append((instance.name, idx, disk.size))
3312
    return changed
3313

    
3314

    
3315
class LUClusterRename(LogicalUnit):
3316
  """Rename the cluster.
3317

3318
  """
3319
  HPATH = "cluster-rename"
3320
  HTYPE = constants.HTYPE_CLUSTER
3321

    
3322
  def BuildHooksEnv(self):
3323
    """Build hooks env.
3324

3325
    """
3326
    return {
3327
      "OP_TARGET": self.cfg.GetClusterName(),
3328
      "NEW_NAME": self.op.name,
3329
      }
3330

    
3331
  def BuildHooksNodes(self):
3332
    """Build hooks nodes.
3333

3334
    """
3335
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
3336

    
3337
  def CheckPrereq(self):
3338
    """Verify that the passed name is a valid one.
3339

3340
    """
3341
    hostname = netutils.GetHostname(name=self.op.name,
3342
                                    family=self.cfg.GetPrimaryIPFamily())
3343

    
3344
    new_name = hostname.name
3345
    self.ip = new_ip = hostname.ip
3346
    old_name = self.cfg.GetClusterName()
3347
    old_ip = self.cfg.GetMasterIP()
3348
    if new_name == old_name and new_ip == old_ip:
3349
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
3350
                                 " cluster has changed",
3351
                                 errors.ECODE_INVAL)
3352
    if new_ip != old_ip:
3353
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
3354
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
3355
                                   " reachable on the network" %
3356
                                   new_ip, errors.ECODE_NOTUNIQUE)
3357

    
3358
    self.op.name = new_name
3359

    
3360
  def Exec(self, feedback_fn):
3361
    """Rename the cluster.
3362

3363
    """
3364
    clustername = self.op.name
3365
    ip = self.ip
3366

    
3367
    # shutdown the master IP
3368
    master = self.cfg.GetMasterNode()
3369
    result = self.rpc.call_node_deactivate_master_ip(master)
3370
    result.Raise("Could not disable the master role")
3371

    
3372
    try:
3373
      cluster = self.cfg.GetClusterInfo()
3374
      cluster.cluster_name = clustername
3375
      cluster.master_ip = ip
3376
      self.cfg.Update(cluster, feedback_fn)
3377

    
3378
      # update the known hosts file
3379
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
3380
      node_list = self.cfg.GetOnlineNodeList()
3381
      try:
3382
        node_list.remove(master)
3383
      except ValueError:
3384
        pass
3385
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
3386
    finally:
3387
      result = self.rpc.call_node_activate_master_ip(master)
3388
      msg = result.fail_msg
3389
      if msg:
3390
        self.LogWarning("Could not re-enable the master role on"
3391
                        " the master, please restart manually: %s", msg)
3392

    
3393
    return clustername
3394

    
3395

    
3396
class LUClusterSetParams(LogicalUnit):
3397
  """Change the parameters of the cluster.
3398

3399
  """
3400
  HPATH = "cluster-modify"
3401
  HTYPE = constants.HTYPE_CLUSTER
3402
  REQ_BGL = False
3403

    
3404
  def CheckArguments(self):
3405
    """Check parameters
3406

3407
    """
3408
    if self.op.uid_pool:
3409
      uidpool.CheckUidPool(self.op.uid_pool)
3410

    
3411
    if self.op.add_uids:
3412
      uidpool.CheckUidPool(self.op.add_uids)
3413

    
3414
    if self.op.remove_uids:
3415
      uidpool.CheckUidPool(self.op.remove_uids)
3416

    
3417
  def ExpandNames(self):
3418
    # FIXME: in the future maybe other cluster params won't require checking on
3419
    # all nodes to be modified.
3420
    self.needed_locks = {
3421
      locking.LEVEL_NODE: locking.ALL_SET,
3422
    }
3423
    self.share_locks[locking.LEVEL_NODE] = 1
3424

    
3425
  def BuildHooksEnv(self):
3426
    """Build hooks env.
3427

3428
    """
3429
    return {
3430
      "OP_TARGET": self.cfg.GetClusterName(),
3431
      "NEW_VG_NAME": self.op.vg_name,
3432
      }
3433

    
3434
  def BuildHooksNodes(self):
3435
    """Build hooks nodes.
3436

3437
    """
3438
    mn = self.cfg.GetMasterNode()
3439
    return ([mn], [mn])
3440

    
3441
  def CheckPrereq(self):
3442
    """Check prerequisites.
3443

3444
    This checks whether the given params don't conflict and
3445
    if the given volume group is valid.
3446

3447
    """
3448
    if self.op.vg_name is not None and not self.op.vg_name:
3449
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
3450
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
3451
                                   " instances exist", errors.ECODE_INVAL)
3452

    
3453
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
3454
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
3455
        raise errors.OpPrereqError("Cannot disable drbd helper while"
3456
                                   " drbd-based instances exist",
3457
                                   errors.ECODE_INVAL)
3458

    
3459
    node_list = self.owned_locks(locking.LEVEL_NODE)
3460

    
3461
    # if vg_name not None, checks given volume group on all nodes
3462
    if self.op.vg_name:
3463
      vglist = self.rpc.call_vg_list(node_list)
3464
      for node in node_list:
3465
        msg = vglist[node].fail_msg
3466
        if msg:
3467
          # ignoring down node
3468
          self.LogWarning("Error while gathering data on node %s"
3469
                          " (ignoring node): %s", node, msg)
3470
          continue
3471
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
3472
                                              self.op.vg_name,
3473
                                              constants.MIN_VG_SIZE)
3474
        if vgstatus:
3475
          raise errors.OpPrereqError("Error on node '%s': %s" %
3476
                                     (node, vgstatus), errors.ECODE_ENVIRON)
3477

    
3478
    if self.op.drbd_helper:
3479
      # checks given drbd helper on all nodes
3480
      helpers = self.rpc.call_drbd_helper(node_list)
3481
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
3482
        if ninfo.offline:
3483
          self.LogInfo("Not checking drbd helper on offline node %s", node)
3484
          continue
3485
        msg = helpers[node].fail_msg
3486
        if msg:
3487
          raise errors.OpPrereqError("Error checking drbd helper on node"
3488
                                     " '%s': %s" % (node, msg),
3489
                                     errors.ECODE_ENVIRON)
3490
        node_helper = helpers[node].payload
3491
        if node_helper != self.op.drbd_helper:
3492
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
3493
                                     (node, node_helper), errors.ECODE_ENVIRON)
3494

    
3495
    self.cluster = cluster = self.cfg.GetClusterInfo()
3496
    # validate params changes
3497
    if self.op.beparams:
3498
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
3499
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
3500

    
3501
    if self.op.ndparams:
3502
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
3503
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
3504

    
3505
      # TODO: we need a more general way to handle resetting
3506
      # cluster-level parameters to default values
3507
      if self.new_ndparams["oob_program"] == "":
3508
        self.new_ndparams["oob_program"] = \
3509
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
3510

    
3511
    if self.op.nicparams:
3512
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
3513
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
3514
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
3515
      nic_errors = []
3516

    
3517
      # check all instances for consistency
3518
      for instance in self.cfg.GetAllInstancesInfo().values():
3519
        for nic_idx, nic in enumerate(instance.nics):
3520
          params_copy = copy.deepcopy(nic.nicparams)
3521
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
3522

    
3523
          # check parameter syntax
3524
          try:
3525
            objects.NIC.CheckParameterSyntax(params_filled)
3526
          except errors.ConfigurationError, err:
3527
            nic_errors.append("Instance %s, nic/%d: %s" %
3528
                              (instance.name, nic_idx, err))
3529

    
3530
          # if we're moving instances to routed, check that they have an ip
3531
          target_mode = params_filled[constants.NIC_MODE]
3532
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
3533
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
3534
                              " address" % (instance.name, nic_idx))
3535
      if nic_errors:
3536
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
3537
                                   "\n".join(nic_errors))
3538

    
3539
    # hypervisor list/parameters
3540
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
3541
    if self.op.hvparams:
3542
      for hv_name, hv_dict in self.op.hvparams.items():
3543
        if hv_name not in self.new_hvparams:
3544
          self.new_hvparams[hv_name] = hv_dict
3545
        else:
3546
          self.new_hvparams[hv_name].update(hv_dict)
3547

    
3548
    # os hypervisor parameters
3549
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
3550
    if self.op.os_hvp:
3551
      for os_name, hvs in self.op.os_hvp.items():
3552
        if os_name not in self.new_os_hvp:
3553
          self.new_os_hvp[os_name] = hvs
3554
        else:
3555
          for hv_name, hv_dict in hvs.items():
3556
            if hv_name not in self.new_os_hvp[os_name]:
3557
              self.new_os_hvp[os_name][hv_name] = hv_dict
3558
            else:
3559
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
3560

    
3561
    # os parameters
3562
    self.new_osp = objects.FillDict(cluster.osparams, {})
3563
    if self.op.osparams:
3564
      for os_name, osp in self.op.osparams.items():
3565
        if os_name not in self.new_osp:
3566
          self.new_osp[os_name] = {}
3567

    
3568
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
3569
                                                  use_none=True)
3570

    
3571
        if not self.new_osp[os_name]:
3572
          # we removed all parameters
3573
          del self.new_osp[os_name]
3574
        else:
3575
          # check the parameter validity (remote check)
3576
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
3577
                         os_name, self.new_osp[os_name])
3578

    
3579
    # changes to the hypervisor list
3580
    if self.op.enabled_hypervisors is not None:
3581
      self.hv_list = self.op.enabled_hypervisors
3582
      for hv in self.hv_list:
3583
        # if the hypervisor doesn't already exist in the cluster
3584
        # hvparams, we initialize it to empty, and then (in both
3585
        # cases) we make sure to fill the defaults, as we might not
3586
        # have a complete defaults list if the hypervisor wasn't
3587
        # enabled before
3588
        if hv not in new_hvp:
3589
          new_hvp[hv] = {}
3590
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
3591
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
3592
    else:
3593
      self.hv_list = cluster.enabled_hypervisors
3594

    
3595
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
3596
      # either the enabled list has changed, or the parameters have, validate
3597
      for hv_name, hv_params in self.new_hvparams.items():
3598
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
3599
            (self.op.enabled_hypervisors and
3600
             hv_name in self.op.enabled_hypervisors)):
3601
          # either this is a new hypervisor, or its parameters have changed
3602
          hv_class = hypervisor.GetHypervisor(hv_name)
3603
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3604
          hv_class.CheckParameterSyntax(hv_params)
3605
          _CheckHVParams(self, node_list, hv_name, hv_params)
3606

    
3607
    if self.op.os_hvp:
3608
      # no need to check any newly-enabled hypervisors, since the
3609
      # defaults have already been checked in the above code-block
3610
      for os_name, os_hvp in self.new_os_hvp.items():
3611
        for hv_name, hv_params in os_hvp.items():
3612
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3613
          # we need to fill in the new os_hvp on top of the actual hv_p
3614
          cluster_defaults = self.new_hvparams.get(hv_name, {})
3615
          new_osp = objects.FillDict(cluster_defaults, hv_params)
3616
          hv_class = hypervisor.GetHypervisor(hv_name)
3617
          hv_class.CheckParameterSyntax(new_osp)
3618
          _CheckHVParams(self, node_list, hv_name, new_osp)
3619

    
3620
    if self.op.default_iallocator:
3621
      alloc_script = utils.FindFile(self.op.default_iallocator,
3622
                                    constants.IALLOCATOR_SEARCH_PATH,
3623
                                    os.path.isfile)
3624
      if alloc_script is None:
3625
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
3626
                                   " specified" % self.op.default_iallocator,
3627
                                   errors.ECODE_INVAL)
3628

    
3629
  def Exec(self, feedback_fn):
3630
    """Change the parameters of the cluster.
3631

3632
    """
3633
    if self.op.vg_name is not None:
3634
      new_volume = self.op.vg_name
3635
      if not new_volume:
3636
        new_volume = None
3637
      if new_volume != self.cfg.GetVGName():
3638
        self.cfg.SetVGName(new_volume)
3639
      else:
3640
        feedback_fn("Cluster LVM configuration already in desired"
3641
                    " state, not changing")
3642
    if self.op.drbd_helper is not None:
3643
      new_helper = self.op.drbd_helper
3644
      if not new_helper:
3645
        new_helper = None
3646
      if new_helper != self.cfg.GetDRBDHelper():
3647
        self.cfg.SetDRBDHelper(new_helper)
3648
      else:
3649
        feedback_fn("Cluster DRBD helper already in desired state,"
3650
                    " not changing")
3651
    if self.op.hvparams:
3652
      self.cluster.hvparams = self.new_hvparams
3653
    if self.op.os_hvp:
3654
      self.cluster.os_hvp = self.new_os_hvp
3655
    if self.op.enabled_hypervisors is not None:
3656
      self.cluster.hvparams = self.new_hvparams
3657
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
3658
    if self.op.beparams:
3659
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
3660
    if self.op.nicparams:
3661
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
3662
    if self.op.osparams:
3663
      self.cluster.osparams = self.new_osp
3664
    if self.op.ndparams:
3665
      self.cluster.ndparams = self.new_ndparams
3666

    
3667
    if self.op.candidate_pool_size is not None:
3668
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3669
      # we need to update the pool size here, otherwise the save will fail
3670
      _AdjustCandidatePool(self, [])
3671

    
3672
    if self.op.maintain_node_health is not None:
3673
      self.cluster.maintain_node_health = self.op.maintain_node_health
3674

    
3675
    if self.op.prealloc_wipe_disks is not None:
3676
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3677

    
3678
    if self.op.add_uids is not None:
3679
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3680

    
3681
    if self.op.remove_uids is not None:
3682
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3683

    
3684
    if self.op.uid_pool is not None:
3685
      self.cluster.uid_pool = self.op.uid_pool
3686

    
3687
    if self.op.default_iallocator is not None:
3688
      self.cluster.default_iallocator = self.op.default_iallocator
3689

    
3690
    if self.op.reserved_lvs is not None:
3691
      self.cluster.reserved_lvs = self.op.reserved_lvs
3692

    
3693
    def helper_os(aname, mods, desc):
3694
      desc += " OS list"
3695
      lst = getattr(self.cluster, aname)
3696
      for key, val in mods:
3697
        if key == constants.DDM_ADD:
3698
          if val in lst:
3699
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3700
          else:
3701
            lst.append(val)
3702
        elif key == constants.DDM_REMOVE:
3703
          if val in lst:
3704
            lst.remove(val)
3705
          else:
3706
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3707
        else:
3708
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3709

    
3710
    if self.op.hidden_os:
3711
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3712

    
3713
    if self.op.blacklisted_os:
3714
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3715

    
3716
    if self.op.master_netdev:
3717
      master = self.cfg.GetMasterNode()
3718
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3719
                  self.cluster.master_netdev)
3720
      result = self.rpc.call_node_deactivate_master_ip(master)
3721
      result.Raise("Could not disable the master ip")
3722
      feedback_fn("Changing master_netdev from %s to %s" %
3723
                  (self.cluster.master_netdev, self.op.master_netdev))
3724
      self.cluster.master_netdev = self.op.master_netdev
3725

    
3726
    self.cfg.Update(self.cluster, feedback_fn)
3727

    
3728
    if self.op.master_netdev:
3729
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3730
                  self.op.master_netdev)
3731
      result = self.rpc.call_node_activate_master_ip(master)
3732
      if result.fail_msg:
3733
        self.LogWarning("Could not re-enable the master ip on"
3734
                        " the master, please restart manually: %s",
3735
                        result.fail_msg)
3736

    
3737

    
3738
def _UploadHelper(lu, nodes, fname):
3739
  """Helper for uploading a file and showing warnings.
3740

3741
  """
3742
  if os.path.exists(fname):
3743
    result = lu.rpc.call_upload_file(nodes, fname)
3744
    for to_node, to_result in result.items():
3745
      msg = to_result.fail_msg
3746
      if msg:
3747
        msg = ("Copy of file %s to node %s failed: %s" %
3748
               (fname, to_node, msg))
3749
        lu.proc.LogWarning(msg)
3750

    
3751

    
3752
def _ComputeAncillaryFiles(cluster, redist):
3753
  """Compute files external to Ganeti which need to be consistent.
3754

3755
  @type redist: boolean
3756
  @param redist: Whether to include files which need to be redistributed
3757

3758
  """
3759
  # Compute files for all nodes
3760
  files_all = set([
3761
    constants.SSH_KNOWN_HOSTS_FILE,
3762
    constants.CONFD_HMAC_KEY,
3763
    constants.CLUSTER_DOMAIN_SECRET_FILE,
3764
    constants.RAPI_USERS_FILE,
3765
    ])
3766

    
3767
  if not redist:
3768
    files_all.update(constants.ALL_CERT_FILES)
3769
    files_all.update(ssconf.SimpleStore().GetFileList())
3770
  else:
3771
    # we need to ship at least the RAPI certificate
3772
    files_all.add(constants.RAPI_CERT_FILE)
3773

    
3774
  if cluster.modify_etc_hosts:
3775
    files_all.add(constants.ETC_HOSTS)
3776

    
3777
  # Files which are optional, these must:
3778
  # - be present in one other category as well
3779
  # - either exist or not exist on all nodes of that category (mc, vm all)
3780
  files_opt = set([
3781
    constants.RAPI_USERS_FILE,
3782
    ])
3783

    
3784
  # Files which should only be on master candidates
3785
  files_mc = set()
3786
  if not redist:
3787
    files_mc.add(constants.CLUSTER_CONF_FILE)
3788

    
3789
  # Files which should only be on VM-capable nodes
3790
  files_vm = set(filename
3791
    for hv_name in cluster.enabled_hypervisors
3792
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0])
3793

    
3794
  files_opt |= set(filename
3795
    for hv_name in cluster.enabled_hypervisors
3796
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1])
3797

    
3798
  # Filenames in each category must be unique
3799
  all_files_set = files_all | files_mc | files_vm
3800
  assert (len(all_files_set) ==
3801
          sum(map(len, [files_all, files_mc, files_vm]))), \
3802
         "Found file listed in more than one file list"
3803

    
3804
  # Optional files must be present in one other category
3805
  assert all_files_set.issuperset(files_opt), \
3806
         "Optional file not in a different required list"
3807

    
3808
  return (files_all, files_opt, files_mc, files_vm)
3809

    
3810

    
3811
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3812
  """Distribute additional files which are part of the cluster configuration.
3813

3814
  ConfigWriter takes care of distributing the config and ssconf files, but
3815
  there are more files which should be distributed to all nodes. This function
3816
  makes sure those are copied.
3817

3818
  @param lu: calling logical unit
3819
  @param additional_nodes: list of nodes not in the config to distribute to
3820
  @type additional_vm: boolean
3821
  @param additional_vm: whether the additional nodes are vm-capable or not
3822

3823
  """
3824
  # Gather target nodes
3825
  cluster = lu.cfg.GetClusterInfo()
3826
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3827

    
3828
  online_nodes = lu.cfg.GetOnlineNodeList()
3829
  vm_nodes = lu.cfg.GetVmCapableNodeList()
3830

    
3831
  if additional_nodes is not None:
3832
    online_nodes.extend(additional_nodes)
3833
    if additional_vm:
3834
      vm_nodes.extend(additional_nodes)
3835

    
3836
  # Never distribute to master node
3837
  for nodelist in [online_nodes, vm_nodes]:
3838
    if master_info.name in nodelist:
3839
      nodelist.remove(master_info.name)
3840

    
3841
  # Gather file lists
3842
  (files_all, _, files_mc, files_vm) = \
3843
    _ComputeAncillaryFiles(cluster, True)
3844

    
3845
  # Never re-distribute configuration file from here
3846
  assert not (constants.CLUSTER_CONF_FILE in files_all or
3847
              constants.CLUSTER_CONF_FILE in files_vm)
3848
  assert not files_mc, "Master candidates not handled in this function"
3849

    
3850
  filemap = [
3851
    (online_nodes, files_all),
3852
    (vm_nodes, files_vm),
3853
    ]
3854

    
3855
  # Upload the files
3856
  for (node_list, files) in filemap:
3857
    for fname in files:
3858
      _UploadHelper(lu, node_list, fname)
3859

    
3860

    
3861
class LUClusterRedistConf(NoHooksLU):
3862
  """Force the redistribution of cluster configuration.
3863

3864
  This is a very simple LU.
3865

3866
  """
3867
  REQ_BGL = False
3868

    
3869
  def ExpandNames(self):
3870
    self.needed_locks = {
3871
      locking.LEVEL_NODE: locking.ALL_SET,
3872
    }
3873
    self.share_locks[locking.LEVEL_NODE] = 1
3874

    
3875
  def Exec(self, feedback_fn):
3876
    """Redistribute the configuration.
3877

3878
    """
3879
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3880
    _RedistributeAncillaryFiles(self)
3881

    
3882

    
3883
class LUClusterActivateMasterIp(NoHooksLU):
3884
  """Activate the master IP on the master node.
3885

3886
  """
3887
  def Exec(self, feedback_fn):
3888
    """Activate the master IP.
3889

3890
    """
3891
    master = self.cfg.GetMasterNode()
3892
    result = self.rpc.call_node_activate_master_ip(master)
3893
    result.Raise("Could not activate the master IP")
3894

    
3895

    
3896
class LUClusterDeactivateMasterIp(NoHooksLU):
3897
  """Deactivate the master IP on the master node.
3898

3899
  """
3900
  def Exec(self, feedback_fn):
3901
    """Deactivate the master IP.
3902

3903
    """
3904
    master = self.cfg.GetMasterNode()
3905
    result = self.rpc.call_node_deactivate_master_ip(master)
3906
    result.Raise("Could not deactivate the master IP")
3907

    
3908

    
3909
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3910
  """Sleep and poll for an instance's disk to sync.
3911

3912
  """
3913
  if not instance.disks or disks is not None and not disks:
3914
    return True
3915

    
3916
  disks = _ExpandCheckDisks(instance, disks)
3917

    
3918
  if not oneshot:
3919
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3920

    
3921
  node = instance.primary_node
3922

    
3923
  for dev in disks:
3924
    lu.cfg.SetDiskID(dev, node)
3925

    
3926
  # TODO: Convert to utils.Retry
3927

    
3928
  retries = 0
3929
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3930
  while True:
3931
    max_time = 0
3932
    done = True
3933
    cumul_degraded = False
3934
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3935
    msg = rstats.fail_msg
3936
    if msg:
3937
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3938
      retries += 1
3939
      if retries >= 10:
3940
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3941
                                 " aborting." % node)
3942
      time.sleep(6)
3943
      continue
3944
    rstats = rstats.payload
3945
    retries = 0
3946
    for i, mstat in enumerate(rstats):
3947
      if mstat is None:
3948
        lu.LogWarning("Can't compute data for node %s/%s",
3949
                           node, disks[i].iv_name)
3950
        continue
3951

    
3952
      cumul_degraded = (cumul_degraded or
3953
                        (mstat.is_degraded and mstat.sync_percent is None))
3954
      if mstat.sync_percent is not None:
3955
        done = False
3956
        if mstat.estimated_time is not None:
3957
          rem_time = ("%s remaining (estimated)" %
3958
                      utils.FormatSeconds(mstat.estimated_time))
3959
          max_time = mstat.estimated_time
3960
        else:
3961
          rem_time = "no time estimate"
3962
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
3963
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
3964

    
3965
    # if we're done but degraded, let's do a few small retries, to
3966
    # make sure we see a stable and not transient situation; therefore
3967
    # we force restart of the loop
3968
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3969
      logging.info("Degraded disks found, %d retries left", degr_retries)
3970
      degr_retries -= 1
3971
      time.sleep(1)
3972
      continue
3973

    
3974
    if done or oneshot:
3975
      break
3976

    
3977
    time.sleep(min(60, max_time))
3978

    
3979
  if done:
3980
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
3981
  return not cumul_degraded
3982

    
3983

    
3984
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3985
  """Check that mirrors are not degraded.
3986

3987
  The ldisk parameter, if True, will change the test from the
3988
  is_degraded attribute (which represents overall non-ok status for
3989
  the device(s)) to the ldisk (representing the local storage status).
3990

3991
  """
3992
  lu.cfg.SetDiskID(dev, node)
3993

    
3994
  result = True
3995

    
3996
  if on_primary or dev.AssembleOnSecondary():
3997
    rstats = lu.rpc.call_blockdev_find(node, dev)
3998
    msg = rstats.fail_msg
3999
    if msg:
4000
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
4001
      result = False
4002
    elif not rstats.payload:
4003
      lu.LogWarning("Can't find disk on node %s", node)
4004
      result = False
4005
    else:
4006
      if ldisk:
4007
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
4008
      else:
4009
        result = result and not rstats.payload.is_degraded
4010

    
4011
  if dev.children:
4012
    for child in dev.children:
4013
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
4014

    
4015
  return result
4016

    
4017

    
4018
class LUOobCommand(NoHooksLU):
4019
  """Logical unit for OOB handling.
4020

4021
  """
4022
  REG_BGL = False
4023
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
4024

    
4025
  def ExpandNames(self):
4026
    """Gather locks we need.
4027

4028
    """
4029
    if self.op.node_names:
4030
      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
4031
      lock_names = self.op.node_names
4032
    else:
4033
      lock_names = locking.ALL_SET
4034

    
4035
    self.needed_locks = {
4036
      locking.LEVEL_NODE: lock_names,
4037
      }
4038

    
4039
  def CheckPrereq(self):
4040
    """Check prerequisites.
4041

4042
    This checks:
4043
     - the node exists in the configuration
4044
     - OOB is supported
4045

4046
    Any errors are signaled by raising errors.OpPrereqError.
4047

4048
    """
4049
    self.nodes = []
4050
    self.master_node = self.cfg.GetMasterNode()
4051

    
4052
    assert self.op.power_delay >= 0.0
4053

    
4054
    if self.op.node_names:
4055
      if (self.op.command in self._SKIP_MASTER and
4056
          self.master_node in self.op.node_names):
4057
        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
4058
        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
4059

    
4060
        if master_oob_handler:
4061
          additional_text = ("run '%s %s %s' if you want to operate on the"
4062
                             " master regardless") % (master_oob_handler,
4063
                                                      self.op.command,
4064
                                                      self.master_node)
4065
        else:
4066
          additional_text = "it does not support out-of-band operations"
4067

    
4068
        raise errors.OpPrereqError(("Operating on the master node %s is not"
4069
                                    " allowed for %s; %s") %
4070
                                   (self.master_node, self.op.command,
4071
                                    additional_text), errors.ECODE_INVAL)
4072
    else:
4073
      self.op.node_names = self.cfg.GetNodeList()
4074
      if self.op.command in self._SKIP_MASTER:
4075
        self.op.node_names.remove(self.master_node)
4076

    
4077
    if self.op.command in self._SKIP_MASTER:
4078
      assert self.master_node not in self.op.node_names
4079

    
4080
    for (node_name, node) in self.cfg.GetMultiNodeInfo(self.op.node_names):
4081
      if node is None:
4082
        raise errors.OpPrereqError("Node %s not found" % node_name,
4083
                                   errors.ECODE_NOENT)
4084
      else:
4085
        self.nodes.append(node)
4086

    
4087
      if (not self.op.ignore_status and
4088
          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
4089
        raise errors.OpPrereqError(("Cannot power off node %s because it is"
4090