Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 78519c10

History | View | Annotate | Download (499.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay too many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42
import itertools
43
import operator
44

    
45
from ganeti import ssh
46
from ganeti import utils
47
from ganeti import errors
48
from ganeti import hypervisor
49
from ganeti import locking
50
from ganeti import constants
51
from ganeti import objects
52
from ganeti import serializer
53
from ganeti import ssconf
54
from ganeti import uidpool
55
from ganeti import compat
56
from ganeti import masterd
57
from ganeti import netutils
58
from ganeti import query
59
from ganeti import qlang
60
from ganeti import opcodes
61
from ganeti import ht
62
from ganeti import rpc
63

    
64
import ganeti.masterd.instance # pylint: disable=W0611
65

    
66

    
67
#: Size of DRBD meta block device
68
DRBD_META_SIZE = 128
69

    
70
# States of instance
71
INSTANCE_UP = [constants.ADMINST_UP]
72
INSTANCE_DOWN = [constants.ADMINST_DOWN]
73
INSTANCE_OFFLINE = [constants.ADMINST_OFFLINE]
74
INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
75
INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
76

    
77

    
78
class ResultWithJobs:
79
  """Data container for LU results with jobs.
80

81
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
82
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
83
  contained in the C{jobs} attribute and include the job IDs in the opcode
84
  result.
85

86
  """
87
  def __init__(self, jobs, **kwargs):
88
    """Initializes this class.
89

90
    Additional return values can be specified as keyword arguments.
91

92
    @type jobs: list of lists of L{opcode.OpCode}
93
    @param jobs: A list of lists of opcode objects
94

95
    """
96
    self.jobs = jobs
97
    self.other = kwargs
98

    
99

    
100
class LogicalUnit(object):
101
  """Logical Unit base class.
102

103
  Subclasses must follow these rules:
104
    - implement ExpandNames
105
    - implement CheckPrereq (except when tasklets are used)
106
    - implement Exec (except when tasklets are used)
107
    - implement BuildHooksEnv
108
    - implement BuildHooksNodes
109
    - redefine HPATH and HTYPE
110
    - optionally redefine their run requirements:
111
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
112

113
  Note that all commands require root permissions.
114

115
  @ivar dry_run_result: the value (if any) that will be returned to the caller
116
      in dry-run mode (signalled by opcode dry_run parameter)
117

118
  """
119
  HPATH = None
120
  HTYPE = None
121
  REQ_BGL = True
122

    
123
  def __init__(self, processor, op, context, rpc_runner):
124
    """Constructor for LogicalUnit.
125

126
    This needs to be overridden in derived classes in order to check op
127
    validity.
128

129
    """
130
    self.proc = processor
131
    self.op = op
132
    self.cfg = context.cfg
133
    self.glm = context.glm
134
    # readability alias
135
    self.owned_locks = context.glm.list_owned
136
    self.context = context
137
    self.rpc = rpc_runner
138
    # Dicts used to declare locking needs to mcpu
139
    self.needed_locks = None
140
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
141
    self.add_locks = {}
142
    self.remove_locks = {}
143
    # Used to force good behavior when calling helper functions
144
    self.recalculate_locks = {}
145
    # logging
146
    self.Log = processor.Log # pylint: disable=C0103
147
    self.LogWarning = processor.LogWarning # pylint: disable=C0103
148
    self.LogInfo = processor.LogInfo # pylint: disable=C0103
149
    self.LogStep = processor.LogStep # pylint: disable=C0103
150
    # support for dry-run
151
    self.dry_run_result = None
152
    # support for generic debug attribute
153
    if (not hasattr(self.op, "debug_level") or
154
        not isinstance(self.op.debug_level, int)):
155
      self.op.debug_level = 0
156

    
157
    # Tasklets
158
    self.tasklets = None
159

    
160
    # Validate opcode parameters and set defaults
161
    self.op.Validate(True)
162

    
163
    self.CheckArguments()
164

    
165
  def CheckArguments(self):
166
    """Check syntactic validity for the opcode arguments.
167

168
    This method is for doing a simple syntactic check and ensure
169
    validity of opcode parameters, without any cluster-related
170
    checks. While the same can be accomplished in ExpandNames and/or
171
    CheckPrereq, doing these separate is better because:
172

173
      - ExpandNames is left as as purely a lock-related function
174
      - CheckPrereq is run after we have acquired locks (and possible
175
        waited for them)
176

177
    The function is allowed to change the self.op attribute so that
178
    later methods can no longer worry about missing parameters.
179

180
    """
181
    pass
182

    
183
  def ExpandNames(self):
184
    """Expand names for this LU.
185

186
    This method is called before starting to execute the opcode, and it should
187
    update all the parameters of the opcode to their canonical form (e.g. a
188
    short node name must be fully expanded after this method has successfully
189
    completed). This way locking, hooks, logging, etc. can work correctly.
190

191
    LUs which implement this method must also populate the self.needed_locks
192
    member, as a dict with lock levels as keys, and a list of needed lock names
193
    as values. Rules:
194

195
      - use an empty dict if you don't need any lock
196
      - if you don't need any lock at a particular level omit that level
197
      - don't put anything for the BGL level
198
      - if you want all locks at a level use locking.ALL_SET as a value
199

200
    If you need to share locks (rather than acquire them exclusively) at one
201
    level you can modify self.share_locks, setting a true value (usually 1) for
202
    that level. By default locks are not shared.
203

204
    This function can also define a list of tasklets, which then will be
205
    executed in order instead of the usual LU-level CheckPrereq and Exec
206
    functions, if those are not defined by the LU.
207

208
    Examples::
209

210
      # Acquire all nodes and one instance
211
      self.needed_locks = {
212
        locking.LEVEL_NODE: locking.ALL_SET,
213
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
214
      }
215
      # Acquire just two nodes
216
      self.needed_locks = {
217
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
218
      }
219
      # Acquire no locks
220
      self.needed_locks = {} # No, you can't leave it to the default value None
221

222
    """
223
    # The implementation of this method is mandatory only if the new LU is
224
    # concurrent, so that old LUs don't need to be changed all at the same
225
    # time.
226
    if self.REQ_BGL:
227
      self.needed_locks = {} # Exclusive LUs don't need locks.
228
    else:
229
      raise NotImplementedError
230

    
231
  def DeclareLocks(self, level):
232
    """Declare LU locking needs for a level
233

234
    While most LUs can just declare their locking needs at ExpandNames time,
235
    sometimes there's the need to calculate some locks after having acquired
236
    the ones before. This function is called just before acquiring locks at a
237
    particular level, but after acquiring the ones at lower levels, and permits
238
    such calculations. It can be used to modify self.needed_locks, and by
239
    default it does nothing.
240

241
    This function is only called if you have something already set in
242
    self.needed_locks for the level.
243

244
    @param level: Locking level which is going to be locked
245
    @type level: member of ganeti.locking.LEVELS
246

247
    """
248

    
249
  def CheckPrereq(self):
250
    """Check prerequisites for this LU.
251

252
    This method should check that the prerequisites for the execution
253
    of this LU are fulfilled. It can do internode communication, but
254
    it should be idempotent - no cluster or system changes are
255
    allowed.
256

257
    The method should raise errors.OpPrereqError in case something is
258
    not fulfilled. Its return value is ignored.
259

260
    This method should also update all the parameters of the opcode to
261
    their canonical form if it hasn't been done by ExpandNames before.
262

263
    """
264
    if self.tasklets is not None:
265
      for (idx, tl) in enumerate(self.tasklets):
266
        logging.debug("Checking prerequisites for tasklet %s/%s",
267
                      idx + 1, len(self.tasklets))
268
        tl.CheckPrereq()
269
    else:
270
      pass
271

    
272
  def Exec(self, feedback_fn):
273
    """Execute the LU.
274

275
    This method should implement the actual work. It should raise
276
    errors.OpExecError for failures that are somewhat dealt with in
277
    code, or expected.
278

279
    """
280
    if self.tasklets is not None:
281
      for (idx, tl) in enumerate(self.tasklets):
282
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
283
        tl.Exec(feedback_fn)
284
    else:
285
      raise NotImplementedError
286

    
287
  def BuildHooksEnv(self):
288
    """Build hooks environment for this LU.
289

290
    @rtype: dict
291
    @return: Dictionary containing the environment that will be used for
292
      running the hooks for this LU. The keys of the dict must not be prefixed
293
      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
294
      will extend the environment with additional variables. If no environment
295
      should be defined, an empty dictionary should be returned (not C{None}).
296
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
297
      will not be called.
298

299
    """
300
    raise NotImplementedError
301

    
302
  def BuildHooksNodes(self):
303
    """Build list of nodes to run LU's hooks.
304

305
    @rtype: tuple; (list, list)
306
    @return: Tuple containing a list of node names on which the hook
307
      should run before the execution and a list of node names on which the
308
      hook should run after the execution. No nodes should be returned as an
309
      empty list (and not None).
310
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
311
      will not be called.
312

313
    """
314
    raise NotImplementedError
315

    
316
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
317
    """Notify the LU about the results of its hooks.
318

319
    This method is called every time a hooks phase is executed, and notifies
320
    the Logical Unit about the hooks' result. The LU can then use it to alter
321
    its result based on the hooks.  By default the method does nothing and the
322
    previous result is passed back unchanged but any LU can define it if it
323
    wants to use the local cluster hook-scripts somehow.
324

325
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
326
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
327
    @param hook_results: the results of the multi-node hooks rpc call
328
    @param feedback_fn: function used send feedback back to the caller
329
    @param lu_result: the previous Exec result this LU had, or None
330
        in the PRE phase
331
    @return: the new Exec result, based on the previous result
332
        and hook results
333

334
    """
335
    # API must be kept, thus we ignore the unused argument and could
336
    # be a function warnings
337
    # pylint: disable=W0613,R0201
338
    return lu_result
339

    
340
  def _ExpandAndLockInstance(self):
341
    """Helper function to expand and lock an instance.
342

343
    Many LUs that work on an instance take its name in self.op.instance_name
344
    and need to expand it and then declare the expanded name for locking. This
345
    function does it, and then updates self.op.instance_name to the expanded
346
    name. It also initializes needed_locks as a dict, if this hasn't been done
347
    before.
348

349
    """
350
    if self.needed_locks is None:
351
      self.needed_locks = {}
352
    else:
353
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
354
        "_ExpandAndLockInstance called with instance-level locks set"
355
    self.op.instance_name = _ExpandInstanceName(self.cfg,
356
                                                self.op.instance_name)
357
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
358

    
359
  def _LockInstancesNodes(self, primary_only=False,
360
                          level=locking.LEVEL_NODE):
361
    """Helper function to declare instances' nodes for locking.
362

363
    This function should be called after locking one or more instances to lock
364
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
365
    with all primary or secondary nodes for instances already locked and
366
    present in self.needed_locks[locking.LEVEL_INSTANCE].
367

368
    It should be called from DeclareLocks, and for safety only works if
369
    self.recalculate_locks[locking.LEVEL_NODE] is set.
370

371
    In the future it may grow parameters to just lock some instance's nodes, or
372
    to just lock primaries or secondary nodes, if needed.
373

374
    If should be called in DeclareLocks in a way similar to::
375

376
      if level == locking.LEVEL_NODE:
377
        self._LockInstancesNodes()
378

379
    @type primary_only: boolean
380
    @param primary_only: only lock primary nodes of locked instances
381
    @param level: Which lock level to use for locking nodes
382

383
    """
384
    assert level in self.recalculate_locks, \
385
      "_LockInstancesNodes helper function called with no nodes to recalculate"
386

    
387
    # TODO: check if we're really been called with the instance locks held
388

    
389
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
390
    # future we might want to have different behaviors depending on the value
391
    # of self.recalculate_locks[locking.LEVEL_NODE]
392
    wanted_nodes = []
393
    locked_i = self.owned_locks(locking.LEVEL_INSTANCE)
394
    for _, instance in self.cfg.GetMultiInstanceInfo(locked_i):
395
      wanted_nodes.append(instance.primary_node)
396
      if not primary_only:
397
        wanted_nodes.extend(instance.secondary_nodes)
398

    
399
    if self.recalculate_locks[level] == constants.LOCKS_REPLACE:
400
      self.needed_locks[level] = wanted_nodes
401
    elif self.recalculate_locks[level] == constants.LOCKS_APPEND:
402
      self.needed_locks[level].extend(wanted_nodes)
403
    else:
404
      raise errors.ProgrammerError("Unknown recalculation mode")
405

    
406
    del self.recalculate_locks[level]
407

    
408

    
409
class NoHooksLU(LogicalUnit): # pylint: disable=W0223
410
  """Simple LU which runs no hooks.
411

412
  This LU is intended as a parent for other LogicalUnits which will
413
  run no hooks, in order to reduce duplicate code.
414

415
  """
416
  HPATH = None
417
  HTYPE = None
418

    
419
  def BuildHooksEnv(self):
420
    """Empty BuildHooksEnv for NoHooksLu.
421

422
    This just raises an error.
423

424
    """
425
    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
426

    
427
  def BuildHooksNodes(self):
428
    """Empty BuildHooksNodes for NoHooksLU.
429

430
    """
431
    raise AssertionError("BuildHooksNodes called for NoHooksLU")
432

    
433

    
434
class Tasklet:
435
  """Tasklet base class.
436

437
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
438
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
439
  tasklets know nothing about locks.
440

441
  Subclasses must follow these rules:
442
    - Implement CheckPrereq
443
    - Implement Exec
444

445
  """
446
  def __init__(self, lu):
447
    self.lu = lu
448

    
449
    # Shortcuts
450
    self.cfg = lu.cfg
451
    self.rpc = lu.rpc
452

    
453
  def CheckPrereq(self):
454
    """Check prerequisites for this tasklets.
455

456
    This method should check whether the prerequisites for the execution of
457
    this tasklet are fulfilled. It can do internode communication, but it
458
    should be idempotent - no cluster or system changes are allowed.
459

460
    The method should raise errors.OpPrereqError in case something is not
461
    fulfilled. Its return value is ignored.
462

463
    This method should also update all parameters to their canonical form if it
464
    hasn't been done before.
465

466
    """
467
    pass
468

    
469
  def Exec(self, feedback_fn):
470
    """Execute the tasklet.
471

472
    This method should implement the actual work. It should raise
473
    errors.OpExecError for failures that are somewhat dealt with in code, or
474
    expected.
475

476
    """
477
    raise NotImplementedError
478

    
479

    
480
class _QueryBase:
481
  """Base for query utility classes.
482

483
  """
484
  #: Attribute holding field definitions
485
  FIELDS = None
486

    
487
  def __init__(self, qfilter, fields, use_locking):
488
    """Initializes this class.
489

490
    """
491
    self.use_locking = use_locking
492

    
493
    self.query = query.Query(self.FIELDS, fields, qfilter=qfilter,
494
                             namefield="name")
495
    self.requested_data = self.query.RequestedData()
496
    self.names = self.query.RequestedNames()
497

    
498
    # Sort only if no names were requested
499
    self.sort_by_name = not self.names
500

    
501
    self.do_locking = None
502
    self.wanted = None
503

    
504
  def _GetNames(self, lu, all_names, lock_level):
505
    """Helper function to determine names asked for in the query.
506

507
    """
508
    if self.do_locking:
509
      names = lu.owned_locks(lock_level)
510
    else:
511
      names = all_names
512

    
513
    if self.wanted == locking.ALL_SET:
514
      assert not self.names
515
      # caller didn't specify names, so ordering is not important
516
      return utils.NiceSort(names)
517

    
518
    # caller specified names and we must keep the same order
519
    assert self.names
520
    assert not self.do_locking or lu.glm.is_owned(lock_level)
521

    
522
    missing = set(self.wanted).difference(names)
523
    if missing:
524
      raise errors.OpExecError("Some items were removed before retrieving"
525
                               " their data: %s" % missing)
526

    
527
    # Return expanded names
528
    return self.wanted
529

    
530
  def ExpandNames(self, lu):
531
    """Expand names for this query.
532

533
    See L{LogicalUnit.ExpandNames}.
534

535
    """
536
    raise NotImplementedError()
537

    
538
  def DeclareLocks(self, lu, level):
539
    """Declare locks for this query.
540

541
    See L{LogicalUnit.DeclareLocks}.
542

543
    """
544
    raise NotImplementedError()
545

    
546
  def _GetQueryData(self, lu):
547
    """Collects all data for this query.
548

549
    @return: Query data object
550

551
    """
552
    raise NotImplementedError()
553

    
554
  def NewStyleQuery(self, lu):
555
    """Collect data and execute query.
556

557
    """
558
    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
559
                                  sort_by_name=self.sort_by_name)
560

    
561
  def OldStyleQuery(self, lu):
562
    """Collect data and execute query.
563

564
    """
565
    return self.query.OldStyleQuery(self._GetQueryData(lu),
566
                                    sort_by_name=self.sort_by_name)
567

    
568

    
569
def _ShareAll():
570
  """Returns a dict declaring all lock levels shared.
571

572
  """
573
  return dict.fromkeys(locking.LEVELS, 1)
574

    
575

    
576
def _MakeLegacyNodeInfo(data):
577
  """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
578

579
  Converts the data into a single dictionary. This is fine for most use cases,
580
  but some require information from more than one volume group or hypervisor.
581

582
  """
583
  (bootid, (vg_info, ), (hv_info, )) = data
584

    
585
  return utils.JoinDisjointDicts(utils.JoinDisjointDicts(vg_info, hv_info), {
586
    "bootid": bootid,
587
    })
