Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 57de31c0

History | View | Annotate | Download (494.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay too many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42
import itertools
43
import operator
44

    
45
from ganeti import ssh
46
from ganeti import utils
47
from ganeti import errors
48
from ganeti import hypervisor
49
from ganeti import locking
50
from ganeti import constants
51
from ganeti import objects
52
from ganeti import serializer
53
from ganeti import ssconf
54
from ganeti import uidpool
55
from ganeti import compat
56
from ganeti import masterd
57
from ganeti import netutils
58
from ganeti import query
59
from ganeti import qlang
60
from ganeti import opcodes
61
from ganeti import ht
62
from ganeti import rpc
63

    
64
import ganeti.masterd.instance # pylint: disable=W0611
65

    
66

    
67
#: Size of DRBD meta block device
68
DRBD_META_SIZE = 128
69

    
70
# States of instance
71
INSTANCE_UP = [constants.ADMINST_UP]
72
INSTANCE_DOWN = [constants.ADMINST_DOWN]
73
INSTANCE_OFFLINE = [constants.ADMINST_OFFLINE]
74
INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
75
INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
76

    
77

    
78
class ResultWithJobs:
79
  """Data container for LU results with jobs.
80

81
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
82
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
83
  contained in the C{jobs} attribute and include the job IDs in the opcode
84
  result.
85

86
  """
87
  def __init__(self, jobs, **kwargs):
88
    """Initializes this class.
89

90
    Additional return values can be specified as keyword arguments.
91

92
    @type jobs: list of lists of L{opcode.OpCode}
93
    @param jobs: A list of lists of opcode objects
94

95
    """
96
    self.jobs = jobs
97
    self.other = kwargs
98

    
99

    
100
class LogicalUnit(object):
101
  """Logical Unit base class.
102

103
  Subclasses must follow these rules:
104
    - implement ExpandNames
105
    - implement CheckPrereq (except when tasklets are used)
106
    - implement Exec (except when tasklets are used)
107
    - implement BuildHooksEnv
108
    - implement BuildHooksNodes
109
    - redefine HPATH and HTYPE
110
    - optionally redefine their run requirements:
111
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
112

113
  Note that all commands require root permissions.
114

115
  @ivar dry_run_result: the value (if any) that will be returned to the caller
116
      in dry-run mode (signalled by opcode dry_run parameter)
117

118
  """
119
  HPATH = None
120
  HTYPE = None
121
  REQ_BGL = True
122

    
123
  def __init__(self, processor, op, context, rpc_runner):
124
    """Constructor for LogicalUnit.
125

126
    This needs to be overridden in derived classes in order to check op
127
    validity.
128

129
    """
130
    self.proc = processor
131
    self.op = op
132
    self.cfg = context.cfg
133
    self.glm = context.glm
134
    # readability alias
135
    self.owned_locks = context.glm.list_owned
136
    self.context = context
137
    self.rpc = rpc_runner
138
    # Dicts used to declare locking needs to mcpu
139
    self.needed_locks = None
140
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
141
    self.add_locks = {}
142
    self.remove_locks = {}
143
    # Used to force good behavior when calling helper functions
144
    self.recalculate_locks = {}
145
    # logging
146
    self.Log = processor.Log # pylint: disable=C0103
147
    self.LogWarning = processor.LogWarning # pylint: disable=C0103
148
    self.LogInfo = processor.LogInfo # pylint: disable=C0103
149
    self.LogStep = processor.LogStep # pylint: disable=C0103
150
    # support for dry-run
151
    self.dry_run_result = None
152
    # support for generic debug attribute
153
    if (not hasattr(self.op, "debug_level") or
154
        not isinstance(self.op.debug_level, int)):
155
      self.op.debug_level = 0
156

    
157
    # Tasklets
158
    self.tasklets = None
159

    
160
    # Validate opcode parameters and set defaults
161
    self.op.Validate(True)
162

    
163
    self.CheckArguments()
164

    
165
  def CheckArguments(self):
166
    """Check syntactic validity for the opcode arguments.
167

168
    This method is for doing a simple syntactic check and ensure
169
    validity of opcode parameters, without any cluster-related
170
    checks. While the same can be accomplished in ExpandNames and/or
171
    CheckPrereq, doing these separate is better because:
172

173
      - ExpandNames is left as as purely a lock-related function
174
      - CheckPrereq is run after we have acquired locks (and possible
175
        waited for them)
176

177
    The function is allowed to change the self.op attribute so that
178
    later methods can no longer worry about missing parameters.
179

180
    """
181
    pass
182

    
183
  def ExpandNames(self):
184
    """Expand names for this LU.
185

186
    This method is called before starting to execute the opcode, and it should
187
    update all the parameters of the opcode to their canonical form (e.g. a
188
    short node name must be fully expanded after this method has successfully
189
    completed). This way locking, hooks, logging, etc. can work correctly.
190

191
    LUs which implement this method must also populate the self.needed_locks
192
    member, as a dict with lock levels as keys, and a list of needed lock names
193
    as values. Rules:
194

195
      - use an empty dict if you don't need any lock
196
      - if you don't need any lock at a particular level omit that level
197
      - don't put anything for the BGL level
198
      - if you want all locks at a level use locking.ALL_SET as a value
199

200
    If you need to share locks (rather than acquire them exclusively) at one
201
    level you can modify self.share_locks, setting a true value (usually 1) for
202
    that level. By default locks are not shared.
203

204
    This function can also define a list of tasklets, which then will be
205
    executed in order instead of the usual LU-level CheckPrereq and Exec
206
    functions, if those are not defined by the LU.
207

208
    Examples::
209

210
      # Acquire all nodes and one instance
211
      self.needed_locks = {
212
        locking.LEVEL_NODE: locking.ALL_SET,
213
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
214
      }
215
      # Acquire just two nodes
216
      self.needed_locks = {
217
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
218
      }
219
      # Acquire no locks
220
      self.needed_locks = {} # No, you can't leave it to the default value None
221

222
    """
223
    # The implementation of this method is mandatory only if the new LU is
224
    # concurrent, so that old LUs don't need to be changed all at the same
225
    # time.
226
    if self.REQ_BGL:
227
      self.needed_locks = {} # Exclusive LUs don't need locks.
228
    else:
229
      raise NotImplementedError
230

    
231
  def DeclareLocks(self, level):
232
    """Declare LU locking needs for a level
233

234
    While most LUs can just declare their locking needs at ExpandNames time,
235
    sometimes there's the need to calculate some locks after having acquired
236
    the ones before. This function is called just before acquiring locks at a
237
    particular level, but after acquiring the ones at lower levels, and permits
238
    such calculations. It can be used to modify self.needed_locks, and by
239
    default it does nothing.
240

241
    This function is only called if you have something already set in
242
    self.needed_locks for the level.
243

244
    @param level: Locking level which is going to be locked
245
    @type level: member of ganeti.locking.LEVELS
246

247
    """
248

    
249
  def CheckPrereq(self):
250
    """Check prerequisites for this LU.
251

252
    This method should check that the prerequisites for the execution
253
    of this LU are fulfilled. It can do internode communication, but
254
    it should be idempotent - no cluster or system changes are
255
    allowed.
256

257
    The method should raise errors.OpPrereqError in case something is
258
    not fulfilled. Its return value is ignored.
259

260
    This method should also update all the parameters of the opcode to
261
    their canonical form if it hasn't been done by ExpandNames before.
262

263
    """
264
    if self.tasklets is not None:
265
      for (idx, tl) in enumerate(self.tasklets):
266
        logging.debug("Checking prerequisites for tasklet %s/%s",
267
                      idx + 1, len(self.tasklets))
268
        tl.CheckPrereq()
269
    else:
270
      pass
271

    
272
  def Exec(self, feedback_fn):
273
    """Execute the LU.
274

275
    This method should implement the actual work. It should raise
276
    errors.OpExecError for failures that are somewhat dealt with in
277
    code, or expected.
278

279
    """
280
    if self.tasklets is not None:
281
      for (idx, tl) in enumerate(self.tasklets):
282
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
283
        tl.Exec(feedback_fn)
284
    else:
285
      raise NotImplementedError
286

    
287
  def BuildHooksEnv(self):
288
    """Build hooks environment for this LU.
289

290
    @rtype: dict
291
    @return: Dictionary containing the environment that will be used for
292
      running the hooks for this LU. The keys of the dict must not be prefixed
293
      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
294
      will extend the environment with additional variables. If no environment
295
      should be defined, an empty dictionary should be returned (not C{None}).
296
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
297
      will not be called.
298

299
    """
300
    raise NotImplementedError
301

    
302
  def BuildHooksNodes(self):
303
    """Build list of nodes to run LU's hooks.
304

305
    @rtype: tuple; (list, list)
306
    @return: Tuple containing a list of node names on which the hook
307
      should run before the execution and a list of node names on which the
308
      hook should run after the execution. No nodes should be returned as an
309
      empty list (and not None).
310
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
311
      will not be called.
312

313
    """
314
    raise NotImplementedError
315

    
316
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
317
    """Notify the LU about the results of its hooks.
318

319
    This method is called every time a hooks phase is executed, and notifies
320
    the Logical Unit about the hooks' result. The LU can then use it to alter
321
    its result based on the hooks.  By default the method does nothing and the
322
    previous result is passed back unchanged but any LU can define it if it
323
    wants to use the local cluster hook-scripts somehow.
324

325
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
326
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
327
    @param hook_results: the results of the multi-node hooks rpc call
328
    @param feedback_fn: function used send feedback back to the caller
329
    @param lu_result: the previous Exec result this LU had, or None
330
        in the PRE phase
331
    @return: the new Exec result, based on the previous result
332
        and hook results
333

334
    """
335
    # API must be kept, thus we ignore the unused argument and could
336
    # be a function warnings
337
    # pylint: disable=W0613,R0201
338
    return lu_result
339

    
340
  def _ExpandAndLockInstance(self):
341
    """Helper function to expand and lock an instance.
342

343
    Many LUs that work on an instance take its name in self.op.instance_name
344
    and need to expand it and then declare the expanded name for locking. This
345
    function does it, and then updates self.op.instance_name to the expanded
346
    name. It also initializes needed_locks as a dict, if this hasn't been done
347
    before.
348

349
    """
350
    if self.needed_locks is None:
351
      self.needed_locks = {}
352
    else:
353
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
354
        "_ExpandAndLockInstance called with instance-level locks set"
355
    self.op.instance_name = _ExpandInstanceName(self.cfg,
356
                                                self.op.instance_name)
357
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
358

    
359
  def _LockInstancesNodes(self, primary_only=False,
360
                          level=locking.LEVEL_NODE):
361
    """Helper function to declare instances' nodes for locking.
362

363
    This function should be called after locking one or more instances to lock
364
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
365
    with all primary or secondary nodes for instances already locked and
366
    present in self.needed_locks[locking.LEVEL_INSTANCE].
367

368
    It should be called from DeclareLocks, and for safety only works if
369
    self.recalculate_locks[locking.LEVEL_NODE] is set.
370

371
    In the future it may grow parameters to just lock some instance's nodes, or
372
    to just lock primaries or secondary nodes, if needed.
373

374
    If should be called in DeclareLocks in a way similar to::
375

376
      if level == locking.LEVEL_NODE:
377
        self._LockInstancesNodes()
378

379
    @type primary_only: boolean
380
    @param primary_only: only lock primary nodes of locked instances
381
    @param level: Which lock level to use for locking nodes
382

383
    """
384
    assert level in self.recalculate_locks, \
385
      "_LockInstancesNodes helper function called with no nodes to recalculate"
386

    
387
    # TODO: check if we're really been called with the instance locks held
388

    
389
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
390
    # future we might want to have different behaviors depending on the value
391
    # of self.recalculate_locks[locking.LEVEL_NODE]
392
    wanted_nodes = []
393
    locked_i = self.owned_locks(locking.LEVEL_INSTANCE)
394
    for _, instance in self.cfg.GetMultiInstanceInfo(locked_i):
395
      wanted_nodes.append(instance.primary_node)
396
      if not primary_only:
397
        wanted_nodes.extend(instance.secondary_nodes)
398

    
399
    if self.recalculate_locks[level] == constants.LOCKS_REPLACE:
400
      self.needed_locks[level] = wanted_nodes
401
    elif self.recalculate_locks[level] == constants.LOCKS_APPEND:
402
      self.needed_locks[level].extend(wanted_nodes)
403
    else:
404
      raise errors.ProgrammerError("Unknown recalculation mode")
405

    
406
    del self.recalculate_locks[level]
407

    
408

    
409
class NoHooksLU(LogicalUnit): # pylint: disable=W0223
410
  """Simple LU which runs no hooks.
411

412
  This LU is intended as a parent for other LogicalUnits which will
413
  run no hooks, in order to reduce duplicate code.
414

415
  """
416
  HPATH = None
417
  HTYPE = None
418

    
419
  def BuildHooksEnv(self):
420
    """Empty BuildHooksEnv for NoHooksLu.
421

422
    This just raises an error.
423

424
    """
425
    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
426

    
427
  def BuildHooksNodes(self):
428
    """Empty BuildHooksNodes for NoHooksLU.
429

430
    """
431
    raise AssertionError("BuildHooksNodes called for NoHooksLU")
432

    
433

    
434
class Tasklet:
435
  """Tasklet base class.
436

437
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
438
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
439
  tasklets know nothing about locks.
440

441
  Subclasses must follow these rules:
442
    - Implement CheckPrereq
443
    - Implement Exec
444

445
  """
446
  def __init__(self, lu):
447
    self.lu = lu
448

    
449
    # Shortcuts
450
    self.cfg = lu.cfg
451
    self.rpc = lu.rpc
452

    
453
  def CheckPrereq(self):
454
    """Check prerequisites for this tasklets.
455

456
    This method should check whether the prerequisites for the execution of
457
    this tasklet are fulfilled. It can do internode communication, but it
458
    should be idempotent - no cluster or system changes are allowed.
459

460
    The method should raise errors.OpPrereqError in case something is not
461
    fulfilled. Its return value is ignored.
462

463
    This method should also update all parameters to their canonical form if it
464
    hasn't been done before.
465

466
    """
467
    pass
468

    
469
  def Exec(self, feedback_fn):
470
    """Execute the tasklet.
471

472
    This method should implement the actual work. It should raise
473
    errors.OpExecError for failures that are somewhat dealt with in code, or
474
    expected.
475

476
    """
477
    raise NotImplementedError
478

    
479

    
480
class _QueryBase:
481
  """Base for query utility classes.
482

483
  """
484
  #: Attribute holding field definitions
485
  FIELDS = None
486

    
487
  def __init__(self, qfilter, fields, use_locking):
488
    """Initializes this class.
489

490
    """
491
    self.use_locking = use_locking
492

    
493
    self.query = query.Query(self.FIELDS, fields, qfilter=qfilter,
494
                             namefield="name")
495
    self.requested_data = self.query.RequestedData()
496
    self.names = self.query.RequestedNames()
497

    
498
    # Sort only if no names were requested
499
    self.sort_by_name = not self.names
500

    
501
    self.do_locking = None
502
    self.wanted = None
503

    
504
  def _GetNames(self, lu, all_names, lock_level):
505
    """Helper function to determine names asked for in the query.
506

507
    """
508
    if self.do_locking:
509
      names = lu.owned_locks(lock_level)
510
    else:
511
      names = all_names
512

    
513
    if self.wanted == locking.ALL_SET:
514
      assert not self.names
515
      # caller didn't specify names, so ordering is not important
516
      return utils.NiceSort(names)
517

    
518
    # caller specified names and we must keep the same order
519
    assert self.names
520
    assert not self.do_locking or lu.glm.is_owned(lock_level)
521

    
522
    missing = set(self.wanted).difference(names)
523
    if missing:
524
      raise errors.OpExecError("Some items were removed before retrieving"
525
                               " their data: %s" % missing)
526

    
527
    # Return expanded names
528
    return self.wanted
529

    
530
  def ExpandNames(self, lu):
531
    """Expand names for this query.
532

533
    See L{LogicalUnit.ExpandNames}.
534

535
    """
536
    raise NotImplementedError()
537

    
538
  def DeclareLocks(self, lu, level):
539
    """Declare locks for this query.
540

541
    See L{LogicalUnit.DeclareLocks}.
542

543
    """
544
    raise NotImplementedError()
545

    
546
  def _GetQueryData(self, lu):
547
    """Collects all data for this query.
548

549
    @return: Query data object
550

551
    """
552
    raise NotImplementedError()
553

    
554
  def NewStyleQuery(self, lu):
555
    """Collect data and execute query.
556

557
    """
558
    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
559
                                  sort_by_name=self.sort_by_name)
560

    
561
  def OldStyleQuery(self, lu):
562
    """Collect data and execute query.
563

564
    """
565
    return self.query.OldStyleQuery(self._GetQueryData(lu),
566
                                    sort_by_name=self.sort_by_name)
567

    
568

    
569
def _ShareAll():
570
  """Returns a dict declaring all lock levels shared.
571

572
  """
573
  return dict.fromkeys(locking.LEVELS, 1)
574

    
575

    
576
def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
577
  """Checks if the owned node groups are still correct for an instance.
578

579
  @type cfg: L{config.ConfigWriter}
580
  @param cfg: The cluster configuration
581
  @type instance_name: string
582
  @param instance_name: Instance name
583
  @type owned_groups: set or frozenset
584
  @param owned_groups: List of currently owned node groups
585

586
  """