588

    
589

    
590
def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
591
  """Checks if the owned node groups are still correct for an instance.
592

593
  @type cfg: L{config.ConfigWriter}
594
  @param cfg: The cluster configuration
595
  @type instance_name: string
596
  @param instance_name: Instance name
597
  @type owned_groups: set or frozenset
598
  @param owned_groups: List of currently owned node groups
599

600
  """
601
  inst_groups = cfg.GetInstanceNodeGroups(instance_name)
602

    
603
  if not owned_groups.issuperset(inst_groups):
604
    raise errors.OpPrereqError("Instance %s's node groups changed since"
605
                               " locks were acquired, current groups are"
606
                               " are '%s', owning groups '%s'; retry the"
607
                               " operation" %
608
                               (instance_name,
609
                                utils.CommaJoin(inst_groups),
610
                                utils.CommaJoin(owned_groups)),
611
                               errors.ECODE_STATE)
612

    
613
  return inst_groups
614

    
615

    
616
def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
617
  """Checks if the instances in a node group are still correct.
618

619
  @type cfg: L{config.ConfigWriter}
620
  @param cfg: The cluster configuration
621
  @type group_uuid: string
622
  @param group_uuid: Node group UUID
623
  @type owned_instances: set or frozenset
624
  @param owned_instances: List of currently owned instances
625

626
  """
627
  wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
628
  if owned_instances != wanted_instances:
629
    raise errors.OpPrereqError("Instances in node group '%s' changed since"
630
                               " locks were acquired, wanted '%s', have '%s';"
631
                               " retry the operation" %
632
                               (group_uuid,
633
                                utils.CommaJoin(wanted_instances),
634
                                utils.CommaJoin(owned_instances)),
635
                               errors.ECODE_STATE)
636

    
637
  return wanted_instances
638

    
639

    
640
def _SupportsOob(cfg, node):
641
  """Tells if node supports OOB.
642

643
  @type cfg: L{config.ConfigWriter}
644
  @param cfg: The cluster configuration
645
  @type node: L{objects.Node}
646
  @param node: The node
647
  @return: The OOB script if supported or an empty string otherwise
648

649
  """
650
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
651

    
652

    
653
def _GetWantedNodes(lu, nodes):
654
  """Returns list of checked and expanded node names.
655

656
  @type lu: L{LogicalUnit}
657
  @param lu: the logical unit on whose behalf we execute
658
  @type nodes: list
659
  @param nodes: list of node names or None for all nodes
660
  @rtype: list
661
  @return: the list of nodes, sorted
662
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
663

664
  """
665
  if nodes:
666
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
667

    
668
  return utils.NiceSort(lu.cfg.GetNodeList())
669

    
670

    
671
def _GetWantedInstances(lu, instances):
672
  """Returns list of checked and expanded instance names.
673

674
  @type lu: L{LogicalUnit}
675
  @param lu: the logical unit on whose behalf we execute
676
  @type instances: list
677
  @param instances: list of instance names or None for all instances
678
  @rtype: list
679
  @return: the list of instances, sorted
680
  @raise errors.OpPrereqError: if the instances parameter is wrong type
681
  @raise errors.OpPrereqError: if any of the passed instances is not found
682

683
  """
684
  if instances:
685
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
686
  else:
687
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
688
  return wanted
689

    
690

    
691
def _GetUpdatedParams(old_params, update_dict,
692
                      use_default=True, use_none=False):
693
  """Return the new version of a parameter dictionary.
694

695
  @type old_params: dict
696
  @param old_params: old parameters
697
  @type update_dict: dict
698
  @param update_dict: dict containing new parameter values, or
699
      constants.VALUE_DEFAULT to reset the parameter to its default
700
      value
701
  @param use_default: boolean
702
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
703
      values as 'to be deleted' values
704
  @param use_none: boolean
705
  @type use_none: whether to recognise C{None} values as 'to be
706
      deleted' values
707
  @rtype: dict
708
  @return: the new parameter dictionary
709

710
  """
711
  params_copy = copy.deepcopy(old_params)
712
  for key, val in update_dict.iteritems():
713
    if ((use_default and val == constants.VALUE_DEFAULT) or
714
        (use_none and val is None)):
715
      try:
716
        del params_copy[key]
717
      except KeyError:
718
        pass
719
    else:
720
      params_copy[key] = val
721
  return params_copy
722

    
723

    
724
def _ReleaseLocks(lu, level, names=None, keep=None):
725
  """Releases locks owned by an LU.
726

727
  @type lu: L{LogicalUnit}
728
  @param level: Lock level
729
  @type names: list or None
730
  @param names: Names of locks to release
731
  @type keep: list or None
732
  @param keep: Names of locks to retain
733

734
  """
735
  assert not (keep is not None and names is not None), \
736
         "Only one of the 'names' and the 'keep' parameters can be given"
737

    
738
  if names is not None:
739
    should_release = names.__contains__
740
  elif keep:
741
    should_release = lambda name: name not in keep
742
  else:
743
    should_release = None
744

    
745
  owned = lu.owned_locks(level)
746
  if not owned:
747
    # Not owning any lock at this level, do nothing
748
    pass
749

    
750
  elif should_release:
751
    retain = []
752
    release = []
753

    
754
    # Determine which locks to release
755
    for name in owned:
756
      if should_release(name):
757
        release.append(name)
758
      else:
759
        retain.append(name)
760

    
761
    assert len(lu.owned_locks(level)) == (len(retain) + len(release))
762

    
763
    # Release just some locks
764
    lu.glm.release(level, names=release)
765

    
766
    assert frozenset(lu.owned_locks(level)) == frozenset(retain)
767
  else:
768
    # Release everything
769
    lu.glm.release(level)
770

    
771
    assert not lu.glm.is_owned(level), "No locks should be owned"
772

    
773

    
774
def _MapInstanceDisksToNodes(instances):
775
  """Creates a map from (node, volume) to instance name.
776

777
  @type instances: list of L{objects.Instance}
778
  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
779

780
  """
781
  return dict(((node, vol), inst.name)
782
              for inst in instances
783
              for (node, vols) in inst.MapLVsByNode().items()
784
              for vol in vols)
785

    
786

    
787
def _RunPostHook(lu, node_name):
788
  """Runs the post-hook for an opcode on a single node.
789

790
  """
791
  hm = lu.proc.BuildHooksManager(lu)
792
  try:
793
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
794
  except:
795
    # pylint: disable=W0702
796
    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
797

    
798

    
799
def _CheckOutputFields(static, dynamic, selected):
800
  """Checks whether all selected fields are valid.
801

802
  @type static: L{utils.FieldSet}
803
  @param static: static fields set
804
  @type dynamic: L{utils.FieldSet}
805
  @param dynamic: dynamic fields set
806

807
  """
808
  f = utils.FieldSet()
809
  f.Extend(static)
810
  f.Extend(dynamic)
811

    
812
  delta = f.NonMatching(selected)
813
  if delta:
814
    raise errors.OpPrereqError("Unknown output fields selected: %s"
815
                               % ",".join(delta), errors.ECODE_INVAL)
816

    
817

    
818
def _CheckGlobalHvParams(params):
819
  """Validates that given hypervisor params are not global ones.
820

821
  This will ensure that instances don't get customised versions of
822
  global params.
823

824
  """
825
  used_globals = constants.HVC_GLOBALS.intersection(params)
826
  if used_globals:
827
    msg = ("The following hypervisor parameters are global and cannot"
828
           " be customized at instance level, please modify them at"
829
           " cluster level: %s" % utils.CommaJoin(used_globals))
830
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
831

    
832

    
833
def _CheckNodeOnline(lu, node, msg=None):
834
  """Ensure that a given node is online.
835

836
  @param lu: the LU on behalf of which we make the check
837
  @param node: the node to check
838
  @param msg: if passed, should be a message to replace the default one
839
  @raise errors.OpPrereqError: if the node is offline
840

841
  """
842
  if msg is None:
843
    msg = "Can't use offline node"
844
  if lu.cfg.GetNodeInfo(node).offline:
845
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
846

    
847

    
848
def _CheckNodeNotDrained(lu, node):
849
  """Ensure that a given node is not drained.
850

851
  @param lu: the LU on behalf of which we make the check
852
  @param node: the node to check
853
  @raise errors.OpPrereqError: if the node is drained
854

855
  """
856
  if lu.cfg.GetNodeInfo(node).drained:
857
    raise errors.OpPrereqError("Can't use drained node %s" % node,
858
                               errors.ECODE_STATE)
859

    
860

    
861
def _CheckNodeVmCapable(lu, node):
862
  """Ensure that a given node is vm capable.
863

864
  @param lu: the LU on behalf of which we make the check
865
  @param node: the node to check
866
  @raise errors.OpPrereqError: if the node is not vm capable
867

868
  """
869
  if not lu.cfg.GetNodeInfo(node).vm_capable:
870
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
871
                               errors.ECODE_STATE)
872

    
873

    
874
def _CheckNodeHasOS(lu, node, os_name, force_variant):
875
  """Ensure that a node supports a given OS.
876

877
  @param lu: the LU on behalf of which we make the check
878
  @param node: the node to check
879
  @param os_name: the OS to query about
880
  @param force_variant: whether to ignore variant errors
881
  @raise errors.OpPrereqError: if the node is not supporting the OS
882

883
  """
884
  result = lu.rpc.call_os_get(node, os_name)
885
  result.Raise("OS '%s' not in supported OS list for node %s" %
886
               (os_name, node),
887
               prereq=True, ecode=errors.ECODE_INVAL)
888
  if not force_variant:
889
    _CheckOSVariant(result.payload, os_name)
890

    
891

    
892
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
893
  """Ensure that a node has the given secondary ip.
894

895
  @type lu: L{LogicalUnit}
896
  @param lu: the LU on behalf of which we make the check
897
  @type node: string
898
  @param node: the node to check
899
  @type secondary_ip: string
900
  @param secondary_ip: the ip to check
901
  @type prereq: boolean
902
  @param prereq: whether to throw a prerequisite or an execute error
903
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
904
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
905

906
  """
907
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
908
  result.Raise("Failure checking secondary ip on node %s" % node,
909
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
910
  if not result.payload:
911
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
912
           " please fix and re-run this command" % secondary_ip)
913
    if prereq:
914
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
915
    else:
916
      raise errors.OpExecError(msg)
917

    
918

    
919
def _GetClusterDomainSecret():
920
  """Reads the cluster domain secret.
921

922
  """
923
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
924
                               strict=True)
925

    
926

    
927
def _CheckInstanceState(lu, instance, req_states, msg=None):
928
  """Ensure that an instance is in one of the required states.
929

930
  @param lu: the LU on behalf of which we make the check
931
  @param instance: the instance to check
932
  @param msg: if passed, should be a message to replace the default one
933
  @raise errors.OpPrereqError: if the instance is not in the required state
934

935
  """
936
  if msg is None:
937
    msg = "can't use instance from outside %s states" % ", ".join(req_states)
938
  if instance.admin_state not in req_states:
939
    raise errors.OpPrereqError("Instance %s is marked to be %s, %s" %
940
                               (instance, instance.admin_state, msg),
941
                               errors.ECODE_STATE)
942

    
943
  if constants.ADMINST_UP not in req_states:
944
    pnode = instance.primary_node
945
    ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
946
    ins_l.Raise("Can't contact node %s for instance information" % pnode,
947
                prereq=True, ecode=errors.ECODE_ENVIRON)
948

    
949
    if instance.name in ins_l.payload:
950
      raise errors.OpPrereqError("Instance %s is running, %s" %
951
                                 (instance.name, msg), errors.ECODE_STATE)
952

    
953

    
954
def _ExpandItemName(fn, name, kind):
955
  """Expand an item name.
956

957
  @param fn: the function to use for expansion
958
  @param name: requested item name
959
  @param kind: text description ('Node' or 'Instance')
960
  @return: the resolved (full) name
961
  @raise errors.OpPrereqError: if the item is not found
962

963
  """
964
  full_name = fn(name)
965
  if full_name is None:
966
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
967
                               errors.ECODE_NOENT)
968
  return full_name
969

    
970

    
971
def _ExpandNodeName(cfg, name):
972
  """Wrapper over L{_ExpandItemName} for nodes."""
973
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
974

    
975

    
976
def _ExpandInstanceName(cfg, name):
977
  """Wrapper over L{_ExpandItemName} for instance."""
978
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
979

    
980

    
981
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
982
                          minmem, maxmem, vcpus, nics, disk_template, disks,
983
                          bep, hvp, hypervisor_name, tags):
984
  """Builds instance related env variables for hooks
985

986
  This builds the hook environment from individual variables.
987

988
  @type name: string
989
  @param name: the name of the instance
990
  @type primary_node: string
991
  @param primary_node: the name of the instance's primary node
992
  @type secondary_nodes: list
993
  @param secondary_nodes: list of secondary nodes as strings
994
  @type os_type: string
995
  @param os_type: the name of the instance's OS
996
  @type status: string
997
  @param status: the desired status of the instance
998
  @type minmem: string
999
  @param minmem: the minimum memory size of the instance
1000
  @type maxmem: string
1001
  @param maxmem: the maximum memory size of the instance
1002
  @type vcpus: string
1003
  @param vcpus: the count of VCPUs the instance has
1004
  @type nics: list
1005
  @param nics: list of tuples (ip, mac, mode, link) representing
1006
      the NICs the instance has
1007
  @type disk_template: string
1008
  @param disk_template: the disk template of the instance
1009
  @type disks: list
1010
  @param disks: the list of (size, mode) pairs
1011
  @type bep: dict
1012
  @param bep: the backend parameters for the instance
1013
  @type hvp: dict
1014
  @param hvp: the hypervisor parameters for the instance
1015
  @type hypervisor_name: string
1016
  @param hypervisor_name: the hypervisor for the instance
1017
  @type tags: list
1018
  @param tags: list of instance tags as strings
1019
  @rtype: dict
1020
  @return: the hook environment for this instance
1021

1022
  """
1023
  env = {
1024
    "OP_TARGET": name,
1025
    "INSTANCE_NAME": name,
1026
    "INSTANCE_PRIMARY": primary_node,
1027
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
1028
    "INSTANCE_OS_TYPE": os_type,
1029
    "INSTANCE_STATUS": status,
1030
    "INSTANCE_MINMEM": minmem,
1031
    "INSTANCE_MAXMEM": maxmem,
1032
    # TODO(2.7) remove deprecated "memory" value
1033
    "INSTANCE_MEMORY": maxmem,
1034
    "INSTANCE_VCPUS": vcpus,
1035
    "INSTANCE_DISK_TEMPLATE": disk_template,
1036
    "INSTANCE_HYPERVISOR": hypervisor_name,
1037
  }
1038
  if nics:
1039
    nic_count = len(nics)
1040
    for idx, (ip, mac, mode, link) in enumerate(nics):
1041
      if ip is None:
1042
        ip = ""
1043
      env["INSTANCE_NIC%d_IP" % idx] = ip
1044
      env["INSTANCE_NIC%d_MAC" % idx] = mac
1045
      env["INSTANCE_NIC%d_MODE" % idx] = mode
1046
      env["INSTANCE_NIC%d_LINK" % idx] = link
1047
      if mode == constants.NIC_MODE_BRIDGED:
1048
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
1049
  else:
1050
    nic_count = 0
1051

    
1052
  env["INSTANCE_NIC_COUNT"] = nic_count
1053

    
1054
  if disks:
1055
    disk_count = len(disks)
1056
    for idx, (size, mode) in enumerate(disks):
1057
      env["INSTANCE_DISK%d_SIZE" % idx] = size
1058
      env["INSTANCE_DISK%d_MODE" % idx] = mode
1059
  else:
1060
    disk_count = 0
1061

    
1062
  env["INSTANCE_DISK_COUNT"] = disk_count
1063

    
1064
  if not tags:
1065
    tags = []
1066

    
1067
  env["INSTANCE_TAGS"] = " ".join(tags)
1068

    
1069
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
1070
    for key, value in source.items():
1071
      env["INSTANCE_%s_%s" % (kind, key)] = value
1072

    
1073
  return env
1074

    
1075

    
1076
def _NICListToTuple(lu, nics):
1077
  """Build a list of nic information tuples.
1078

1079
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
1080
  value in LUInstanceQueryData.
1081

1082
  @type lu:  L{LogicalUnit}
1083
  @param lu: the logical unit on whose behalf we execute
1084
  @type nics: list of L{objects.NIC}
1085
  @param nics: list of nics to convert to hooks tuples
1086

1087
  """
1088
  hooks_nics = []
1089
  cluster = lu.cfg.GetClusterInfo()
1090
  for nic in nics:
1091
    ip = nic.ip
1092
    mac = nic.mac
1093
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
1094
    mode = filled_params[constants.NIC_MODE]
1095
    link = filled_params[constants.NIC_LINK]
1096
    hooks_nics.append((ip, mac, mode, link))
1097
  return hooks_nics
1098

    
1099

    
1100
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
1101
  """Builds instance related env variables for hooks from an object.
1102

1103
  @type lu: L{LogicalUnit}
1104
  @param lu: the logical unit on whose behalf we execute
1105
  @type instance: L{objects.Instance}
1106
  @param instance: the instance for which we should build the
1107
      environment
1108
  @type override: dict
1109
  @param override: dictionary with key/values that will override
1110
      our values
1111
  @rtype: dict
1112
  @return: the hook environment dictionary
1113

1114
  """
1115
  cluster = lu.cfg.GetClusterInfo()
1116
  bep = cluster.FillBE(instance)
1117
  hvp = cluster.FillHV(instance)
1118
  args = {
1119
    "name": instance.name,
1120
    "primary_node": instance.primary_node,
1121
    "secondary_nodes": instance.secondary_nodes,
1122
    "os_type": instance.os,
1123
    "status": instance.admin_state,
1124
    "maxmem": bep[constants.BE_MAXMEM],
1125
    "minmem": bep[constants.BE_MINMEM],
1126
    "vcpus": bep[constants.BE_VCPUS],
1127
    "nics": _NICListToTuple(lu, instance.nics),
1128
    "disk_template": instance.disk_template,
1129
    "disks": [(disk.size, disk.mode) for disk in instance.disks],
1130
    "bep": bep,
1131
    "hvp": hvp,
1132
    "hypervisor_name": instance.hypervisor,
1133
    "tags": instance.tags,
1134
  }
1135
  if override:
1136
    args.update(override)
1137
  return _BuildInstanceHookEnv(**args) # pylint: disable=W0142
1138

    
1139

    
1140
def _AdjustCandidatePool(lu, exceptions):
1141
  """Adjust the candidate pool after node operations.
1142

1143
  """
1144
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1145
  if mod_list:
1146
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1147
               utils.CommaJoin(node.name for node in mod_list))
1148
    for name in mod_list:
1149
      lu.context.ReaddNode(name)
1150
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1151
  if mc_now > mc_max:
1152
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1153
               (mc_now, mc_max))
1154

    
1155

    
1156
def _DecideSelfPromotion(lu, exceptions=None):
1157
  """Decide whether I should promote myself as a master candidate.
1158

1159
  """
1160
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1161
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1162
  # the new node will increase mc_max with one, so:
1163
  mc_should = min(mc_should + 1, cp_size)
1164
  return mc_now < mc_should
1165

    
1166

    
1167
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1168
  """Check that the brigdes needed by a list of nics exist.
1169

1170
  """
1171
  cluster = lu.cfg.GetClusterInfo()
1172
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1173
  brlist = [params[constants.NIC_LINK] for params in paramslist
1174
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1175
  if brlist:
1176
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1177
    result.Raise("Error checking bridges on destination node '%s'" %
1178
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1179

    
1180

    
1181
def _CheckInstanceBridgesExist(lu, instance, node=None):
1182
  """Check that the brigdes needed by an instance exist.
1183

1184
  """
1185
  if node is None:
1186
    node = instance.primary_node
1187
  _CheckNicsBridgesExist(lu, instance.nics, node)
1188

    
1189

    
1190
def _CheckOSVariant(os_obj, name):
1191
  """Check whether an OS name conforms to the os variants specification.
1192

1193
  @type os_obj: L{objects.OS}
1194
  @param os_obj: OS object to check
1195
  @type name: string
1196
  @param name: OS name passed by the user, to check for validity
1197

1198
  """
1199
  variant = objects.OS.GetVariant(name)
1200
  if not os_obj.supported_variants:
1201
    if variant:
1202
      raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
1203
                                 " passed)" % (os_obj.name, variant),
1204
                                 errors.ECODE_INVAL)
1205
    return
1206
  if not variant:
1207
    raise errors.OpPrereqError("OS name must include a variant",
1208
                               errors.ECODE_INVAL)
1209

    
1210
  if variant not in os_obj.supported_variants:
1211
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1212

    
1213

    
1214
def _GetNodeInstancesInner(cfg, fn):
1215
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1216

    
1217

    
1218
def _GetNodeInstances(cfg, node_name):
1219
  """Returns a list of all primary and secondary instances on a node.
1220

1221
  """
1222

    
1223
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1224

    
1225

    
1226
def _GetNodePrimaryInstances(cfg, node_name):
1227
  """Returns primary instances on a node.
1228

1229
  """
1230
  return _GetNodeInstancesInner(cfg,
1231
                                lambda inst: node_name == inst.primary_node)
1232

    
1233

    
1234
def _GetNodeSecondaryInstances(cfg, node_name):
1235
  """Returns secondary instances on a node.
1236

1237
  """
1238
  return _GetNodeInstancesInner(cfg,
1239
                                lambda inst: node_name in inst.secondary_nodes)
1240

    
1241

    
1242
def _GetStorageTypeArgs(cfg, storage_type):
1243
  """Returns the arguments for a storage type.
1244

1245
  """
1246
  # Special case for file storage
1247
  if storage_type == constants.ST_FILE:
1248
    # storage.FileStorage wants a list of storage directories
1249
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1250

    
1251
  return []
1252

    
1253

    
1254
def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
1255
  faulty = []
1256

    
1257
  for dev in instance.disks:
1258
    cfg.SetDiskID(dev, node_name)
1259

    
1260
  result = rpc_runner.call_blockdev_getmirrorstatus(node_name, instance.disks)
1261
  result.Raise("Failed to get disk status from node %s" % node_name,
1262
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1263

    
1264
  for idx, bdev_status in enumerate(result.payload):
1265
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1266
      faulty.append(idx)
1267

    
1268
  return faulty
1269

    
1270

    
1271
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1272
  """Check the sanity of iallocator and node arguments and use the
1273
  cluster-wide iallocator if appropriate.
1274

1275
  Check that at most one of (iallocator, node) is specified. If none is
1276
  specified, then the LU's opcode's iallocator slot is filled with the
1277
  cluster-wide default iallocator.
1278

1279
  @type iallocator_slot: string
1280
  @param iallocator_slot: the name of the opcode iallocator slot
1281
  @type node_slot: string
1282
  @param node_slot: the name of the opcode target node slot
1283

1284
  """
1285
  node = getattr(lu.op, node_slot, None)
1286
  iallocator = getattr(lu.op, iallocator_slot, None)
1287

    
1288
  if node is not None and iallocator is not None:
1289
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
1290
                               errors.ECODE_INVAL)
1291
  elif node is None and iallocator is None:
1292
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1293
    if default_iallocator:
1294
      setattr(lu.op, iallocator_slot, default_iallocator)
1295
    else:
1296
      raise errors.OpPrereqError("No iallocator or node given and no"
1297
                                 " cluster-wide default iallocator found;"
1298
                                 " please specify either an iallocator or a"
1299
                                 " node, or set a cluster-wide default"
1300
                                 " iallocator")
1301

    
1302

    
1303
def _GetDefaultIAllocator(cfg, iallocator):
1304
  """Decides on which iallocator to use.
1305

1306
  @type cfg: L{config.ConfigWriter}
1307
  @param cfg: Cluster configuration object
1308
  @type iallocator: string or None
1309
  @param iallocator: Iallocator specified in opcode
1310
  @rtype: string
1311
  @return: Iallocator name
1312

1313
  """
1314
  if not iallocator:
1315
    # Use default iallocator
1316
    iallocator = cfg.GetDefaultIAllocator()
1317

    
1318
  if not iallocator:
1319
    raise errors.OpPrereqError("No iallocator was specified, neither in the"
1320
                               " opcode nor as a cluster-wide default",
1321
                               errors.ECODE_INVAL)
1322

    
1323
  return iallocator
1324

    
1325

    
1326
class LUClusterPostInit(LogicalUnit):
1327
  """Logical unit for running hooks after cluster initialization.
1328

1329
  """
1330
  HPATH = "cluster-init"
1331
  HTYPE = constants.HTYPE_CLUSTER
1332

    
1333
  def BuildHooksEnv(self):
1334
    """Build hooks env.
1335

1336
    """
1337
    return {
1338
      "OP_TARGET": self.cfg.GetClusterName(),
1339
      }
1340

    
1341
  def BuildHooksNodes(self):
1342
    """Build hooks nodes.
1343

1344
    """
1345
    return ([], [self.cfg.GetMasterNode()])
1346

    
1347
  def Exec(self, feedback_fn):
1348
    """Nothing to do.
1349

1350
    """
1351
    return True
1352

    
1353

    
1354
class LUClusterDestroy(LogicalUnit):
1355
  """Logical unit for destroying the cluster.
1356

1357
  """
1358
  HPATH = "cluster-destroy"
1359
  HTYPE = constants.HTYPE_CLUSTER
1360

    
1361
  def BuildHooksEnv(self):
1362
    """Build hooks env.
1363

1364
    """
1365
    return {
1366
      "OP_TARGET": self.cfg.GetClusterName(),
1367
      }
1368

    
1369
  def BuildHooksNodes(self):
1370
    """Build hooks nodes.
1371

1372
    """
1373
    return ([], [])
1374

    
1375
  def CheckPrereq(self):
1376
    """Check prerequisites.
1377

1378
    This checks whether the cluster is empty.
1379

1380
    Any errors are signaled by raising errors.OpPrereqError.
1381

1382
    """
1383
    master = self.cfg.GetMasterNode()
1384

    
1385
    nodelist = self.cfg.GetNodeList()
1386
    if len(nodelist) != 1 or nodelist[0] != master:
1387
      raise errors.OpPrereqError("There are still %d node(s) in"
1388
                                 " this cluster." % (len(nodelist) - 1),
1389
                                 errors.ECODE_INVAL)
1390
    instancelist = self.cfg.GetInstanceList()
1391
    if instancelist:
1392
      raise errors.OpPrereqError("There are still %d instance(s) in"
1393
                                 " this cluster." % len(instancelist),
1394
                                 errors.ECODE_INVAL)
1395

    
1396
  def Exec(self, feedback_fn):
1397
    """Destroys the cluster.
1398

1399
    """
1400
    master_params = self.cfg.GetMasterNetworkParameters()
1401

    
1402
    # Run post hooks on master node before it's removed
1403
    _RunPostHook(self, master_params.name)
1404

    
1405
    ems = self.cfg.GetUseExternalMipScript()
1406
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1407
                                                     master_params, ems)
1408
    result.Raise("Could not disable the master role")
1409

    
1410
    return master_params.name
1411

    
1412

    
1413
def _VerifyCertificate(filename):
1414
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1415

1416
  @type filename: string
1417
  @param filename: Path to PEM file
1418

1419
  """
1420
  try:
1421
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1422
                                           utils.ReadFile(filename))
1423
  except Exception, err: # pylint: disable=W0703
1424
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1425
            "Failed to load X509 certificate %s: %s" % (filename, err))
1426

    
1427
  (errcode, msg) = \
1428
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1429
                                constants.SSL_CERT_EXPIRATION_ERROR)
1430

    
1431
  if msg:
1432
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1433
  else:
1434
    fnamemsg = None
1435

    
1436
  if errcode is None:
1437
    return (None, fnamemsg)
1438
  elif errcode == utils.CERT_WARNING:
1439
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1440
  elif errcode == utils.CERT_ERROR:
1441
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1442

    
1443
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1444

    
1445

    
1446
def _GetAllHypervisorParameters(cluster, instances):
1447
  """Compute the set of all hypervisor parameters.
1448

1449
  @type cluster: L{objects.Cluster}
1450
  @param cluster: the cluster object
1451
  @param instances: list of L{objects.Instance}
1452
  @param instances: additional instances from which to obtain parameters
1453
  @rtype: list of (origin, hypervisor, parameters)
1454
  @return: a list with all parameters found, indicating the hypervisor they
1455
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1456

1457
  """
1458
  hvp_data = []
1459

    
1460
  for hv_name in cluster.enabled_hypervisors:
1461
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1462

    
1463
  for os_name, os_hvp in cluster.os_hvp.items():
1464
    for hv_name, hv_params in os_hvp.items():
1465
      if hv_params:
1466
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1467
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1468

    
1469
  # TODO: collapse identical parameter values in a single one
1470
  for instance in instances:
1471
    if instance.hvparams:
1472
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1473
                       cluster.FillHV(instance)))
1474

    
1475
  return hvp_data
1476

    
1477

    
1478
class _VerifyErrors(object):
1479
  """Mix-in for cluster/group verify LUs.
1480

1481
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1482
  self.op and self._feedback_fn to be available.)
1483

1484
  """
1485

    
1486
  ETYPE_FIELD = "code"
1487
  ETYPE_ERROR = "ERROR"
1488
  ETYPE_WARNING = "WARNING"
1489

    
1490
  def _Error(self, ecode, item, msg, *args, **kwargs):
1491
    """Format an error message.
1492

1493
    Based on the opcode's error_codes parameter, either format a
1494
    parseable error code, or a simpler error string.
1495

1496
    This must be called only from Exec and functions called from Exec.
1497

1498
    """
1499
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1500
    itype, etxt, _ = ecode
1501
    # first complete the msg
1502
    if args:
1503
      msg = msg % args
1504
    # then format the whole message
1505
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1506
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1507
    else:
1508
      if item:
1509
        item = " " + item
1510
      else:
1511
        item = ""
1512
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1513
    # and finally report it via the feedback_fn
1514
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1515

    
1516
  def _ErrorIf(self, cond, ecode, *args, **kwargs):
1517
    """Log an error message if the passed condition is True.
1518

1519
    """
1520
    cond = (bool(cond)
1521
            or self.op.debug_simulate_errors) # pylint: disable=E1101
1522

    
1523
    # If the error code is in the list of ignored errors, demote the error to a
1524
    # warning
1525
    (_, etxt, _) = ecode
1526
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1527
      kwargs[self.ETYPE_FIELD] = self.ETYPE_WARNING
1528

    
1529
    if cond:
1530
      self._Error(ecode, *args, **kwargs)
1531

    
1532
    # do not mark the operation as failed for WARN cases only
1533
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1534
      self.bad = self.bad or cond
1535

    
1536

    
1537
class LUClusterVerify(NoHooksLU):
1538
  """Submits all jobs necessary to verify the cluster.
1539

1540
  """
1541
  REQ_BGL = False
1542

    
1543
  def ExpandNames(self):
1544
    self.needed_locks = {}
1545

    
1546
  def Exec(self, feedback_fn):
1547
    jobs = []
1548

    
1549
    if self.op.group_name:
1550
      groups = [self.op.group_name]
1551
      depends_fn = lambda: None
1552
    else:
1553
      groups = self.cfg.GetNodeGroupList()
1554

    
1555
      # Verify global configuration
1556
      jobs.append([
1557
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors)
1558
        ])
1559

    
1560
      # Always depend on global verification
1561
      depends_fn = lambda: [(-len(jobs), [])]
1562

    
1563
    jobs.extend([opcodes.OpClusterVerifyGroup(group_name=group,
1564
                                            ignore_errors=self.op.ignore_errors,
1565
                                            depends=depends_fn())]
1566
                for group in groups)
1567

    
1568
    # Fix up all parameters
1569
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1570
      op.debug_simulate_errors = self.op.debug_simulate_errors
1571
      op.verbose = self.op.verbose
1572
      op.error_codes = self.op.error_codes
1573
      try:
1574
        op.skip_checks = self.op.skip_checks
1575
      except AttributeError:
1576
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1577

    
1578
    return ResultWithJobs(jobs)
1579

    
1580

    
1581
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1582
  """Verifies the cluster config.
1583

1584
  """
1585
  REQ_BGL = True
1586

    
1587
  def _VerifyHVP(self, hvp_data):
1588
    """Verifies locally the syntax of the hypervisor parameters.
1589

1590
    """
1591
    for item, hv_name, hv_params in hvp_data:
1592
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1593
             (item, hv_name))
1594
      try:
1595
        hv_class = hypervisor.GetHypervisor(hv_name)
1596
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1597
        hv_class.CheckParameterSyntax(hv_params)
1598
      except errors.GenericError, err:
1599
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1600

    
1601
  def ExpandNames(self):
1602
    # Information can be safely retrieved as the BGL is acquired in exclusive
1603
    # mode
1604
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
1605
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1606
    self.all_node_info = self.cfg.GetAllNodesInfo()
1607
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1608
    self.needed_locks = {}
1609

    
1610
  def Exec(self, feedback_fn):
1611
    """Verify integrity of cluster, performing various test on nodes.
1612

1613
    """
1614
    self.bad = False
1615
    self._feedback_fn = feedback_fn
1616

    
1617
    feedback_fn("* Verifying cluster config")
1618

    
1619
    for msg in self.cfg.VerifyConfig():
1620
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1621

    
1622
    feedback_fn("* Verifying cluster certificate files")
1623

    
1624
    for cert_filename in constants.ALL_CERT_FILES:
1625
      (errcode, msg) = _VerifyCertificate(cert_filename)
1626
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1627

    
1628
    feedback_fn("* Verifying hypervisor parameters")
1629

    
1630
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1631
                                                self.all_inst_info.values()))
1632

    
1633
    feedback_fn("* Verifying all nodes belong to an existing group")
1634

    
1635
    # We do this verification here because, should this bogus circumstance
1636
    # occur, it would never be caught by VerifyGroup, which only acts on
1637
    # nodes/instances reachable from existing node groups.
1638

    
1639
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1640
                         if node.group not in self.all_group_info)
1641

    
1642
    dangling_instances = {}
1643
    no_node_instances = []
1644

    
1645
    for inst in self.all_inst_info.values():
1646
      if inst.primary_node in dangling_nodes:
1647
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1648
      elif inst.primary_node not in self.all_node_info:
1649
        no_node_instances.append(inst.name)
1650

    
1651
    pretty_dangling = [
1652
        "%s (%s)" %
1653
        (node.name,
1654
         utils.CommaJoin(dangling_instances.get(node.name,
1655
                                                ["no instances"])))
1656
        for node in dangling_nodes]
1657

    
1658
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1659
                  None,
1660
                  "the following nodes (and their instances) belong to a non"
1661
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1662

    
1663
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1664
                  None,
1665
                  "the following instances have a non-existing primary-node:"
1666
                  " %s", utils.CommaJoin(no_node_instances))
1667

    
1668
    return not self.bad
1669

    
1670

    
1671
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1672
  """Verifies the status of a node group.