587
  inst_groups = cfg.GetInstanceNodeGroups(instance_name)
588

    
589
  if not owned_groups.issuperset(inst_groups):
590
    raise errors.OpPrereqError("Instance %s's node groups changed since"
591
                               " locks were acquired, current groups are"
592
                               " are '%s', owning groups '%s'; retry the"
593
                               " operation" %
594
                               (instance_name,
595
                                utils.CommaJoin(inst_groups),
596
                                utils.CommaJoin(owned_groups)),
597
                               errors.ECODE_STATE)
598

    
599
  return inst_groups
600

    
601

    
602
def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
603
  """Checks if the instances in a node group are still correct.
604

605
  @type cfg: L{config.ConfigWriter}
606
  @param cfg: The cluster configuration
607
  @type group_uuid: string
608
  @param group_uuid: Node group UUID
609
  @type owned_instances: set or frozenset
610
  @param owned_instances: List of currently owned instances
611

612
  """
613
  wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
614
  if owned_instances != wanted_instances:
615
    raise errors.OpPrereqError("Instances in node group '%s' changed since"
616
                               " locks were acquired, wanted '%s', have '%s';"
617
                               " retry the operation" %
618
                               (group_uuid,
619
                                utils.CommaJoin(wanted_instances),
620
                                utils.CommaJoin(owned_instances)),
621
                               errors.ECODE_STATE)
622

    
623
  return wanted_instances
624

    
625

    
626
def _SupportsOob(cfg, node):
627
  """Tells if node supports OOB.
628

629
  @type cfg: L{config.ConfigWriter}
630
  @param cfg: The cluster configuration
631
  @type node: L{objects.Node}
632
  @param node: The node
633
  @return: The OOB script if supported or an empty string otherwise
634

635
  """
636
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
637

    
638

    
639
def _GetWantedNodes(lu, nodes):
640
  """Returns list of checked and expanded node names.
641

642
  @type lu: L{LogicalUnit}
643
  @param lu: the logical unit on whose behalf we execute
644
  @type nodes: list
645
  @param nodes: list of node names or None for all nodes
646
  @rtype: list
647
  @return: the list of nodes, sorted
648
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
649

650
  """
651
  if nodes:
652
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
653

    
654
  return utils.NiceSort(lu.cfg.GetNodeList())
655

    
656

    
657
def _GetWantedInstances(lu, instances):
658
  """Returns list of checked and expanded instance names.
659

660
  @type lu: L{LogicalUnit}
661
  @param lu: the logical unit on whose behalf we execute
662
  @type instances: list
663
  @param instances: list of instance names or None for all instances
664
  @rtype: list
665
  @return: the list of instances, sorted
666
  @raise errors.OpPrereqError: if the instances parameter is wrong type
667
  @raise errors.OpPrereqError: if any of the passed instances is not found
668

669
  """
670
  if instances:
671
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
672
  else:
673
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
674
  return wanted
675

    
676

    
677
def _GetUpdatedParams(old_params, update_dict,
678
                      use_default=True, use_none=False):
679
  """Return the new version of a parameter dictionary.
680

681
  @type old_params: dict
682
  @param old_params: old parameters
683
  @type update_dict: dict
684
  @param update_dict: dict containing new parameter values, or
685
      constants.VALUE_DEFAULT to reset the parameter to its default
686
      value
687
  @param use_default: boolean
688
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
689
      values as 'to be deleted' values
690
  @param use_none: boolean
691
  @type use_none: whether to recognise C{None} values as 'to be
692
      deleted' values
693
  @rtype: dict
694
  @return: the new parameter dictionary
695

696
  """
697
  params_copy = copy.deepcopy(old_params)
698
  for key, val in update_dict.iteritems():
699
    if ((use_default and val == constants.VALUE_DEFAULT) or
700
        (use_none and val is None)):
701
      try:
702
        del params_copy[key]
703
      except KeyError:
704
        pass
705
    else:
706
      params_copy[key] = val
707
  return params_copy
708

    
709

    
710
def _ReleaseLocks(lu, level, names=None, keep=None):
711
  """Releases locks owned by an LU.
712

713
  @type lu: L{LogicalUnit}
714
  @param level: Lock level
715
  @type names: list or None
716
  @param names: Names of locks to release
717
  @type keep: list or None
718
  @param keep: Names of locks to retain
719

720
  """
721
  assert not (keep is not None and names is not None), \
722
         "Only one of the 'names' and the 'keep' parameters can be given"
723

    
724
  if names is not None:
725
    should_release = names.__contains__
726
  elif keep:
727
    should_release = lambda name: name not in keep
728
  else:
729
    should_release = None
730

    
731
  owned = lu.owned_locks(level)
732
  if not owned:
733
    # Not owning any lock at this level, do nothing
734
    pass
735

    
736
  elif should_release:
737
    retain = []
738
    release = []
739

    
740
    # Determine which locks to release
741
    for name in owned:
742
      if should_release(name):
743
        release.append(name)
744
      else:
745
        retain.append(name)
746

    
747
    assert len(lu.owned_locks(level)) == (len(retain) + len(release))
748

    
749
    # Release just some locks
750
    lu.glm.release(level, names=release)
751

    
752
    assert frozenset(lu.owned_locks(level)) == frozenset(retain)
753
  else:
754
    # Release everything
755
    lu.glm.release(level)
756

    
757
    assert not lu.glm.is_owned(level), "No locks should be owned"
758

    
759

    
760
def _MapInstanceDisksToNodes(instances):
761
  """Creates a map from (node, volume) to instance name.
762

763
  @type instances: list of L{objects.Instance}
764
  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
765

766
  """
767
  return dict(((node, vol), inst.name)
768
              for inst in instances
769
              for (node, vols) in inst.MapLVsByNode().items()
770
              for vol in vols)
771

    
772

    
773
def _RunPostHook(lu, node_name):
774
  """Runs the post-hook for an opcode on a single node.
775

776
  """
777
  hm = lu.proc.BuildHooksManager(lu)
778
  try:
779
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
780
  except:
781
    # pylint: disable=W0702
782
    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
783

    
784

    
785
def _CheckOutputFields(static, dynamic, selected):
786
  """Checks whether all selected fields are valid.
787

788
  @type static: L{utils.FieldSet}
789
  @param static: static fields set
790
  @type dynamic: L{utils.FieldSet}
791
  @param dynamic: dynamic fields set
792

793
  """
794
  f = utils.FieldSet()
795
  f.Extend(static)
796
  f.Extend(dynamic)
797

    
798
  delta = f.NonMatching(selected)
799
  if delta:
800
    raise errors.OpPrereqError("Unknown output fields selected: %s"
801
                               % ",".join(delta), errors.ECODE_INVAL)
802

    
803

    
804
def _CheckGlobalHvParams(params):
805
  """Validates that given hypervisor params are not global ones.
806

807
  This will ensure that instances don't get customised versions of
808
  global params.
809

810
  """
811
  used_globals = constants.HVC_GLOBALS.intersection(params)
812
  if used_globals:
813
    msg = ("The following hypervisor parameters are global and cannot"
814
           " be customized at instance level, please modify them at"
815
           " cluster level: %s" % utils.CommaJoin(used_globals))
816
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
817

    
818

    
819
def _CheckNodeOnline(lu, node, msg=None):
820
  """Ensure that a given node is online.
821

822
  @param lu: the LU on behalf of which we make the check
823
  @param node: the node to check
824
  @param msg: if passed, should be a message to replace the default one
825
  @raise errors.OpPrereqError: if the node is offline
826

827
  """
828
  if msg is None:
829
    msg = "Can't use offline node"
830
  if lu.cfg.GetNodeInfo(node).offline:
831
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
832

    
833

    
834
def _CheckNodeNotDrained(lu, node):
835
  """Ensure that a given node is not drained.
836

837
  @param lu: the LU on behalf of which we make the check
838
  @param node: the node to check
839
  @raise errors.OpPrereqError: if the node is drained
840

841
  """
842
  if lu.cfg.GetNodeInfo(node).drained:
843
    raise errors.OpPrereqError("Can't use drained node %s" % node,
844
                               errors.ECODE_STATE)
845

    
846

    
847
def _CheckNodeVmCapable(lu, node):
848
  """Ensure that a given node is vm capable.
849

850
  @param lu: the LU on behalf of which we make the check
851
  @param node: the node to check
852
  @raise errors.OpPrereqError: if the node is not vm capable
853

854
  """
855
  if not lu.cfg.GetNodeInfo(node).vm_capable:
856
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
857
                               errors.ECODE_STATE)
858

    
859

    
860
def _CheckNodeHasOS(lu, node, os_name, force_variant):
861
  """Ensure that a node supports a given OS.
862

863
  @param lu: the LU on behalf of which we make the check
864
  @param node: the node to check
865
  @param os_name: the OS to query about
866
  @param force_variant: whether to ignore variant errors
867
  @raise errors.OpPrereqError: if the node is not supporting the OS
868

869
  """
870
  result = lu.rpc.call_os_get(node, os_name)
871
  result.Raise("OS '%s' not in supported OS list for node %s" %
872
               (os_name, node),
873
               prereq=True, ecode=errors.ECODE_INVAL)
874
  if not force_variant:
875
    _CheckOSVariant(result.payload, os_name)
876

    
877

    
878
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
879
  """Ensure that a node has the given secondary ip.
880

881
  @type lu: L{LogicalUnit}
882
  @param lu: the LU on behalf of which we make the check
883
  @type node: string
884
  @param node: the node to check
885
  @type secondary_ip: string
886
  @param secondary_ip: the ip to check
887
  @type prereq: boolean
888
  @param prereq: whether to throw a prerequisite or an execute error
889
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
890
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
891

892
  """
893
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
894
  result.Raise("Failure checking secondary ip on node %s" % node,
895
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
896
  if not result.payload:
897
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
898
           " please fix and re-run this command" % secondary_ip)
899
    if prereq:
900
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
901
    else:
902
      raise errors.OpExecError(msg)
903

    
904

    
905
def _GetClusterDomainSecret():
906
  """Reads the cluster domain secret.
907

908
  """
909
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
910
                               strict=True)
911

    
912

    
913
def _CheckInstanceState(lu, instance, req_states, msg=None):
914
  """Ensure that an instance is in one of the required states.
915

916
  @param lu: the LU on behalf of which we make the check
917
  @param instance: the instance to check
918
  @param msg: if passed, should be a message to replace the default one
919
  @raise errors.OpPrereqError: if the instance is not in the required state
920

921
  """
922
  if msg is None:
923
    msg = "can't use instance from outside %s states" % ", ".join(req_states)
924
  if instance.admin_state not in req_states:
925
    raise errors.OpPrereqError("Instance %s is marked to be %s, %s" %
926
                               (instance, instance.admin_state, msg),
927
                               errors.ECODE_STATE)
928

    
929
  if constants.ADMINST_UP not in req_states:
930
    pnode = instance.primary_node
931
    ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
932
    ins_l.Raise("Can't contact node %s for instance information" % pnode,
933
                prereq=True, ecode=errors.ECODE_ENVIRON)
934

    
935
    if instance.name in ins_l.payload:
936
      raise errors.OpPrereqError("Instance %s is running, %s" %
937
                                 (instance.name, msg), errors.ECODE_STATE)
938

    
939

    
940
def _ExpandItemName(fn, name, kind):
941
  """Expand an item name.
942

943
  @param fn: the function to use for expansion
944
  @param name: requested item name
945
  @param kind: text description ('Node' or 'Instance')
946
  @return: the resolved (full) name
947
  @raise errors.OpPrereqError: if the item is not found
948

949
  """
950
  full_name = fn(name)
951
  if full_name is None:
952
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
953
                               errors.ECODE_NOENT)
954
  return full_name
955

    
956

    
957
def _ExpandNodeName(cfg, name):
958
  """Wrapper over L{_ExpandItemName} for nodes."""
959
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
960

    
961

    
962
def _ExpandInstanceName(cfg, name):
963
  """Wrapper over L{_ExpandItemName} for instance."""
964
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
965

    
966

    
967
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
968
                          memory, vcpus, nics, disk_template, disks,
969
                          bep, hvp, hypervisor_name, tags):
970
  """Builds instance related env variables for hooks
971

972
  This builds the hook environment from individual variables.
973

974
  @type name: string
975
  @param name: the name of the instance
976
  @type primary_node: string
977
  @param primary_node: the name of the instance's primary node
978
  @type secondary_nodes: list
979
  @param secondary_nodes: list of secondary nodes as strings
980
  @type os_type: string
981
  @param os_type: the name of the instance's OS
982
  @type status: string
983
  @param status: the desired status of the instance
984
  @type memory: string
985
  @param memory: the memory size of the instance
986
  @type vcpus: string
987
  @param vcpus: the count of VCPUs the instance has
988
  @type nics: list
989
  @param nics: list of tuples (ip, mac, mode, link) representing
990
      the NICs the instance has
991
  @type disk_template: string
992
  @param disk_template: the disk template of the instance
993
  @type disks: list
994
  @param disks: the list of (size, mode) pairs
995
  @type bep: dict
996
  @param bep: the backend parameters for the instance
997
  @type hvp: dict
998
  @param hvp: the hypervisor parameters for the instance
999
  @type hypervisor_name: string
1000
  @param hypervisor_name: the hypervisor for the instance
1001
  @type tags: list
1002
  @param tags: list of instance tags as strings
1003
  @rtype: dict
1004
  @return: the hook environment for this instance
1005

1006
  """
1007
  env = {
1008
    "OP_TARGET": name,
1009
    "INSTANCE_NAME": name,
1010
    "INSTANCE_PRIMARY": primary_node,
1011
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
1012
    "INSTANCE_OS_TYPE": os_type,
1013
    "INSTANCE_STATUS": status,
1014
    "INSTANCE_MEMORY": memory,
1015
    "INSTANCE_VCPUS": vcpus,
1016
    "INSTANCE_DISK_TEMPLATE": disk_template,
1017
    "INSTANCE_HYPERVISOR": hypervisor_name,
1018
  }
1019

    
1020
  if nics:
1021
    nic_count = len(nics)
1022
    for idx, (ip, mac, mode, link) in enumerate(nics):
1023
      if ip is None:
1024
        ip = ""
1025
      env["INSTANCE_NIC%d_IP" % idx] = ip
1026
      env["INSTANCE_NIC%d_MAC" % idx] = mac
1027
      env["INSTANCE_NIC%d_MODE" % idx] = mode
1028
      env["INSTANCE_NIC%d_LINK" % idx] = link
1029
      if mode == constants.NIC_MODE_BRIDGED:
1030
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
1031
  else:
1032
    nic_count = 0
1033

    
1034
  env["INSTANCE_NIC_COUNT"] = nic_count
1035

    
1036
  if disks:
1037
    disk_count = len(disks)
1038
    for idx, (size, mode) in enumerate(disks):
1039
      env["INSTANCE_DISK%d_SIZE" % idx] = size
1040
      env["INSTANCE_DISK%d_MODE" % idx] = mode
1041
  else:
1042
    disk_count = 0
1043

    
1044
  env["INSTANCE_DISK_COUNT"] = disk_count
1045

    
1046
  if not tags:
1047
    tags = []
1048

    
1049
  env["INSTANCE_TAGS"] = " ".join(tags)
1050

    
1051
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
1052
    for key, value in source.items():
1053
      env["INSTANCE_%s_%s" % (kind, key)] = value
1054

    
1055
  return env
1056

    
1057

    
1058
def _NICListToTuple(lu, nics):
1059
  """Build a list of nic information tuples.
1060

1061
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
1062
  value in LUInstanceQueryData.
1063

1064
  @type lu:  L{LogicalUnit}
1065
  @param lu: the logical unit on whose behalf we execute
1066
  @type nics: list of L{objects.NIC}
1067
  @param nics: list of nics to convert to hooks tuples
1068

1069
  """
1070
  hooks_nics = []
1071
  cluster = lu.cfg.GetClusterInfo()
1072
  for nic in nics:
1073
    ip = nic.ip
1074
    mac = nic.mac
1075
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
1076
    mode = filled_params[constants.NIC_MODE]
1077
    link = filled_params[constants.NIC_LINK]
1078
    hooks_nics.append((ip, mac, mode, link))
1079
  return hooks_nics
1080

    
1081

    
1082
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
1083
  """Builds instance related env variables for hooks from an object.
1084

1085
  @type lu: L{LogicalUnit}
1086
  @param lu: the logical unit on whose behalf we execute
1087
  @type instance: L{objects.Instance}
1088
  @param instance: the instance for which we should build the
1089
      environment
1090
  @type override: dict
1091
  @param override: dictionary with key/values that will override
1092
      our values
1093
  @rtype: dict
1094
  @return: the hook environment dictionary
1095

1096
  """
1097
  cluster = lu.cfg.GetClusterInfo()
1098
  bep = cluster.FillBE(instance)
1099
  hvp = cluster.FillHV(instance)
1100
  args = {
1101
    "name": instance.name,
1102
    "primary_node": instance.primary_node,
1103
    "secondary_nodes": instance.secondary_nodes,
1104
    "os_type": instance.os,
1105
    "status": instance.admin_state,
1106
    "memory": bep[constants.BE_MEMORY],
1107
    "vcpus": bep[constants.BE_VCPUS],
1108
    "nics": _NICListToTuple(lu, instance.nics),
1109
    "disk_template": instance.disk_template,
1110
    "disks": [(disk.size, disk.mode) for disk in instance.disks],
1111
    "bep": bep,
1112
    "hvp": hvp,
1113
    "hypervisor_name": instance.hypervisor,
1114
    "tags": instance.tags,
1115
  }