1673

1674
  """
1675
  HPATH = "cluster-verify"
1676
  HTYPE = constants.HTYPE_CLUSTER
1677
  REQ_BGL = False
1678

    
1679
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1680

    
1681
  class NodeImage(object):
1682
    """A class representing the logical and physical status of a node.
1683

1684
    @type name: string
1685
    @ivar name: the node name to which this object refers
1686
    @ivar volumes: a structure as returned from
1687
        L{ganeti.backend.GetVolumeList} (runtime)
1688
    @ivar instances: a list of running instances (runtime)
1689
    @ivar pinst: list of configured primary instances (config)
1690
    @ivar sinst: list of configured secondary instances (config)
1691
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1692
        instances for which this node is secondary (config)
1693
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1694
    @ivar dfree: free disk, as reported by the node (runtime)
1695
    @ivar offline: the offline status (config)
1696
    @type rpc_fail: boolean
1697
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1698
        not whether the individual keys were correct) (runtime)
1699
    @type lvm_fail: boolean
1700
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1701
    @type hyp_fail: boolean
1702
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1703
    @type ghost: boolean
1704
    @ivar ghost: whether this is a known node or not (config)
1705
    @type os_fail: boolean
1706
    @ivar os_fail: whether the RPC call didn't return valid OS data
1707
    @type oslist: list
1708
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1709
    @type vm_capable: boolean
1710
    @ivar vm_capable: whether the node can host instances
1711

1712
    """
1713
    def __init__(self, offline=False, name=None, vm_capable=True):
1714
      self.name = name
1715
      self.volumes = {}
1716
      self.instances = []
1717
      self.pinst = []
1718
      self.sinst = []
1719
      self.sbp = {}
1720
      self.mfree = 0
1721
      self.dfree = 0
1722
      self.offline = offline
1723
      self.vm_capable = vm_capable
1724
      self.rpc_fail = False
1725
      self.lvm_fail = False
1726
      self.hyp_fail = False
1727
      self.ghost = False
1728
      self.os_fail = False
1729
      self.oslist = {}
1730

    
1731
  def ExpandNames(self):
1732
    # This raises errors.OpPrereqError on its own:
1733
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1734

    
1735
    # Get instances in node group; this is unsafe and needs verification later
1736
    inst_names = self.cfg.GetNodeGroupInstances(self.group_uuid)
1737

    
1738
    self.needed_locks = {
1739
      locking.LEVEL_INSTANCE: inst_names,
1740
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1741
      locking.LEVEL_NODE: [],
1742
      }
1743

    
1744
    self.share_locks = _ShareAll()
1745

    
1746
  def DeclareLocks(self, level):
1747
    if level == locking.LEVEL_NODE:
1748
      # Get members of node group; this is unsafe and needs verification later
1749
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1750

    
1751
      all_inst_info = self.cfg.GetAllInstancesInfo()
1752

    
1753
      # In Exec(), we warn about mirrored instances that have primary and
1754
      # secondary living in separate node groups. To fully verify that
1755
      # volumes for these instances are healthy, we will need to do an
1756
      # extra call to their secondaries. We ensure here those nodes will
1757
      # be locked.
1758
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1759
        # Important: access only the instances whose lock is owned
1760
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1761
          nodes.update(all_inst_info[inst].secondary_nodes)
1762

    
1763
      self.needed_locks[locking.LEVEL_NODE] = nodes
1764

    
1765
  def CheckPrereq(self):
1766
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1767
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1768

    
1769
    group_nodes = set(self.group_info.members)
1770
    group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
1771

    
1772
    unlocked_nodes = \
1773
        group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1774

    
1775
    unlocked_instances = \
1776
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1777

    
1778
    if unlocked_nodes:
1779
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1780
                                 utils.CommaJoin(unlocked_nodes))
1781

    
1782
    if unlocked_instances:
1783
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1784
                                 utils.CommaJoin(unlocked_instances))
1785

    
1786
    self.all_node_info = self.cfg.GetAllNodesInfo()
1787
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1788

    
1789
    self.my_node_names = utils.NiceSort(group_nodes)
1790
    self.my_inst_names = utils.NiceSort(group_instances)
1791

    
1792
    self.my_node_info = dict((name, self.all_node_info[name])
1793
                             for name in self.my_node_names)
1794

    
1795
    self.my_inst_info = dict((name, self.all_inst_info[name])
1796
                             for name in self.my_inst_names)
1797

    
1798
    # We detect here the nodes that will need the extra RPC calls for verifying
1799
    # split LV volumes; they should be locked.
1800
    extra_lv_nodes = set()
1801

    
1802
    for inst in self.my_inst_info.values():
1803
      if inst.disk_template in constants.DTS_INT_MIRROR:
1804
        group = self.my_node_info[inst.primary_node].group
1805
        for nname in inst.secondary_nodes:
1806
          if self.all_node_info[nname].group != group:
1807
            extra_lv_nodes.add(nname)
1808

    
1809
    unlocked_lv_nodes = \
1810
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1811

    
1812
    if unlocked_lv_nodes:
1813
      raise errors.OpPrereqError("these nodes could be locked: %s" %
1814
                                 utils.CommaJoin(unlocked_lv_nodes))
1815
    self.extra_lv_nodes = list(extra_lv_nodes)
1816

    
1817
  def _VerifyNode(self, ninfo, nresult):
1818
    """Perform some basic validation on data returned from a node.
1819

1820
      - check the result data structure is well formed and has all the
1821
        mandatory fields
1822
      - check ganeti version
1823

1824
    @type ninfo: L{objects.Node}
1825
    @param ninfo: the node to check
1826
    @param nresult: the results from the node
1827
    @rtype: boolean
1828
    @return: whether overall this call was successful (and we can expect
1829
         reasonable values in the respose)
1830

1831
    """
1832
    node = ninfo.name
1833
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1834

    
1835
    # main result, nresult should be a non-empty dict
1836
    test = not nresult or not isinstance(nresult, dict)
1837
    _ErrorIf(test, constants.CV_ENODERPC, node,
1838
                  "unable to verify node: no data returned")
1839
    if test:
1840
      return False
1841

    
1842
    # compares ganeti version
1843
    local_version = constants.PROTOCOL_VERSION
1844
    remote_version = nresult.get("version", None)
1845
    test = not (remote_version and
1846
                isinstance(remote_version, (list, tuple)) and
1847
                len(remote_version) == 2)
1848
    _ErrorIf(test, constants.CV_ENODERPC, node,
1849
             "connection to node returned invalid data")
1850
    if test:
1851
      return False
1852

    
1853
    test = local_version != remote_version[0]
1854
    _ErrorIf(test, constants.CV_ENODEVERSION, node,
1855
             "incompatible protocol versions: master %s,"
1856
             " node %s", local_version, remote_version[0])
1857
    if test:
1858
      return False
1859

    
1860
    # node seems compatible, we can actually try to look into its results
1861

    
1862
    # full package version
1863
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1864
                  constants.CV_ENODEVERSION, node,
1865
                  "software version mismatch: master %s, node %s",
1866
                  constants.RELEASE_VERSION, remote_version[1],
1867
                  code=self.ETYPE_WARNING)
1868

    
1869
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1870
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1871
      for hv_name, hv_result in hyp_result.iteritems():
1872
        test = hv_result is not None
1873
        _ErrorIf(test, constants.CV_ENODEHV, node,
1874
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1875

    
1876
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1877
    if ninfo.vm_capable and isinstance(hvp_result, list):
1878
      for item, hv_name, hv_result in hvp_result:
1879
        _ErrorIf(True, constants.CV_ENODEHV, node,
1880
                 "hypervisor %s parameter verify failure (source %s): %s",
1881
                 hv_name, item, hv_result)
1882

    
1883
    test = nresult.get(constants.NV_NODESETUP,
1884
                       ["Missing NODESETUP results"])
1885
    _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1886
             "; ".join(test))
1887

    
1888
    return True
1889

    
1890
  def _VerifyNodeTime(self, ninfo, nresult,
1891
                      nvinfo_starttime, nvinfo_endtime):
1892
    """Check the node time.
1893

1894
    @type ninfo: L{objects.Node}
1895
    @param ninfo: the node to check
1896
    @param nresult: the remote results for the node
1897
    @param nvinfo_starttime: the start time of the RPC call
1898
    @param nvinfo_endtime: the end time of the RPC call
1899

1900
    """
1901
    node = ninfo.name
1902
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1903

    
1904
    ntime = nresult.get(constants.NV_TIME, None)
1905
    try:
1906
      ntime_merged = utils.MergeTime(ntime)
1907
    except (ValueError, TypeError):
1908
      _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1909
      return
1910

    
1911
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1912
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1913
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1914
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1915
    else:
1916
      ntime_diff = None
1917

    
1918
    _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1919
             "Node time diverges by at least %s from master node time",
1920
             ntime_diff)
1921

    
1922
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1923
    """Check the node LVM results.
1924

1925
    @type ninfo: L{objects.Node}
1926
    @param ninfo: the node to check
1927
    @param nresult: the remote results for the node
1928
    @param vg_name: the configured VG name
1929

1930
    """
1931
    if vg_name is None:
1932
      return
1933

    
1934
    node = ninfo.name
1935
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1936

    
1937
    # checks vg existence and size > 20G
1938
    vglist = nresult.get(constants.NV_VGLIST, None)
1939
    test = not vglist
1940
    _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1941
    if not test:
1942
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1943
                                            constants.MIN_VG_SIZE)
1944
      _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1945

    
1946
    # check pv names
1947
    pvlist = nresult.get(constants.NV_PVLIST, None)
1948
    test = pvlist is None
1949
    _ErrorIf(test, constants.CV_ENODELVM, node, "Can't get PV list from node")
1950
    if not test:
1951
      # check that ':' is not present in PV names, since it's a
1952
      # special character for lvcreate (denotes the range of PEs to
1953
      # use on the PV)
1954
      for _, pvname, owner_vg in pvlist:
1955
        test = ":" in pvname
1956
        _ErrorIf(test, constants.CV_ENODELVM, node,
1957
                 "Invalid character ':' in PV '%s' of VG '%s'",
1958
                 pvname, owner_vg)
1959

    
1960
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1961
    """Check the node bridges.
1962

1963
    @type ninfo: L{objects.Node}
1964
    @param ninfo: the node to check
1965
    @param nresult: the remote results for the node
1966
    @param bridges: the expected list of bridges
1967

1968
    """
1969
    if not bridges:
1970
      return
1971

    
1972
    node = ninfo.name
1973
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1974

    
1975
    missing = nresult.get(constants.NV_BRIDGES, None)
1976
    test = not isinstance(missing, list)
1977
    _ErrorIf(test, constants.CV_ENODENET, node,
1978
             "did not return valid bridge information")
1979
    if not test:
1980
      _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1981
               "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1982

    
1983
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1984
    """Check the results of user scripts presence and executability on the node
1985

1986
    @type ninfo: L{objects.Node}
1987
    @param ninfo: the node to check
1988
    @param nresult: the remote results for the node
1989

1990
    """
1991
    node = ninfo.name
1992

    
1993
    test = not constants.NV_USERSCRIPTS in nresult
1994
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1995
                  "did not return user scripts information")
1996

    
1997
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1998
    if not test:
1999
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
2000
                    "user scripts not present or not executable: %s" %
2001
                    utils.CommaJoin(sorted(broken_scripts)))
2002

    
2003
  def _VerifyNodeNetwork(self, ninfo, nresult):
2004
    """Check the node network connectivity results.
2005

2006
    @type ninfo: L{objects.Node}
2007
    @param ninfo: the node to check
2008
    @param nresult: the remote results for the node
2009

2010
    """
2011
    node = ninfo.name
2012
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2013

    
2014
    test = constants.NV_NODELIST not in nresult
2015
    _ErrorIf(test, constants.CV_ENODESSH, node,
2016
             "node hasn't returned node ssh connectivity data")
2017
    if not test:
2018
      if nresult[constants.NV_NODELIST]:
2019
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2020
          _ErrorIf(True, constants.CV_ENODESSH, node,
2021
                   "ssh communication with node '%s': %s", a_node, a_msg)
2022

    
2023
    test = constants.NV_NODENETTEST not in nresult
2024
    _ErrorIf(test, constants.CV_ENODENET, node,
2025
             "node hasn't returned node tcp connectivity data")
2026
    if not test:
2027
      if nresult[constants.NV_NODENETTEST]:
2028
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2029
        for anode in nlist:
2030
          _ErrorIf(True, constants.CV_ENODENET, node,
2031
                   "tcp communication with node '%s': %s",
2032
                   anode, nresult[constants.NV_NODENETTEST][anode])
2033

    
2034
    test = constants.NV_MASTERIP not in nresult
2035
    _ErrorIf(test, constants.CV_ENODENET, node,
2036
             "node hasn't returned node master IP reachability data")
2037
    if not test:
2038
      if not nresult[constants.NV_MASTERIP]:
2039
        if node == self.master_node:
2040
          msg = "the master node cannot reach the master IP (not configured?)"
2041
        else:
2042
          msg = "cannot reach the master IP"
2043
        _ErrorIf(True, constants.CV_ENODENET, node, msg)
2044

    
2045
  def _VerifyInstance(self, instance, instanceconfig, node_image,
2046
                      diskstatus):
2047
    """Verify an instance.
2048

2049
    This function checks to see if the required block devices are
2050
    available on the instance's node.
2051

2052
    """
2053
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2054
    node_current = instanceconfig.primary_node
2055

    
2056
    node_vol_should = {}
2057
    instanceconfig.MapLVsByNode(node_vol_should)
2058

    
2059
    for node in node_vol_should:
2060
      n_img = node_image[node]
2061
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2062
        # ignore missing volumes on offline or broken nodes
2063
        continue
2064
      for volume in node_vol_should[node]:
2065
        test = volume not in n_img.volumes
2066
        _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
2067
                 "volume %s missing on node %s", volume, node)
2068

    
2069
    if instanceconfig.admin_state == constants.ADMINST_UP:
2070
      pri_img = node_image[node_current]
2071
      test = instance not in pri_img.instances and not pri_img.offline
2072
      _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
2073
               "instance not running on its primary node %s",
2074
               node_current)
2075

    
2076
    diskdata = [(nname, success, status, idx)
2077
                for (nname, disks) in diskstatus.items()
2078
                for idx, (success, status) in enumerate(disks)]
2079

    
2080
    for nname, success, bdev_status, idx in diskdata:
2081
      # the 'ghost node' construction in Exec() ensures that we have a
2082
      # node here
2083
      snode = node_image[nname]
2084
      bad_snode = snode.ghost or snode.offline
2085
      _ErrorIf(instanceconfig.admin_state == constants.ADMINST_UP and
2086
               not success and not bad_snode,
2087
               constants.CV_EINSTANCEFAULTYDISK, instance,
2088
               "couldn't retrieve status for disk/%s on %s: %s",
2089
               idx, nname, bdev_status)
2090
      _ErrorIf((instanceconfig.admin_state == constants.ADMINST_UP and
2091
                success and bdev_status.ldisk_status == constants.LDS_FAULTY),
2092
               constants.CV_EINSTANCEFAULTYDISK, instance,
2093
               "disk/%s on %s is faulty", idx, nname)
2094

    
2095
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2096
    """Verify if there are any unknown volumes in the cluster.
2097

2098
    The .os, .swap and backup volumes are ignored. All other volumes are
2099
    reported as unknown.
2100

2101
    @type reserved: L{ganeti.utils.FieldSet}
2102
    @param reserved: a FieldSet of reserved volume names
2103

2104
    """
2105
    for node, n_img in node_image.items():
2106
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2107
        # skip non-healthy nodes
2108
        continue
2109
      for volume in n_img.volumes:
2110
        test = ((node not in node_vol_should or
2111
                volume not in node_vol_should[node]) and
2112
                not reserved.Matches(volume))
2113
        self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
2114
                      "volume %s is unknown", volume)
2115

    
2116
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
2117
    """Verify N+1 Memory Resilience.
2118

2119
    Check that if one single node dies we can still start all the
2120
    instances it was primary for.
2121

2122
    """
2123
    cluster_info = self.cfg.GetClusterInfo()
2124
    for node, n_img in node_image.items():
2125
      # This code checks that every node which is now listed as
2126
      # secondary has enough memory to host all instances it is
2127
      # supposed to should a single other node in the cluster fail.
2128
      # FIXME: not ready for failover to an arbitrary node
2129
      # FIXME: does not support file-backed instances
2130
      # WARNING: we currently take into account down instances as well
2131
      # as up ones, considering that even if they're down someone
2132
      # might want to start them even in the event of a node failure.
2133
      if n_img.offline:
2134
        # we're skipping offline nodes from the N+1 warning, since
2135
        # most likely we don't have good memory infromation from them;
2136
        # we already list instances living on such nodes, and that's
2137
        # enough warning
2138
        continue
2139
      #TODO(dynmem): use MINMEM for checking
2140
      #TODO(dynmem): also consider ballooning out other instances
2141
      for prinode, instances in n_img.sbp.items():
2142
        needed_mem = 0
2143
        for instance in instances:
2144
          bep = cluster_info.FillBE(instance_cfg[instance])
2145
          if bep[constants.BE_AUTO_BALANCE]:
2146
            needed_mem += bep[constants.BE_MAXMEM]
2147
        test = n_img.mfree < needed_mem
2148
        self._ErrorIf(test, constants.CV_ENODEN1, node,
2149
                      "not enough memory to accomodate instance failovers"
2150
                      " should node %s fail (%dMiB needed, %dMiB available)",
2151
                      prinode, needed_mem, n_img.mfree)
2152

    
2153
  @classmethod
2154
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
2155
                   (files_all, files_opt, files_mc, files_vm)):
2156
    """Verifies file checksums collected from all nodes.