1116
  if override:
1117
    args.update(override)
1118
  return _BuildInstanceHookEnv(**args) # pylint: disable=W0142
1119

    
1120

    
1121
def _AdjustCandidatePool(lu, exceptions):
1122
  """Adjust the candidate pool after node operations.
1123

1124
  """
1125
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1126
  if mod_list:
1127
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1128
               utils.CommaJoin(node.name for node in mod_list))
1129
    for name in mod_list:
1130
      lu.context.ReaddNode(name)
1131
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1132
  if mc_now > mc_max:
1133
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1134
               (mc_now, mc_max))
1135

    
1136

    
1137
def _DecideSelfPromotion(lu, exceptions=None):
1138
  """Decide whether I should promote myself as a master candidate.
1139

1140
  """
1141
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1142
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1143
  # the new node will increase mc_max with one, so:
1144
  mc_should = min(mc_should + 1, cp_size)
1145
  return mc_now < mc_should
1146

    
1147

    
1148
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1149
  """Check that the brigdes needed by a list of nics exist.
1150

1151
  """
1152
  cluster = lu.cfg.GetClusterInfo()
1153
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1154
  brlist = [params[constants.NIC_LINK] for params in paramslist
1155
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1156
  if brlist:
1157
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1158
    result.Raise("Error checking bridges on destination node '%s'" %
1159
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1160

    
1161

    
1162
def _CheckInstanceBridgesExist(lu, instance, node=None):
1163
  """Check that the brigdes needed by an instance exist.
1164

1165
  """
1166
  if node is None:
1167
    node = instance.primary_node
1168
  _CheckNicsBridgesExist(lu, instance.nics, node)
1169

    
1170

    
1171
def _CheckOSVariant(os_obj, name):
1172
  """Check whether an OS name conforms to the os variants specification.
1173

1174
  @type os_obj: L{objects.OS}
1175
  @param os_obj: OS object to check
1176
  @type name: string
1177
  @param name: OS name passed by the user, to check for validity
1178

1179
  """
1180
  variant = objects.OS.GetVariant(name)
1181
  if not os_obj.supported_variants:
1182
    if variant:
1183
      raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
1184
                                 " passed)" % (os_obj.name, variant),
1185
                                 errors.ECODE_INVAL)
1186
    return
1187
  if not variant:
1188
    raise errors.OpPrereqError("OS name must include a variant",
1189
                               errors.ECODE_INVAL)
1190

    
1191
  if variant not in os_obj.supported_variants:
1192
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1193

    
1194

    
1195
def _GetNodeInstancesInner(cfg, fn):
1196
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1197

    
1198

    
1199
def _GetNodeInstances(cfg, node_name):
1200
  """Returns a list of all primary and secondary instances on a node.
1201

1202
  """
1203

    
1204
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1205

    
1206

    
1207
def _GetNodePrimaryInstances(cfg, node_name):
1208
  """Returns primary instances on a node.
1209

1210
  """
1211
  return _GetNodeInstancesInner(cfg,
1212
                                lambda inst: node_name == inst.primary_node)
1213

    
1214

    
1215
def _GetNodeSecondaryInstances(cfg, node_name):
1216
  """Returns secondary instances on a node.
1217

1218
  """
1219
  return _GetNodeInstancesInner(cfg,
1220
                                lambda inst: node_name in inst.secondary_nodes)
1221

    
1222

    
1223
def _GetStorageTypeArgs(cfg, storage_type):
1224
  """Returns the arguments for a storage type.
1225

1226
  """
1227
  # Special case for file storage
1228
  if storage_type == constants.ST_FILE:
1229
    # storage.FileStorage wants a list of storage directories
1230
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1231

    
1232
  return []
1233

    
1234

    
1235
def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
1236
  faulty = []
1237

    
1238
  for dev in instance.disks:
1239
    cfg.SetDiskID(dev, node_name)
1240

    
1241
  result = rpc_runner.call_blockdev_getmirrorstatus(node_name, instance.disks)
1242
  result.Raise("Failed to get disk status from node %s" % node_name,
1243
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1244

    
1245
  for idx, bdev_status in enumerate(result.payload):
1246
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1247
      faulty.append(idx)
1248

    
1249
  return faulty
1250

    
1251

    
1252
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1253
  """Check the sanity of iallocator and node arguments and use the
1254
  cluster-wide iallocator if appropriate.
1255

1256
  Check that at most one of (iallocator, node) is specified. If none is
1257
  specified, then the LU's opcode's iallocator slot is filled with the
1258
  cluster-wide default iallocator.
1259

1260
  @type iallocator_slot: string
1261
  @param iallocator_slot: the name of the opcode iallocator slot
1262
  @type node_slot: string
1263
  @param node_slot: the name of the opcode target node slot
1264

1265
  """
1266
  node = getattr(lu.op, node_slot, None)
1267
  iallocator = getattr(lu.op, iallocator_slot, None)
1268

    
1269
  if node is not None and iallocator is not None:
1270
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
1271
                               errors.ECODE_INVAL)
1272
  elif node is None and iallocator is None:
1273
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1274
    if default_iallocator:
1275
      setattr(lu.op, iallocator_slot, default_iallocator)
1276
    else:
1277
      raise errors.OpPrereqError("No iallocator or node given and no"
1278
                                 " cluster-wide default iallocator found;"
1279
                                 " please specify either an iallocator or a"
1280
                                 " node, or set a cluster-wide default"
1281
                                 " iallocator")
1282

    
1283

    
1284
def _GetDefaultIAllocator(cfg, iallocator):
1285
  """Decides on which iallocator to use.
1286

1287
  @type cfg: L{config.ConfigWriter}
1288
  @param cfg: Cluster configuration object
1289
  @type iallocator: string or None
1290
  @param iallocator: Iallocator specified in opcode
1291
  @rtype: string
1292
  @return: Iallocator name
1293

1294
  """
1295
  if not iallocator:
1296
    # Use default iallocator
1297
    iallocator = cfg.GetDefaultIAllocator()
1298

    
1299
  if not iallocator:
1300
    raise errors.OpPrereqError("No iallocator was specified, neither in the"
1301
                               " opcode nor as a cluster-wide default",
1302
                               errors.ECODE_INVAL)
1303

    
1304
  return iallocator
1305

    
1306

    
1307
class LUClusterPostInit(LogicalUnit):
1308
  """Logical unit for running hooks after cluster initialization.
1309

1310
  """
1311
  HPATH = "cluster-init"
1312
  HTYPE = constants.HTYPE_CLUSTER
1313

    
1314
  def BuildHooksEnv(self):
1315
    """Build hooks env.
1316

1317
    """
1318
    return {
1319
      "OP_TARGET": self.cfg.GetClusterName(),
1320
      }
1321

    
1322
  def BuildHooksNodes(self):
1323
    """Build hooks nodes.
1324

1325
    """
1326
    return ([], [self.cfg.GetMasterNode()])
1327

    
1328
  def Exec(self, feedback_fn):
1329
    """Nothing to do.
1330

1331
    """
1332
    return True
1333

    
1334

    
1335
class LUClusterDestroy(LogicalUnit):
1336
  """Logical unit for destroying the cluster.
1337

1338
  """
1339
  HPATH = "cluster-destroy"
1340
  HTYPE = constants.HTYPE_CLUSTER
1341

    
1342
  def BuildHooksEnv(self):
1343
    """Build hooks env.
1344

1345
    """
1346
    return {
1347
      "OP_TARGET": self.cfg.GetClusterName(),
1348
      }
1349

    
1350
  def BuildHooksNodes(self):
1351
    """Build hooks nodes.
1352

1353
    """
1354
    return ([], [])
1355

    
1356
  def CheckPrereq(self):
1357
    """Check prerequisites.
1358

1359
    This checks whether the cluster is empty.
1360

1361
    Any errors are signaled by raising errors.OpPrereqError.
1362

1363
    """
1364
    master = self.cfg.GetMasterNode()
1365

    
1366
    nodelist = self.cfg.GetNodeList()
1367
    if len(nodelist) != 1 or nodelist[0] != master:
1368
      raise errors.OpPrereqError("There are still %d node(s) in"
1369
                                 " this cluster." % (len(nodelist) - 1),
1370
                                 errors.ECODE_INVAL)
1371
    instancelist = self.cfg.GetInstanceList()
1372
    if instancelist:
1373
      raise errors.OpPrereqError("There are still %d instance(s) in"
1374
                                 " this cluster." % len(instancelist),
1375
                                 errors.ECODE_INVAL)
1376

    
1377
  def Exec(self, feedback_fn):
1378
    """Destroys the cluster.
1379

1380
    """
1381
    master_params = self.cfg.GetMasterNetworkParameters()
1382

    
1383
    # Run post hooks on master node before it's removed
1384
    _RunPostHook(self, master_params.name)
1385

    
1386
    ems = self.cfg.GetUseExternalMipScript()
1387
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1388
                                                     master_params, ems)
1389
    result.Raise("Could not disable the master role")
1390

    
1391
    return master_params.name
1392

    
1393

    
1394
def _VerifyCertificate(filename):
1395
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1396

1397
  @type filename: string
1398
  @param filename: Path to PEM file
1399

1400
  """
1401
  try:
1402
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1403
                                           utils.ReadFile(filename))
1404
  except Exception, err: # pylint: disable=W0703
1405
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1406
            "Failed to load X509 certificate %s: %s" % (filename, err))
1407

    
1408
  (errcode, msg) = \
1409
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1410
                                constants.SSL_CERT_EXPIRATION_ERROR)
1411

    
1412
  if msg:
1413
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1414
  else:
1415
    fnamemsg = None
1416

    
1417
  if errcode is None:
1418
    return (None, fnamemsg)
1419
  elif errcode == utils.CERT_WARNING:
1420
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1421
  elif errcode == utils.CERT_ERROR:
1422
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1423

    
1424
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1425

    
1426

    
1427
def _GetAllHypervisorParameters(cluster, instances):
1428
  """Compute the set of all hypervisor parameters.
1429

1430
  @type cluster: L{objects.Cluster}
1431
  @param cluster: the cluster object
1432
  @param instances: list of L{objects.Instance}
1433
  @param instances: additional instances from which to obtain parameters
1434
  @rtype: list of (origin, hypervisor, parameters)
1435
  @return: a list with all parameters found, indicating the hypervisor they
1436
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1437

1438
  """
1439
  hvp_data = []
1440

    
1441
  for hv_name in cluster.enabled_hypervisors:
1442
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1443

    
1444
  for os_name, os_hvp in cluster.os_hvp.items():
1445
    for hv_name, hv_params in os_hvp.items():
1446
      if hv_params:
1447
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1448
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1449

    
1450
  # TODO: collapse identical parameter values in a single one
1451
  for instance in instances:
1452
    if instance.hvparams:
1453
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1454
                       cluster.FillHV(instance)))
1455

    
1456
  return hvp_data
1457

    
1458

    
1459
class _VerifyErrors(object):
1460
  """Mix-in for cluster/group verify LUs.
1461

1462
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1463
  self.op and self._feedback_fn to be available.)
1464

1465
  """
1466

    
1467
  ETYPE_FIELD = "code"
1468
  ETYPE_ERROR = "ERROR"
1469
  ETYPE_WARNING = "WARNING"
1470

    
1471
  def _Error(self, ecode, item, msg, *args, **kwargs):
1472
    """Format an error message.
1473

1474
    Based on the opcode's error_codes parameter, either format a
1475
    parseable error code, or a simpler error string.
1476

1477
    This must be called only from Exec and functions called from Exec.
1478

1479
    """
1480
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1481
    itype, etxt, _ = ecode
1482
    # first complete the msg
1483
    if args:
1484
      msg = msg % args
1485
    # then format the whole message
1486
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1487
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1488
    else:
1489
      if item:
1490
        item = " " + item
1491
      else:
1492
        item = ""
1493
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1494
    # and finally report it via the feedback_fn
1495
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1496

    
1497
  def _ErrorIf(self, cond, ecode, *args, **kwargs):
1498
    """Log an error message if the passed condition is True.
1499

1500
    """
1501
    cond = (bool(cond)
1502
            or self.op.debug_simulate_errors) # pylint: disable=E1101
1503

    
1504
    # If the error code is in the list of ignored errors, demote the error to a
1505
    # warning
1506
    (_, etxt, _) = ecode
1507
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1508
      kwargs[self.ETYPE_FIELD] = self.ETYPE_WARNING
1509

    
1510
    if cond:
1511
      self._Error(ecode, *args, **kwargs)
1512

    
1513
    # do not mark the operation as failed for WARN cases only
1514
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1515
      self.bad = self.bad or cond
1516

    
1517

    
1518
class LUClusterVerify(NoHooksLU):
1519
  """Submits all jobs necessary to verify the cluster.
1520

1521
  """
1522
  REQ_BGL = False
1523

    
1524
  def ExpandNames(self):
1525
    self.needed_locks = {}
1526

    
1527
  def Exec(self, feedback_fn):
1528
    jobs = []
1529

    
1530
    if self.op.group_name:
1531
      groups = [self.op.group_name]
1532
      depends_fn = lambda: None
1533
    else:
1534
      groups = self.cfg.GetNodeGroupList()
1535

    
1536
      # Verify global configuration
1537
      jobs.append([
1538
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors)
1539
        ])
1540

    
1541
      # Always depend on global verification
1542
      depends_fn = lambda: [(-len(jobs), [])]
1543

    
1544
    jobs.extend([opcodes.OpClusterVerifyGroup(group_name=group,
1545
                                            ignore_errors=self.op.ignore_errors,
1546
                                            depends=depends_fn())]
1547
                for group in groups)
1548

    
1549
    # Fix up all parameters
1550
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1551
      op.debug_simulate_errors = self.op.debug_simulate_errors
1552
      op.verbose = self.op.verbose
1553
      op.error_codes = self.op.error_codes
1554
      try:
1555
        op.skip_checks = self.op.skip_checks
1556
      except AttributeError:
1557
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1558

    
1559
    return ResultWithJobs(jobs)
1560

    
1561

    
1562
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1563
  """Verifies the cluster config.
1564

1565
  """
1566
  REQ_BGL = True
1567

    
1568
  def _VerifyHVP(self, hvp_data):
1569
    """Verifies locally the syntax of the hypervisor parameters.
1570

1571
    """
1572
    for item, hv_name, hv_params in hvp_data:
1573
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1574
             (item, hv_name))
1575
      try:
1576
        hv_class = hypervisor.GetHypervisor(hv_name)
1577
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1578
        hv_class.CheckParameterSyntax(hv_params)
1579
      except errors.GenericError, err:
1580
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1581

    
1582
  def ExpandNames(self):
1583
    # Information can be safely retrieved as the BGL is acquired in exclusive
1584
    # mode
1585
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
1586
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1587
    self.all_node_info = self.cfg.GetAllNodesInfo()
1588
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1589
    self.needed_locks = {}
1590

    
1591
  def Exec(self, feedback_fn):
1592
    """Verify integrity of cluster, performing various test on nodes.
1593

1594
    """
1595
    self.bad = False
1596
    self._feedback_fn = feedback_fn
1597

    
1598
    feedback_fn("* Verifying cluster config")
1599

    
1600
    for msg in self.cfg.VerifyConfig():
1601
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1602

    
1603
    feedback_fn("* Verifying cluster certificate files")
1604

    
1605
    for cert_filename in constants.ALL_CERT_FILES:
1606
      (errcode, msg) = _VerifyCertificate(cert_filename)
1607
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1608

    
1609
    feedback_fn("* Verifying hypervisor parameters")
1610

    
1611
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1612
                                                self.all_inst_info.values()))
1613

    
1614
    feedback_fn("* Verifying all nodes belong to an existing group")
1615

    
1616
    # We do this verification here because, should this bogus circumstance
1617
    # occur, it would never be caught by VerifyGroup, which only acts on
1618
    # nodes/instances reachable from existing node groups.
1619

    
1620
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1621
                         if node.group not in self.all_group_info)
1622

    
1623
    dangling_instances = {}
1624
    no_node_instances = []
1625

    
1626
    for inst in self.all_inst_info.values():
1627
      if inst.primary_node in dangling_nodes:
1628
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1629
      elif inst.primary_node not in self.all_node_info:
1630
        no_node_instances.append(inst.name)
1631

    
1632
    pretty_dangling = [
1633
        "%s (%s)" %
1634
        (node.name,
1635
         utils.CommaJoin(dangling_instances.get(node.name,
1636
                                                ["no instances"])))
1637
        for node in dangling_nodes]
1638

    
1639
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1640
                  None,
1641
                  "the following nodes (and their instances) belong to a non"
1642
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1643

    
1644
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1645
                  None,
1646
                  "the following instances have a non-existing primary-node:"
1647
                  " %s", utils.CommaJoin(no_node_instances))
1648

    
1649
    return not self.bad
1650

    
1651

    
1652
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1653
  """Verifies the status of a node group.
1654

1655
  """
1656
  HPATH = "cluster-verify"
1657
  HTYPE = constants.HTYPE_CLUSTER
1658
  REQ_BGL = False
1659

    
1660
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1661

    
1662
  class NodeImage(object):
1663
    """A class representing the logical and physical status of a node.
1664

1665
    @type name: string
1666
    @ivar name: the node name to which this object refers
1667
    @ivar volumes: a structure as returned from
1668
        L{ganeti.backend.GetVolumeList} (runtime)
1669
    @ivar instances: a list of running instances (runtime)
1670
    @ivar pinst: list of configured primary instances (config)
1671
    @ivar sinst: list of configured secondary instances (config)
1672
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1673
        instances for which this node is secondary (config)
1674
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1675
    @ivar dfree: free disk, as reported by the node (runtime)
1676
    @ivar offline: the offline status (config)
1677
    @type rpc_fail: boolean
1678
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1679
        not whether the individual keys were correct) (runtime)
1680
    @type lvm_fail: boolean
1681
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1682
    @type hyp_fail: boolean
1683
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1684
    @type ghost: boolean
1685
    @ivar ghost: whether this is a known node or not (config)
1686
    @type os_fail: boolean
1687
    @ivar os_fail: whether the RPC call didn't return valid OS data
1688
    @type oslist: list
1689
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1690
    @type vm_capable: boolean
1691
    @ivar vm_capable: whether the node can host instances
1692

1693
    """
1694
    def __init__(self, offline=False, name=None, vm_capable=True):
1695
      self.name = name
1696
      self.volumes = {}
1697
      self.instances = []
1698
      self.pinst = []
1699
      self.sinst = []
1700
      self.sbp = {}
1701
      self.mfree = 0
1702
      self.dfree = 0
1703
      self.offline = offline
1704
      self.vm_capable = vm_capable
1705
      self.rpc_fail = False
1706
      self.lvm_fail = False
1707
      self.hyp_fail = False
1708
      self.ghost = False
1709
      self.os_fail = False
1710
      self.oslist = {}
1711

    
1712
  def ExpandNames(self):
1713
    # This raises errors.OpPrereqError on its own:
1714
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1715

    
1716
    # Get instances in node group; this is unsafe and needs verification later
1717
    inst_names = self.cfg.GetNodeGroupInstances(self.group_uuid)
1718

    
1719
    self.needed_locks = {
1720
      locking.LEVEL_INSTANCE: inst_names,
1721
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1722
      locking.LEVEL_NODE: [],
1723
      }
1724

    
1725
    self.share_locks = _ShareAll()
1726

    
1727
  def DeclareLocks(self, level):
1728
    if level == locking.LEVEL_NODE:
1729
      # Get members of node group; this is unsafe and needs verification later
1730
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1731

    
1732
      all_inst_info = self.cfg.GetAllInstancesInfo()
1733

    
1734
      # In Exec(), we warn about mirrored instances that have primary and
1735
      # secondary living in separate node groups. To fully verify that
1736
      # volumes for these instances are healthy, we will need to do an
1737
      # extra call to their secondaries. We ensure here those nodes will
1738
      # be locked.
1739
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1740
        # Important: access only the instances whose lock is owned
1741
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1742
          nodes.update(all_inst_info[inst].secondary_nodes)
1743

    
1744
      self.needed_locks[locking.LEVEL_NODE] = nodes
1745

    
1746
  def CheckPrereq(self):
1747
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1748
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1749

    
1750
    group_nodes = set(self.group_info.members)
1751
    group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
1752

    
1753
    unlocked_nodes = \
1754
        group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1755

    
1756
    unlocked_instances = \
1757
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1758

    
1759
    if unlocked_nodes:
1760
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1761
                                 utils.CommaJoin(unlocked_nodes))
1762

    
1763
    if unlocked_instances:
1764
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1765
                                 utils.CommaJoin(unlocked_instances))
1766

    
1767
    self.all_node_info = self.cfg.GetAllNodesInfo()
1768
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1769

    
1770
    self.my_node_names = utils.NiceSort(group_nodes)
1771
    self.my_inst_names = utils.NiceSort(group_instances)
1772

    
1773
    self.my_node_info = dict((name, self.all_node_info[name])
1774
                             for name in self.my_node_names)
1775

    
1776
    self.my_inst_info = dict((name, self.all_inst_info[name])
1777
                             for name in self.my_inst_names)
1778

    
1779
    # We detect here the nodes that will need the extra RPC calls for verifying
1780
    # split LV volumes; they should be locked.
1781
    extra_lv_nodes = set()
1782

    
1783
    for inst in self.my_inst_info.values():
1784
      if inst.disk_template in constants.DTS_INT_MIRROR:
1785
        group = self.my_node_info[inst.primary_node].group
1786
        for nname in inst.secondary_nodes:
1787
          if self.all_node_info[nname].group != group:
1788
            extra_lv_nodes.add(nname)
1789

    
1790
    unlocked_lv_nodes = \
1791
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1792

    
1793
    if unlocked_lv_nodes:
1794
      raise errors.OpPrereqError("these nodes could be locked: %s" %
1795
                                 utils.CommaJoin(unlocked_lv_nodes))
1796
    self.extra_lv_nodes = list(extra_lv_nodes)
1797

    
1798
  def _VerifyNode(self, ninfo, nresult):
1799
    """Perform some basic validation on data returned from a node.
1800

1801
      - check the result data structure is well formed and has all the
1802
        mandatory fields
1803
      - check ganeti version
1804

1805
    @type ninfo: L{objects.Node}
1806
    @param ninfo: the node to check
1807
    @param nresult: the results from the node
1808
    @rtype: boolean
1809
    @return: whether overall this call was successful (and we can expect
1810
         reasonable values in the respose)
1811

1812
    """
1813
    node = ninfo.name
1814
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1815

    
1816
    # main result, nresult should be a non-empty dict
1817
    test = not nresult or not isinstance(nresult, dict)
1818
    _ErrorIf(test, constants.CV_ENODERPC, node,
1819
                  "unable to verify node: no data returned")
1820
    if test:
1821
      return False
1822

    
1823
    # compares ganeti version
1824
    local_version = constants.PROTOCOL_VERSION
1825
    remote_version = nresult.get("version", None)
1826
    test = not (remote_version and
1827
                isinstance(remote_version, (list, tuple)) and
1828
                len(remote_version) == 2)
1829
    _ErrorIf(test, constants.CV_ENODERPC, node,
1830
             "connection to node returned invalid data")
1831
    if test:
1832
      return False
1833

    
1834
    test = local_version != remote_version[0]
1835
    _ErrorIf(test, constants.CV_ENODEVERSION, node,
1836
             "incompatible protocol versions: master %s,"
1837
             " node %s", local_version, remote_version[0])
1838
    if test:
1839
      return False
1840

    
1841
    # node seems compatible, we can actually try to look into its results
1842

    
1843
    # full package version
1844
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1845
                  constants.CV_ENODEVERSION, node,
1846
                  "software version mismatch: master %s, node %s",
1847
                  constants.RELEASE_VERSION, remote_version[1],
1848
                  code=self.ETYPE_WARNING)
1849

    
1850
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1851
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1852
      for hv_name, hv_result in hyp_result.iteritems():
1853
        test = hv_result is not None
1854
        _ErrorIf(test, constants.CV_ENODEHV, node,
1855
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1856

    
1857
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1858
    if ninfo.vm_capable and isinstance(hvp_result, list):
1859
      for item, hv_name, hv_result in hvp_result:
1860
        _ErrorIf(True, constants.CV_ENODEHV, node,
1861
                 "hypervisor %s parameter verify failure (source %s): %s",
1862
                 hv_name, item, hv_result)
1863

    
1864
    test = nresult.get(constants.NV_NODESETUP,
1865
                       ["Missing NODESETUP results"])
1866
    _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1867
             "; ".join(test))
1868

    
1869
    return True
1870

    
1871
  def _VerifyNodeTime(self, ninfo, nresult,
1872
                      nvinfo_starttime, nvinfo_endtime):
1873
    """Check the node time.
1874

1875
    @type ninfo: L{objects.Node}
1876
    @param ninfo: the node to check
1877
    @param nresult: the remote results for the node
1878
    @param nvinfo_starttime: the start time of the RPC call
1879
    @param nvinfo_endtime: the end time of the RPC call
1880

1881
    """
1882
    node = ninfo.name
1883
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1884

    
1885
    ntime = nresult.get(constants.NV_TIME, None)
1886
    try:
1887
      ntime_merged = utils.MergeTime(ntime)
1888
    except (ValueError, TypeError):
1889
      _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1890
      return
1891

    
1892
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1893
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1894
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1895
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1896
    else:
1897
      ntime_diff = None
1898

    
1899
    _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1900
             "Node time diverges by at least %s from master node time",
1901
             ntime_diff)
1902

    
1903
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1904
    """Check the node LVM results.
1905

1906
    @type ninfo: L{objects.Node}
1907
    @param ninfo: the node to check
1908
    @param nresult: the remote results for the node
1909
    @param vg_name: the configured VG name
1910

1911
    """
1912
    if vg_name is None:
1913
      return
1914

    
1915
    node = ninfo.name
1916
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1917

    
1918
    # checks vg existence and size > 20G
1919
    vglist = nresult.get(constants.NV_VGLIST, None)
1920
    test = not vglist
1921
    _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1922
    if not test:
1923
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1924
                                            constants.MIN_VG_SIZE)
1925
      _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1926

    
1927
    # check pv names
1928
    pvlist = nresult.get(constants.NV_PVLIST, None)
1929
    test = pvlist is None
1930
    _ErrorIf(test, constants.CV_ENODELVM, node, "Can't get PV list from node")
1931
    if not test:
1932
      # check that ':' is not present in PV names, since it's a
1933
      # special character for lvcreate (denotes the range of PEs to
1934
      # use on the PV)
1935
      for _, pvname, owner_vg in pvlist:
1936
        test = ":" in pvname
1937
        _ErrorIf(test, constants.CV_ENODELVM, node,
1938
                 "Invalid character ':' in PV '%s' of VG '%s'",
1939
                 pvname, owner_vg)
1940

    
1941
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1942
    """Check the node bridges.
1943

1944
    @type ninfo: L{objects.Node}
1945
    @param ninfo: the node to check
1946
    @param nresult: the remote results for the node
1947
    @param bridges: the expected list of bridges
1948

1949
    """
1950
    if not bridges:
1951
      return
1952

    
1953
    node = ninfo.name
1954
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1955

    
1956
    missing = nresult.get(constants.NV_BRIDGES, None)
1957
    test = not isinstance(missing, list)
1958
    _ErrorIf(test, constants.CV_ENODENET, node,
1959
             "did not return valid bridge information")
1960
    if not test:
1961
      _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1962
               "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1963

    
1964
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1965
    """Check the results of user scripts presence and executability on the node
1966

1967
    @type ninfo: L{objects.Node}
1968
    @param ninfo: the node to check
1969
    @param nresult: the remote results for the node
1970

1971
    """
1972
    node = ninfo.name
1973

    
1974
    test = not constants.NV_USERSCRIPTS in nresult
1975
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1976
                  "did not return user scripts information")
1977

    
1978
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1979
    if not test:
1980
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1981
                    "user scripts not present or not executable: %s" %
1982
                    utils.CommaJoin(sorted(broken_scripts)))
1983

    
1984
  def _VerifyNodeNetwork(self, ninfo, nresult):
1985
    """Check the node network connectivity results.
1986

1987
    @type ninfo: L{objects.Node}
1988
    @param ninfo: the node to check
1989
    @param nresult: the remote results for the node
1990

1991
    """
1992
    node = ninfo.name
1993
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1994

    
1995
    test = constants.NV_NODELIST not in nresult
1996
    _ErrorIf(test, constants.CV_ENODESSH, node,
1997
             "node hasn't returned node ssh connectivity data")
1998
    if not test:
1999
      if nresult[constants.NV_NODELIST]:
2000
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2001
          _ErrorIf(True, constants.CV_ENODESSH, node,
2002
                   "ssh communication with node '%s': %s", a_node, a_msg)
2003

    
2004
    test = constants.NV_NODENETTEST not in nresult
2005
    _ErrorIf(test, constants.CV_ENODENET, node,
2006
             "node hasn't returned node tcp connectivity data")
2007
    if not test:
2008
      if nresult[constants.NV_NODENETTEST]:
2009
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2010
        for anode in nlist:
2011
          _ErrorIf(True, constants.CV_ENODENET, node,
2012
                   "tcp communication with node '%s': %s",
2013
                   anode, nresult[constants.NV_NODENETTEST][anode])
2014

    
2015
    test = constants.NV_MASTERIP not in nresult
2016
    _ErrorIf(test, constants.CV_ENODENET, node,
2017
             "node hasn't returned node master IP reachability data")
2018
    if not test:
2019
      if not nresult[constants.NV_MASTERIP]:
2020
        if node == self.master_node:
2021
          msg = "the master node cannot reach the master IP (not configured?)"
2022
        else:
2023
          msg = "cannot reach the master IP"
2024
        _ErrorIf(True, constants.CV_ENODENET, node, msg)
2025

    
2026
  def _VerifyInstance(self, instance, instanceconfig, node_image,
2027
                      diskstatus):
2028
    """Verify an instance.
2029

2030
    This function checks to see if the required block devices are
2031
    available on the instance's node.
2032

2033
    """
2034
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2035
    node_current = instanceconfig.primary_node
2036

    
2037
    node_vol_should = {}
2038
    instanceconfig.MapLVsByNode(node_vol_should)
2039

    
2040
    for node in node_vol_should:
2041
      n_img = node_image[node]
2042
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2043
        # ignore missing volumes on offline or broken nodes
2044
        continue
2045
      for volume in node_vol_should[node]:
2046
        test = volume not in n_img.volumes
2047
        _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
2048
                 "volume %s missing on node %s", volume, node)
2049

    
2050
    if instanceconfig.admin_state == constants.ADMINST_UP:
2051
      pri_img = node_image[node_current]
2052
      test = instance not in pri_img.instances and not pri_img.offline
2053
      _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
2054
               "instance not running on its primary node %s",
2055
               node_current)
2056

    
2057
    diskdata = [(nname, success, status, idx)
2058
                for (nname, disks) in diskstatus.items()
2059
                for idx, (success, status) in enumerate(disks)]
2060

    
2061
    for nname, success, bdev_status, idx in diskdata:
2062
      # the 'ghost node' construction in Exec() ensures that we have a
2063
      # node here
2064
      snode = node_image[nname]
2065
      bad_snode = snode.ghost or snode.offline
2066
      _ErrorIf(instanceconfig.admin_state == constants.ADMINST_UP and
2067
               not success and not bad_snode,
2068
               constants.CV_EINSTANCEFAULTYDISK, instance,
2069
               "couldn't retrieve status for disk/%s on %s: %s",
2070
               idx, nname, bdev_status)
2071
      _ErrorIf((instanceconfig.admin_state == constants.ADMINST_UP and
2072
                success and bdev_status.ldisk_status == constants.LDS_FAULTY),
2073
               constants.CV_EINSTANCEFAULTYDISK, instance,
2074
               "disk/%s on %s is faulty", idx, nname)
2075

    
2076
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2077
    """Verify if there are any unknown volumes in the cluster.
2078

2079
    The .os, .swap and backup volumes are ignored. All other volumes are
2080
    reported as unknown.
2081

2082
    @type reserved: L{ganeti.utils.FieldSet}
2083
    @param reserved: a FieldSet of reserved volume names
2084

2085
    """
2086
    for node, n_img in node_image.items():
2087
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2088
        # skip non-healthy nodes
2089
        continue
2090
      for volume in n_img.volumes:
2091
        test = ((node not in node_vol_should or
2092
                volume not in node_vol_should[node]) and
2093
                not reserved.Matches(volume))
2094
        self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
2095
                      "volume %s is unknown", volume)
2096

    
2097
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
2098
    """Verify N+1 Memory Resilience.
2099

2100
    Check that if one single node dies we can still start all the
2101
    instances it was primary for.
2102

2103
    """
2104
    cluster_info = self.cfg.GetClusterInfo()
2105
    for node, n_img in node_image.items():
2106
      # This code checks that every node which is now listed as
2107
      # secondary has enough memory to host all instances it is
2108
      # supposed to should a single other node in the cluster fail.
2109
      # FIXME: not ready for failover to an arbitrary node
2110
      # FIXME: does not support file-backed instances
2111
      # WARNING: we currently take into account down instances as well
2112
      # as up ones, considering that even if they're down someone
2113
      # might want to start them even in the event of a node failure.
2114
      if n_img.offline:
2115
        # we're skipping offline nodes from the N+1 warning, since
2116
        # most likely we don't have good memory infromation from them;
2117
        # we already list instances living on such nodes, and that's
2118
        # enough warning
2119
        continue
2120
      for prinode, instances in n_img.sbp.items():
2121
        needed_mem = 0
2122
        for instance in instances:
2123
          bep = cluster_info.FillBE(instance_cfg[instance])
2124
          if bep[constants.BE_AUTO_BALANCE]:
2125
            needed_mem += bep[constants.BE_MEMORY]
2126
        test = n_img.mfree < needed_mem
2127
        self._ErrorIf(test, constants.CV_ENODEN1, node,
2128
                      "not enough memory to accomodate instance failovers"
2129
                      " should node %s fail (%dMiB needed, %dMiB available)",
2130
                      prinode, needed_mem, n_img.mfree)
2131

    
2132
  @classmethod
2133
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
2134
                   (files_all, files_opt, files_mc, files_vm)):
2135
    """Verifies file checksums collected from all nodes.
2136

2137
    @param errorif: Callback for reporting errors
2138
    @param nodeinfo: List of L{objects.Node} objects