2157

2158
    @param errorif: Callback for reporting errors
2159
    @param nodeinfo: List of L{objects.Node} objects
2160
    @param master_node: Name of master node
2161
    @param all_nvinfo: RPC results
2162

2163
    """
2164
    # Define functions determining which nodes to consider for a file
2165
    files2nodefn = [
2166
      (files_all, None),
2167
      (files_mc, lambda node: (node.master_candidate or
2168
                               node.name == master_node)),
2169
      (files_vm, lambda node: node.vm_capable),
2170
      ]
2171

    
2172
    # Build mapping from filename to list of nodes which should have the file
2173
    nodefiles = {}
2174
    for (files, fn) in files2nodefn:
2175
      if fn is None:
2176
        filenodes = nodeinfo
2177
      else:
2178
        filenodes = filter(fn, nodeinfo)
2179
      nodefiles.update((filename,
2180
                        frozenset(map(operator.attrgetter("name"), filenodes)))
2181
                       for filename in files)
2182

    
2183
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2184

    
2185
    fileinfo = dict((filename, {}) for filename in nodefiles)
2186
    ignore_nodes = set()
2187

    
2188
    for node in nodeinfo:
2189
      if node.offline:
2190
        ignore_nodes.add(node.name)
2191
        continue
2192

    
2193
      nresult = all_nvinfo[node.name]
2194

    
2195
      if nresult.fail_msg or not nresult.payload:
2196
        node_files = None
2197
      else:
2198
        node_files = nresult.payload.get(constants.NV_FILELIST, None)
2199

    
2200
      test = not (node_files and isinstance(node_files, dict))
2201
      errorif(test, constants.CV_ENODEFILECHECK, node.name,
2202
              "Node did not return file checksum data")
2203
      if test:
2204
        ignore_nodes.add(node.name)
2205
        continue
2206

    
2207
      # Build per-checksum mapping from filename to nodes having it
2208
      for (filename, checksum) in node_files.items():
2209
        assert filename in nodefiles
2210
        fileinfo[filename].setdefault(checksum, set()).add(node.name)
2211

    
2212
    for (filename, checksums) in fileinfo.items():
2213
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2214

    
2215
      # Nodes having the file
2216
      with_file = frozenset(node_name
2217
                            for nodes in fileinfo[filename].values()
2218
                            for node_name in nodes) - ignore_nodes
2219

    
2220
      expected_nodes = nodefiles[filename] - ignore_nodes
2221

    
2222
      # Nodes missing file
2223
      missing_file = expected_nodes - with_file
2224

    
2225
      if filename in files_opt:
2226
        # All or no nodes
2227
        errorif(missing_file and missing_file != expected_nodes,
2228
                constants.CV_ECLUSTERFILECHECK, None,
2229
                "File %s is optional, but it must exist on all or no"
2230
                " nodes (not found on %s)",
2231
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2232
      else:
2233
        errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2234
                "File %s is missing from node(s) %s", filename,
2235
                utils.CommaJoin(utils.NiceSort(missing_file)))
2236

    
2237
        # Warn if a node has a file it shouldn't
2238
        unexpected = with_file - expected_nodes
2239
        errorif(unexpected,
2240
                constants.CV_ECLUSTERFILECHECK, None,
2241
                "File %s should not exist on node(s) %s",
2242
                filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2243

    
2244
      # See if there are multiple versions of the file
2245
      test = len(checksums) > 1
2246
      if test:
2247
        variants = ["variant %s on %s" %
2248
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2249
                    for (idx, (checksum, nodes)) in
2250
                      enumerate(sorted(checksums.items()))]
2251
      else:
2252
        variants = []
2253

    
2254
      errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2255
              "File %s found with %s different checksums (%s)",
2256
              filename, len(checksums), "; ".join(variants))
2257

    
2258
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2259
                      drbd_map):
2260
    """Verifies and the node DRBD status.
2261

2262
    @type ninfo: L{objects.Node}
2263
    @param ninfo: the node to check
2264
    @param nresult: the remote results for the node
2265
    @param instanceinfo: the dict of instances
2266
    @param drbd_helper: the configured DRBD usermode helper
2267
    @param drbd_map: the DRBD map as returned by
2268
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2269

2270
    """
2271
    node = ninfo.name
2272
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2273

    
2274
    if drbd_helper:
2275
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2276
      test = (helper_result == None)
2277
      _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2278
               "no drbd usermode helper returned")
2279
      if helper_result:
2280
        status, payload = helper_result
2281
        test = not status
2282
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2283
                 "drbd usermode helper check unsuccessful: %s", payload)
2284
        test = status and (payload != drbd_helper)
2285
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2286
                 "wrong drbd usermode helper: %s", payload)
2287

    
2288
    # compute the DRBD minors
2289
    node_drbd = {}
2290
    for minor, instance in drbd_map[node].items():
2291
      test = instance not in instanceinfo
2292
      _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2293
               "ghost instance '%s' in temporary DRBD map", instance)
2294
        # ghost instance should not be running, but otherwise we
2295
        # don't give double warnings (both ghost instance and
2296
        # unallocated minor in use)
2297
      if test:
2298
        node_drbd[minor] = (instance, False)
2299
      else:
2300
        instance = instanceinfo[instance]
2301
        node_drbd[minor] = (instance.name,
2302
                            instance.admin_state == constants.ADMINST_UP)
2303

    
2304
    # and now check them
2305
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2306
    test = not isinstance(used_minors, (tuple, list))
2307
    _ErrorIf(test, constants.CV_ENODEDRBD, node,
2308
             "cannot parse drbd status file: %s", str(used_minors))
2309
    if test:
2310
      # we cannot check drbd status
2311
      return
2312

    
2313
    for minor, (iname, must_exist) in node_drbd.items():
2314
      test = minor not in used_minors and must_exist
2315
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2316
               "drbd minor %d of instance %s is not active", minor, iname)
2317
    for minor in used_minors:
2318
      test = minor not in node_drbd
2319
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2320
               "unallocated drbd minor %d is in use", minor)
2321

    
2322
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2323
    """Builds the node OS structures.
2324

2325
    @type ninfo: L{objects.Node}
2326
    @param ninfo: the node to check
2327
    @param nresult: the remote results for the node
2328
    @param nimg: the node image object
2329

2330
    """
2331
    node = ninfo.name
2332
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2333

    
2334
    remote_os = nresult.get(constants.NV_OSLIST, None)
2335
    test = (not isinstance(remote_os, list) or
2336
            not compat.all(isinstance(v, list) and len(v) == 7
2337
                           for v in remote_os))
2338

    
2339
    _ErrorIf(test, constants.CV_ENODEOS, node,
2340
             "node hasn't returned valid OS data")
2341

    
2342
    nimg.os_fail = test
2343

    
2344
    if test:
2345
      return
2346

    
2347
    os_dict = {}
2348

    
2349
    for (name, os_path, status, diagnose,
2350
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2351

    
2352
      if name not in os_dict:
2353
        os_dict[name] = []
2354

    
2355
      # parameters is a list of lists instead of list of tuples due to
2356
      # JSON lacking a real tuple type, fix it:
2357
      parameters = [tuple(v) for v in parameters]
2358
      os_dict[name].append((os_path, status, diagnose,
2359
                            set(variants), set(parameters), set(api_ver)))
2360

    
2361
    nimg.oslist = os_dict
2362

    
2363
  def _VerifyNodeOS(self, ninfo, nimg, base):
2364
    """Verifies the node OS list.
2365

2366
    @type ninfo: L{objects.Node}
2367
    @param ninfo: the node to check
2368
    @param nimg: the node image object
2369
    @param base: the 'template' node we match against (e.g. from the master)
2370

2371
    """
2372
    node = ninfo.name
2373
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2374

    
2375
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2376

    
2377
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2378
    for os_name, os_data in nimg.oslist.items():
2379
      assert os_data, "Empty OS status for OS %s?!" % os_name
2380
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2381
      _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2382
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2383
      _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2384
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2385
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2386
      # comparisons with the 'base' image
2387
      test = os_name not in base.oslist
2388
      _ErrorIf(test, constants.CV_ENODEOS, node,
2389
               "Extra OS %s not present on reference node (%s)",
2390
               os_name, base.name)
2391
      if test:
2392
        continue
2393
      assert base.oslist[os_name], "Base node has empty OS status?"
2394
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2395
      if not b_status:
2396
        # base OS is invalid, skipping
2397
        continue
2398
      for kind, a, b in [("API version", f_api, b_api),
2399
                         ("variants list", f_var, b_var),
2400
                         ("parameters", beautify_params(f_param),
2401
                          beautify_params(b_param))]:
2402
        _ErrorIf(a != b, constants.CV_ENODEOS, node,
2403
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2404
                 kind, os_name, base.name,
2405
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2406

    
2407
    # check any missing OSes
2408
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2409
    _ErrorIf(missing, constants.CV_ENODEOS, node,
2410
             "OSes present on reference node %s but missing on this node: %s",
2411
             base.name, utils.CommaJoin(missing))
2412

    
2413
  def _VerifyOob(self, ninfo, nresult):
2414
    """Verifies out of band functionality of a node.
2415

2416
    @type ninfo: L{objects.Node}
2417
    @param ninfo: the node to check
2418
    @param nresult: the remote results for the node
2419

2420
    """
2421
    node = ninfo.name
2422
    # We just have to verify the paths on master and/or master candidates
2423
    # as the oob helper is invoked on the master
2424
    if ((ninfo.master_candidate or ninfo.master_capable) and
2425
        constants.NV_OOB_PATHS in nresult):
2426
      for path_result in nresult[constants.NV_OOB_PATHS]:
2427
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2428

    
2429
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2430
    """Verifies and updates the node volume data.
2431

2432
    This function will update a L{NodeImage}'s internal structures
2433
    with data from the remote call.
2434

2435
    @type ninfo: L{objects.Node}
2436
    @param ninfo: the node to check
2437
    @param nresult: the remote results for the node
2438
    @param nimg: the node image object
2439
    @param vg_name: the configured VG name
2440

2441
    """
2442
    node = ninfo.name
2443
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2444

    
2445
    nimg.lvm_fail = True
2446
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2447
    if vg_name is None:
2448
      pass
2449
    elif isinstance(lvdata, basestring):
2450
      _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2451
               utils.SafeEncode(lvdata))
2452
    elif not isinstance(lvdata, dict):
2453
      _ErrorIf(True, constants.CV_ENODELVM, node,
2454
               "rpc call to node failed (lvlist)")
2455
    else:
2456
      nimg.volumes = lvdata
2457
      nimg.lvm_fail = False
2458

    
2459
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2460
    """Verifies and updates the node instance list.
2461

2462
    If the listing was successful, then updates this node's instance
2463
    list. Otherwise, it marks the RPC call as failed for the instance
2464
    list key.
2465

2466
    @type ninfo: L{objects.Node}
2467
    @param ninfo: the node to check
2468
    @param nresult: the remote results for the node
2469
    @param nimg: the node image object
2470

2471
    """
2472
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2473
    test = not isinstance(idata, list)
2474
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2475
                  "rpc call to node failed (instancelist): %s",
2476
                  utils.SafeEncode(str(idata)))
2477
    if test:
2478
      nimg.hyp_fail = True
2479
    else:
2480
      nimg.instances = idata
2481

    
2482
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2483
    """Verifies and computes a node information map
2484

2485
    @type ninfo: L{objects.Node}
2486
    @param ninfo: the node to check
2487
    @param nresult: the remote results for the node
2488
    @param nimg: the node image object
2489
    @param vg_name: the configured VG name
2490

2491
    """
2492
    node = ninfo.name
2493
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2494

    
2495
    # try to read free memory (from the hypervisor)
2496
    hv_info = nresult.get(constants.NV_HVINFO, None)
2497
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2498
    _ErrorIf(test, constants.CV_ENODEHV, node,
2499
             "rpc call to node failed (hvinfo)")
2500
    if not test:
2501
      try:
2502
        nimg.mfree = int(hv_info["memory_free"])
2503
      except (ValueError, TypeError):
2504
        _ErrorIf(True, constants.CV_ENODERPC, node,
2505
                 "node returned invalid nodeinfo, check hypervisor")
2506

    
2507
    # FIXME: devise a free space model for file based instances as well
2508
    if vg_name is not None:
2509
      test = (constants.NV_VGLIST not in nresult or
2510
              vg_name not in nresult[constants.NV_VGLIST])
2511
      _ErrorIf(test, constants.CV_ENODELVM, node,
2512
               "node didn't return data for the volume group '%s'"
2513
               " - it is either missing or broken", vg_name)
2514
      if not test:
2515
        try:
2516
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2517
        except (ValueError, TypeError):
2518
          _ErrorIf(True, constants.CV_ENODERPC, node,
2519
                   "node returned invalid LVM info, check LVM status")
2520

    
2521
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2522
    """Gets per-disk status information for all instances.
2523

2524
    @type nodelist: list of strings
2525
    @param nodelist: Node names
2526
    @type node_image: dict of (name, L{objects.Node})
2527
    @param node_image: Node objects
2528
    @type instanceinfo: dict of (name, L{objects.Instance})
2529
    @param instanceinfo: Instance objects
2530
    @rtype: {instance: {node: [(succes, payload)]}}
2531
    @return: a dictionary of per-instance dictionaries with nodes as
2532
        keys and disk information as values; the disk information is a
2533
        list of tuples (success, payload)
2534

2535
    """
2536
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2537

    
2538
    node_disks = {}
2539
    node_disks_devonly = {}
2540
    diskless_instances = set()
2541
    diskless = constants.DT_DISKLESS
2542

    
2543
    for nname in nodelist:
2544
      node_instances = list(itertools.chain(node_image[nname].pinst,
2545
                                            node_image[nname].sinst))
2546
      diskless_instances.update(inst for inst in node_instances
2547
                                if instanceinfo[inst].disk_template == diskless)
2548
      disks = [(inst, disk)
2549
               for inst in node_instances
2550
               for disk in instanceinfo[inst].disks]
2551

    
2552
      if not disks:
2553
        # No need to collect data
2554
        continue
2555

    
2556
      node_disks[nname] = disks
2557

    
2558
      # Creating copies as SetDiskID below will modify the objects and that can
2559
      # lead to incorrect data returned from nodes
2560
      devonly = [dev.Copy() for (_, dev) in disks]
2561

    
2562
      for dev in devonly:
2563
        self.cfg.SetDiskID(dev, nname)
2564

    
2565
      node_disks_devonly[nname] = devonly
2566

    
2567
    assert len(node_disks) == len(node_disks_devonly)
2568

    
2569
    # Collect data from all nodes with disks
2570
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2571
                                                          node_disks_devonly)
2572

    
2573
    assert len(result) == len(node_disks)
2574

    
2575
    instdisk = {}
2576

    
2577
    for (nname, nres) in result.items():
2578
      disks = node_disks[nname]
2579

    
2580
      if nres.offline:
2581
        # No data from this node
2582
        data = len(disks) * [(False, "node offline")]
2583
      else:
2584
        msg = nres.fail_msg
2585
        _ErrorIf(msg, constants.CV_ENODERPC, nname,
2586
                 "while getting disk information: %s", msg)
2587
        if msg:
2588
          # No data from this node
2589
          data = len(disks) * [(False, msg)]
2590
        else:
2591
          data = []
2592
          for idx, i in enumerate(nres.payload):
2593
            if isinstance(i, (tuple, list)) and len(i) == 2:
2594
              data.append(i)
2595
            else:
2596
              logging.warning("Invalid result from node %s, entry %d: %s",
2597
                              nname, idx, i)
2598
              data.append((False, "Invalid result from the remote node"))
2599

    
2600
      for ((inst, _), status) in zip(disks, data):
2601
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2602

    
2603
    # Add empty entries for diskless instances.
2604
    for inst in diskless_instances:
2605
      assert inst not in instdisk
2606
      instdisk[inst] = {}
2607

    
2608
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2609
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2610
                      compat.all(isinstance(s, (tuple, list)) and
2611
                                 len(s) == 2 for s in statuses)
2612
                      for inst, nnames in instdisk.items()
2613
                      for nname, statuses in nnames.items())
2614
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2615

    
2616
    return instdisk
2617

    
2618
  @staticmethod
2619
  def _SshNodeSelector(group_uuid, all_nodes):
2620
    """Create endless iterators for all potential SSH check hosts.
2621

2622
    """
2623
    nodes = [node for node in all_nodes
2624
             if (node.group != group_uuid and
2625
                 not node.offline)]
2626
    keyfunc = operator.attrgetter("group")
2627

    
2628
    return map(itertools.cycle,
2629
               [sorted(map(operator.attrgetter("name"), names))
2630
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2631
                                                  keyfunc)])
2632

    
2633
  @classmethod
2634
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2635
    """Choose which nodes should talk to which other nodes.
2636

2637
    We will make nodes contact all nodes in their group, and one node from
2638
    every other group.
2639

2640
    @warning: This algorithm has a known issue if one node group is much
2641
      smaller than others (e.g. just one node). In such a case all other
2642
      nodes will talk to the single node.
2643

2644
    """
2645
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2646
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2647

    
2648
    return (online_nodes,
2649
            dict((name, sorted([i.next() for i in sel]))
2650
                 for name in online_nodes))
2651

    
2652
  def BuildHooksEnv(self):
2653
    """Build hooks env.