2139
    @param master_node: Name of master node
2140
    @param all_nvinfo: RPC results
2141

2142
    """
2143
    # Define functions determining which nodes to consider for a file
2144
    files2nodefn = [
2145
      (files_all, None),
2146
      (files_mc, lambda node: (node.master_candidate or
2147
                               node.name == master_node)),
2148
      (files_vm, lambda node: node.vm_capable),
2149
      ]
2150

    
2151
    # Build mapping from filename to list of nodes which should have the file
2152
    nodefiles = {}
2153
    for (files, fn) in files2nodefn:
2154
      if fn is None:
2155
        filenodes = nodeinfo
2156
      else:
2157
        filenodes = filter(fn, nodeinfo)
2158
      nodefiles.update((filename,
2159
                        frozenset(map(operator.attrgetter("name"), filenodes)))
2160
                       for filename in files)
2161

    
2162
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2163

    
2164
    fileinfo = dict((filename, {}) for filename in nodefiles)
2165
    ignore_nodes = set()
2166

    
2167
    for node in nodeinfo:
2168
      if node.offline:
2169
        ignore_nodes.add(node.name)
2170
        continue
2171

    
2172
      nresult = all_nvinfo[node.name]
2173

    
2174
      if nresult.fail_msg or not nresult.payload:
2175
        node_files = None
2176
      else:
2177
        node_files = nresult.payload.get(constants.NV_FILELIST, None)
2178

    
2179
      test = not (node_files and isinstance(node_files, dict))
2180
      errorif(test, constants.CV_ENODEFILECHECK, node.name,
2181
              "Node did not return file checksum data")
2182
      if test:
2183
        ignore_nodes.add(node.name)
2184
        continue
2185

    
2186
      # Build per-checksum mapping from filename to nodes having it
2187
      for (filename, checksum) in node_files.items():
2188
        assert filename in nodefiles
2189
        fileinfo[filename].setdefault(checksum, set()).add(node.name)
2190

    
2191
    for (filename, checksums) in fileinfo.items():
2192
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2193

    
2194
      # Nodes having the file
2195
      with_file = frozenset(node_name
2196
                            for nodes in fileinfo[filename].values()
2197
                            for node_name in nodes) - ignore_nodes
2198

    
2199
      expected_nodes = nodefiles[filename] - ignore_nodes
2200

    
2201
      # Nodes missing file
2202
      missing_file = expected_nodes - with_file
2203

    
2204
      if filename in files_opt:
2205
        # All or no nodes
2206
        errorif(missing_file and missing_file != expected_nodes,
2207
                constants.CV_ECLUSTERFILECHECK, None,
2208
                "File %s is optional, but it must exist on all or no"
2209
                " nodes (not found on %s)",
2210
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2211
      else:
2212
        errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2213
                "File %s is missing from node(s) %s", filename,
2214
                utils.CommaJoin(utils.NiceSort(missing_file)))
2215

    
2216
        # Warn if a node has a file it shouldn't
2217
        unexpected = with_file - expected_nodes
2218
        errorif(unexpected,
2219
                constants.CV_ECLUSTERFILECHECK, None,
2220
                "File %s should not exist on node(s) %s",
2221
                filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2222

    
2223
      # See if there are multiple versions of the file
2224
      test = len(checksums) > 1
2225
      if test:
2226
        variants = ["variant %s on %s" %
2227
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2228
                    for (idx, (checksum, nodes)) in
2229
                      enumerate(sorted(checksums.items()))]
2230
      else:
2231
        variants = []
2232

    
2233
      errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2234
              "File %s found with %s different checksums (%s)",
2235
              filename, len(checksums), "; ".join(variants))
2236

    
2237
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2238
                      drbd_map):
2239
    """Verifies and the node DRBD status.
2240

2241
    @type ninfo: L{objects.Node}
2242
    @param ninfo: the node to check
2243
    @param nresult: the remote results for the node
2244
    @param instanceinfo: the dict of instances
2245
    @param drbd_helper: the configured DRBD usermode helper
2246
    @param drbd_map: the DRBD map as returned by
2247
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2248

2249
    """
2250
    node = ninfo.name
2251
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2252

    
2253
    if drbd_helper:
2254
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2255
      test = (helper_result == None)
2256
      _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2257
               "no drbd usermode helper returned")
2258
      if helper_result:
2259
        status, payload = helper_result
2260
        test = not status
2261
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2262
                 "drbd usermode helper check unsuccessful: %s", payload)
2263
        test = status and (payload != drbd_helper)
2264
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2265
                 "wrong drbd usermode helper: %s", payload)
2266

    
2267
    # compute the DRBD minors
2268
    node_drbd = {}
2269
    for minor, instance in drbd_map[node].items():
2270
      test = instance not in instanceinfo
2271
      _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2272
               "ghost instance '%s' in temporary DRBD map", instance)
2273
        # ghost instance should not be running, but otherwise we
2274
        # don't give double warnings (both ghost instance and
2275
        # unallocated minor in use)
2276
      if test:
2277
        node_drbd[minor] = (instance, False)
2278
      else:
2279
        instance = instanceinfo[instance]
2280
        node_drbd[minor] = (instance.name,
2281
                            instance.admin_state == constants.ADMINST_UP)
2282

    
2283
    # and now check them
2284
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2285
    test = not isinstance(used_minors, (tuple, list))
2286
    _ErrorIf(test, constants.CV_ENODEDRBD, node,
2287
             "cannot parse drbd status file: %s", str(used_minors))
2288
    if test:
2289
      # we cannot check drbd status
2290
      return
2291

    
2292
    for minor, (iname, must_exist) in node_drbd.items():
2293
      test = minor not in used_minors and must_exist
2294
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2295
               "drbd minor %d of instance %s is not active", minor, iname)
2296
    for minor in used_minors:
2297
      test = minor not in node_drbd
2298
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2299
               "unallocated drbd minor %d is in use", minor)
2300

    
2301
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2302
    """Builds the node OS structures.
2303

2304
    @type ninfo: L{objects.Node}
2305
    @param ninfo: the node to check
2306
    @param nresult: the remote results for the node
2307
    @param nimg: the node image object
2308

2309
    """
2310
    node = ninfo.name
2311
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2312

    
2313
    remote_os = nresult.get(constants.NV_OSLIST, None)
2314
    test = (not isinstance(remote_os, list) or
2315
            not compat.all(isinstance(v, list) and len(v) == 7
2316
                           for v in remote_os))
2317

    
2318
    _ErrorIf(test, constants.CV_ENODEOS, node,
2319
             "node hasn't returned valid OS data")
2320

    
2321
    nimg.os_fail = test
2322

    
2323
    if test:
2324
      return
2325

    
2326
    os_dict = {}
2327

    
2328
    for (name, os_path, status, diagnose,
2329
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2330

    
2331
      if name not in os_dict:
2332
        os_dict[name] = []
2333

    
2334
      # parameters is a list of lists instead of list of tuples due to
2335
      # JSON lacking a real tuple type, fix it:
2336
      parameters = [tuple(v) for v in parameters]
2337
      os_dict[name].append((os_path, status, diagnose,
2338
                            set(variants), set(parameters), set(api_ver)))
2339

    
2340
    nimg.oslist = os_dict
2341

    
2342
  def _VerifyNodeOS(self, ninfo, nimg, base):
2343
    """Verifies the node OS list.
2344

2345
    @type ninfo: L{objects.Node}
2346
    @param ninfo: the node to check
2347
    @param nimg: the node image object
2348
    @param base: the 'template' node we match against (e.g. from the master)
2349

2350
    """
2351
    node = ninfo.name
2352
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2353

    
2354
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2355

    
2356
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2357
    for os_name, os_data in nimg.oslist.items():
2358
      assert os_data, "Empty OS status for OS %s?!" % os_name
2359
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2360
      _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2361
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2362
      _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2363
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2364
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2365
      # comparisons with the 'base' image
2366
      test = os_name not in base.oslist
2367
      _ErrorIf(test, constants.CV_ENODEOS, node,
2368
               "Extra OS %s not present on reference node (%s)",
2369
               os_name, base.name)
2370
      if test:
2371
        continue
2372
      assert base.oslist[os_name], "Base node has empty OS status?"
2373
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2374
      if not b_status:
2375
        # base OS is invalid, skipping
2376
        continue
2377
      for kind, a, b in [("API version", f_api, b_api),
2378
                         ("variants list", f_var, b_var),
2379
                         ("parameters", beautify_params(f_param),
2380
                          beautify_params(b_param))]:
2381
        _ErrorIf(a != b, constants.CV_ENODEOS, node,
2382
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2383
                 kind, os_name, base.name,
2384
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2385

    
2386
    # check any missing OSes
2387
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2388
    _ErrorIf(missing, constants.CV_ENODEOS, node,
2389
             "OSes present on reference node %s but missing on this node: %s",
2390
             base.name, utils.CommaJoin(missing))
2391

    
2392
  def _VerifyOob(self, ninfo, nresult):
2393
    """Verifies out of band functionality of a node.
2394

2395
    @type ninfo: L{objects.Node}
2396
    @param ninfo: the node to check
2397
    @param nresult: the remote results for the node
2398

2399
    """
2400
    node = ninfo.name
2401
    # We just have to verify the paths on master and/or master candidates
2402
    # as the oob helper is invoked on the master
2403
    if ((ninfo.master_candidate or ninfo.master_capable) and
2404
        constants.NV_OOB_PATHS in nresult):
2405
      for path_result in nresult[constants.NV_OOB_PATHS]:
2406
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2407

    
2408
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2409
    """Verifies and updates the node volume data.
2410

2411
    This function will update a L{NodeImage}'s internal structures
2412
    with data from the remote call.
2413

2414
    @type ninfo: L{objects.Node}
2415
    @param ninfo: the node to check
2416
    @param nresult: the remote results for the node
2417
    @param nimg: the node image object
2418
    @param vg_name: the configured VG name
2419

2420
    """
2421
    node = ninfo.name
2422
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2423

    
2424
    nimg.lvm_fail = True
2425
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2426
    if vg_name is None:
2427
      pass
2428
    elif isinstance(lvdata, basestring):
2429
      _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2430
               utils.SafeEncode(lvdata))
2431
    elif not isinstance(lvdata, dict):
2432
      _ErrorIf(True, constants.CV_ENODELVM, node,
2433
               "rpc call to node failed (lvlist)")
2434
    else:
2435
      nimg.volumes = lvdata
2436
      nimg.lvm_fail = False
2437

    
2438
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2439
    """Verifies and updates the node instance list.
2440

2441
    If the listing was successful, then updates this node's instance
2442
    list. Otherwise, it marks the RPC call as failed for the instance
2443
    list key.
2444

2445
    @type ninfo: L{objects.Node}
2446
    @param ninfo: the node to check
2447
    @param nresult: the remote results for the node
2448
    @param nimg: the node image object
2449

2450
    """
2451
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2452
    test = not isinstance(idata, list)
2453
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2454
                  "rpc call to node failed (instancelist): %s",
2455
                  utils.SafeEncode(str(idata)))
2456
    if test:
2457
      nimg.hyp_fail = True
2458
    else:
2459
      nimg.instances = idata
2460

    
2461
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2462
    """Verifies and computes a node information map
2463

2464
    @type ninfo: L{objects.Node}
2465
    @param ninfo: the node to check
2466
    @param nresult: the remote results for the node
2467
    @param nimg: the node image object
2468
    @param vg_name: the configured VG name
2469

2470
    """
2471
    node = ninfo.name
2472
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2473

    
2474
    # try to read free memory (from the hypervisor)
2475
    hv_info = nresult.get(constants.NV_HVINFO, None)
2476
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2477
    _ErrorIf(test, constants.CV_ENODEHV, node,
2478
             "rpc call to node failed (hvinfo)")
2479
    if not test:
2480
      try:
2481
        nimg.mfree = int(hv_info["memory_free"])
2482
      except (ValueError, TypeError):
2483
        _ErrorIf(True, constants.CV_ENODERPC, node,
2484
                 "node returned invalid nodeinfo, check hypervisor")
2485

    
2486
    # FIXME: devise a free space model for file based instances as well
2487
    if vg_name is not None:
2488
      test = (constants.NV_VGLIST not in nresult or
2489
              vg_name not in nresult[constants.NV_VGLIST])
2490
      _ErrorIf(test, constants.CV_ENODELVM, node,
2491
               "node didn't return data for the volume group '%s'"
2492
               " - it is either missing or broken", vg_name)
2493
      if not test:
2494
        try:
2495
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2496
        except (ValueError, TypeError):
2497
          _ErrorIf(True, constants.CV_ENODERPC, node,
2498
                   "node returned invalid LVM info, check LVM status")
2499

    
2500
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2501
    """Gets per-disk status information for all instances.
2502

2503
    @type nodelist: list of strings
2504
    @param nodelist: Node names
2505
    @type node_image: dict of (name, L{objects.Node})
2506
    @param node_image: Node objects
2507
    @type instanceinfo: dict of (name, L{objects.Instance})
2508
    @param instanceinfo: Instance objects
2509
    @rtype: {instance: {node: [(succes, payload)]}}
2510
    @return: a dictionary of per-instance dictionaries with nodes as
2511
        keys and disk information as values; the disk information is a
2512
        list of tuples (success, payload)
2513

2514
    """
2515
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2516

    
2517
    node_disks = {}
2518
    node_disks_devonly = {}
2519
    diskless_instances = set()
2520
    diskless = constants.DT_DISKLESS
2521

    
2522
    for nname in nodelist:
2523
      node_instances = list(itertools.chain(node_image[nname].pinst,
2524
                                            node_image[nname].sinst))
2525
      diskless_instances.update(inst for inst in node_instances
2526
                                if instanceinfo[inst].disk_template == diskless)
2527
      disks = [(inst, disk)
2528
               for inst in node_instances
2529
               for disk in instanceinfo[inst].disks]
2530

    
2531
      if not disks:
2532
        # No need to collect data
2533
        continue
2534

    
2535
      node_disks[nname] = disks
2536

    
2537
      # Creating copies as SetDiskID below will modify the objects and that can
2538
      # lead to incorrect data returned from nodes
2539
      devonly = [dev.Copy() for (_, dev) in disks]
2540

    
2541
      for dev in devonly:
2542
        self.cfg.SetDiskID(dev, nname)
2543

    
2544
      node_disks_devonly[nname] = devonly
2545

    
2546
    assert len(node_disks) == len(node_disks_devonly)
2547

    
2548
    # Collect data from all nodes with disks
2549
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2550
                                                          node_disks_devonly)
2551

    
2552
    assert len(result) == len(node_disks)
2553

    
2554
    instdisk = {}
2555

    
2556
    for (nname, nres) in result.items():
2557
      disks = node_disks[nname]
2558

    
2559
      if nres.offline:
2560
        # No data from this node
2561
        data = len(disks) * [(False, "node offline")]
2562
      else:
2563
        msg = nres.fail_msg
2564
        _ErrorIf(msg, constants.CV_ENODERPC, nname,
2565
                 "while getting disk information: %s", msg)
2566
        if msg:
2567
          # No data from this node
2568
          data = len(disks) * [(False, msg)]
2569
        else:
2570
          data = []
2571
          for idx, i in enumerate(nres.payload):
2572
            if isinstance(i, (tuple, list)) and len(i) == 2:
2573
              data.append(i)
2574
            else:
2575
              logging.warning("Invalid result from node %s, entry %d: %s",
2576
                              nname, idx, i)
2577
              data.append((False, "Invalid result from the remote node"))
2578

    
2579
      for ((inst, _), status) in zip(disks, data):
2580
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2581

    
2582
    # Add empty entries for diskless instances.
2583
    for inst in diskless_instances:
2584
      assert inst not in instdisk
2585
      instdisk[inst] = {}
2586

    
2587
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2588
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2589
                      compat.all(isinstance(s, (tuple, list)) and
2590
                                 len(s) == 2 for s in statuses)
2591
                      for inst, nnames in instdisk.items()
2592
                      for nname, statuses in nnames.items())
2593
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2594

    
2595
    return instdisk
2596

    
2597
  @staticmethod
2598
  def _SshNodeSelector(group_uuid, all_nodes):
2599
    """Create endless iterators for all potential SSH check hosts.
2600

2601
    """
2602
    nodes = [node for node in all_nodes
2603
             if (node.group != group_uuid and
2604
                 not node.offline)]
2605
    keyfunc = operator.attrgetter("group")
2606

    
2607
    return map(itertools.cycle,
2608
               [sorted(map(operator.attrgetter("name"), names))
2609
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2610
                                                  keyfunc)])
2611

    
2612
  @classmethod
2613
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2614
    """Choose which nodes should talk to which other nodes.
2615

2616
    We will make nodes contact all nodes in their group, and one node from
2617
    every other group.
2618

2619
    @warning: This algorithm has a known issue if one node group is much
2620
      smaller than others (e.g. just one node). In such a case all other
2621
      nodes will talk to the single node.
2622

2623
    """
2624
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2625
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2626

    
2627
    return (online_nodes,
2628
            dict((name, sorted([i.next() for i in sel]))
2629
                 for name in online_nodes))
2630

    
2631
  def BuildHooksEnv(self):
2632
    """Build hooks env.
2633

2634
    Cluster-Verify hooks just ran in the post phase and their failure makes
2635
    the output be logged in the verify output and the verification to fail.
2636

2637
    """
2638
    env = {
2639
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2640
      }
2641

    
2642
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2643
               for node in self.my_node_info.values())
2644

    
2645
    return env
2646

    
2647
  def BuildHooksNodes(self):
2648
    """Build hooks nodes.
2649

2650
    """
2651
    return ([], self.my_node_names)
2652

    
2653
  def Exec(self, feedback_fn):
2654
    """Verify integrity of the node group, performing various test on nodes.
2655

2656
    """
2657
    # This method has too many local variables. pylint: disable=R0914
2658
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2659

    
2660
    if not self.my_node_names:
2661
      # empty node group
2662
      feedback_fn("* Empty node group, skipping verification")
2663
      return True
2664

    
2665
    self.bad = False
2666
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2667
    verbose = self.op.verbose
2668
    self._feedback_fn = feedback_fn
2669

    
2670
    vg_name = self.cfg.GetVGName()
2671
    drbd_helper = self.cfg.GetDRBDHelper()
2672
    cluster = self.cfg.GetClusterInfo()
2673
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2674
    hypervisors = cluster.enabled_hypervisors
2675
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2676

    
2677
    i_non_redundant = [] # Non redundant instances
2678
    i_non_a_balanced = [] # Non auto-balanced instances
2679
    i_offline = 0 # Count of offline instances
2680
    n_offline = 0 # Count of offline nodes
2681
    n_drained = 0 # Count of nodes being drained
2682
    node_vol_should = {}
2683

    
2684
    # FIXME: verify OS list
2685

    
2686
    # File verification
2687
    filemap = _ComputeAncillaryFiles(cluster, False)
2688

    
2689
    # do local checksums
2690
    master_node = self.master_node = self.cfg.GetMasterNode()
2691
    master_ip = self.cfg.GetMasterIP()
2692

    
2693
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2694

    
2695
    user_scripts = []
2696
    if self.cfg.GetUseExternalMipScript():
2697
      user_scripts.append(constants.EXTERNAL_MASTER_SETUP_SCRIPT)
2698

    
2699
    node_verify_param = {
2700
      constants.NV_FILELIST:
2701
        utils.UniqueSequence(filename
2702
                             for files in filemap
2703
                             for filename in files),
2704
      constants.NV_NODELIST:
2705
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2706
                                  self.all_node_info.values()),
2707
      constants.NV_HYPERVISOR: hypervisors,
2708
      constants.NV_HVPARAMS:
2709
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2710
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2711
                                 for node in node_data_list
2712
                                 if not node.offline],
2713
      constants.NV_INSTANCELIST: hypervisors,
2714
      constants.NV_VERSION: None,
2715
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2716
      constants.NV_NODESETUP: None,
2717
      constants.NV_TIME: None,
2718
      constants.NV_MASTERIP: (master_node, master_ip),
2719
      constants.NV_OSLIST: None,
2720
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2721
      constants.NV_USERSCRIPTS: user_scripts,
2722
      }
2723

    
2724
    if vg_name is not None:
2725
      node_verify_param[constants.NV_VGLIST] = None
2726
      node_verify_param[constants.NV_LVLIST] = vg_name
2727
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2728
      node_verify_param[constants.NV_DRBDLIST] = None
2729

    
2730
    if drbd_helper:
2731
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2732

    
2733
    # bridge checks
2734
    # FIXME: this needs to be changed per node-group, not cluster-wide
2735
    bridges = set()
2736
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2737
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2738
      bridges.add(default_nicpp[constants.NIC_LINK])
2739
    for instance in self.my_inst_info.values():
2740
      for nic in instance.nics:
2741
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2742
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2743
          bridges.add(full_nic[constants.NIC_LINK])
2744

    
2745
    if bridges:
2746
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2747

    
2748
    # Build our expected cluster state
2749
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2750
                                                 name=node.name,
2751
                                                 vm_capable=node.vm_capable))
2752
                      for node in node_data_list)
2753

    
2754
    # Gather OOB paths
2755
    oob_paths = []
2756
    for node in self.all_node_info.values():
2757
      path = _SupportsOob(self.cfg, node)
2758
      if path and path not in oob_paths:
2759
        oob_paths.append(path)
2760

    
2761
    if oob_paths:
2762
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2763

    
2764
    for instance in self.my_inst_names:
2765
      inst_config = self.my_inst_info[instance]
2766

    
2767
      for nname in inst_config.all_nodes:
2768
        if nname not in node_image:
2769
          gnode = self.NodeImage(name=nname)
2770
          gnode.ghost = (nname not in self.all_node_info)
2771
          node_image[nname] = gnode
2772

    
2773
      inst_config.MapLVsByNode(node_vol_should)
2774

    
2775
      pnode = inst_config.primary_node
2776
      node_image[pnode].pinst.append(instance)
2777

    
2778
      for snode in inst_config.secondary_nodes:
2779
        nimg = node_image[snode]
2780
        nimg.sinst.append(instance)
2781
        if pnode not in nimg.sbp:
2782
          nimg.sbp[pnode] = []
2783
        nimg.sbp[pnode].append(instance)
2784

    
2785
    # At this point, we have the in-memory data structures complete,
2786
    # except for the runtime information, which we'll gather next
2787

    
2788
    # Due to the way our RPC system works, exact response times cannot be
2789
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2790
    # time before and after executing the request, we can at least have a time
2791
    # window.
2792
    nvinfo_starttime = time.time()
2793
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2794
                                           node_verify_param,
2795
                                           self.cfg.GetClusterName())
2796
    nvinfo_endtime = time.time()
2797

    
2798
    if self.extra_lv_nodes and vg_name is not None:
2799
      extra_lv_nvinfo = \
2800
          self.rpc.call_node_verify(self.extra_lv_nodes,
2801
                                    {constants.NV_LVLIST: vg_name},
2802
                                    self.cfg.GetClusterName())
2803
    else:
2804
      extra_lv_nvinfo = {}
2805

    
2806
    all_drbd_map = self.cfg.ComputeDRBDMap()
2807

    
2808
    feedback_fn("* Gathering disk information (%s nodes)" %
2809
                len(self.my_node_names))
2810
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2811
                                     self.my_inst_info)
2812

    
2813
    feedback_fn("* Verifying configuration file consistency")
2814

    
2815
    # If not all nodes are being checked, we need to make sure the master node
2816
    # and a non-checked vm_capable node are in the list.
2817
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2818
    if absent_nodes:
2819
      vf_nvinfo = all_nvinfo.copy()
2820
      vf_node_info = list(self.my_node_info.values())
2821
      additional_nodes = []
2822
      if master_node not in self.my_node_info:
2823
        additional_nodes.append(master_node)
2824
        vf_node_info.append(self.all_node_info[master_node])
2825
      # Add the first vm_capable node we find which is not included
2826
      for node in absent_nodes:
2827
        nodeinfo = self.all_node_info[node]
2828
        if nodeinfo.vm_capable and not nodeinfo.offline:
2829
          additional_nodes.append(node)
2830
          vf_node_info.append(self.all_node_info[node])
2831
          break
2832
      key = constants.NV_FILELIST
2833
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2834
                                                 {key: node_verify_param[key]},
2835
                                                 self.cfg.GetClusterName()))
2836
    else:
2837
      vf_nvinfo = all_nvinfo
2838
      vf_node_info = self.my_node_info.values()
2839

    
2840
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2841

    
2842
    feedback_fn("* Verifying node status")
2843

    
2844
    refos_img = None
2845

    
2846
    for node_i in node_data_list:
2847
      node = node_i.name
2848
      nimg = node_image[node]
2849

    
2850
      if node_i.offline:
2851
        if verbose:
2852
          feedback_fn("* Skipping offline node %s" % (node,))
2853
        n_offline += 1
2854
        continue
2855

    
2856
      if node == master_node:
2857
        ntype = "master"
2858
      elif node_i.master_candidate:
2859
        ntype = "master candidate"
2860
      elif node_i.drained:
2861
        ntype = "drained"
2862
        n_drained += 1
2863
      else:
2864
        ntype = "regular"
2865
      if verbose:
2866
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2867

    
2868
      msg = all_nvinfo[node].fail_msg
2869
      _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2870
               msg)
2871
      if msg:
2872
        nimg.rpc_fail = True
2873
        continue
2874

    
2875
      nresult = all_nvinfo[node].payload
2876

    
2877
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2878
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2879
      self._VerifyNodeNetwork(node_i, nresult)
2880
      self._VerifyNodeUserScripts(node_i, nresult)
2881
      self._VerifyOob(node_i, nresult)
2882

    
2883
      if nimg.vm_capable:
2884
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2885
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2886
                             all_drbd_map)
2887

    
2888
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2889
        self._UpdateNodeInstances(node_i, nresult, nimg)
2890
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2891
        self._UpdateNodeOS(node_i, nresult, nimg)
2892

    
2893
        if not nimg.os_fail:
2894
          if refos_img is None:
2895
            refos_img = nimg
2896
          self._VerifyNodeOS(node_i, nimg, refos_img)
2897
        self._VerifyNodeBridges(node_i, nresult, bridges)
2898

    
2899
        # Check whether all running instancies are primary for the node. (This
2900
        # can no longer be done from _VerifyInstance below, since some of the
2901
        # wrong instances could be from other node groups.)
2902
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2903

    
2904
        for inst in non_primary_inst:
2905
          # FIXME: investigate best way to handle offline insts
2906
          if inst.admin_state == constants.ADMINST_OFFLINE:
2907
            if verbose:
2908
              feedback_fn("* Skipping offline instance %s" % inst.name)
2909
            i_offline += 1
2910
            continue
2911
          test = inst in self.all_inst_info
2912
          _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2913
                   "instance should not run on node %s", node_i.name)
2914
          _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2915
                   "node is running unknown instance %s", inst)
2916

    
2917
    for node, result in extra_lv_nvinfo.items():
2918
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2919
                              node_image[node], vg_name)
2920

    
2921
    feedback_fn("* Verifying instance status")
2922
    for instance in self.my_inst_names:
2923
      if verbose:
2924
        feedback_fn("* Verifying instance %s" % instance)
2925
      inst_config = self.my_inst_info[instance]
2926
      self._VerifyInstance(instance, inst_config, node_image,
2927
                           instdisk[instance])
2928
      inst_nodes_offline = []
2929

    
2930
      pnode = inst_config.primary_node
2931
      pnode_img = node_image[pnode]
2932
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2933
               constants.CV_ENODERPC, pnode, "instance %s, connection to"
2934
               " primary node failed", instance)
2935

    
2936
      _ErrorIf(inst_config.admin_state == constants.ADMINST_UP and
2937
               pnode_img.offline,
2938
               constants.CV_EINSTANCEBADNODE, instance,
2939
               "instance is marked as running and lives on offline node %s",
2940
               inst_config.primary_node)
2941

    
2942
      # If the instance is non-redundant we cannot survive losing its primary
2943
      # node, so we are not N+1 compliant. On the other hand we have no disk
2944
      # templates with more than one secondary so that situation is not well
2945
      # supported either.
2946
      # FIXME: does not support file-backed instances
2947
      if not inst_config.secondary_nodes:
2948
        i_non_redundant.append(instance)
2949

    
2950
      _ErrorIf(len(inst_config.secondary_nodes) > 1,
2951
               constants.CV_EINSTANCELAYOUT,
2952
               instance, "instance has multiple secondary nodes: %s",
2953
               utils.CommaJoin(inst_config.secondary_nodes),
2954
               code=self.ETYPE_WARNING)
2955

    
2956
      if inst_config.disk_template in constants.DTS_INT_MIRROR:
2957
        pnode = inst_config.primary_node
2958
        instance_nodes = utils.NiceSort(inst_config.all_nodes)
2959
        instance_groups = {}
2960

    
2961
        for node in instance_nodes:
2962
          instance_groups.setdefault(self.all_node_info[node].group,
2963
                                     []).append(node)
2964

    
2965
        pretty_list = [
2966
          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
2967
          # Sort so that we always list the primary node first.
2968
          for group, nodes in sorted(instance_groups.items(),
2969
                                     key=lambda (_, nodes): pnode in nodes,
2970
                                     reverse=True)]
2971

    
2972
        self._ErrorIf(len(instance_groups) > 1,
2973
                      constants.CV_EINSTANCESPLITGROUPS,
2974
                      instance, "instance has primary and secondary nodes in"
2975
                      " different groups: %s", utils.CommaJoin(pretty_list),
2976
                      code=self.ETYPE_WARNING)
2977

    
2978
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2979
        i_non_a_balanced.append(instance)
2980

    
2981
      for snode in inst_config.secondary_nodes:
2982
        s_img = node_image[snode]
2983
        _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2984
                 snode, "instance %s, connection to secondary node failed",
2985
                 instance)
2986

    
2987
        if s_img.offline:
2988
          inst_nodes_offline.append(snode)
2989

    
2990
      # warn that the instance lives on offline nodes
2991
      _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
2992
               "instance has offline secondary node(s) %s",
2993
               utils.CommaJoin(inst_nodes_offline))
2994
      # ... or ghost/non-vm_capable nodes
2995
      for node in inst_config.all_nodes:
2996
        _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
2997
                 instance, "instance lives on ghost node %s", node)
2998
        _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
2999
                 instance, "instance lives on non-vm_capable node %s", node)
3000

    
3001
    feedback_fn("* Verifying orphan volumes")
3002
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3003

    
3004
    # We will get spurious "unknown volume" warnings if any node of this group
3005
    # is secondary for an instance whose primary is in another group. To avoid
3006
    # them, we find these instances and add their volumes to node_vol_should.
3007
    for inst in self.all_inst_info.values():
3008
      for secondary in inst.secondary_nodes:
3009
        if (secondary in self.my_node_info
3010
            and inst.name not in self.my_inst_info):
3011
          inst.MapLVsByNode(node_vol_should)
3012
          break
3013

    
3014
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3015

    
3016
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3017
      feedback_fn("* Verifying N+1 Memory redundancy")
3018
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3019

    
3020
    feedback_fn("* Other Notes")
3021
    if i_non_redundant:
3022
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3023
                  % len(i_non_redundant))
3024

    
3025
    if i_non_a_balanced:
3026
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3027
                  % len(i_non_a_balanced))
3028

    
3029
    if i_offline:
3030
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3031

    
3032
    if n_offline:
3033
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3034

    
3035
    if n_drained:
3036
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3037

    
3038
    return not self.bad
3039

    
3040
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3041
    """Analyze the post-hooks' result
3042

3043
    This method analyses the hook result, handles it, and sends some
3044
    nicely-formatted feedback back to the user.
3045

3046
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3047
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3048
    @param hooks_results: the results of the multi-node hooks rpc call
3049
    @param feedback_fn: function used send feedback back to the caller
3050
    @param lu_result: previous Exec result
3051
    @return: the new Exec result, based on the previous result
3052
        and hook results
3053

3054
    """
3055
    # We only really run POST phase hooks, only for non-empty groups,
3056
    # and are only interested in their results
3057
    if not self.my_node_names:
3058
      # empty node group
3059
      pass
3060
    elif phase == constants.HOOKS_PHASE_POST:
3061
      # Used to change hooks' output to proper indentation
3062
      feedback_fn("* Hooks Results")
3063
      assert hooks_results, "invalid result from hooks"
3064

    
3065
      for node_name in hooks_results:
3066
        res = hooks_results[node_name]
3067
        msg = res.fail_msg
3068
        test = msg and not res.offline
3069
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3070
                      "Communication failure in hooks execution: %s", msg)
3071
        if res.offline or msg:
3072
          # No need to investigate payload if node is offline or gave
3073
          # an error.
3074
          continue
3075
        for script, hkr, output in res.payload:
3076
          test = hkr == constants.HKR_FAIL
3077
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3078
                        "Script %s failed, output:", script)
3079
          if test:
3080
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3081
            feedback_fn("%s" % output)
3082
            lu_result = False
3083

    
3084
    return lu_result
3085

    
3086

    
3087
class LUClusterVerifyDisks(NoHooksLU):
3088
  """Verifies the cluster disks status.
3089

3090
  """
3091
  REQ_BGL = False
3092

    
3093
  def ExpandNames(self):
3094
    self.share_locks = _ShareAll()
3095
    self.needed_locks = {
3096
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3097
      }
3098

    
3099
  def Exec(self, feedback_fn):
3100
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3101

    
3102
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3103
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3104
                           for group in group_names])
3105

    
3106

    
3107
class LUGroupVerifyDisks(NoHooksLU):
3108
  """Verifies the status of all disks in a node group.
3109