2654

2655
    Cluster-Verify hooks just ran in the post phase and their failure makes
2656
    the output be logged in the verify output and the verification to fail.
2657

2658
    """
2659
    env = {
2660
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2661
      }
2662

    
2663
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2664
               for node in self.my_node_info.values())
2665

    
2666
    return env
2667

    
2668
  def BuildHooksNodes(self):
2669
    """Build hooks nodes.
2670

2671
    """
2672
    return ([], self.my_node_names)
2673

    
2674
  def Exec(self, feedback_fn):
2675
    """Verify integrity of the node group, performing various test on nodes.
2676

2677
    """
2678
    # This method has too many local variables. pylint: disable=R0914
2679
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2680

    
2681
    if not self.my_node_names:
2682
      # empty node group
2683
      feedback_fn("* Empty node group, skipping verification")
2684
      return True
2685

    
2686
    self.bad = False
2687
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2688
    verbose = self.op.verbose
2689
    self._feedback_fn = feedback_fn
2690

    
2691
    vg_name = self.cfg.GetVGName()
2692
    drbd_helper = self.cfg.GetDRBDHelper()
2693
    cluster = self.cfg.GetClusterInfo()
2694
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2695
    hypervisors = cluster.enabled_hypervisors
2696
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2697

    
2698
    i_non_redundant = [] # Non redundant instances
2699
    i_non_a_balanced = [] # Non auto-balanced instances
2700
    i_offline = 0 # Count of offline instances
2701
    n_offline = 0 # Count of offline nodes
2702
    n_drained = 0 # Count of nodes being drained
2703
    node_vol_should = {}
2704

    
2705
    # FIXME: verify OS list
2706

    
2707
    # File verification
2708
    filemap = _ComputeAncillaryFiles(cluster, False)
2709

    
2710
    # do local checksums
2711
    master_node = self.master_node = self.cfg.GetMasterNode()
2712
    master_ip = self.cfg.GetMasterIP()
2713

    
2714
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2715

    
2716
    user_scripts = []
2717
    if self.cfg.GetUseExternalMipScript():
2718
      user_scripts.append(constants.EXTERNAL_MASTER_SETUP_SCRIPT)
2719

    
2720
    node_verify_param = {
2721
      constants.NV_FILELIST:
2722
        utils.UniqueSequence(filename
2723
                             for files in filemap
2724
                             for filename in files),
2725
      constants.NV_NODELIST:
2726
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2727
                                  self.all_node_info.values()),
2728
      constants.NV_HYPERVISOR: hypervisors,
2729
      constants.NV_HVPARAMS:
2730
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2731
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2732
                                 for node in node_data_list
2733
                                 if not node.offline],
2734
      constants.NV_INSTANCELIST: hypervisors,
2735
      constants.NV_VERSION: None,
2736
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2737
      constants.NV_NODESETUP: None,
2738
      constants.NV_TIME: None,
2739
      constants.NV_MASTERIP: (master_node, master_ip),
2740
      constants.NV_OSLIST: None,
2741
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2742
      constants.NV_USERSCRIPTS: user_scripts,
2743
      }
2744

    
2745
    if vg_name is not None:
2746
      node_verify_param[constants.NV_VGLIST] = None
2747
      node_verify_param[constants.NV_LVLIST] = vg_name
2748
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2749
      node_verify_param[constants.NV_DRBDLIST] = None
2750

    
2751
    if drbd_helper:
2752
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2753

    
2754
    # bridge checks
2755
    # FIXME: this needs to be changed per node-group, not cluster-wide
2756
    bridges = set()
2757
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2758
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2759
      bridges.add(default_nicpp[constants.NIC_LINK])
2760
    for instance in self.my_inst_info.values():
2761
      for nic in instance.nics:
2762
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2763
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2764
          bridges.add(full_nic[constants.NIC_LINK])
2765

    
2766
    if bridges:
2767
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2768

    
2769
    # Build our expected cluster state
2770
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2771
                                                 name=node.name,
2772
                                                 vm_capable=node.vm_capable))
2773
                      for node in node_data_list)
2774

    
2775
    # Gather OOB paths
2776
    oob_paths = []
2777
    for node in self.all_node_info.values():
2778
      path = _SupportsOob(self.cfg, node)
2779
      if path and path not in oob_paths:
2780
        oob_paths.append(path)
2781

    
2782
    if oob_paths:
2783
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2784

    
2785
    for instance in self.my_inst_names:
2786
      inst_config = self.my_inst_info[instance]
2787

    
2788
      for nname in inst_config.all_nodes:
2789
        if nname not in node_image:
2790
          gnode = self.NodeImage(name=nname)
2791
          gnode.ghost = (nname not in self.all_node_info)
2792
          node_image[nname] = gnode
2793

    
2794
      inst_config.MapLVsByNode(node_vol_should)
2795

    
2796
      pnode = inst_config.primary_node
2797
      node_image[pnode].pinst.append(instance)
2798

    
2799
      for snode in inst_config.secondary_nodes:
2800
        nimg = node_image[snode]
2801
        nimg.sinst.append(instance)
2802
        if pnode not in nimg.sbp:
2803
          nimg.sbp[pnode] = []
2804
        nimg.sbp[pnode].append(instance)
2805

    
2806
    # At this point, we have the in-memory data structures complete,
2807
    # except for the runtime information, which we'll gather next
2808

    
2809
    # Due to the way our RPC system works, exact response times cannot be
2810
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2811
    # time before and after executing the request, we can at least have a time
2812
    # window.
2813
    nvinfo_starttime = time.time()
2814
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2815
                                           node_verify_param,
2816
                                           self.cfg.GetClusterName())
2817
    nvinfo_endtime = time.time()
2818

    
2819
    if self.extra_lv_nodes and vg_name is not None:
2820
      extra_lv_nvinfo = \
2821
          self.rpc.call_node_verify(self.extra_lv_nodes,
2822
                                    {constants.NV_LVLIST: vg_name},
2823
                                    self.cfg.GetClusterName())
2824
    else:
2825
      extra_lv_nvinfo = {}
2826

    
2827
    all_drbd_map = self.cfg.ComputeDRBDMap()
2828

    
2829
    feedback_fn("* Gathering disk information (%s nodes)" %
2830
                len(self.my_node_names))
2831
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2832
                                     self.my_inst_info)
2833

    
2834
    feedback_fn("* Verifying configuration file consistency")
2835

    
2836
    # If not all nodes are being checked, we need to make sure the master node
2837
    # and a non-checked vm_capable node are in the list.
2838
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2839
    if absent_nodes:
2840
      vf_nvinfo = all_nvinfo.copy()
2841
      vf_node_info = list(self.my_node_info.values())
2842
      additional_nodes = []
2843
      if master_node not in self.my_node_info:
2844
        additional_nodes.append(master_node)
2845
        vf_node_info.append(self.all_node_info[master_node])
2846
      # Add the first vm_capable node we find which is not included
2847
      for node in absent_nodes:
2848
        nodeinfo = self.all_node_info[node]
2849
        if nodeinfo.vm_capable and not nodeinfo.offline:
2850
          additional_nodes.append(node)
2851
          vf_node_info.append(self.all_node_info[node])
2852
          break
2853
      key = constants.NV_FILELIST
2854
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2855
                                                 {key: node_verify_param[key]},
2856
                                                 self.cfg.GetClusterName()))
2857
    else:
2858
      vf_nvinfo = all_nvinfo
2859
      vf_node_info = self.my_node_info.values()
2860

    
2861
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2862

    
2863
    feedback_fn("* Verifying node status")
2864

    
2865
    refos_img = None
2866

    
2867
    for node_i in node_data_list:
2868
      node = node_i.name
2869
      nimg = node_image[node]
2870

    
2871
      if node_i.offline:
2872
        if verbose:
2873
          feedback_fn("* Skipping offline node %s" % (node,))
2874
        n_offline += 1
2875
        continue
2876

    
2877
      if node == master_node:
2878
        ntype = "master"
2879
      elif node_i.master_candidate:
2880
        ntype = "master candidate"
2881
      elif node_i.drained:
2882
        ntype = "drained"
2883
        n_drained += 1
2884
      else:
2885
        ntype = "regular"
2886
      if verbose:
2887
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2888

    
2889
      msg = all_nvinfo[node].fail_msg
2890
      _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2891
               msg)
2892
      if msg:
2893
        nimg.rpc_fail = True
2894
        continue
2895

    
2896
      nresult = all_nvinfo[node].payload
2897

    
2898
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2899
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2900
      self._VerifyNodeNetwork(node_i, nresult)
2901
      self._VerifyNodeUserScripts(node_i, nresult)
2902
      self._VerifyOob(node_i, nresult)
2903

    
2904
      if nimg.vm_capable:
2905
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2906
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2907
                             all_drbd_map)
2908

    
2909
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2910
        self._UpdateNodeInstances(node_i, nresult, nimg)
2911
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2912
        self._UpdateNodeOS(node_i, nresult, nimg)
2913

    
2914
        if not nimg.os_fail:
2915
          if refos_img is None:
2916
            refos_img = nimg
2917
          self._VerifyNodeOS(node_i, nimg, refos_img)
2918
        self._VerifyNodeBridges(node_i, nresult, bridges)
2919

    
2920
        # Check whether all running instancies are primary for the node. (This
2921
        # can no longer be done from _VerifyInstance below, since some of the
2922
        # wrong instances could be from other node groups.)
2923
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2924

    
2925
        for inst in non_primary_inst:
2926
          # FIXME: investigate best way to handle offline insts
2927
          if inst.admin_state == constants.ADMINST_OFFLINE:
2928
            if verbose:
2929
              feedback_fn("* Skipping offline instance %s" % inst.name)
2930
            i_offline += 1
2931
            continue
2932
          test = inst in self.all_inst_info
2933
          _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2934
                   "instance should not run on node %s", node_i.name)
2935
          _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2936
                   "node is running unknown instance %s", inst)
2937

    
2938
    for node, result in extra_lv_nvinfo.items():
2939
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2940
                              node_image[node], vg_name)
2941

    
2942
    feedback_fn("* Verifying instance status")
2943
    for instance in self.my_inst_names:
2944
      if verbose:
2945
        feedback_fn("* Verifying instance %s" % instance)
2946
      inst_config = self.my_inst_info[instance]
2947
      self._VerifyInstance(instance, inst_config, node_image,
2948
                           instdisk[instance])
2949
      inst_nodes_offline = []
2950

    
2951
      pnode = inst_config.primary_node
2952
      pnode_img = node_image[pnode]
2953
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2954
               constants.CV_ENODERPC, pnode, "instance %s, connection to"
2955
               " primary node failed", instance)
2956

    
2957
      _ErrorIf(inst_config.admin_state == constants.ADMINST_UP and
2958
               pnode_img.offline,
2959
               constants.CV_EINSTANCEBADNODE, instance,
2960
               "instance is marked as running and lives on offline node %s",
2961
               inst_config.primary_node)
2962

    
2963
      # If the instance is non-redundant we cannot survive losing its primary
2964
      # node, so we are not N+1 compliant. On the other hand we have no disk
2965
      # templates with more than one secondary so that situation is not well
2966
      # supported either.
2967
      # FIXME: does not support file-backed instances
2968
      if not inst_config.secondary_nodes:
2969
        i_non_redundant.append(instance)
2970

    
2971
      _ErrorIf(len(inst_config.secondary_nodes) > 1,
2972
               constants.CV_EINSTANCELAYOUT,
2973
               instance, "instance has multiple secondary nodes: %s",
2974
               utils.CommaJoin(inst_config.secondary_nodes),
2975
               code=self.ETYPE_WARNING)
2976

    
2977
      if inst_config.disk_template in constants.DTS_INT_MIRROR:
2978
        pnode = inst_config.primary_node
2979
        instance_nodes = utils.NiceSort(inst_config.all_nodes)
2980
        instance_groups = {}
2981

    
2982
        for node in instance_nodes:
2983
          instance_groups.setdefault(self.all_node_info[node].group,
2984
                                     []).append(node)
2985

    
2986
        pretty_list = [
2987
          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
2988
          # Sort so that we always list the primary node first.
2989
          for group, nodes in sorted(instance_groups.items(),
2990
                                     key=lambda (_, nodes): pnode in nodes,
2991
                                     reverse=True)]
2992

    
2993
        self._ErrorIf(len(instance_groups) > 1,
2994
                      constants.CV_EINSTANCESPLITGROUPS,
2995
                      instance, "instance has primary and secondary nodes in"
2996
                      " different groups: %s", utils.CommaJoin(pretty_list),
2997
                      code=self.ETYPE_WARNING)
2998

    
2999
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
3000
        i_non_a_balanced.append(instance)
3001

    
3002
      for snode in inst_config.secondary_nodes:
3003
        s_img = node_image[snode]
3004
        _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
3005
                 snode, "instance %s, connection to secondary node failed",
3006
                 instance)
3007

    
3008
        if s_img.offline:
3009
          inst_nodes_offline.append(snode)
3010

    
3011
      # warn that the instance lives on offline nodes
3012
      _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
3013
               "instance has offline secondary node(s) %s",
3014
               utils.CommaJoin(inst_nodes_offline))
3015
      # ... or ghost/non-vm_capable nodes
3016
      for node in inst_config.all_nodes:
3017
        _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
3018
                 instance, "instance lives on ghost node %s", node)
3019
        _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
3020
                 instance, "instance lives on non-vm_capable node %s", node)
3021

    
3022
    feedback_fn("* Verifying orphan volumes")
3023
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3024

    
3025
    # We will get spurious "unknown volume" warnings if any node of this group
3026
    # is secondary for an instance whose primary is in another group. To avoid
3027
    # them, we find these instances and add their volumes to node_vol_should.
3028
    for inst in self.all_inst_info.values():
3029
      for secondary in inst.secondary_nodes:
3030
        if (secondary in self.my_node_info
3031
            and inst.name not in self.my_inst_info):
3032
          inst.MapLVsByNode(node_vol_should)
3033
          break
3034

    
3035
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3036

    
3037
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3038
      feedback_fn("* Verifying N+1 Memory redundancy")
3039
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3040

    
3041
    feedback_fn("* Other Notes")
3042
    if i_non_redundant:
3043
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3044
                  % len(i_non_redundant))
3045

    
3046
    if i_non_a_balanced:
3047
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3048
                  % len(i_non_a_balanced))
3049

    
3050
    if i_offline:
3051
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3052

    
3053
    if n_offline:
3054
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3055

    
3056
    if n_drained:
3057
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3058

    
3059
    return not self.bad
3060

    
3061
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3062
    """Analyze the post-hooks' result
3063

3064
    This method analyses the hook result, handles it, and sends some
3065
    nicely-formatted feedback back to the user.
3066

3067
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3068
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3069
    @param hooks_results: the results of the multi-node hooks rpc call
3070
    @param feedback_fn: function used send feedback back to the caller
3071
    @param lu_result: previous Exec result
3072
    @return: the new Exec result, based on the previous result
3073
        and hook results
3074

3075
    """
3076
    # We only really run POST phase hooks, only for non-empty groups,
3077
    # and are only interested in their results
3078
    if not self.my_node_names:
3079
      # empty node group
3080
      pass
3081
    elif phase == constants.HOOKS_PHASE_POST:
3082
      # Used to change hooks' output to proper indentation
3083
      feedback_fn("* Hooks Results")
3084
      assert hooks_results, "invalid result from hooks"
3085

    
3086
      for node_name in hooks_results:
3087
        res = hooks_results[node_name]
3088
        msg = res.fail_msg
3089
        test = msg and not res.offline
3090
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3091
                      "Communication failure in hooks execution: %s", msg)
3092
        if res.offline or msg:
3093
          # No need to investigate payload if node is offline or gave
3094
          # an error.
3095
          continue
3096
        for script, hkr, output in res.payload:
3097
          test = hkr == constants.HKR_FAIL
3098
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3099
                        "Script %s failed, output:", script)
3100
          if test:
3101
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3102
            feedback_fn("%s" % output)
3103
            lu_result = False
3104

    
3105
    return lu_result
3106

    
3107

    
3108
class LUClusterVerifyDisks(NoHooksLU):
3109
  """Verifies the cluster disks status.
3110

3111
  """
3112
  REQ_BGL = False
3113

    
3114
  def ExpandNames(self):
3115
    self.share_locks = _ShareAll()
3116
    self.needed_locks = {
3117
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3118
      }
3119

    
3120
  def Exec(self, feedback_fn):
3121
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3122

    
3123
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3124
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3125
                           for group in group_names])
3126

    
3127

    
3128
class LUGroupVerifyDisks(NoHooksLU):
3129
  """Verifies the status of all disks in a node group.
3130

3131
  """
3132
  REQ_BGL = False
3133

    
3134
  def ExpandNames(self):
3135
    # Raises errors.OpPrereqError on its own if group can't be found
3136
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
3137

    
3138
    self.share_locks = _ShareAll()
3139
    self.needed_locks = {
3140
      locking.LEVEL_INSTANCE: [],
3141
      locking.LEVEL_NODEGROUP: [],
3142
      locking.LEVEL_NODE: [],
3143
      }
3144

    
3145
  def DeclareLocks(self, level):
3146
    if level == locking.LEVEL_INSTANCE:
3147
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
3148

    
3149
      # Lock instances optimistically, needs verification once node and group
3150
      # locks have been acquired
3151
      self.needed_locks[locking.LEVEL_INSTANCE] = \
3152
        self.cfg.GetNodeGroupInstances(self.group_uuid)
3153

    
3154
    elif level == locking.LEVEL_NODEGROUP:
3155
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3156

    
3157
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
3158
        set([self.group_uuid] +
3159
            # Lock all groups used by instances optimistically; this requires
3160
            # going via the node before it's locked, requiring verification
3161
            # later on
3162
            [group_uuid
3163
             for instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
3164
             for group_uuid in self.cfg.GetInstanceNodeGroups(instance_name)])
3165

    
3166
    elif level == locking.LEVEL_NODE:
3167
      # This will only lock the nodes in the group to be verified which contain
3168
      # actual instances
3169
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3170
      self._LockInstancesNodes()
3171

    
3172
      # Lock all nodes in group to be verified
3173
      assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
3174
      member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
3175
      self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
3176

    
3177
  def CheckPrereq(self):
3178
    owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
3179
    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
3180
    owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
3181

    
3182
    assert self.group_uuid in owned_groups
3183

    
3184
    # Check if locked instances are still correct
3185
    _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
3186

    
3187
    # Get instance information
3188
    self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
3189

    
3190
    # Check if node groups for locked instances are still correct
3191
    for (instance_name, inst) in self.instances.items():
3192
      assert owned_nodes.issuperset(inst.all_nodes), \
3193
        "Instance %s's nodes changed while we kept the lock" % instance_name
3194

    
3195
      inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
3196
                                             owned_groups)
3197

    
3198
      assert self.group_uuid in inst_groups, \
3199
        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
3200

    
3201
  def Exec(self, feedback_fn):
3202
    """Verify integrity of cluster disks.
3203

3204
    @rtype: tuple of three items