3110
  """
3111
  REQ_BGL = False
3112

    
3113
  def ExpandNames(self):
3114
    # Raises errors.OpPrereqError on its own if group can't be found
3115
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
3116

    
3117
    self.share_locks = _ShareAll()
3118
    self.needed_locks = {
3119
      locking.LEVEL_INSTANCE: [],
3120
      locking.LEVEL_NODEGROUP: [],
3121
      locking.LEVEL_NODE: [],
3122
      }
3123

    
3124
  def DeclareLocks(self, level):
3125
    if level == locking.LEVEL_INSTANCE:
3126
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
3127

    
3128
      # Lock instances optimistically, needs verification once node and group
3129
      # locks have been acquired
3130
      self.needed_locks[locking.LEVEL_INSTANCE] = \
3131
        self.cfg.GetNodeGroupInstances(self.group_uuid)
3132

    
3133
    elif level == locking.LEVEL_NODEGROUP:
3134
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3135

    
3136
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
3137
        set([self.group_uuid] +
3138
            # Lock all groups used by instances optimistically; this requires
3139
            # going via the node before it's locked, requiring verification
3140
            # later on
3141
            [group_uuid
3142
             for instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
3143
             for group_uuid in self.cfg.GetInstanceNodeGroups(instance_name)])
3144

    
3145
    elif level == locking.LEVEL_NODE:
3146
      # This will only lock the nodes in the group to be verified which contain
3147
      # actual instances
3148
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3149
      self._LockInstancesNodes()
3150

    
3151
      # Lock all nodes in group to be verified
3152
      assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
3153
      member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
3154
      self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
3155

    
3156
  def CheckPrereq(self):
3157
    owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
3158
    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
3159
    owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
3160

    
3161
    assert self.group_uuid in owned_groups
3162

    
3163
    # Check if locked instances are still correct
3164
    _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
3165

    
3166
    # Get instance information
3167
    self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
3168

    
3169
    # Check if node groups for locked instances are still correct
3170
    for (instance_name, inst) in self.instances.items():
3171
      assert owned_nodes.issuperset(inst.all_nodes), \
3172
        "Instance %s's nodes changed while we kept the lock" % instance_name
3173

    
3174
      inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
3175
                                             owned_groups)
3176

    
3177
      assert self.group_uuid in inst_groups, \
3178
        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
3179

    
3180
  def Exec(self, feedback_fn):
3181
    """Verify integrity of cluster disks.
3182

3183
    @rtype: tuple of three items