3205
    @return: a tuple of (dict of node-to-node_error, list of instances
3206
        which need activate-disks, dict of instance: (node, volume) for
3207
        missing volumes
3208

3209
    """
3210
    res_nodes = {}
3211
    res_instances = set()
3212
    res_missing = {}
3213

    
3214
    nv_dict = _MapInstanceDisksToNodes([inst
3215
            for inst in self.instances.values()
3216
            if inst.admin_state == constants.ADMINST_UP])
3217

    
3218
    if nv_dict:
3219
      nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
3220
                             set(self.cfg.GetVmCapableNodeList()))
3221

    
3222
      node_lvs = self.rpc.call_lv_list(nodes, [])
3223

    
3224
      for (node, node_res) in node_lvs.items():
3225
        if node_res.offline:
3226
          continue
3227

    
3228
        msg = node_res.fail_msg
3229
        if msg:
3230
          logging.warning("Error enumerating LVs on node %s: %s", node, msg)
3231
          res_nodes[node] = msg
3232
          continue
3233

    
3234
        for lv_name, (_, _, lv_online) in node_res.payload.items():
3235
          inst = nv_dict.pop((node, lv_name), None)
3236
          if not (lv_online or inst is None):
3237
            res_instances.add(inst)
3238

    
3239
      # any leftover items in nv_dict are missing LVs, let's arrange the data
3240
      # better
3241
      for key, inst in nv_dict.iteritems():
3242
        res_missing.setdefault(inst, []).append(list(key))
3243

    
3244
    return (res_nodes, list(res_instances), res_missing)
3245

    
3246

    
3247
class LUClusterRepairDiskSizes(NoHooksLU):
3248
  """Verifies the cluster disks sizes.
3249

3250
  """
3251
  REQ_BGL = False
3252

    
3253
  def ExpandNames(self):
3254
    if self.op.instances:
3255
      self.wanted_names = _GetWantedInstances(self, self.op.instances)
3256
      self.needed_locks = {
3257
        locking.LEVEL_NODE_RES: [],
3258
        locking.LEVEL_INSTANCE: self.wanted_names,
3259
        }
3260
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
3261
    else:
3262
      self.wanted_names = None
3263
      self.needed_locks = {
3264
        locking.LEVEL_NODE_RES: locking.ALL_SET,
3265
        locking.LEVEL_INSTANCE: locking.ALL_SET,
3266
        }
3267
    self.share_locks = {
3268
      locking.LEVEL_NODE_RES: 1,
3269
      locking.LEVEL_INSTANCE: 0,
3270
      }
3271

    
3272
  def DeclareLocks(self, level):
3273
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
3274
      self._LockInstancesNodes(primary_only=True, level=level)
3275

    
3276
  def CheckPrereq(self):
3277
    """Check prerequisites.
3278

3279
    This only checks the optional instance list against the existing names.
3280

3281
    """
3282
    if self.wanted_names is None:
3283
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
3284

    
3285
    self.wanted_instances = \
3286
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
3287

    
3288
  def _EnsureChildSizes(self, disk):
3289
    """Ensure children of the disk have the needed disk size.
3290

3291
    This is valid mainly for DRBD8 and fixes an issue where the
3292
    children have smaller disk size.
3293

3294
    @param disk: an L{ganeti.objects.Disk} object
3295

3296
    """
3297
    if disk.dev_type == constants.LD_DRBD8:
3298
      assert disk.children, "Empty children for DRBD8?"
3299
      fchild = disk.children[0]
3300
      mismatch = fchild.size < disk.size
3301
      if mismatch:
3302
        self.LogInfo("Child disk has size %d, parent %d, fixing",
3303
                     fchild.size, disk.size)
3304
        fchild.size = disk.size
3305

    
3306
      # and we recurse on this child only, not on the metadev
3307
      return self._EnsureChildSizes(fchild) or mismatch
3308
    else:
3309
      return False
3310

    
3311
  def Exec(self, feedback_fn):
3312
    """Verify the size of cluster disks.
3313

3314
    """
3315
    # TODO: check child disks too
3316
    # TODO: check differences in size between primary/secondary nodes
3317
    per_node_disks = {}
3318
    for instance in self.wanted_instances:
3319
      pnode = instance.primary_node
3320
      if pnode not in per_node_disks:
3321
        per_node_disks[pnode] = []
3322
      for idx, disk in enumerate(instance.disks):
3323
        per_node_disks[pnode].append((instance, idx, disk))
3324

    
3325
    assert not (frozenset(per_node_disks.keys()) -
3326
                self.owned_locks(locking.LEVEL_NODE_RES)), \
3327
      "Not owning correct locks"
3328
    assert not self.owned_locks(locking.LEVEL_NODE)
3329

    
3330
    changed = []
3331
    for node, dskl in per_node_disks.items():
3332
      newl = [v[2].Copy() for v in dskl]
3333
      for dsk in newl:
3334
        self.cfg.SetDiskID(dsk, node)
3335
      result = self.rpc.call_blockdev_getsize(node, newl)
3336
      if result.fail_msg:
3337
        self.LogWarning("Failure in blockdev_getsize call to node"
3338
                        " %s, ignoring", node)
3339
        continue
3340
      if len(result.payload) != len(dskl):
3341
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
3342
                        " result.payload=%s", node, len(dskl), result.payload)
3343
        self.LogWarning("Invalid result from node %s, ignoring node results",
3344
                        node)
3345
        continue
3346
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
3347
        if size is None:
3348
          self.LogWarning("Disk %d of instance %s did not return size"
3349
                          " information, ignoring", idx, instance.name)
3350
          continue
3351
        if not isinstance(size, (int, long)):
3352
          self.LogWarning("Disk %d of instance %s did not return valid"
3353
                          " size information, ignoring", idx, instance.name)
3354
          continue
3355
        size = size >> 20
3356
        if size != disk.size:
3357
          self.LogInfo("Disk %d of instance %s has mismatched size,"
3358
                       " correcting: recorded %d, actual %d", idx,
3359
                       instance.name, disk.size, size)
3360
          disk.size = size
3361
          self.cfg.Update(instance, feedback_fn)
3362
          changed.append((instance.name, idx, size))
3363
        if self._EnsureChildSizes(disk):
3364
          self.cfg.Update(instance, feedback_fn)
3365
          changed.append((instance.name, idx, disk.size))
3366
    return changed
3367

    
3368

    
3369
class LUClusterRename(LogicalUnit):
3370
  """Rename the cluster.
3371

3372
  """
3373
  HPATH = "cluster-rename"
3374
  HTYPE = constants.HTYPE_CLUSTER
3375

    
3376
  def BuildHooksEnv(self):
3377
    """Build hooks env.
3378

3379
    """
3380
    return {
3381
      "OP_TARGET": self.cfg.GetClusterName(),
3382
      "NEW_NAME": self.op.name,
3383
      }
3384

    
3385
  def BuildHooksNodes(self):
3386
    """Build hooks nodes.
3387

3388
    """
3389
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
3390

    
3391
  def CheckPrereq(self):
3392
    """Verify that the passed name is a valid one.
3393

3394
    """
3395
    hostname = netutils.GetHostname(name=self.op.name,
3396
                                    family=self.cfg.GetPrimaryIPFamily())
3397

    
3398
    new_name = hostname.name
3399
    self.ip = new_ip = hostname.ip
3400
    old_name = self.cfg.GetClusterName()
3401
    old_ip = self.cfg.GetMasterIP()
3402
    if new_name == old_name and new_ip == old_ip:
3403
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
3404
                                 " cluster has changed",
3405
                                 errors.ECODE_INVAL)
3406
    if new_ip != old_ip:
3407
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
3408
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
3409
                                   " reachable on the network" %
3410
                                   new_ip, errors.ECODE_NOTUNIQUE)
3411

    
3412
    self.op.name = new_name
3413

    
3414
  def Exec(self, feedback_fn):
3415
    """Rename the cluster.
3416

3417
    """
3418
    clustername = self.op.name
3419
    new_ip = self.ip
3420

    
3421
    # shutdown the master IP
3422
    master_params = self.cfg.GetMasterNetworkParameters()
3423
    ems = self.cfg.GetUseExternalMipScript()
3424
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
3425
                                                     master_params, ems)
3426
    result.Raise("Could not disable the master role")
3427

    
3428
    try:
3429
      cluster = self.cfg.GetClusterInfo()
3430
      cluster.cluster_name = clustername
3431
      cluster.master_ip = new_ip
3432
      self.cfg.Update(cluster, feedback_fn)
3433

    
3434
      # update the known hosts file
3435
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
3436
      node_list = self.cfg.GetOnlineNodeList()
3437
      try:
3438
        node_list.remove(master_params.name)
3439
      except ValueError:
3440
        pass
3441
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
3442
    finally:
3443
      master_params.ip = new_ip
3444
      result = self.rpc.call_node_activate_master_ip(master_params.name,
3445
                                                     master_params, ems)
3446
      msg = result.fail_msg
3447
      if msg:
3448
        self.LogWarning("Could not re-enable the master role on"
3449
                        " the master, please restart manually: %s", msg)
3450

    
3451
    return clustername
3452

    
3453

    
3454
def _ValidateNetmask(cfg, netmask):
3455
  """Checks if a netmask is valid.
3456

3457
  @type cfg: L{config.ConfigWriter}
3458
  @param cfg: The cluster configuration
3459
  @type netmask: int
3460
  @param netmask: the netmask to be verified
3461
  @raise errors.OpPrereqError: if the validation fails
3462

3463
  """
3464
  ip_family = cfg.GetPrimaryIPFamily()
3465
  try:
3466
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
3467
  except errors.ProgrammerError:
3468
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
3469
                               ip_family)
3470
  if not ipcls.ValidateNetmask(netmask):
3471
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
3472
                                (netmask))
3473

    
3474

    
3475
class LUClusterSetParams(LogicalUnit):
3476
  """Change the parameters of the cluster.
3477

3478
  """
3479
  HPATH = "cluster-modify"
3480
  HTYPE = constants.HTYPE_CLUSTER
3481
  REQ_BGL = False
3482

    
3483
  def CheckArguments(self):
3484
    """Check parameters
3485

3486
    """
3487
    if self.op.uid_pool:
3488
      uidpool.CheckUidPool(self.op.uid_pool)
3489

    
3490
    if self.op.add_uids:
3491
      uidpool.CheckUidPool(self.op.add_uids)
3492

    
3493
    if self.op.remove_uids:
3494
      uidpool.CheckUidPool(self.op.remove_uids)
3495

    
3496
    if self.op.master_netmask is not None:
3497
      _ValidateNetmask(self.cfg, self.op.master_netmask)
3498

    
3499
  def ExpandNames(self):
3500
    # FIXME: in the future maybe other cluster params won't require checking on
3501
    # all nodes to be modified.
3502
    self.needed_locks = {
3503
      locking.LEVEL_NODE: locking.ALL_SET,
3504
    }
3505
    self.share_locks[locking.LEVEL_NODE] = 1
3506

    
3507
  def BuildHooksEnv(self):
3508
    """Build hooks env.
3509

3510
    """
3511
    return {
3512
      "OP_TARGET": self.cfg.GetClusterName(),
3513
      "NEW_VG_NAME": self.op.vg_name,
3514
      }
3515

    
3516
  def BuildHooksNodes(self):
3517
    """Build hooks nodes.
3518

3519
    """
3520
    mn = self.cfg.GetMasterNode()
3521
    return ([mn], [mn])
3522

    
3523
  def CheckPrereq(self):
3524
    """Check prerequisites.
3525

3526
    This checks whether the given params don't conflict and
3527
    if the given volume group is valid.
3528

3529
    """
3530
    if self.op.vg_name is not None and not self.op.vg_name:
3531
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
3532
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
3533
                                   " instances exist", errors.ECODE_INVAL)
3534

    
3535
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
3536
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
3537
        raise errors.OpPrereqError("Cannot disable drbd helper while"
3538
                                   " drbd-based instances exist",
3539
                                   errors.ECODE_INVAL)
3540

    
3541
    node_list = self.owned_locks(locking.LEVEL_NODE)
3542

    
3543
    # if vg_name not None, checks given volume group on all nodes
3544
    if self.op.vg_name:
3545
      vglist = self.rpc.call_vg_list(node_list)
3546
      for node in node_list:
3547
        msg = vglist[node].fail_msg
3548
        if msg:
3549
          # ignoring down node
3550
          self.LogWarning("Error while gathering data on node %s"
3551
                          " (ignoring node): %s", node, msg)
3552
          continue
3553
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
3554
                                              self.op.vg_name,
3555
                                              constants.MIN_VG_SIZE)
3556
        if vgstatus:
3557
          raise errors.OpPrereqError("Error on node '%s': %s" %
3558
                                     (node, vgstatus), errors.ECODE_ENVIRON)
3559

    
3560
    if self.op.drbd_helper:
3561
      # checks given drbd helper on all nodes
3562
      helpers = self.rpc.call_drbd_helper(node_list)
3563
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
3564
        if ninfo.offline:
3565
          self.LogInfo("Not checking drbd helper on offline node %s", node)
3566
          continue
3567
        msg = helpers[node].fail_msg
3568
        if msg:
3569
          raise errors.OpPrereqError("Error checking drbd helper on node"
3570
                                     " '%s': %s" % (node, msg),
3571
                                     errors.ECODE_ENVIRON)
3572
        node_helper = helpers[node].payload
3573
        if node_helper != self.op.drbd_helper:
3574
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
3575
                                     (node, node_helper), errors.ECODE_ENVIRON)
3576

    
3577
    self.cluster = cluster = self.cfg.GetClusterInfo()
3578
    # validate params changes
3579
    if self.op.beparams:
3580
      objects.UpgradeBeParams(self.op.beparams)
3581
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
3582
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
3583

    
3584
    if self.op.ndparams:
3585
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
3586
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
3587

    
3588
      # TODO: we need a more general way to handle resetting
3589
      # cluster-level parameters to default values
3590
      if self.new_ndparams["oob_program"] == "":
3591
        self.new_ndparams["oob_program"] = \
3592
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
3593

    
3594
    if self.op.nicparams:
3595
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
3596
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
3597
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
3598
      nic_errors = []
3599

    
3600
      # check all instances for consistency
3601
      for instance in self.cfg.GetAllInstancesInfo().values():
3602
        for nic_idx, nic in enumerate(instance.nics):
3603
          params_copy = copy.deepcopy(nic.nicparams)
3604
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
3605

    
3606
          # check parameter syntax
3607
          try:
3608
            objects.NIC.CheckParameterSyntax(params_filled)
3609
          except errors.ConfigurationError, err:
3610
            nic_errors.append("Instance %s, nic/%d: %s" %
3611
                              (instance.name, nic_idx, err))
3612

    
3613
          # if we're moving instances to routed, check that they have an ip
3614
          target_mode = params_filled[constants.NIC_MODE]
3615
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
3616
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
3617
                              " address" % (instance.name, nic_idx))
3618
      if nic_errors:
3619
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
3620
                                   "\n".join(nic_errors))
3621

    
3622
    # hypervisor list/parameters
3623
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
3624
    if self.op.hvparams:
3625
      for hv_name, hv_dict in self.op.hvparams.items():
3626
        if hv_name not in self.new_hvparams:
3627
          self.new_hvparams[hv_name] = hv_dict
3628
        else:
3629
          self.new_hvparams[hv_name].update(hv_dict)
3630

    
3631
    # os hypervisor parameters
3632
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
3633
    if self.op.os_hvp:
3634
      for os_name, hvs in self.op.os_hvp.items():
3635
        if os_name not in self.new_os_hvp:
3636
          self.new_os_hvp[os_name] = hvs
3637
        else:
3638
          for hv_name, hv_dict in hvs.items():
3639
            if hv_name not in self.new_os_hvp[os_name]:
3640
              self.new_os_hvp[os_name][hv_name] = hv_dict
3641
            else:
3642
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
3643

    
3644
    # os parameters
3645
    self.new_osp = objects.FillDict(cluster.osparams, {})
3646
    if self.op.osparams:
3647
      for os_name, osp in self.op.osparams.items():
3648
        if os_name not in self.new_osp:
3649
          self.new_osp[os_name] = {}
3650

    
3651
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
3652
                                                  use_none=True)
3653

    
3654
        if not self.new_osp[os_name]:
3655
          # we removed all parameters
3656
          del self.new_osp[os_name]
3657
        else:
3658
          # check the parameter validity (remote check)
3659
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
3660
                         os_name, self.new_osp[os_name])
3661

    
3662
    # changes to the hypervisor list
3663
    if self.op.enabled_hypervisors is not None:
3664
      self.hv_list = self.op.enabled_hypervisors
3665
      for hv in self.hv_list:
3666
        # if the hypervisor doesn't already exist in the cluster
3667
        # hvparams, we initialize it to empty, and then (in both
3668
        # cases) we make sure to fill the defaults, as we might not
3669
        # have a complete defaults list if the hypervisor wasn't
3670
        # enabled before
3671
        if hv not in new_hvp:
3672
          new_hvp[hv] = {}
3673
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
3674
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
3675
    else:
3676
      self.hv_list = cluster.enabled_hypervisors
3677

    
3678
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
3679
      # either the enabled list has changed, or the parameters have, validate
3680
      for hv_name, hv_params in self.new_hvparams.items():
3681
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
3682
            (self.op.enabled_hypervisors and
3683
             hv_name in self.op.enabled_hypervisors)):
3684
          # either this is a new hypervisor, or its parameters have changed
3685
          hv_class = hypervisor.GetHypervisor(hv_name)
3686
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3687
          hv_class.CheckParameterSyntax(hv_params)
3688
          _CheckHVParams(self, node_list, hv_name, hv_params)
3689

    
3690
    if self.op.os_hvp:
3691
      # no need to check any newly-enabled hypervisors, since the
3692
      # defaults have already been checked in the above code-block
3693
      for os_name, os_hvp in self.new_os_hvp.items():
3694
        for hv_name, hv_params in os_hvp.items():
3695
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3696
          # we need to fill in the new os_hvp on top of the actual hv_p
3697
          cluster_defaults = self.new_hvparams.get(hv_name, {})
3698
          new_osp = objects.FillDict(cluster_defaults, hv_params)
3699
          hv_class = hypervisor.GetHypervisor(hv_name)
3700
          hv_class.CheckParameterSyntax(new_osp)
3701
          _CheckHVParams(self, node_list, hv_name, new_osp)
3702

    
3703
    if self.op.default_iallocator:
3704
      alloc_script = utils.FindFile(self.op.default_iallocator,
3705
                                    constants.IALLOCATOR_SEARCH_PATH,
3706
                                    os.path.isfile)
3707
      if alloc_script is None:
3708
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
3709
                                   " specified" % self.op.default_iallocator,
3710
                                   errors.ECODE_INVAL)
3711

    
3712
  def Exec(self, feedback_fn):
3713
    """Change the parameters of the cluster.
3714

3715
    """
3716
    if self.op.vg_name is not None:
3717
      new_volume = self.op.vg_name
3718
      if not new_volume:
3719
        new_volume = None
3720
      if new_volume != self.cfg.GetVGName():
3721
        self.cfg.SetVGName(new_volume)
3722
      else:
3723
        feedback_fn("Cluster LVM configuration already in desired"
3724
                    " state, not changing")
3725
    if self.op.drbd_helper is not None:
3726
      new_helper = self.op.drbd_helper
3727
      if not new_helper:
3728
        new_helper = None
3729
      if new_helper != self.cfg.GetDRBDHelper():
3730
        self.cfg.SetDRBDHelper(new_helper)
3731
      else:
3732
        feedback_fn("Cluster DRBD helper already in desired state,"
3733
                    " not changing")
3734
    if self.op.hvparams:
3735
      self.cluster.hvparams = self.new_hvparams
3736
    if self.op.os_hvp:
3737
      self.cluster.os_hvp = self.new_os_hvp
3738
    if self.op.enabled_hypervisors is not None:
3739
      self.cluster.hvparams = self.new_hvparams
3740
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
3741
    if self.op.beparams:
3742
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
3743
    if self.op.nicparams:
3744
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
3745
    if self.op.osparams:
3746
      self.cluster.osparams = self.new_osp
3747
    if self.op.ndparams:
3748
      self.cluster.ndparams = self.new_ndparams
3749

    
3750
    if self.op.candidate_pool_size is not None:
3751
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3752
      # we need to update the pool size here, otherwise the save will fail
3753
      _AdjustCandidatePool(self, [])
3754

    
3755
    if self.op.maintain_node_health is not None:
3756
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
3757
        feedback_fn("Note: CONFD was disabled at build time, node health"
3758
                    " maintenance is not useful (still enabling it)")
3759
      self.cluster.maintain_node_health = self.op.maintain_node_health
3760

    
3761
    if self.op.prealloc_wipe_disks is not None:
3762
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3763

    
3764
    if self.op.add_uids is not None:
3765
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3766

    
3767
    if self.op.remove_uids is not None:
3768
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3769

    
3770
    if self.op.uid_pool is not None:
3771
      self.cluster.uid_pool = self.op.uid_pool
3772

    
3773
    if self.op.default_iallocator is not None:
3774
      self.cluster.default_iallocator = self.op.default_iallocator
3775

    
3776
    if self.op.reserved_lvs is not None:
3777
      self.cluster.reserved_lvs = self.op.reserved_lvs
3778

    
3779
    if self.op.use_external_mip_script is not None:
3780
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
3781

    
3782
    def helper_os(aname, mods, desc):
3783
      desc += " OS list"
3784
      lst = getattr(self.cluster, aname)
3785
      for key, val in mods:
3786
        if key == constants.DDM_ADD:
3787
          if val in lst:
3788
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3789
          else:
3790
            lst.append(val)
3791
        elif key == constants.DDM_REMOVE:
3792
          if val in lst:
3793
            lst.remove(val)
3794
          else:
3795
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3796
        else:
3797
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3798

    
3799
    if self.op.hidden_os:
3800
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3801

    
3802
    if self.op.blacklisted_os:
3803
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3804

    
3805
    if self.op.master_netdev:
3806
      master_params = self.cfg.GetMasterNetworkParameters()
3807
      ems = self.cfg.GetUseExternalMipScript()
3808
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3809
                  self.cluster.master_netdev)
3810
      result = self.rpc.call_node_deactivate_master_ip(master_params.name,
3811
                                                       master_params, ems)
3812
      result.Raise("Could not disable the master ip")
3813
      feedback_fn("Changing master_netdev from %s to %s" %
3814
                  (master_params.netdev, self.op.master_netdev))
3815
      self.cluster.master_netdev = self.op.master_netdev
3816

    
3817
    if self.op.master_netmask:
3818
      master_params = self.cfg.GetMasterNetworkParameters()
3819
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
3820
      result = self.rpc.call_node_change_master_netmask(master_params.name,
3821
                                                        master_params.netmask,
3822
                                                        self.op.master_netmask,
3823
                                                        master_params.ip,
3824
                                                        master_params.netdev)
3825
      if result.fail_msg:
3826
        msg = "Could not change the master IP netmask: %s" % result.fail_msg
3827
        feedback_fn(msg)
3828

    
3829
      self.cluster.master_netmask = self.op.master_netmask
3830

    
3831
    self.cfg.Update(self.cluster, feedback_fn)
3832

    
3833
    if self.op.master_netdev:
3834
      master_params = self.cfg.GetMasterNetworkParameters()
3835
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3836
                  self.op.master_netdev)
3837
      ems = self.cfg.GetUseExternalMipScript()
3838
      result = self.rpc.call_node_activate_master_ip(master_params.name,
3839
                                                     master_params, ems)
3840
      if result.fail_msg:
3841
        self.LogWarning("Could not re-enable the master ip on"
3842
                        " the master, please restart manually: %s",
3843
                        result.fail_msg)
3844

    
3845

    
3846
def _UploadHelper(lu, nodes, fname):
3847
  """Helper for uploading a file and showing warnings.
3848

3849
  """
3850
  if os.path.exists(fname):
3851
    result = lu.rpc.call_upload_file(nodes, fname)
3852
    for to_node, to_result in result.items():
3853
      msg = to_result.fail_msg
3854
      if msg:
3855
        msg = ("Copy of file %s to node %s failed: %s" %
3856
               (fname, to_node, msg))
3857
        lu.proc.LogWarning(msg)
3858

    
3859

    
3860
def _ComputeAncillaryFiles(cluster, redist):
3861
  """Compute files external to Ganeti which need to be consistent.
3862

3863
  @type redist: boolean
3864
  @param redist: Whether to include files which need to be redistributed
3865

3866
  """
3867
  # Compute files for all nodes
3868
  files_all = set([
3869
    constants.SSH_KNOWN_HOSTS_FILE,
3870
    constants.CONFD_HMAC_KEY,
3871
    constants.CLUSTER_DOMAIN_SECRET_FILE,
3872
    constants.SPICE_CERT_FILE,
3873
    constants.SPICE_CACERT_FILE,
3874
    constants.RAPI_USERS_FILE,
3875
    ])
3876

    
3877
  if not redist:
3878
    files_all.update(constants.ALL_CERT_FILES)
3879
    files_all.update(ssconf.SimpleStore().GetFileList())
3880
  else:
3881
    # we need to ship at least the RAPI certificate
3882
    files_all.add(constants.RAPI_CERT_FILE)
3883

    
3884
  if cluster.modify_etc_hosts:
3885
    files_all.add(constants.ETC_HOSTS)
3886

    
3887
  # Files which are optional, these must:
3888
  # - be present in one other category as well
3889
  # - either exist or not exist on all nodes of that category (mc, vm all)
3890
  files_opt = set([
3891
    constants.RAPI_USERS_FILE,
3892
    ])
3893

    
3894
  # Files which should only be on master candidates
3895
  files_mc = set()
3896

    
3897
  if not redist:
3898
    files_mc.add(constants.CLUSTER_CONF_FILE)
3899

    
3900
    # FIXME: this should also be replicated but Ganeti doesn't support files_mc
3901
    # replication
3902
    files_mc.add(constants.DEFAULT_MASTER_SETUP_SCRIPT)
3903

    
3904
  # Files which should only be on VM-capable nodes
3905
  files_vm = set(filename
3906
    for hv_name in cluster.enabled_hypervisors
3907
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0])
3908

    
3909
  files_opt |= set(filename
3910
    for hv_name in cluster.enabled_hypervisors
3911
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1])
3912

    
3913
  # Filenames in each category must be unique
3914
  all_files_set = files_all | files_mc | files_vm
3915
  assert (len(all_files_set) ==
3916
          sum(map(len, [files_all, files_mc, files_vm]))), \
3917
         "Found file listed in more than one file list"
3918

    
3919
  # Optional files must be present in one other category
3920
  assert all_files_set.issuperset(files_opt), \
3921
         "Optional file not in a different required list"
3922

    
3923
  return (files_all, files_opt, files_mc, files_vm)
3924

    
3925

    
3926
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3927
  """Distribute additional files which are part of the cluster configuration.
3928

3929
  ConfigWriter takes care of distributing the config and ssconf files, but
3930
  there are more files which should be distributed to all nodes. This function
3931
  makes sure those are copied.
3932

3933
  @param lu: calling logical unit
3934
  @param additional_nodes: list of nodes not in the config to distribute to
3935
  @type additional_vm: boolean
3936
  @param additional_vm: whether the additional nodes are vm-capable or not
3937

3938
  """
3939
  # Gather target nodes
3940
  cluster = lu.cfg.GetClusterInfo()
3941
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3942

    
3943
  online_nodes = lu.cfg.GetOnlineNodeList()
3944
  vm_nodes = lu.cfg.GetVmCapableNodeList()
3945

    
3946
  if additional_nodes is not None:
3947
    online_nodes.extend(additional_nodes)
3948
    if additional_vm:
3949
      vm_nodes.extend(additional_nodes)
3950

    
3951
  # Never distribute to master node
3952
  for nodelist in [online_nodes, vm_nodes]:
3953
    if master_info.name in nodelist:
3954
      nodelist.remove(master_info.name)
3955

    
3956
  # Gather file lists
3957
  (files_all, _, files_mc, files_vm) = \
3958
    _ComputeAncillaryFiles(cluster, True)
3959

    
3960
  # Never re-distribute configuration file from here
3961
  assert not (constants.CLUSTER_CONF_FILE in files_all or
3962
              constants.CLUSTER_CONF_FILE in files_vm)
3963
  assert not files_mc, "Master candidates not handled in this function"
3964

    
3965
  filemap = [
3966
    (online_nodes, files_all),
3967
    (vm_nodes, files_vm),
3968
    ]
3969

    
3970
  # Upload the files
3971
  for (node_list, files) in filemap:
3972
    for fname in files:
3973
      _UploadHelper(lu, node_list, fname)
3974

    
3975

    
3976
class LUClusterRedistConf(NoHooksLU):
3977
  """Force the redistribution of cluster configuration.
3978

3979
  This is a very simple LU.
3980

3981
  """
3982
  REQ_BGL = False
3983

    
3984
  def ExpandNames(self):
3985
    self.needed_locks = {
3986
      locking.LEVEL_NODE: locking.ALL_SET,
3987
    }
3988
    self.share_locks[locking.LEVEL_NODE] = 1
3989

    
3990
  def Exec(self, feedback_fn):
3991
    """Redistribute the configuration.
3992

3993
    """
3994
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3995
    _RedistributeAncillaryFiles(self)
3996

    
3997

    
3998
class LUClusterActivateMasterIp(NoHooksLU):
3999
  """Activate the master IP on the master node.
4000

4001
  """
4002
  def Exec(self, feedback_fn):
4003
    """Activate the master IP.
4004

4005
    """
4006
    master_params = self.cfg.GetMasterNetworkParameters()
4007
    ems = self.cfg.GetUseExternalMipScript()
4008
    result = self.rpc.call_node_activate_master_ip(master_params.name,
4009
                                                   master_params, ems)
4010
    result.Raise("Could not activate the master IP")
4011

    
4012

    
4013
class LUClusterDeactivateMasterIp(NoHooksLU):
4014
  """Deactivate the master IP on the master node.
4015

4016
  """
4017
  def Exec(self, feedback_fn):
4018
    """Deactivate the master IP.
4019

4020
    """
4021
    master_params = self.cfg.GetMasterNetworkParameters()
4022
    ems = self.cfg.GetUseExternalMipScript()
4023
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
4024
                                                     master_params, ems)
4025
    result.Raise("Could not deactivate the master IP")
4026

    
4027

    
4028
def _WaitForSync(lu, instance, disks=None, oneshot=False):
4029
  """Sleep and poll for an instance's disk to sync.
4030

4031
  """
4032
  if not instance.disks or disks is not None and not disks:
4033
    return True
4034

    
4035
  disks = _ExpandCheckDisks(instance, disks)
4036

    
4037
  if not oneshot:
4038
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
4039

    
4040
  node = instance.primary_node
4041

    
4042
  for dev in disks:
4043
    lu.cfg.SetDiskID(dev, node)
4044

    
4045
  # TODO: Convert to utils.Retry
4046

    
4047
  retries = 0
4048
  degr_retries = 10 # in seconds, as we sleep 1 second each time
4049
  while True:
4050
    max_time = 0
4051
    done = True
4052
    cumul_degraded = False
4053
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
4054
    msg = rstats.fail_msg
4055
    if msg:
4056
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
4057
      retries += 1
4058
      if retries >= 10:
4059
        raise errors.RemoteError("Can't contact node %s for mirror data,"
4060
                                 " aborting." % node)
4061
      time.sleep(6)
4062
      continue
4063
    rstats = rstats.payload
4064
    retries = 0
4065
    for i, mstat in enumerate(rstats):
4066
      if mstat is None:
4067
        lu.LogWarning("Can't compute data for node %s/%s",
4068
                           node, disks[i].iv_name)
4069
        continue
4070

    
4071
      cumul_degraded = (cumul_degraded or
4072
                        (mstat.is_degraded and mstat.sync_percent is None))
4073
      if mstat.sync_percent is not None:
4074
        done = False
4075
        if mstat.estimated_time is not None:
4076
          rem_time = ("%s remaining (estimated)" %
4077
                      utils.FormatSeconds(mstat.estimated_time))
4078
          max_time = mstat.estimated_time
4079
        else:
4080
          rem_time = "no time estimate"
4081
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
4082
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
4083

    
4084
    # if we're done but degraded, let's do a few small retries, to
4085
    # make sure we see a stable and not transient situation; therefore
4086
    # we force restart of the loop
4087
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
4088
      logging.info("Degraded disks found, %d retries left", degr_retries)
4089
      degr_retries -= 1
4090
      time.sleep(1)
4091
      continue
4092

    
4093
    if done or oneshot:
4094
      break
4095

    
4096
    time.sleep(min(60, max_time))
4097

    
4098
  if done:
4099
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
4100
  return not cumul_degraded
4101

    
4102

    
4103
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
4104
  """Check that mirrors are not degraded.
4105

4106
  The ldisk parameter, if True, will change the test from the
4107
  is_degraded attribute (which represents overall non-ok status for
4108
  the device(s)) to the ldisk (representing the local storage status).
4109

4110
  """
4111
  lu.cfg.SetDiskID(dev, node)
4112

    
4113
  result = True
4114

    
4115
  if on_primary or dev.AssembleOnSecondary():
4116
    rstats = lu.rpc.call_blockdev_find(node, dev)
4117