3184
    @return: a tuple of (dict of node-to-node_error, list of instances
3185
        which need activate-disks, dict of instance: (node, volume) for
3186
        missing volumes
3187

3188
    """
3189
    res_nodes = {}
3190
    res_instances = set()
3191
    res_missing = {}
3192

    
3193
    nv_dict = _MapInstanceDisksToNodes([inst
3194
            for inst in self.instances.values()
3195
            if inst.admin_state == constants.ADMINST_UP])
3196

    
3197
    if nv_dict:
3198
      nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
3199
                             set(self.cfg.GetVmCapableNodeList()))
3200

    
3201
      node_lvs = self.rpc.call_lv_list(nodes, [])
3202

    
3203
      for (node, node_res) in node_lvs.items():
3204
        if node_res.offline:
3205
          continue
3206

    
3207
        msg = node_res.fail_msg
3208
        if msg:
3209
          logging.warning("Error enumerating LVs on node %s: %s", node, msg)
3210
          res_nodes[node] = msg
3211
          continue
3212

    
3213
        for lv_name, (_, _, lv_online) in node_res.payload.items():
3214
          inst = nv_dict.pop((node, lv_name), None)
3215
          if not (lv_online or inst is None):
3216
            res_instances.add(inst)
3217

    
3218
      # any leftover items in nv_dict are missing LVs, let's arrange the data
3219
      # better
3220
      for key, inst in nv_dict.iteritems():
3221
        res_missing.setdefault(inst, []).append(list(key))
3222

    
3223
    return (res_nodes, list(res_instances), res_missing)
3224

    
3225

    
3226
class LUClusterRepairDiskSizes(NoHooksLU):
3227
  """Verifies the cluster disks sizes.
3228

3229
  """
3230
  REQ_BGL = False
3231

    
3232
  def ExpandNames(self):
3233
    if self.op.instances:
3234
      self.wanted_names = _GetWantedInstances(self, self.op.instances)
3235
      self.needed_locks = {
3236
        locking.LEVEL_NODE_RES: [],
3237
        locking.LEVEL_INSTANCE: self.wanted_names,
3238
        }
3239
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
3240
    else:
3241
      self.wanted_names = None
3242
      self.needed_locks = {
3243
        locking.LEVEL_NODE_RES: locking.ALL_SET,
3244
        locking.LEVEL_INSTANCE: locking.ALL_SET,
3245
        }
3246
    self.share_locks = _ShareAll()
3247

    
3248
  def DeclareLocks(self, level):
3249
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
3250
      self._LockInstancesNodes(primary_only=True, level=level)
3251

    
3252
  def CheckPrereq(self):
3253
    """Check prerequisites.
3254

3255
    This only checks the optional instance list against the existing names.
3256

3257
    """
3258
    if self.wanted_names is None:
3259
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
3260

    
3261
    self.wanted_instances = \
3262
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
3263

    
3264
  def _EnsureChildSizes(self, disk):
3265
    """Ensure children of the disk have the needed disk size.
3266

3267
    This is valid mainly for DRBD8 and fixes an issue where the
3268
    children have smaller disk size.
3269

3270
    @param disk: an L{ganeti.objects.Disk} object
3271

3272
    """
3273
    if disk.dev_type == constants.LD_DRBD8:
3274
      assert disk.children, "Empty children for DRBD8?"
3275
      fchild = disk.children[0]
3276
      mismatch = fchild.size < disk.size
3277
      if mismatch:
3278
        self.LogInfo("Child disk has size %d, parent %d, fixing",
3279
                     fchild.size, disk.size)
3280
        fchild.size = disk.size
3281

    
3282
      # and we recurse on this child only, not on the metadev
3283
      return self._EnsureChildSizes(fchild) or mismatch
3284
    else:
3285
      return False
3286

    
3287
  def Exec(self, feedback_fn):
3288
    """Verify the size of cluster disks.
3289

3290
    """
3291
    # TODO: check child disks too
3292
    # TODO: check differences in size between primary/secondary nodes
3293
    per_node_disks = {}
3294
    for instance in self.wanted_instances:
3295
      pnode = instance.primary_node
3296
      if pnode not in per_node_disks:
3297
        per_node_disks[pnode] = []
3298
      for idx, disk in enumerate(instance.disks):
3299
        per_node_disks[pnode].append((instance, idx, disk))
3300

    
3301
    assert not (frozenset(per_node_disks.keys()) -
3302
                self.owned_locks(locking.LEVEL_NODE_RES)), \
3303
      "Not owning correct locks"
3304
    assert not self.owned_locks(locking.LEVEL_NODE)
3305

    
3306
    changed = []
3307
    for node, dskl in per_node_disks.items():
3308
      newl = [v[2].Copy() for v in dskl]
3309
      for dsk in newl:
3310
        self.cfg.SetDiskID(dsk, node)
3311
      result = self.rpc.call_blockdev_getsize(node, newl)
3312
      if result.fail_msg:
3313
        self.LogWarning("Failure in blockdev_getsize call to node"
3314
                        " %s, ignoring", node)
3315
        continue
3316
      if len(result.payload) != len(dskl):
3317
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
3318
                        " result.payload=%s", node, len(dskl), result.payload)
3319
        self.LogWarning("Invalid result from node %s, ignoring node results",
3320
                        node)
3321
        continue
3322
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
3323
        if size is None:
3324
          self.LogWarning("Disk %d of instance %s did not return size"
3325
                          " information, ignoring", idx, instance.name)
3326
          continue
3327
        if not isinstance(size, (int, long)):
3328
          self.LogWarning("Disk %d of instance %s did not return valid"
3329
                          " size information, ignoring", idx, instance.name)
3330
          continue
3331
        size = size >> 20
3332
        if size != disk.size:
3333
          self.LogInfo("Disk %d of instance %s has mismatched size,"
3334
                       " correcting: recorded %d, actual %d", idx,
3335
                       instance.name, disk.size, size)
3336
          disk.size = size
3337
          self.cfg.Update(instance, feedback_fn)
3338
          changed.append((instance.name, idx, size))
3339
        if self._EnsureChildSizes(disk):
3340
          self.cfg.Update(instance, feedback_fn)
3341
          changed.append((instance.name, idx, disk.size))
3342
    return changed
3343

    
3344

    
3345
class LUClusterRename(LogicalUnit):
3346
  """Rename the cluster.
3347

3348
  """
3349
  HPATH = "cluster-rename"
3350
  HTYPE = constants.HTYPE_CLUSTER
3351

    
3352
  def BuildHooksEnv(self):
3353
    """Build hooks env.
3354

3355
    """
3356
    return {
3357
      "OP_TARGET": self.cfg.GetClusterName(),
3358
      "NEW_NAME": self.op.name,
3359
      }
3360

    
3361
  def BuildHooksNodes(self):
3362
    """Build hooks nodes.
3363

3364
    """
3365
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
3366

    
3367
  def CheckPrereq(self):
3368
    """Verify that the passed name is a valid one.
3369

3370
    """
3371
    hostname = netutils.GetHostname(name=self.op.name,
3372
                                    family=self.cfg.GetPrimaryIPFamily())
3373

    
3374
    new_name = hostname.name
3375
    self.ip = new_ip = hostname.ip
3376
    old_name = self.cfg.GetClusterName()
3377
    old_ip = self.cfg.GetMasterIP()
3378
    if new_name == old_name and new_ip == old_ip:
3379
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
3380
                                 " cluster has changed",
3381
                                 errors.ECODE_INVAL)
3382
    if new_ip != old_ip:
3383
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
3384
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
3385
                                   " reachable on the network" %
3386
                                   new_ip, errors.ECODE_NOTUNIQUE)
3387

    
3388
    self.op.name = new_name
3389

    
3390
  def Exec(self, feedback_fn):
3391
    """Rename the cluster.
3392

3393
    """
3394
    clustername = self.op.name
3395
    new_ip = self.ip
3396

    
3397
    # shutdown the master IP
3398
    master_params = self.cfg.GetMasterNetworkParameters()
3399
    ems = self.cfg.GetUseExternalMipScript()
3400
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
3401
                                                     master_params, ems)
3402
    result.Raise("Could not disable the master role")
3403

    
3404
    try:
3405
      cluster = self.cfg.GetClusterInfo()
3406
      cluster.cluster_name = clustername
3407
      cluster.master_ip = new_ip
3408
      self.cfg.Update(cluster, feedback_fn)
3409

    
3410
      # update the known hosts file
3411
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
3412
      node_list = self.cfg.GetOnlineNodeList()
3413
      try:
3414
        node_list.remove(master_params.name)
3415
      except ValueError:
3416
        pass
3417
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
3418
    finally:
3419
      master_params.ip = new_ip
3420
      result = self.rpc.call_node_activate_master_ip(master_params.name,
3421
                                                     master_params, ems)
3422
      msg = result.fail_msg
3423
      if msg:
3424
        self.LogWarning("Could not re-enable the master role on"
3425
                        " the master, please restart manually: %s", msg)
3426

    
3427
    return clustername
3428

    
3429

    
3430
def _ValidateNetmask(cfg, netmask):
3431
  """Checks if a netmask is valid.
3432

3433
  @type cfg: L{config.ConfigWriter}
3434
  @param cfg: The cluster configuration
3435
  @type netmask: int
3436
  @param netmask: the netmask to be verified
3437
  @raise errors.OpPrereqError: if the validation fails
3438

3439
  """
3440
  ip_family = cfg.GetPrimaryIPFamily()
3441
  try:
3442
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
3443
  except errors.ProgrammerError:
3444
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
3445
                               ip_family)
3446
  if not ipcls.ValidateNetmask(netmask):
3447
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
3448
                                (netmask))
3449

    
3450

    
3451
class LUClusterSetParams(LogicalUnit):
3452
  """Change the parameters of the cluster.
3453

3454
  """
3455
  HPATH = "cluster-modify"
3456
  HTYPE = constants.HTYPE_CLUSTER
3457
  REQ_BGL = False
3458

    
3459
  def CheckArguments(self):
3460
    """Check parameters
3461

3462
    """
3463
    if self.op.uid_pool:
3464
      uidpool.CheckUidPool(self.op.uid_pool)
3465

    
3466
    if self.op.add_uids:
3467
      uidpool.CheckUidPool(self.op.add_uids)
3468

    
3469
    if self.op.remove_uids:
3470
      uidpool.CheckUidPool(self.op.remove_uids)
3471

    
3472
    if self.op.master_netmask is not None:
3473
      _ValidateNetmask(self.cfg, self.op.master_netmask)
3474

    
3475
  def ExpandNames(self):
3476
    # FIXME: in the future maybe other cluster params won't require checking on
3477
    # all nodes to be modified.
3478
    self.needed_locks = {
3479
      locking.LEVEL_NODE: locking.ALL_SET,
3480
    }
3481
    self.share_locks[locking.LEVEL_NODE] = 1
3482

    
3483
  def BuildHooksEnv(self):
3484
    """Build hooks env.
3485

3486
    """
3487
    return {
3488
      "OP_TARGET": self.cfg.GetClusterName(),
3489
      "NEW_VG_NAME": self.op.vg_name,
3490
      }
3491

    
3492
  def BuildHooksNodes(self):
3493
    """Build hooks nodes.
3494

3495
    """
3496
    mn = self.cfg.GetMasterNode()
3497
    return ([mn], [mn])
3498

    
3499
  def CheckPrereq(self):
3500
    """Check prerequisites.
3501

3502
    This checks whether the given params don't conflict and
3503
    if the given volume group is valid.
3504

3505
    """
3506
    if self.op.vg_name is not None and not self.op.vg_name:
3507
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
3508
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
3509
                                   " instances exist", errors.ECODE_INVAL)
3510

    
3511
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
3512
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
3513
        raise errors.OpPrereqError("Cannot disable drbd helper while"
3514
                                   " drbd-based instances exist",
3515
                                   errors.ECODE_INVAL)
3516

    
3517
    node_list = self.owned_locks(locking.LEVEL_NODE)
3518

    
3519
    # if vg_name not None, checks given volume group on all nodes
3520
    if self.op.vg_name:
3521
      vglist = self.rpc.call_vg_list(node_list)
3522
      for node in node_list:
3523
        msg = vglist[node].fail_msg
3524
        if msg:
3525
          # ignoring down node
3526
          self.LogWarning("Error while gathering data on node %s"
3527
                          " (ignoring node): %s", node, msg)
3528
          continue
3529
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
3530
                                              self.op.vg_name,
3531
                                              constants.MIN_VG_SIZE)
3532
        if vgstatus:
3533
          raise errors.OpPrereqError("Error on node '%s': %s" %
3534
                                     (node, vgstatus), errors.ECODE_ENVIRON)
3535

    
3536
    if self.op.drbd_helper:
3537
      # checks given drbd helper on all nodes
3538
      helpers = self.rpc.call_drbd_helper(node_list)
3539
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
3540
        if ninfo.offline:
3541
          self.LogInfo("Not checking drbd helper on offline node %s", node)
3542
          continue
3543
        msg = helpers[node].fail_msg
3544
        if msg:
3545
          raise errors.OpPrereqError("Error checking drbd helper on node"
3546
                                     " '%s': %s" % (node, msg),
3547
                                     errors.ECODE_ENVIRON)
3548
        node_helper = helpers[node].payload
3549
        if node_helper != self.op.drbd_helper:
3550
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
3551
                                     (node, node_helper), errors.ECODE_ENVIRON)
3552

    
3553
    self.cluster = cluster = self.cfg.GetClusterInfo()
3554
    # validate params changes
3555
    if self.op.beparams:
3556
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
3557
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
3558

    
3559
    if self.op.ndparams:
3560
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
3561
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
3562

    
3563
      # TODO: we need a more general way to handle resetting
3564
      # cluster-level parameters to default values
3565
      if self.new_ndparams["oob_program"] == "":
3566
        self.new_ndparams["oob_program"] = \
3567
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
3568

    
3569
    if self.op.nicparams:
3570
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
3571
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
3572
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
3573
      nic_errors = []
3574

    
3575
      # check all instances for consistency
3576
      for instance in self.cfg.GetAllInstancesInfo().values():
3577
        for nic_idx, nic in enumerate(instance.nics):
3578
          params_copy = copy.deepcopy(nic.nicparams)
3579
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
3580

    
3581
          # check parameter syntax
3582
          try:
3583
            objects.NIC.CheckParameterSyntax(params_filled)
3584
          except errors.ConfigurationError, err:
3585
            nic_errors.append("Instance %s, nic/%d: %s" %
3586
                              (instance.name, nic_idx, err))
3587

    
3588
          # if we're moving instances to routed, check that they have an ip
3589
          target_mode = params_filled[constants.NIC_MODE]
3590
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
3591
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
3592
                              " address" % (instance.name, nic_idx))
3593
      if nic_errors:
3594
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
3595
                                   "\n".join(nic_errors))
3596

    
3597
    # hypervisor list/parameters
3598
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
3599
    if self.op.hvparams:
3600
      for hv_name, hv_dict in self.op.hvparams.items():
3601
        if hv_name not in self.new_hvparams:
3602
          self.new_hvparams[hv_name] = hv_dict
3603
        else:
3604
          self.new_hvparams[hv_name].update(hv_dict)
3605

    
3606
    # os hypervisor parameters
3607
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
3608
    if self.op.os_hvp:
3609
      for os_name, hvs in self.op.os_hvp.items():
3610
        if os_name not in self.new_os_hvp:
3611
          self.new_os_hvp[os_name] = hvs
3612
        else:
3613
          for hv_name, hv_dict in hvs.items():
3614
            if hv_name not in self.new_os_hvp[os_name]:
3615
              self.new_os_hvp[os_name][hv_name] = hv_dict
3616
            else:
3617
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
3618

    
3619
    # os parameters
3620
    self.new_osp = objects.FillDict(cluster.osparams, {})
3621
    if self.op.osparams:
3622
      for os_name, osp in self.op.osparams.items():
3623
        if os_name not in self.new_osp:
3624
          self.new_osp[os_name] = {}
3625

    
3626
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
3627
                                                  use_none=True)
3628

    
3629
        if not self.new_osp[os_name]:
3630
          # we removed all parameters
3631
          del self.new_osp[os_name]
3632
        else:
3633
          # check the parameter validity (remote check)
3634
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
3635
                         os_name, self.new_osp[os_name])
3636

    
3637
    # changes to the hypervisor list
3638
    if self.op.enabled_hypervisors is not None:
3639
      self.hv_list = self.op.enabled_hypervisors
3640
      for hv in self.hv_list:
3641
        # if the hypervisor doesn't already exist in the cluster
3642
        # hvparams, we initialize it to empty, and then (in both
3643
        # cases) we make sure to fill the defaults, as we might not
3644
        # have a complete defaults list if the hypervisor wasn't
3645
        # enabled before
3646
        if hv not in new_hvp:
3647
          new_hvp[hv] = {}
3648
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
3649
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
3650
    else:
3651
      self.hv_list = cluster.enabled_hypervisors
3652

    
3653
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
3654
      # either the enabled list has changed, or the parameters have, validate
3655
      for hv_name, hv_params in self.new_hvparams.items():
3656
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
3657
            (self.op.enabled_hypervisors and
3658
             hv_name in self.op.enabled_hypervisors)):
3659
          # either this is a new hypervisor, or its parameters have changed
3660
          hv_class = hypervisor.GetHypervisor(hv_name)
3661
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3662
          hv_class.CheckParameterSyntax(hv_params)
3663
          _CheckHVParams(self, node_list, hv_name, hv_params)
3664

    
3665
    if self.op.os_hvp:
3666
      # no need to check any newly-enabled hypervisors, since the
3667
      # defaults have already been checked in the above code-block
3668
      for os_name, os_hvp in self.new_os_hvp.items():
3669
        for hv_name, hv_params in os_hvp.items():
3670
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3671
          # we need to fill in the new os_hvp on top of the actual hv_p
3672
          cluster_defaults = self.new_hvparams.get(hv_name, {})
3673
          new_osp = objects.FillDict(cluster_defaults, hv_params)
3674
          hv_class = hypervisor.GetHypervisor(hv_name)
3675
          hv_class.CheckParameterSyntax(new_osp)
3676
          _CheckHVParams(self, node_list, hv_name, new_osp)
3677

    
3678
    if self.op.default_iallocator:
3679
      alloc_script = utils.FindFile(self.op.default_iallocator,
3680
                                    constants.IALLOCATOR_SEARCH_PATH,
3681
                                    os.path.isfile)
3682
      if alloc_script is None:
3683
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
3684
                                   " specified" % self.op.default_iallocator,
3685
                                   errors.ECODE_INVAL)
3686

    
3687
  def Exec(self, feedback_fn):
3688
    """Change the parameters of the cluster.
3689

3690
    """
3691
    if self.op.vg_name is not None:
3692
      new_volume = self.op.vg_name
3693
      if not new_volume:
3694
        new_volume = None
3695
      if new_volume != self.cfg.GetVGName():
3696
        self.cfg.SetVGName(new_volume)
3697
      else:
3698
        feedback_fn("Cluster LVM configuration already in desired"
3699
                    " state, not changing")
3700
    if self.op.drbd_helper is not None:
3701
      new_helper = self.op.drbd_helper
3702
      if not new_helper:
3703
        new_helper = None
3704
      if new_helper != self.cfg.GetDRBDHelper():
3705
        self.cfg.SetDRBDHelper(new_helper)
3706
      else:
3707
        feedback_fn("Cluster DRBD helper already in desired state,"
3708
                    " not changing")
3709
    if self.op.hvparams:
3710
      self.cluster.hvparams = self.new_hvparams
3711
    if self.op.os_hvp:
3712
      self.cluster.os_hvp = self.new_os_hvp
3713
    if self.op.enabled_hypervisors is not None:
3714
      self.cluster.hvparams = self.new_hvparams
3715
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
3716
    if self.op.beparams:
3717
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
3718
    if self.op.nicparams:
3719
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
3720
    if self.op.osparams:
3721
      self.cluster.osparams = self.new_osp
3722
    if self.op.ndparams:
3723
      self.cluster.ndparams = self.new_ndparams
3724

    
3725
    if self.op.candidate_pool_size is not None:
3726
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3727
      # we need to update the pool size here, otherwise the save will fail
3728
      _AdjustCandidatePool(self, [])
3729

    
3730
    if self.op.maintain_node_health is not None:
3731
      self.cluster.maintain_node_health = self.op.maintain_node_health
3732

    
3733
    if self.op.prealloc_wipe_disks is not None:
3734
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3735

    
3736
    if self.op.add_uids is not None:
3737
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3738

    
3739
    if self.op.remove_uids is not None:
3740
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3741

    
3742
    if self.op.uid_pool is not None:
3743
      self.cluster.uid_pool = self.op.uid_pool
3744

    
3745
    if self.op.default_iallocator is not None:
3746
      self.cluster.default_iallocator = self.op.default_iallocator
3747

    
3748
    if self.op.reserved_lvs is not None:
3749
      self.cluster.reserved_lvs = self.op.reserved_lvs
3750

    
3751
    if self.op.use_external_mip_script is not None:
3752
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
3753

    
3754
    def helper_os(aname, mods, desc):
3755
      desc += " OS list"
3756
      lst = getattr(self.cluster, aname)
3757
      for key, val in mods:
3758
        if key == constants.DDM_ADD:
3759
          if val in lst:
3760
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3761
          else:
3762
            lst.append(val)
3763
        elif key == constants.DDM_REMOVE:
3764
          if val in lst:
3765
            lst.remove(val)
3766
          else:
3767
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3768
        else:
3769
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3770

    
3771
    if self.op.hidden_os:
3772
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3773

    
3774
    if self.op.blacklisted_os:
3775
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3776

    
3777
    if self.op.master_netdev:
3778
      master_params = self.cfg.GetMasterNetworkParameters()
3779
      ems = self.cfg.GetUseExternalMipScript()
3780
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3781
                  self.cluster.master_netdev)
3782
      result = self.rpc.call_node_deactivate_master_ip(master_params.name,
3783
                                                       master_params, ems)
3784
      result.Raise("Could not disable the master ip")
3785
      feedback_fn("Changing master_netdev from %s to %s" %
3786
                  (master_params.netdev, self.op.master_netdev))
3787
      self.cluster.master_netdev = self.op.master_netdev
3788

    
3789
    if self.op.master_netmask:
3790
      master_params = self.cfg.GetMasterNetworkParameters()
3791
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
3792
      result = self.rpc.call_node_change_master_netmask(master_params.name,
3793
                                                        master_params.netmask,
3794
                                                        self.op.master_netmask,
3795
                                                        master_params.ip,
3796
                                                        master_params.netdev)
3797
      if result.fail_msg:
3798
        msg = "Could not change the master IP netmask: %s" % result.fail_msg
3799
        feedback_fn(msg)
3800

    
3801
      self.cluster.master_netmask = self.op.master_netmask
3802

    
3803
    self.cfg.Update(self.cluster, feedback_fn)
3804

    
3805
    if self.op.master_netdev:
3806
      master_params = self.cfg.GetMasterNetworkParameters()
3807
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3808
                  self.op.master_netdev)
3809
      ems = self.cfg.GetUseExternalMipScript()
3810
      result = self.rpc.call_node_activate_master_ip(master_params.name,
3811
                                                     master_params, ems)
3812
      if result.fail_msg:
3813
        self.LogWarning("Could not re-enable the master ip on"
3814
                        " the master, please restart manually: %s",
3815
                        result.fail_msg)
3816

    
3817

    
3818
def _UploadHelper(lu, nodes, fname):
3819
  """Helper for uploading a file and showing warnings.
3820

3821
  """
3822
  if os.path.exists(fname):
3823
    result = lu.rpc.call_upload_file(nodes, fname)
3824
    for to_node, to_result in result.items():
3825
      msg = to_result.fail_msg
3826
      if msg:
3827
        msg = ("Copy of file %s to node %s failed: %s" %
3828
               (fname, to_node, msg))
3829
        lu.proc.LogWarning(msg)
3830

    
3831

    
3832
def _ComputeAncillaryFiles(cluster, redist):
3833
  """Compute files external to Ganeti which need to be consistent.
3834

3835
  @type redist: boolean
3836
  @param redist: Whether to include files which need to be redistributed
3837

3838
  """
3839
  # Compute files for all nodes
3840
  files_all = set([
3841
    constants.SSH_KNOWN_HOSTS_FILE,
3842
    constants.CONFD_HMAC_KEY,
3843
    constants.CLUSTER_DOMAIN_SECRET_FILE,
3844
    constants.SPICE_CERT_FILE,
3845
    constants.SPICE_CACERT_FILE,
3846
    constants.RAPI_USERS_FILE,
3847
    ])
3848

    
3849
  if not redist:
3850
    files_all.update(constants.ALL_CERT_FILES)
3851
    files_all.update(ssconf.SimpleStore().GetFileList())
3852
  else:
3853
    # we need to ship at least the RAPI certificate
3854
    files_all.add(constants.RAPI_CERT_FILE)
3855

    
3856
  if cluster.modify_etc_hosts:
3857
    files_all.add(constants.ETC_HOSTS)
3858

    
3859
  # Files which are optional, these must:
3860
  # - be present in one other category as well
3861
  # - either exist or not exist on all nodes of that category (mc, vm all)
3862
  files_opt = set([
3863
    constants.RAPI_USERS_FILE,
3864
    ])
3865

    
3866
  # Files which should only be on master candidates
3867
  files_mc = set()
3868

    
3869
  if not redist:
3870
    files_mc.add(constants.CLUSTER_CONF_FILE)
3871

    
3872
    # FIXME: this should also be replicated but Ganeti doesn't support files_mc
3873
    # replication
3874
    files_mc.add(constants.DEFAULT_MASTER_SETUP_SCRIPT)
3875

    
3876
  # Files which should only be on VM-capable nodes
3877
  files_vm = set(filename
3878
    for hv_name in cluster.enabled_hypervisors
3879
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0])
3880

    
3881
  files_opt |= set(filename
3882
    for hv_name in cluster.enabled_hypervisors
3883
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1])
3884

    
3885
  # Filenames in each category must be unique
3886
  all_files_set = files_all | files_mc | files_vm
3887
  assert (len(all_files_set) ==
3888
          sum(map(len, [files_all, files_mc, files_vm]))), \
3889
         "Found file listed in more than one file list"
3890

    
3891
  # Optional files must be present in one other category
3892
  assert all_files_set.issuperset(files_opt), \
3893
         "Optional file not in a different required list"
3894

    
3895
  return (files_all, files_opt, files_mc, files_vm)
3896

    
3897

    
3898
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3899
  """Distribute additional files which are part of the cluster configuration.
3900

3901
  ConfigWriter takes care of distributing the config and ssconf files, but
3902
  there are more files which should be distributed to all nodes. This function
3903
  makes sure those are copied.
3904

3905
  @param lu: calling logical unit
3906
  @param additional_nodes: list of nodes not in the config to distribute to
3907
  @type additional_vm: boolean
3908
  @param additional_vm: whether the additional nodes are vm-capable or not
3909

3910
  """
3911
  # Gather target nodes
3912
  cluster = lu.cfg.GetClusterInfo()
3913
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3914

    
3915
  online_nodes = lu.cfg.GetOnlineNodeList()
3916
  vm_nodes = lu.cfg.GetVmCapableNodeList()
3917

    
3918
  if additional_nodes is not None:
3919
    online_nodes.extend(additional_nodes)
3920
    if additional_vm:
3921
      vm_nodes.extend(additional_nodes)
3922

    
3923
  # Never distribute to master node
3924
  for nodelist in [online_nodes, vm_nodes]:
3925
    if master_info.name in nodelist:
3926
      nodelist.remove(master_info.name)
3927

    
3928
  # Gather file lists
3929
  (files_all, _, files_mc, files_vm) = \
3930
    _ComputeAncillaryFiles(cluster, True)
3931

    
3932
  # Never re-distribute configuration file from here
3933
  assert not (constants.CLUSTER_CONF_FILE in files_all or
3934
              constants.CLUSTER_CONF_FILE in files_vm)
3935
  assert not files_mc, "Master candidates not handled in this function"
3936

    
3937
  filemap = [
3938
    (online_nodes, files_all),
3939
    (vm_nodes, files_vm),
3940
    ]
3941

    
3942
  # Upload the files
3943
  for (node_list, files) in filemap:
3944
    for fname in files:
3945
      _UploadHelper(lu, node_list, fname)
3946

    
3947

    
3948
class LUClusterRedistConf(NoHooksLU):
3949
  """Force the redistribution of cluster configuration.
3950

3951
  This is a very simple LU.
3952

3953
  """
3954
  REQ_BGL = False
3955

    
3956
  def ExpandNames(self):
3957
    self.needed_locks = {
3958
      locking.LEVEL_NODE: locking.ALL_SET,
3959
    }
3960
    self.share_locks[locking.LEVEL_NODE] = 1
3961

    
3962
  def Exec(self, feedback_fn):
3963
    """Redistribute the configuration.
3964

3965
    """
3966
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3967
    _RedistributeAncillaryFiles(self)
3968

    
3969

    
3970
class LUClusterActivateMasterIp(NoHooksLU):
3971
  """Activate the master IP on the master node.
3972

3973
  """
3974
  def Exec(self, feedback_fn):
3975
    """Activate the master IP.
3976

3977
    """
3978
    master_params = self.cfg.GetMasterNetworkParameters()
3979
    ems = self.cfg.GetUseExternalMipScript()
3980
    self.rpc.call_node_activate_master_ip(master_params.name,
3981
                                          master_params, ems)
3982

    
3983

    
3984
class LUClusterDeactivateMasterIp(NoHooksLU):
3985
  """Deactivate the master IP on the master node.
3986

3987
  """
3988
  def Exec(self, feedback_fn):
3989
    """Deactivate the master IP.
3990

3991
    """
3992
    master_params = self.cfg.GetMasterNetworkParameters()
3993
    ems = self.cfg.GetUseExternalMipScript()
3994
    self.rpc.call_node_deactivate_master_ip(master_params.name, master_params,
3995
                                            ems)
3996

    
3997

    
3998
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3999
  """Sleep and poll for an instance's disk to sync.
4000

4001
  """
4002
  if not instance.disks or disks is not None and not disks:
4003
    return True
4004

    
4005
  disks = _ExpandCheckDisks(instance, disks)
4006

    
4007
  if not oneshot:
4008
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
4009

    
4010
  node = instance.primary_node
4011

    
4012
  for dev in disks:
4013
    lu.cfg.SetDiskID(dev, node)
4014

    
4015
  # TODO: Convert to utils.Retry
4016

    
4017
  retries = 0
4018
  degr_retries = 10 # in seconds, as we sleep 1 second each time
4019
  while True:
4020
    max_time = 0
4021
    done = True
4022
    cumul_degraded = False
4023
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
4024
    msg = rstats.fail_msg
4025
    if msg:
4026
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
4027
      retries += 1
4028
      if retries >= 10:
4029
        raise errors.RemoteError("Can't contact node %s for mirror data,"
4030
                                 " aborting." % node)
4031
      time.sleep(6)
4032
      continue
4033
    rstats = rstats.payload
4034
    retries = 0
4035
    for i, mstat in enumerate(rstats):
4036
      if mstat is None:
4037
        lu.LogWarning("Can't compute data for node %s/%s",
4038
                           node, disks[i].iv_name)
4039
        continue
4040

    
4041
      cumul_degraded = (cumul_degraded or
4042
                        (mstat.is_degraded and mstat.sync_percent is None))
4043
      if mstat.sync_percent is not None:
4044
        done = False
4045
        if mstat.estimated_time is not None:
4046
          rem_time = ("%s remaining (estimated)" %
4047
                      utils.FormatSeconds(mstat.estimated_time))
4048
          max_time = mstat.estimated_time
4049
        else:
4050
          rem_time = "no time estimate"
4051
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
4052
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
4053

    
4054
    # if we're done but degraded, let's do a few small retries, to
4055
    # make sure we see a stable and not transient situation; therefore
4056
    # we force restart of the loop
4057
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
4058
      logging.info("Degraded disks found, %d retries left", degr_retries)
4059
      degr_retries -= 1
4060
      time.sleep(1)
4061
      continue
4062

    
4063
    if done or oneshot:
4064
      break
4065

    
4066
    time.sleep(min(60, max_time))
4067

    
4068
  if done:
4069
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
4070
  return not cumul_degraded
4071

    
4072

    
4073
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
4074
  """Check that mirrors are not degraded.
4075

4076
  The ldisk parameter, if True, will change the test from the
4077
  is_degraded attribute (which represents overall non-ok status for
4078
  the device(s)) to the ldisk (representing the local storage status).
4079

4080
  """
4081
  lu.cfg.SetDiskID(dev, node)
4082

    
4083
  result = True
4084

    
4085
  if on_primary or dev.AssembleOnSecondary():
4086
    rstats = lu.rpc.call_blockdev_find(node, dev)
4087
    msg = rstats.fail_msg
4088
    if msg:
4089
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
4090
      result = False
4091
    elif not rstats.payload:
4092
      lu.LogWarning("Can't find disk on node %s", node)
4093
      result = False
4094
    else:
4095
      if ldisk:
4096
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
4097
      else:
4098
        result = result and not rstats.