Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 1db993d5

History | View | Annotate | Download (496.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay too many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42
import itertools
43
import operator
44

    
45
from ganeti import ssh
46
from ganeti import utils
47
from ganeti import errors
48
from ganeti import hypervisor
49
from ganeti import locking
50
from ganeti import constants
51
from ganeti import objects
52
from ganeti import serializer
53
from ganeti import ssconf
54
from ganeti import uidpool
55
from ganeti import compat
56
from ganeti import masterd
57
from ganeti import netutils
58
from ganeti import query
59
from ganeti import qlang
60
from ganeti import opcodes
61
from ganeti import ht
62
from ganeti import rpc
63

    
64
import ganeti.masterd.instance # pylint: disable=W0611
65

    
66

    
67
#: Size of DRBD meta block device
68
DRBD_META_SIZE = 128
69

    
70
# States of instance
71
INSTANCE_UP = [constants.ADMINST_UP]
72
INSTANCE_DOWN = [constants.ADMINST_DOWN]
73
INSTANCE_OFFLINE = [constants.ADMINST_OFFLINE]
74
INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
75
INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
76

    
77

    
78
class ResultWithJobs:
79
  """Data container for LU results with jobs.
80

81
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
82
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
83
  contained in the C{jobs} attribute and include the job IDs in the opcode
84
  result.
85

86
  """
87
  def __init__(self, jobs, **kwargs):
88
    """Initializes this class.
89

90
    Additional return values can be specified as keyword arguments.
91

92
    @type jobs: list of lists of L{opcode.OpCode}
93
    @param jobs: A list of lists of opcode objects
94

95
    """
96
    self.jobs = jobs
97
    self.other = kwargs
98

    
99

    
100
class LogicalUnit(object):
101
  """Logical Unit base class.
102

103
  Subclasses must follow these rules:
104
    - implement ExpandNames
105
    - implement CheckPrereq (except when tasklets are used)
106
    - implement Exec (except when tasklets are used)
107
    - implement BuildHooksEnv
108
    - implement BuildHooksNodes
109
    - redefine HPATH and HTYPE
110
    - optionally redefine their run requirements:
111
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
112

113
  Note that all commands require root permissions.
114

115
  @ivar dry_run_result: the value (if any) that will be returned to the caller
116
      in dry-run mode (signalled by opcode dry_run parameter)
117

118
  """
119
  HPATH = None
120
  HTYPE = None
121
  REQ_BGL = True
122

    
123
  def __init__(self, processor, op, context, rpc_runner):
124
    """Constructor for LogicalUnit.
125

126
    This needs to be overridden in derived classes in order to check op
127
    validity.
128

129
    """
130
    self.proc = processor
131
    self.op = op
132
    self.cfg = context.cfg
133
    self.glm = context.glm
134
    # readability alias
135
    self.owned_locks = context.glm.list_owned
136
    self.context = context
137
    self.rpc = rpc_runner
138
    # Dicts used to declare locking needs to mcpu
139
    self.needed_locks = None
140
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
141
    self.add_locks = {}
142
    self.remove_locks = {}
143
    # Used to force good behavior when calling helper functions
144
    self.recalculate_locks = {}
145
    # logging
146
    self.Log = processor.Log # pylint: disable=C0103
147
    self.LogWarning = processor.LogWarning # pylint: disable=C0103
148
    self.LogInfo = processor.LogInfo # pylint: disable=C0103
149
    self.LogStep = processor.LogStep # pylint: disable=C0103
150
    # support for dry-run
151
    self.dry_run_result = None
152
    # support for generic debug attribute
153
    if (not hasattr(self.op, "debug_level") or
154
        not isinstance(self.op.debug_level, int)):
155
      self.op.debug_level = 0
156

    
157
    # Tasklets
158
    self.tasklets = None
159

    
160
    # Validate opcode parameters and set defaults
161
    self.op.Validate(True)
162

    
163
    self.CheckArguments()
164

    
165
  def CheckArguments(self):
166
    """Check syntactic validity for the opcode arguments.
167

168
    This method is for doing a simple syntactic check and ensure
169
    validity of opcode parameters, without any cluster-related
170
    checks. While the same can be accomplished in ExpandNames and/or
171
    CheckPrereq, doing these separate is better because:
172

173
      - ExpandNames is left as as purely a lock-related function
174
      - CheckPrereq is run after we have acquired locks (and possible
175
        waited for them)
176

177
    The function is allowed to change the self.op attribute so that
178
    later methods can no longer worry about missing parameters.
179

180
    """
181
    pass
182

    
183
  def ExpandNames(self):
184
    """Expand names for this LU.
185

186
    This method is called before starting to execute the opcode, and it should
187
    update all the parameters of the opcode to their canonical form (e.g. a
188
    short node name must be fully expanded after this method has successfully
189
    completed). This way locking, hooks, logging, etc. can work correctly.
190

191
    LUs which implement this method must also populate the self.needed_locks
192
    member, as a dict with lock levels as keys, and a list of needed lock names
193
    as values. Rules:
194

195
      - use an empty dict if you don't need any lock
196
      - if you don't need any lock at a particular level omit that level
197
      - don't put anything for the BGL level
198
      - if you want all locks at a level use locking.ALL_SET as a value
199

200
    If you need to share locks (rather than acquire them exclusively) at one
201
    level you can modify self.share_locks, setting a true value (usually 1) for
202
    that level. By default locks are not shared.
203

204
    This function can also define a list of tasklets, which then will be
205
    executed in order instead of the usual LU-level CheckPrereq and Exec
206
    functions, if those are not defined by the LU.
207

208
    Examples::
209

210
      # Acquire all nodes and one instance
211
      self.needed_locks = {
212
        locking.LEVEL_NODE: locking.ALL_SET,
213
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
214
      }
215
      # Acquire just two nodes
216
      self.needed_locks = {
217
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
218
      }
219
      # Acquire no locks
220
      self.needed_locks = {} # No, you can't leave it to the default value None
221

222
    """
223
    # The implementation of this method is mandatory only if the new LU is
224
    # concurrent, so that old LUs don't need to be changed all at the same
225
    # time.
226
    if self.REQ_BGL:
227
      self.needed_locks = {} # Exclusive LUs don't need locks.
228
    else:
229
      raise NotImplementedError
230

    
231
  def DeclareLocks(self, level):
232
    """Declare LU locking needs for a level
233

234
    While most LUs can just declare their locking needs at ExpandNames time,
235
    sometimes there's the need to calculate some locks after having acquired
236
    the ones before. This function is called just before acquiring locks at a
237
    particular level, but after acquiring the ones at lower levels, and permits
238
    such calculations. It can be used to modify self.needed_locks, and by
239
    default it does nothing.
240

241
    This function is only called if you have something already set in
242
    self.needed_locks for the level.
243

244
    @param level: Locking level which is going to be locked
245
    @type level: member of ganeti.locking.LEVELS
246

247
    """
248

    
249
  def CheckPrereq(self):
250
    """Check prerequisites for this LU.
251

252
    This method should check that the prerequisites for the execution
253
    of this LU are fulfilled. It can do internode communication, but
254
    it should be idempotent - no cluster or system changes are
255
    allowed.
256

257
    The method should raise errors.OpPrereqError in case something is
258
    not fulfilled. Its return value is ignored.
259

260
    This method should also update all the parameters of the opcode to
261
    their canonical form if it hasn't been done by ExpandNames before.
262

263
    """
264
    if self.tasklets is not None:
265
      for (idx, tl) in enumerate(self.tasklets):
266
        logging.debug("Checking prerequisites for tasklet %s/%s",
267
                      idx + 1, len(self.tasklets))
268
        tl.CheckPrereq()
269
    else:
270
      pass
271

    
272
  def Exec(self, feedback_fn):
273
    """Execute the LU.
274

275
    This method should implement the actual work. It should raise
276
    errors.OpExecError for failures that are somewhat dealt with in
277
    code, or expected.
278

279
    """
280
    if self.tasklets is not None:
281
      for (idx, tl) in enumerate(self.tasklets):
282
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
283
        tl.Exec(feedback_fn)
284
    else:
285
      raise NotImplementedError
286

    
287
  def BuildHooksEnv(self):
288
    """Build hooks environment for this LU.
289

290
    @rtype: dict
291
    @return: Dictionary containing the environment that will be used for
292
      running the hooks for this LU. The keys of the dict must not be prefixed
293
      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
294
      will extend the environment with additional variables. If no environment
295
      should be defined, an empty dictionary should be returned (not C{None}).
296
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
297
      will not be called.
298

299
    """
300
    raise NotImplementedError
301

    
302
  def BuildHooksNodes(self):
303
    """Build list of nodes to run LU's hooks.
304

305
    @rtype: tuple; (list, list)
306
    @return: Tuple containing a list of node names on which the hook
307
      should run before the execution and a list of node names on which the
308
      hook should run after the execution. No nodes should be returned as an
309
      empty list (and not None).
310
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
311
      will not be called.
312

313
    """
314
    raise NotImplementedError
315

    
316
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
317
    """Notify the LU about the results of its hooks.
318

319
    This method is called every time a hooks phase is executed, and notifies
320
    the Logical Unit about the hooks' result. The LU can then use it to alter
321
    its result based on the hooks.  By default the method does nothing and the
322
    previous result is passed back unchanged but any LU can define it if it
323
    wants to use the local cluster hook-scripts somehow.
324

325
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
326
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
327
    @param hook_results: the results of the multi-node hooks rpc call
328
    @param feedback_fn: function used send feedback back to the caller
329
    @param lu_result: the previous Exec result this LU had, or None
330
        in the PRE phase
331
    @return: the new Exec result, based on the previous result
332
        and hook results
333

334
    """
335
    # API must be kept, thus we ignore the unused argument and could
336
    # be a function warnings
337
    # pylint: disable=W0613,R0201
338
    return lu_result
339

    
340
  def _ExpandAndLockInstance(self):
341
    """Helper function to expand and lock an instance.
342

343
    Many LUs that work on an instance take its name in self.op.instance_name
344
    and need to expand it and then declare the expanded name for locking. This
345
    function does it, and then updates self.op.instance_name to the expanded
346
    name. It also initializes needed_locks as a dict, if this hasn't been done
347
    before.
348

349
    """
350
    if self.needed_locks is None:
351
      self.needed_locks = {}
352
    else:
353
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
354
        "_ExpandAndLockInstance called with instance-level locks set"
355
    self.op.instance_name = _ExpandInstanceName(self.cfg,
356
                                                self.op.instance_name)
357
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
358

    
359
  def _LockInstancesNodes(self, primary_only=False,
360
                          level=locking.LEVEL_NODE):
361
    """Helper function to declare instances' nodes for locking.
362

363
    This function should be called after locking one or more instances to lock
364
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
365
    with all primary or secondary nodes for instances already locked and
366
    present in self.needed_locks[locking.LEVEL_INSTANCE].
367

368
    It should be called from DeclareLocks, and for safety only works if
369
    self.recalculate_locks[locking.LEVEL_NODE] is set.
370

371
    In the future it may grow parameters to just lock some instance's nodes, or
372
    to just lock primaries or secondary nodes, if needed.
373

374
    If should be called in DeclareLocks in a way similar to::
375

376
      if level == locking.LEVEL_NODE:
377
        self._LockInstancesNodes()
378

379
    @type primary_only: boolean
380
    @param primary_only: only lock primary nodes of locked instances
381
    @param level: Which lock level to use for locking nodes
382

383
    """
384
    assert level in self.recalculate_locks, \
385
      "_LockInstancesNodes helper function called with no nodes to recalculate"
386

    
387
    # TODO: check if we're really been called with the instance locks held
388

    
389
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
390
    # future we might want to have different behaviors depending on the value
391
    # of self.recalculate_locks[locking.LEVEL_NODE]
392
    wanted_nodes = []
393
    locked_i = self.owned_locks(locking.LEVEL_INSTANCE)
394
    for _, instance in self.cfg.GetMultiInstanceInfo(locked_i):
395
      wanted_nodes.append(instance.primary_node)
396
      if not primary_only:
397
        wanted_nodes.extend(instance.secondary_nodes)
398

    
399
    if self.recalculate_locks[level] == constants.LOCKS_REPLACE:
400
      self.needed_locks[level] = wanted_nodes
401
    elif self.recalculate_locks[level] == constants.LOCKS_APPEND:
402
      self.needed_locks[level].extend(wanted_nodes)
403
    else:
404
      raise errors.ProgrammerError("Unknown recalculation mode")
405

    
406
    del self.recalculate_locks[level]
407

    
408

    
409
class NoHooksLU(LogicalUnit): # pylint: disable=W0223
410
  """Simple LU which runs no hooks.
411

412
  This LU is intended as a parent for other LogicalUnits which will
413
  run no hooks, in order to reduce duplicate code.
414

415
  """
416
  HPATH = None
417
  HTYPE = None
418

    
419
  def BuildHooksEnv(self):
420
    """Empty BuildHooksEnv for NoHooksLu.
421

422
    This just raises an error.
423

424
    """
425
    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
426

    
427
  def BuildHooksNodes(self):
428
    """Empty BuildHooksNodes for NoHooksLU.
429

430
    """
431
    raise AssertionError("BuildHooksNodes called for NoHooksLU")
432

    
433

    
434
class Tasklet:
435
  """Tasklet base class.
436

437
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
438
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
439
  tasklets know nothing about locks.
440

441
  Subclasses must follow these rules:
442
    - Implement CheckPrereq
443
    - Implement Exec
444

445
  """
446
  def __init__(self, lu):
447
    self.lu = lu
448

    
449
    # Shortcuts
450
    self.cfg = lu.cfg
451
    self.rpc = lu.rpc
452

    
453
  def CheckPrereq(self):
454
    """Check prerequisites for this tasklets.
455

456
    This method should check whether the prerequisites for the execution of
457
    this tasklet are fulfilled. It can do internode communication, but it
458
    should be idempotent - no cluster or system changes are allowed.
459

460
    The method should raise errors.OpPrereqError in case something is not
461
    fulfilled. Its return value is ignored.
462

463
    This method should also update all parameters to their canonical form if it
464
    hasn't been done before.
465

466
    """
467
    pass
468

    
469
  def Exec(self, feedback_fn):
470
    """Execute the tasklet.
471

472
    This method should implement the actual work. It should raise
473
    errors.OpExecError for failures that are somewhat dealt with in code, or
474
    expected.
475

476
    """
477
    raise NotImplementedError
478

    
479

    
480
class _QueryBase:
481
  """Base for query utility classes.
482

483
  """
484
  #: Attribute holding field definitions
485
  FIELDS = None
486

    
487
  def __init__(self, qfilter, fields, use_locking):
488
    """Initializes this class.
489

490
    """
491
    self.use_locking = use_locking
492

    
493
    self.query = query.Query(self.FIELDS, fields, qfilter=qfilter,
494
                             namefield="name")
495
    self.requested_data = self.query.RequestedData()
496
    self.names = self.query.RequestedNames()
497

    
498
    # Sort only if no names were requested
499
    self.sort_by_name = not self.names
500

    
501
    self.do_locking = None
502
    self.wanted = None
503

    
504
  def _GetNames(self, lu, all_names, lock_level):
505
    """Helper function to determine names asked for in the query.
506

507
    """
508
    if self.do_locking:
509
      names = lu.owned_locks(lock_level)
510
    else:
511
      names = all_names
512

    
513
    if self.wanted == locking.ALL_SET:
514
      assert not self.names
515
      # caller didn't specify names, so ordering is not important
516
      return utils.NiceSort(names)
517

    
518
    # caller specified names and we must keep the same order
519
    assert self.names
520
    assert not self.do_locking or lu.glm.is_owned(lock_level)
521

    
522
    missing = set(self.wanted).difference(names)
523
    if missing:
524
      raise errors.OpExecError("Some items were removed before retrieving"
525
                               " their data: %s" % missing)
526

    
527
    # Return expanded names
528
    return self.wanted
529

    
530
  def ExpandNames(self, lu):
531
    """Expand names for this query.
532

533
    See L{LogicalUnit.ExpandNames}.
534

535
    """
536
    raise NotImplementedError()
537

    
538
  def DeclareLocks(self, lu, level):
539
    """Declare locks for this query.
540

541
    See L{LogicalUnit.DeclareLocks}.
542

543
    """
544
    raise NotImplementedError()
545

    
546
  def _GetQueryData(self, lu):
547
    """Collects all data for this query.
548

549
    @return: Query data object
550

551
    """
552
    raise NotImplementedError()
553

    
554
  def NewStyleQuery(self, lu):
555
    """Collect data and execute query.
556

557
    """
558
    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
559
                                  sort_by_name=self.sort_by_name)
560

    
561
  def OldStyleQuery(self, lu):
562
    """Collect data and execute query.
563

564
    """
565
    return self.query.OldStyleQuery(self._GetQueryData(lu),
566
                                    sort_by_name=self.sort_by_name)
567

    
568

    
569
def _ShareAll():
570
  """Returns a dict declaring all lock levels shared.
571

572
  """
573
  return dict.fromkeys(locking.LEVELS, 1)
574

    
575

    
576
def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
577
  """Checks if the owned node groups are still correct for an instance.
578

579
  @type cfg: L{config.ConfigWriter}
580
  @param cfg: The cluster configuration
581
  @type instance_name: string
582
  @param instance_name: Instance name
583
  @type owned_groups: set or frozenset
584
  @param owned_groups: List of currently owned node groups
585

586
  """
587
  inst_groups = cfg.GetInstanceNodeGroups(instance_name)
588

    
589
  if not owned_groups.issuperset(inst_groups):
590
    raise errors.OpPrereqError("Instance %s's node groups changed since"
591
                               " locks were acquired, current groups are"
592
                               " are '%s', owning groups '%s'; retry the"
593
                               " operation" %
594
                               (instance_name,
595
                                utils.CommaJoin(inst_groups),
596
                                utils.CommaJoin(owned_groups)),
597
                               errors.ECODE_STATE)
598

    
599
  return inst_groups
600

    
601

    
602
def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
603
  """Checks if the instances in a node group are still correct.
604

605
  @type cfg: L{config.ConfigWriter}
606
  @param cfg: The cluster configuration
607
  @type group_uuid: string
608
  @param group_uuid: Node group UUID
609
  @type owned_instances: set or frozenset
610
  @param owned_instances: List of currently owned instances
611

612
  """
613
  wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
614
  if owned_instances != wanted_instances:
615
    raise errors.OpPrereqError("Instances in node group '%s' changed since"
616
                               " locks were acquired, wanted '%s', have '%s';"
617
                               " retry the operation" %
618
                               (group_uuid,
619
                                utils.CommaJoin(wanted_instances),
620
                                utils.CommaJoin(owned_instances)),
621
                               errors.ECODE_STATE)
622

    
623
  return wanted_instances
624

    
625

    
626
def _SupportsOob(cfg, node):
627
  """Tells if node supports OOB.
628

629
  @type cfg: L{config.ConfigWriter}
630
  @param cfg: The cluster configuration
631
  @type node: L{objects.Node}
632
  @param node: The node
633
  @return: The OOB script if supported or an empty string otherwise
634

635
  """
636
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
637

    
638

    
639
def _GetWantedNodes(lu, nodes):
640
  """Returns list of checked and expanded node names.
641

642
  @type lu: L{LogicalUnit}
643
  @param lu: the logical unit on whose behalf we execute
644
  @type nodes: list
645
  @param nodes: list of node names or None for all nodes
646
  @rtype: list
647
  @return: the list of nodes, sorted
648
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
649

650
  """
651
  if nodes:
652
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
653

    
654
  return utils.NiceSort(lu.cfg.GetNodeList())
655

    
656

    
657
def _GetWantedInstances(lu, instances):
658
  """Returns list of checked and expanded instance names.
659

660
  @type lu: L{LogicalUnit}
661
  @param lu: the logical unit on whose behalf we execute
662
  @type instances: list
663
  @param instances: list of instance names or None for all instances
664
  @rtype: list
665
  @return: the list of instances, sorted
666
  @raise errors.OpPrereqError: if the instances parameter is wrong type
667
  @raise errors.OpPrereqError: if any of the passed instances is not found
668

669
  """
670
  if instances:
671
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
672
  else:
673
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
674
  return wanted
675

    
676

    
677
def _GetUpdatedParams(old_params, update_dict,
678
                      use_default=True, use_none=False):
679
  """Return the new version of a parameter dictionary.
680

681
  @type old_params: dict
682
  @param old_params: old parameters
683
  @type update_dict: dict
684
  @param update_dict: dict containing new parameter values, or
685
      constants.VALUE_DEFAULT to reset the parameter to its default
686
      value
687
  @param use_default: boolean
688
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
689
      values as 'to be deleted' values
690
  @param use_none: boolean
691
  @type use_none: whether to recognise C{None} values as 'to be
692
      deleted' values
693
  @rtype: dict
694
  @return: the new parameter dictionary
695

696
  """
697
  params_copy = copy.deepcopy(old_params)
698
  for key, val in update_dict.iteritems():
699
    if ((use_default and val == constants.VALUE_DEFAULT) or
700
        (use_none and val is None)):
701
      try:
702
        del params_copy[key]
703
      except KeyError:
704
        pass
705
    else:
706
      params_copy[key] = val
707
  return params_copy
708

    
709

    
710
def _ReleaseLocks(lu, level, names=None, keep=None):
711
  """Releases locks owned by an LU.
712

713
  @type lu: L{LogicalUnit}
714
  @param level: Lock level
715
  @type names: list or None
716
  @param names: Names of locks to release
717
  @type keep: list or None
718
  @param keep: Names of locks to retain
719

720
  """
721
  assert not (keep is not None and names is not None), \
722
         "Only one of the 'names' and the 'keep' parameters can be given"
723

    
724
  if names is not None:
725
    should_release = names.__contains__
726
  elif keep:
727
    should_release = lambda name: name not in keep
728
  else:
729
    should_release = None
730

    
731
  owned = lu.owned_locks(level)
732
  if not owned:
733
    # Not owning any lock at this level, do nothing
734
    pass
735

    
736
  elif should_release:
737
    retain = []
738
    release = []
739

    
740
    # Determine which locks to release
741
    for name in owned:
742
      if should_release(name):
743
        release.append(name)
744
      else:
745
        retain.append(name)
746

    
747
    assert len(lu.owned_locks(level)) == (len(retain) + len(release))
748

    
749
    # Release just some locks
750
    lu.glm.release(level, names=release)
751

    
752
    assert frozenset(lu.owned_locks(level)) == frozenset(retain)
753
  else:
754
    # Release everything
755
    lu.glm.release(level)
756

    
757
    assert not lu.glm.is_owned(level), "No locks should be owned"
758

    
759

    
760
def _MapInstanceDisksToNodes(instances):
761
  """Creates a map from (node, volume) to instance name.
762

763
  @type instances: list of L{objects.Instance}
764
  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
765

766
  """
767
  return dict(((node, vol), inst.name)
768
              for inst in instances
769
              for (node, vols) in inst.MapLVsByNode().items()
770
              for vol in vols)
771

    
772

    
773
def _RunPostHook(lu, node_name):
774
  """Runs the post-hook for an opcode on a single node.
775

776
  """
777
  hm = lu.proc.BuildHooksManager(lu)
778
  try:
779
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
780
  except:
781
    # pylint: disable=W0702
782
    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
783

    
784

    
785
def _CheckOutputFields(static, dynamic, selected):
786
  """Checks whether all selected fields are valid.
787

788
  @type static: L{utils.FieldSet}
789
  @param static: static fields set
790
  @type dynamic: L{utils.FieldSet}
791
  @param dynamic: dynamic fields set
792

793
  """
794
  f = utils.FieldSet()
795
  f.Extend(static)
796
  f.Extend(dynamic)
797

    
798
  delta = f.NonMatching(selected)
799
  if delta:
800
    raise errors.OpPrereqError("Unknown output fields selected: %s"
801
                               % ",".join(delta), errors.ECODE_INVAL)
802

    
803

    
804
def _CheckGlobalHvParams(params):
805
  """Validates that given hypervisor params are not global ones.
806

807
  This will ensure that instances don't get customised versions of
808
  global params.
809

810
  """
811
  used_globals = constants.HVC_GLOBALS.intersection(params)
812
  if used_globals:
813
    msg = ("The following hypervisor parameters are global and cannot"
814
           " be customized at instance level, please modify them at"
815
           " cluster level: %s" % utils.CommaJoin(used_globals))
816
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
817

    
818

    
819
def _CheckNodeOnline(lu, node, msg=None):
820
  """Ensure that a given node is online.
821

822
  @param lu: the LU on behalf of which we make the check
823
  @param node: the node to check
824
  @param msg: if passed, should be a message to replace the default one
825
  @raise errors.OpPrereqError: if the node is offline
826

827
  """
828
  if msg is None:
829
    msg = "Can't use offline node"
830
  if lu.cfg.GetNodeInfo(node).offline:
831
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
832

    
833

    
834
def _CheckNodeNotDrained(lu, node):
835
  """Ensure that a given node is not drained.
836

837
  @param lu: the LU on behalf of which we make the check
838
  @param node: the node to check
839
  @raise errors.OpPrereqError: if the node is drained
840

841
  """
842
  if lu.cfg.GetNodeInfo(node).drained:
843
    raise errors.OpPrereqError("Can't use drained node %s" % node,
844
                               errors.ECODE_STATE)
845

    
846

    
847
def _CheckNodeVmCapable(lu, node):
848
  """Ensure that a given node is vm capable.
849

850
  @param lu: the LU on behalf of which we make the check
851
  @param node: the node to check
852
  @raise errors.OpPrereqError: if the node is not vm capable
853

854
  """
855
  if not lu.cfg.GetNodeInfo(node).vm_capable:
856
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
857
                               errors.ECODE_STATE)
858

    
859

    
860
def _CheckNodeHasOS(lu, node, os_name, force_variant):
861
  """Ensure that a node supports a given OS.
862

863
  @param lu: the LU on behalf of which we make the check
864
  @param node: the node to check
865
  @param os_name: the OS to query about
866
  @param force_variant: whether to ignore variant errors
867
  @raise errors.OpPrereqError: if the node is not supporting the OS
868

869
  """
870
  result = lu.rpc.call_os_get(node, os_name)
871
  result.Raise("OS '%s' not in supported OS list for node %s" %
872
               (os_name, node),
873
               prereq=True, ecode=errors.ECODE_INVAL)
874
  if not force_variant:
875
    _CheckOSVariant(result.payload, os_name)
876

    
877

    
878
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
879
  """Ensure that a node has the given secondary ip.
880

881
  @type lu: L{LogicalUnit}
882
  @param lu: the LU on behalf of which we make the check
883
  @type node: string
884
  @param node: the node to check
885
  @type secondary_ip: string
886
  @param secondary_ip: the ip to check
887
  @type prereq: boolean
888
  @param prereq: whether to throw a prerequisite or an execute error
889
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
890
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
891

892
  """
893
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
894
  result.Raise("Failure checking secondary ip on node %s" % node,
895
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
896
  if not result.payload:
897
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
898
           " please fix and re-run this command" % secondary_ip)
899
    if prereq:
900
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
901
    else:
902
      raise errors.OpExecError(msg)
903

    
904

    
905
def _GetClusterDomainSecret():
906
  """Reads the cluster domain secret.
907

908
  """
909
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
910
                               strict=True)
911

    
912

    
913
def _CheckInstanceState(lu, instance, req_states, msg=None):
914
  """Ensure that an instance is in one of the required states.
915

916
  @param lu: the LU on behalf of which we make the check
917
  @param instance: the instance to check
918
  @param msg: if passed, should be a message to replace the default one
919
  @raise errors.OpPrereqError: if the instance is not in the required state
920

921
  """
922
  if msg is None:
923
    msg = "can't use instance from outside %s states" % ", ".join(req_states)
924
  if instance.admin_state not in req_states:
925
    raise errors.OpPrereqError("Instance %s is marked to be %s, %s" %
926
                               (instance, instance.admin_state, msg),
927
                               errors.ECODE_STATE)
928

    
929
  if constants.ADMINST_UP not in req_states:
930
    pnode = instance.primary_node
931
    ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
932
    ins_l.Raise("Can't contact node %s for instance information" % pnode,
933
                prereq=True, ecode=errors.ECODE_ENVIRON)
934

    
935
    if instance.name in ins_l.payload:
936
      raise errors.OpPrereqError("Instance %s is running, %s" %
937
                                 (instance.name, msg), errors.ECODE_STATE)
938

    
939

    
940
def _ExpandItemName(fn, name, kind):
941
  """Expand an item name.
942

943
  @param fn: the function to use for expansion
944
  @param name: requested item name
945
  @param kind: text description ('Node' or 'Instance')
946
  @return: the resolved (full) name
947
  @raise errors.OpPrereqError: if the item is not found
948

949
  """
950
  full_name = fn(name)
951
  if full_name is None:
952
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
953
                               errors.ECODE_NOENT)
954
  return full_name
955

    
956

    
957
def _ExpandNodeName(cfg, name):
958
  """Wrapper over L{_ExpandItemName} for nodes."""
959
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
960

    
961

    
962
def _ExpandInstanceName(cfg, name):
963
  """Wrapper over L{_ExpandItemName} for instance."""
964
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
965

    
966

    
967
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
968
                          minmem, maxmem, vcpus, nics, disk_template, disks,
969
                          bep, hvp, hypervisor_name, tags):
970
  """Builds instance related env variables for hooks
971

972
  This builds the hook environment from individual variables.
973

974
  @type name: string
975
  @param name: the name of the instance
976
  @type primary_node: string
977
  @param primary_node: the name of the instance's primary node
978
  @type secondary_nodes: list
979
  @param secondary_nodes: list of secondary nodes as strings
980
  @type os_type: string
981
  @param os_type: the name of the instance's OS
982
  @type status: string
983
  @param status: the desired status of the instance
984
  @type minmem: string
985
  @param minmem: the minimum memory size of the instance
986
  @type maxmem: string
987
  @param maxmem: the maximum memory size of the instance
988
  @type vcpus: string
989
  @param vcpus: the count of VCPUs the instance has
990
  @type nics: list
991
  @param nics: list of tuples (ip, mac, mode, link) representing
992
      the NICs the instance has
993
  @type disk_template: string
994
  @param disk_template: the disk template of the instance
995
  @type disks: list
996
  @param disks: the list of (size, mode) pairs
997
  @type bep: dict
998
  @param bep: the backend parameters for the instance
999
  @type hvp: dict
1000
  @param hvp: the hypervisor parameters for the instance
1001
  @type hypervisor_name: string
1002
  @param hypervisor_name: the hypervisor for the instance
1003
  @type tags: list
1004
  @param tags: list of instance tags as strings
1005
  @rtype: dict
1006
  @return: the hook environment for this instance
1007

1008
  """
1009
  env = {
1010
    "OP_TARGET": name,
1011
    "INSTANCE_NAME": name,
1012
    "INSTANCE_PRIMARY": primary_node,
1013
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
1014
    "INSTANCE_OS_TYPE": os_type,
1015
    "INSTANCE_STATUS": status,
1016
    "INSTANCE_MINMEM": minmem,
1017
    "INSTANCE_MAXMEM": maxmem,
1018
    # TODO(2.7) remove deprecated "memory" value
1019
    "INSTANCE_MEMORY": maxmem,
1020
    "INSTANCE_VCPUS": vcpus,
1021
    "INSTANCE_DISK_TEMPLATE": disk_template,
1022
    "INSTANCE_HYPERVISOR": hypervisor_name,
1023
  }
1024
  if nics:
1025
    nic_count = len(nics)
1026
    for idx, (ip, mac, mode, link) in enumerate(nics):
1027
      if ip is None:
1028
        ip = ""
1029
      env["INSTANCE_NIC%d_IP" % idx] = ip
1030
      env["INSTANCE_NIC%d_MAC" % idx] = mac
1031
      env["INSTANCE_NIC%d_MODE" % idx] = mode
1032
      env["INSTANCE_NIC%d_LINK" % idx] = link
1033
      if mode == constants.NIC_MODE_BRIDGED:
1034
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
1035
  else:
1036
    nic_count = 0
1037

    
1038
  env["INSTANCE_NIC_COUNT"] = nic_count
1039

    
1040
  if disks:
1041
    disk_count = len(disks)
1042
    for idx, (size, mode) in enumerate(disks):
1043
      env["INSTANCE_DISK%d_SIZE" % idx] = size
1044
      env["INSTANCE_DISK%d_MODE" % idx] = mode
1045
  else:
1046
    disk_count = 0
1047

    
1048
  env["INSTANCE_DISK_COUNT"] = disk_count
1049

    
1050
  if not tags:
1051
    tags = []
1052

    
1053
  env["INSTANCE_TAGS"] = " ".join(tags)
1054

    
1055
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
1056
    for key, value in source.items():
1057
      env["INSTANCE_%s_%s" % (kind, key)] = value
1058

    
1059
  return env
1060

    
1061

    
1062
def _NICListToTuple(lu, nics):
1063
  """Build a list of nic information tuples.
1064

1065
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
1066
  value in LUInstanceQueryData.
1067

1068
  @type lu:  L{LogicalUnit}
1069
  @param lu: the logical unit on whose behalf we execute
1070
  @type nics: list of L{objects.NIC}
1071
  @param nics: list of nics to convert to hooks tuples
1072

1073
  """
1074
  hooks_nics = []
1075
  cluster = lu.cfg.GetClusterInfo()
1076
  for nic in nics:
1077
    ip = nic.ip
1078
    mac = nic.mac
1079
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
1080
    mode = filled_params[constants.NIC_MODE]
1081
    link = filled_params[constants.NIC_LINK]
1082
    hooks_nics.append((ip, mac, mode, link))
1083
  return hooks_nics
1084

    
1085

    
1086
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
1087
  """Builds instance related env variables for hooks from an object.
1088

1089
  @type lu: L{LogicalUnit}
1090
  @param lu: the logical unit on whose behalf we execute
1091
  @type instance: L{objects.Instance}
1092
  @param instance: the instance for which we should build the
1093
      environment
1094
  @type override: dict
1095
  @param override: dictionary with key/values that will override
1096
      our values
1097
  @rtype: dict
1098
  @return: the hook environment dictionary
1099

1100
  """
1101
  cluster = lu.cfg.GetClusterInfo()
1102
  bep = cluster.FillBE(instance)
1103
  hvp = cluster.FillHV(instance)
1104
  args = {
1105
    "name": instance.name,
1106
    "primary_node": instance.primary_node,
1107
    "secondary_nodes": instance.secondary_nodes,
1108
    "os_type": instance.os,
1109
    "status": instance.admin_state,
1110
    "maxmem": bep[constants.BE_MAXMEM],
1111
    "minmem": bep[constants.BE_MINMEM],
1112
    "vcpus": bep[constants.BE_VCPUS],
1113
    "nics": _NICListToTuple(lu, instance.nics),
1114
    "disk_template": instance.disk_template,
1115
    "disks": [(disk.size, disk.mode) for disk in instance.disks],
1116
    "bep": bep,
1117
    "hvp": hvp,
1118
    "hypervisor_name": instance.hypervisor,
1119
    "tags": instance.tags,
1120
  }
1121
  if override:
1122
    args.update(override)
1123
  return _BuildInstanceHookEnv(**args) # pylint: disable=W0142
1124

    
1125

    
1126
def _AdjustCandidatePool(lu, exceptions):
1127
  """Adjust the candidate pool after node operations.
1128

1129
  """
1130
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1131
  if mod_list:
1132
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1133
               utils.CommaJoin(node.name for node in mod_list))
1134
    for name in mod_list:
1135
      lu.context.ReaddNode(name)
1136
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1137
  if mc_now > mc_max:
1138
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1139
               (mc_now, mc_max))
1140

    
1141

    
1142
def _DecideSelfPromotion(lu, exceptions=None):
1143
  """Decide whether I should promote myself as a master candidate.
1144

1145
  """
1146
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1147
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1148
  # the new node will increase mc_max with one, so:
1149
  mc_should = min(mc_should + 1, cp_size)
1150
  return mc_now < mc_should
1151

    
1152

    
1153
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1154
  """Check that the brigdes needed by a list of nics exist.
1155

1156
  """
1157
  cluster = lu.cfg.GetClusterInfo()
1158
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1159
  brlist = [params[constants.NIC_LINK] for params in paramslist
1160
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1161
  if brlist:
1162
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1163
    result.Raise("Error checking bridges on destination node '%s'" %
1164
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1165

    
1166

    
1167
def _CheckInstanceBridgesExist(lu, instance, node=None):
1168
  """Check that the brigdes needed by an instance exist.
1169

1170
  """
1171
  if node is None:
1172
    node = instance.primary_node
1173
  _CheckNicsBridgesExist(lu, instance.nics, node)
1174

    
1175

    
1176
def _CheckOSVariant(os_obj, name):
1177
  """Check whether an OS name conforms to the os variants specification.
1178

1179
  @type os_obj: L{objects.OS}
1180
  @param os_obj: OS object to check
1181
  @type name: string
1182
  @param name: OS name passed by the user, to check for validity
1183

1184
  """
1185
  variant = objects.OS.GetVariant(name)
1186
  if not os_obj.supported_variants:
1187
    if variant:
1188
      raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
1189
                                 " passed)" % (os_obj.name, variant),
1190
                                 errors.ECODE_INVAL)
1191
    return
1192
  if not variant:
1193
    raise errors.OpPrereqError("OS name must include a variant",
1194
                               errors.ECODE_INVAL)
1195

    
1196
  if variant not in os_obj.supported_variants:
1197
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1198

    
1199

    
1200
def _GetNodeInstancesInner(cfg, fn):
1201
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1202

    
1203

    
1204
def _GetNodeInstances(cfg, node_name):
1205
  """Returns a list of all primary and secondary instances on a node.
1206

1207
  """
1208

    
1209
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1210

    
1211

    
1212
def _GetNodePrimaryInstances(cfg, node_name):
1213
  """Returns primary instances on a node.
1214

1215
  """
1216
  return _GetNodeInstancesInner(cfg,
1217
                                lambda inst: node_name == inst.primary_node)
1218

    
1219

    
1220
def _GetNodeSecondaryInstances(cfg, node_name):
1221
  """Returns secondary instances on a node.
1222

1223
  """
1224
  return _GetNodeInstancesInner(cfg,
1225
                                lambda inst: node_name in inst.secondary_nodes)
1226

    
1227

    
1228
def _GetStorageTypeArgs(cfg, storage_type):
1229
  """Returns the arguments for a storage type.
1230

1231
  """
1232
  # Special case for file storage
1233
  if storage_type == constants.ST_FILE:
1234
    # storage.FileStorage wants a list of storage directories
1235
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1236

    
1237
  return []
1238

    
1239

    
1240
def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
1241
  faulty = []
1242

    
1243
  for dev in instance.disks:
1244
    cfg.SetDiskID(dev, node_name)
1245

    
1246
  result = rpc_runner.call_blockdev_getmirrorstatus(node_name, instance.disks)
1247
  result.Raise("Failed to get disk status from node %s" % node_name,
1248
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1249

    
1250
  for idx, bdev_status in enumerate(result.payload):
1251
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1252
      faulty.append(idx)
1253

    
1254
  return faulty
1255

    
1256

    
1257
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1258
  """Check the sanity of iallocator and node arguments and use the
1259
  cluster-wide iallocator if appropriate.
1260

1261
  Check that at most one of (iallocator, node) is specified. If none is
1262
  specified, then the LU's opcode's iallocator slot is filled with the
1263
  cluster-wide default iallocator.
1264

1265
  @type iallocator_slot: string
1266
  @param iallocator_slot: the name of the opcode iallocator slot
1267
  @type node_slot: string
1268
  @param node_slot: the name of the opcode target node slot
1269

1270
  """
1271
  node = getattr(lu.op, node_slot, None)
1272
  iallocator = getattr(lu.op, iallocator_slot, None)
1273

    
1274
  if node is not None and iallocator is not None:
1275
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
1276
                               errors.ECODE_INVAL)
1277
  elif node is None and iallocator is None:
1278
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1279
    if default_iallocator:
1280
      setattr(lu.op, iallocator_slot, default_iallocator)
1281
    else:
1282
      raise errors.OpPrereqError("No iallocator or node given and no"
1283
                                 " cluster-wide default iallocator found;"
1284
                                 " please specify either an iallocator or a"
1285
                                 " node, or set a cluster-wide default"
1286
                                 " iallocator")
1287

    
1288

    
1289
def _GetDefaultIAllocator(cfg, iallocator):
1290
  """Decides on which iallocator to use.
1291

1292
  @type cfg: L{config.ConfigWriter}
1293
  @param cfg: Cluster configuration object
1294
  @type iallocator: string or None
1295
  @param iallocator: Iallocator specified in opcode
1296
  @rtype: string
1297
  @return: Iallocator name
1298

1299
  """
1300
  if not iallocator:
1301
    # Use default iallocator
1302
    iallocator = cfg.GetDefaultIAllocator()
1303

    
1304
  if not iallocator:
1305
    raise errors.OpPrereqError("No iallocator was specified, neither in the"
1306
                               " opcode nor as a cluster-wide default",
1307
                               errors.ECODE_INVAL)
1308

    
1309
  return iallocator
1310

    
1311

    
1312
class LUClusterPostInit(LogicalUnit):
1313
  """Logical unit for running hooks after cluster initialization.
1314

1315
  """
1316
  HPATH = "cluster-init"
1317
  HTYPE = constants.HTYPE_CLUSTER
1318

    
1319
  def BuildHooksEnv(self):
1320
    """Build hooks env.
1321

1322
    """
1323
    return {
1324
      "OP_TARGET": self.cfg.GetClusterName(),
1325
      }
1326

    
1327
  def BuildHooksNodes(self):
1328
    """Build hooks nodes.
1329

1330
    """
1331
    return ([], [self.cfg.GetMasterNode()])
1332

    
1333
  def Exec(self, feedback_fn):
1334
    """Nothing to do.
1335

1336
    """
1337
    return True
1338

    
1339

    
1340
class LUClusterDestroy(LogicalUnit):
1341
  """Logical unit for destroying the cluster.
1342

1343
  """
1344
  HPATH = "cluster-destroy"
1345
  HTYPE = constants.HTYPE_CLUSTER
1346

    
1347
  def BuildHooksEnv(self):
1348
    """Build hooks env.
1349

1350
    """
1351
    return {
1352
      "OP_TARGET": self.cfg.GetClusterName(),
1353
      }
1354

    
1355
  def BuildHooksNodes(self):
1356
    """Build hooks nodes.
1357

1358
    """
1359
    return ([], [])
1360

    
1361
  def CheckPrereq(self):
1362
    """Check prerequisites.
1363

1364
    This checks whether the cluster is empty.
1365

1366
    Any errors are signaled by raising errors.OpPrereqError.
1367

1368
    """
1369
    master = self.cfg.GetMasterNode()
1370

    
1371
    nodelist = self.cfg.GetNodeList()
1372
    if len(nodelist) != 1 or nodelist[0] != master:
1373
      raise errors.OpPrereqError("There are still %d node(s) in"
1374
                                 " this cluster." % (len(nodelist) - 1),
1375
                                 errors.ECODE_INVAL)
1376
    instancelist = self.cfg.GetInstanceList()
1377
    if instancelist:
1378
      raise errors.OpPrereqError("There are still %d instance(s) in"
1379
                                 " this cluster." % len(instancelist),
1380
                                 errors.ECODE_INVAL)
1381

    
1382
  def Exec(self, feedback_fn):
1383
    """Destroys the cluster.
1384

1385
    """
1386
    master_params = self.cfg.GetMasterNetworkParameters()
1387

    
1388
    # Run post hooks on master node before it's removed
1389
    _RunPostHook(self, master_params.name)
1390

    
1391
    ems = self.cfg.GetUseExternalMipScript()
1392
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1393
                                                     master_params, ems)
1394
    result.Raise("Could not disable the master role")
1395

    
1396
    return master_params.name
1397

    
1398

    
1399
def _VerifyCertificate(filename):
1400
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1401

1402
  @type filename: string
1403
  @param filename: Path to PEM file
1404

1405
  """
1406
  try:
1407
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1408
                                           utils.ReadFile(filename))
1409
  except Exception, err: # pylint: disable=W0703
1410
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1411
            "Failed to load X509 certificate %s: %s" % (filename, err))
1412

    
1413
  (errcode, msg) = \
1414
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1415
                                constants.SSL_CERT_EXPIRATION_ERROR)
1416

    
1417
  if msg:
1418
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1419
  else:
1420
    fnamemsg = None
1421

    
1422
  if errcode is None:
1423
    return (None, fnamemsg)
1424
  elif errcode == utils.CERT_WARNING:
1425
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1426
  elif errcode == utils.CERT_ERROR:
1427
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1428

    
1429
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1430

    
1431

    
1432
def _GetAllHypervisorParameters(cluster, instances):
1433
  """Compute the set of all hypervisor parameters.
1434

1435
  @type cluster: L{objects.Cluster}
1436
  @param cluster: the cluster object
1437
  @param instances: list of L{objects.Instance}
1438
  @param instances: additional instances from which to obtain parameters
1439
  @rtype: list of (origin, hypervisor, parameters)
1440
  @return: a list with all parameters found, indicating the hypervisor they
1441
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1442

1443
  """
1444
  hvp_data = []
1445

    
1446
  for hv_name in cluster.enabled_hypervisors:
1447
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1448

    
1449
  for os_name, os_hvp in cluster.os_hvp.items():
1450
    for hv_name, hv_params in os_hvp.items():
1451
      if hv_params:
1452
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1453
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1454

    
1455
  # TODO: collapse identical parameter values in a single one
1456
  for instance in instances:
1457
    if instance.hvparams:
1458
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1459
                       cluster.FillHV(instance)))
1460

    
1461
  return hvp_data
1462

    
1463

    
1464
class _VerifyErrors(object):
1465
  """Mix-in for cluster/group verify LUs.
1466

1467
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1468
  self.op and self._feedback_fn to be available.)
1469

1470
  """
1471

    
1472
  ETYPE_FIELD = "code"
1473
  ETYPE_ERROR = "ERROR"
1474
  ETYPE_WARNING = "WARNING"
1475

    
1476
  def _Error(self, ecode, item, msg, *args, **kwargs):
1477
    """Format an error message.
1478

1479
    Based on the opcode's error_codes parameter, either format a
1480
    parseable error code, or a simpler error string.
1481

1482
    This must be called only from Exec and functions called from Exec.
1483

1484
    """
1485
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1486
    itype, etxt, _ = ecode
1487
    # first complete the msg
1488
    if args:
1489
      msg = msg % args
1490
    # then format the whole message
1491
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1492
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1493
    else:
1494
      if item:
1495
        item = " " + item
1496
      else:
1497
        item = ""
1498
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1499
    # and finally report it via the feedback_fn
1500
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1501

    
1502
  def _ErrorIf(self, cond, ecode, *args, **kwargs):
1503
    """Log an error message if the passed condition is True.
1504

1505
    """
1506
    cond = (bool(cond)
1507
            or self.op.debug_simulate_errors) # pylint: disable=E1101
1508

    
1509
    # If the error code is in the list of ignored errors, demote the error to a
1510
    # warning
1511
    (_, etxt, _) = ecode
1512
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1513
      kwargs[self.ETYPE_FIELD] = self.ETYPE_WARNING
1514

    
1515
    if cond:
1516
      self._Error(ecode, *args, **kwargs)
1517

    
1518
    # do not mark the operation as failed for WARN cases only
1519
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1520
      self.bad = self.bad or cond
1521

    
1522

    
1523
class LUClusterVerify(NoHooksLU):
1524
  """Submits all jobs necessary to verify the cluster.
1525

1526
  """
1527
  REQ_BGL = False
1528

    
1529
  def ExpandNames(self):
1530
    self.needed_locks = {}
1531

    
1532
  def Exec(self, feedback_fn):
1533
    jobs = []
1534

    
1535
    if self.op.group_name:
1536
      groups = [self.op.group_name]
1537
      depends_fn = lambda: None
1538
    else:
1539
      groups = self.cfg.GetNodeGroupList()
1540

    
1541
      # Verify global configuration
1542
      jobs.append([
1543
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors)
1544
        ])
1545

    
1546
      # Always depend on global verification
1547
      depends_fn = lambda: [(-len(jobs), [])]
1548

    
1549
    jobs.extend([opcodes.OpClusterVerifyGroup(group_name=group,
1550
                                            ignore_errors=self.op.ignore_errors,
1551
                                            depends=depends_fn())]
1552
                for group in groups)
1553

    
1554
    # Fix up all parameters
1555
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1556
      op.debug_simulate_errors = self.op.debug_simulate_errors
1557
      op.verbose = self.op.verbose
1558
      op.error_codes = self.op.error_codes
1559
      try:
1560
        op.skip_checks = self.op.skip_checks
1561
      except AttributeError:
1562
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1563

    
1564
    return ResultWithJobs(jobs)
1565

    
1566

    
1567
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1568
  """Verifies the cluster config.
1569

1570
  """
1571
  REQ_BGL = True
1572

    
1573
  def _VerifyHVP(self, hvp_data):
1574
    """Verifies locally the syntax of the hypervisor parameters.
1575

1576
    """
1577
    for item, hv_name, hv_params in hvp_data:
1578
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1579
             (item, hv_name))
1580
      try:
1581
        hv_class = hypervisor.GetHypervisor(hv_name)
1582
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1583
        hv_class.CheckParameterSyntax(hv_params)
1584
      except errors.GenericError, err:
1585
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1586

    
1587
  def ExpandNames(self):
1588
    # Information can be safely retrieved as the BGL is acquired in exclusive
1589
    # mode
1590
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
1591
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1592
    self.all_node_info = self.cfg.GetAllNodesInfo()
1593
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1594
    self.needed_locks = {}
1595

    
1596
  def Exec(self, feedback_fn):
1597
    """Verify integrity of cluster, performing various test on nodes.
1598

1599
    """
1600
    self.bad = False
1601
    self._feedback_fn = feedback_fn
1602

    
1603
    feedback_fn("* Verifying cluster config")
1604

    
1605
    for msg in self.cfg.VerifyConfig():
1606
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1607

    
1608
    feedback_fn("* Verifying cluster certificate files")
1609

    
1610
    for cert_filename in constants.ALL_CERT_FILES:
1611
      (errcode, msg) = _VerifyCertificate(cert_filename)
1612
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1613

    
1614
    feedback_fn("* Verifying hypervisor parameters")
1615

    
1616
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1617
                                                self.all_inst_info.values()))
1618

    
1619
    feedback_fn("* Verifying all nodes belong to an existing group")
1620

    
1621
    # We do this verification here because, should this bogus circumstance
1622
    # occur, it would never be caught by VerifyGroup, which only acts on
1623
    # nodes/instances reachable from existing node groups.
1624

    
1625
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1626
                         if node.group not in self.all_group_info)
1627

    
1628
    dangling_instances = {}
1629
    no_node_instances = []
1630

    
1631
    for inst in self.all_inst_info.values():
1632
      if inst.primary_node in dangling_nodes:
1633
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1634
      elif inst.primary_node not in self.all_node_info:
1635
        no_node_instances.append(inst.name)
1636

    
1637
    pretty_dangling = [
1638
        "%s (%s)" %
1639
        (node.name,
1640
         utils.CommaJoin(dangling_instances.get(node.name,
1641
                                                ["no instances"])))
1642
        for node in dangling_nodes]
1643

    
1644
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1645
                  None,
1646
                  "the following nodes (and their instances) belong to a non"
1647
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1648

    
1649
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1650
                  None,
1651
                  "the following instances have a non-existing primary-node:"
1652
                  " %s", utils.CommaJoin(no_node_instances))
1653

    
1654
    return not self.bad
1655

    
1656

    
1657
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1658
  """Verifies the status of a node group.
1659

1660
  """
1661
  HPATH = "cluster-verify"
1662
  HTYPE = constants.HTYPE_CLUSTER
1663
  REQ_BGL = False
1664

    
1665
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1666

    
1667
  class NodeImage(object):
1668
    """A class representing the logical and physical status of a node.
1669

1670
    @type name: string
1671
    @ivar name: the node name to which this object refers
1672
    @ivar volumes: a structure as returned from
1673
        L{ganeti.backend.GetVolumeList} (runtime)
1674
    @ivar instances: a list of running instances (runtime)
1675
    @ivar pinst: list of configured primary instances (config)
1676
    @ivar sinst: list of configured secondary instances (config)
1677
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1678
        instances for which this node is secondary (config)
1679
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1680
    @ivar dfree: free disk, as reported by the node (runtime)
1681
    @ivar offline: the offline status (config)
1682
    @type rpc_fail: boolean
1683
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1684
        not whether the individual keys were correct) (runtime)
1685
    @type lvm_fail: boolean
1686
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1687
    @type hyp_fail: boolean
1688
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1689
    @type ghost: boolean
1690
    @ivar ghost: whether this is a known node or not (config)
1691
    @type os_fail: boolean
1692
    @ivar os_fail: whether the RPC call didn't return valid OS data
1693
    @type oslist: list
1694
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1695
    @type vm_capable: boolean
1696
    @ivar vm_capable: whether the node can host instances
1697

1698
    """
1699
    def __init__(self, offline=False, name=None, vm_capable=True):
1700
      self.name = name
1701
      self.volumes = {}
1702
      self.instances = []
1703
      self.pinst = []
1704
      self.sinst = []
1705
      self.sbp = {}
1706
      self.mfree = 0
1707
      self.dfree = 0
1708
      self.offline = offline
1709
      self.vm_capable = vm_capable
1710
      self.rpc_fail = False
1711
      self.lvm_fail = False
1712
      self.hyp_fail = False
1713
      self.ghost = False
1714
      self.os_fail = False
1715
      self.oslist = {}
1716

    
1717
  def ExpandNames(self):
1718
    # This raises errors.OpPrereqError on its own:
1719
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1720

    
1721
    # Get instances in node group; this is unsafe and needs verification later
1722
    inst_names = self.cfg.GetNodeGroupInstances(self.group_uuid)
1723

    
1724
    self.needed_locks = {
1725
      locking.LEVEL_INSTANCE: inst_names,
1726
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1727
      locking.LEVEL_NODE: [],
1728
      }
1729

    
1730
    self.share_locks = _ShareAll()
1731

    
1732
  def DeclareLocks(self, level):
1733
    if level == locking.LEVEL_NODE:
1734
      # Get members of node group; this is unsafe and needs verification later
1735
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1736

    
1737
      all_inst_info = self.cfg.GetAllInstancesInfo()
1738

    
1739
      # In Exec(), we warn about mirrored instances that have primary and
1740
      # secondary living in separate node groups. To fully verify that
1741
      # volumes for these instances are healthy, we will need to do an
1742
      # extra call to their secondaries. We ensure here those nodes will
1743
      # be locked.
1744
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1745
        # Important: access only the instances whose lock is owned
1746
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1747
          nodes.update(all_inst_info[inst].secondary_nodes)
1748

    
1749
      self.needed_locks[locking.LEVEL_NODE] = nodes
1750

    
1751
  def CheckPrereq(self):
1752
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1753
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1754

    
1755
    group_nodes = set(self.group_info.members)
1756
    group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
1757

    
1758
    unlocked_nodes = \
1759
        group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1760

    
1761
    unlocked_instances = \
1762
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1763

    
1764
    if unlocked_nodes:
1765
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1766
                                 utils.CommaJoin(unlocked_nodes))
1767

    
1768
    if unlocked_instances:
1769
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1770
                                 utils.CommaJoin(unlocked_instances))
1771

    
1772
    self.all_node_info = self.cfg.GetAllNodesInfo()
1773
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1774

    
1775
    self.my_node_names = utils.NiceSort(group_nodes)
1776
    self.my_inst_names = utils.NiceSort(group_instances)
1777

    
1778
    self.my_node_info = dict((name, self.all_node_info[name])
1779
                             for name in self.my_node_names)
1780

    
1781
    self.my_inst_info = dict((name, self.all_inst_info[name])
1782
                             for name in self.my_inst_names)
1783

    
1784
    # We detect here the nodes that will need the extra RPC calls for verifying
1785
    # split LV volumes; they should be locked.
1786
    extra_lv_nodes = set()
1787

    
1788
    for inst in self.my_inst_info.values():
1789
      if inst.disk_template in constants.DTS_INT_MIRROR:
1790
        group = self.my_node_info[inst.primary_node].group
1791
        for nname in inst.secondary_nodes:
1792
          if self.all_node_info[nname].group != group:
1793
            extra_lv_nodes.add(nname)
1794

    
1795
    unlocked_lv_nodes = \
1796
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1797

    
1798
    if unlocked_lv_nodes:
1799
      raise errors.OpPrereqError("these nodes could be locked: %s" %
1800
                                 utils.CommaJoin(unlocked_lv_nodes))
1801
    self.extra_lv_nodes = list(extra_lv_nodes)
1802

    
1803
  def _VerifyNode(self, ninfo, nresult):
1804
    """Perform some basic validation on data returned from a node.
1805

1806
      - check the result data structure is well formed and has all the
1807
        mandatory fields
1808
      - check ganeti version
1809

1810
    @type ninfo: L{objects.Node}
1811
    @param ninfo: the node to check
1812
    @param nresult: the results from the node
1813
    @rtype: boolean
1814
    @return: whether overall this call was successful (and we can expect
1815
         reasonable values in the respose)
1816

1817
    """
1818
    node = ninfo.name
1819
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1820

    
1821
    # main result, nresult should be a non-empty dict
1822
    test = not nresult or not isinstance(nresult, dict)
1823
    _ErrorIf(test, constants.CV_ENODERPC, node,
1824
                  "unable to verify node: no data returned")
1825
    if test:
1826
      return False
1827

    
1828
    # compares ganeti version
1829
    local_version = constants.PROTOCOL_VERSION
1830
    remote_version = nresult.get("version", None)
1831
    test = not (remote_version and
1832
                isinstance(remote_version, (list, tuple)) and
1833
                len(remote_version) == 2)
1834
    _ErrorIf(test, constants.CV_ENODERPC, node,
1835
             "connection to node returned invalid data")
1836
    if test:
1837
      return False
1838

    
1839
    test = local_version != remote_version[0]
1840
    _ErrorIf(test, constants.CV_ENODEVERSION, node,
1841
             "incompatible protocol versions: master %s,"
1842
             " node %s", local_version, remote_version[0])
1843
    if test:
1844
      return False
1845

    
1846
    # node seems compatible, we can actually try to look into its results
1847

    
1848
    # full package version
1849
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1850
                  constants.CV_ENODEVERSION, node,
1851
                  "software version mismatch: master %s, node %s",
1852
                  constants.RELEASE_VERSION, remote_version[1],
1853
                  code=self.ETYPE_WARNING)
1854

    
1855
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1856
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1857
      for hv_name, hv_result in hyp_result.iteritems():
1858
        test = hv_result is not None
1859
        _ErrorIf(test, constants.CV_ENODEHV, node,
1860
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1861

    
1862
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1863
    if ninfo.vm_capable and isinstance(hvp_result, list):
1864
      for item, hv_name, hv_result in hvp_result:
1865
        _ErrorIf(True, constants.CV_ENODEHV, node,
1866
                 "hypervisor %s parameter verify failure (source %s): %s",
1867
                 hv_name, item, hv_result)
1868

    
1869
    test = nresult.get(constants.NV_NODESETUP,
1870
                       ["Missing NODESETUP results"])
1871
    _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1872
             "; ".join(test))
1873

    
1874
    return True
1875

    
1876
  def _VerifyNodeTime(self, ninfo, nresult,
1877
                      nvinfo_starttime, nvinfo_endtime):
1878
    """Check the node time.
1879

1880
    @type ninfo: L{objects.Node}
1881
    @param ninfo: the node to check
1882
    @param nresult: the remote results for the node
1883
    @param nvinfo_starttime: the start time of the RPC call
1884
    @param nvinfo_endtime: the end time of the RPC call
1885

1886
    """
1887
    node = ninfo.name
1888
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1889

    
1890
    ntime = nresult.get(constants.NV_TIME, None)
1891
    try:
1892
      ntime_merged = utils.MergeTime(ntime)
1893
    except (ValueError, TypeError):
1894
      _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1895
      return
1896

    
1897
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1898
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1899
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1900
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1901
    else:
1902
      ntime_diff = None
1903

    
1904
    _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1905
             "Node time diverges by at least %s from master node time",
1906
             ntime_diff)
1907

    
1908
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1909
    """Check the node LVM results.
1910

1911
    @type ninfo: L{objects.Node}
1912
    @param ninfo: the node to check
1913
    @param nresult: the remote results for the node
1914
    @param vg_name: the configured VG name
1915

1916
    """
1917
    if vg_name is None:
1918
      return
1919

    
1920
    node = ninfo.name
1921
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1922

    
1923
    # checks vg existence and size > 20G
1924
    vglist = nresult.get(constants.NV_VGLIST, None)
1925
    test = not vglist
1926
    _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1927
    if not test:
1928
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1929
                                            constants.MIN_VG_SIZE)
1930
      _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1931

    
1932
    # check pv names
1933
    pvlist = nresult.get(constants.NV_PVLIST, None)
1934
    test = pvlist is None
1935
    _ErrorIf(test, constants.CV_ENODELVM, node, "Can't get PV list from node")
1936
    if not test:
1937
      # check that ':' is not present in PV names, since it's a
1938
      # special character for lvcreate (denotes the range of PEs to
1939
      # use on the PV)
1940
      for _, pvname, owner_vg in pvlist:
1941
        test = ":" in pvname
1942
        _ErrorIf(test, constants.CV_ENODELVM, node,
1943
                 "Invalid character ':' in PV '%s' of VG '%s'",
1944
                 pvname, owner_vg)
1945

    
1946
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1947
    """Check the node bridges.
1948

1949
    @type ninfo: L{objects.Node}
1950
    @param ninfo: the node to check
1951
    @param nresult: the remote results for the node
1952
    @param bridges: the expected list of bridges
1953

1954
    """
1955
    if not bridges:
1956
      return
1957

    
1958
    node = ninfo.name
1959
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1960

    
1961
    missing = nresult.get(constants.NV_BRIDGES, None)
1962
    test = not isinstance(missing, list)
1963
    _ErrorIf(test, constants.CV_ENODENET, node,
1964
             "did not return valid bridge information")
1965
    if not test:
1966
      _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1967
               "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1968

    
1969
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1970
    """Check the results of user scripts presence and executability on the node
1971

1972
    @type ninfo: L{objects.Node}
1973
    @param ninfo: the node to check
1974
    @param nresult: the remote results for the node
1975

1976
    """
1977
    node = ninfo.name
1978

    
1979
    test = not constants.NV_USERSCRIPTS in nresult
1980
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1981
                  "did not return user scripts information")
1982

    
1983
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1984
    if not test:
1985
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1986
                    "user scripts not present or not executable: %s" %
1987
                    utils.CommaJoin(sorted(broken_scripts)))
1988

    
1989
  def _VerifyNodeNetwork(self, ninfo, nresult):
1990
    """Check the node network connectivity results.
1991

1992
    @type ninfo: L{objects.Node}
1993
    @param ninfo: the node to check
1994
    @param nresult: the remote results for the node
1995

1996
    """
1997
    node = ninfo.name
1998
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1999

    
2000
    test = constants.NV_NODELIST not in nresult
2001
    _ErrorIf(test, constants.CV_ENODESSH, node,
2002
             "node hasn't returned node ssh connectivity data")
2003
    if not test:
2004
      if nresult[constants.NV_NODELIST]:
2005
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2006
          _ErrorIf(True, constants.CV_ENODESSH, node,
2007
                   "ssh communication with node '%s': %s", a_node, a_msg)
2008

    
2009
    test = constants.NV_NODENETTEST not in nresult
2010
    _ErrorIf(test, constants.CV_ENODENET, node,
2011
             "node hasn't returned node tcp connectivity data")
2012
    if not test:
2013
      if nresult[constants.NV_NODENETTEST]:
2014
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2015
        for anode in nlist:
2016
          _ErrorIf(True, constants.CV_ENODENET, node,
2017
                   "tcp communication with node '%s': %s",
2018
                   anode, nresult[constants.NV_NODENETTEST][anode])
2019

    
2020
    test = constants.NV_MASTERIP not in nresult
2021
    _ErrorIf(test, constants.CV_ENODENET, node,
2022
             "node hasn't returned node master IP reachability data")
2023
    if not test:
2024
      if not nresult[constants.NV_MASTERIP]:
2025
        if node == self.master_node:
2026
          msg = "the master node cannot reach the master IP (not configured?)"
2027
        else:
2028
          msg = "cannot reach the master IP"
2029
        _ErrorIf(True, constants.CV_ENODENET, node, msg)
2030

    
2031
  def _VerifyInstance(self, instance, instanceconfig, node_image,
2032
                      diskstatus):
2033
    """Verify an instance.
2034

2035
    This function checks to see if the required block devices are
2036
    available on the instance's node.
2037

2038
    """
2039
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2040
    node_current = instanceconfig.primary_node
2041

    
2042
    node_vol_should = {}
2043
    instanceconfig.MapLVsByNode(node_vol_should)
2044

    
2045
    for node in node_vol_should:
2046
      n_img = node_image[node]
2047
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2048
        # ignore missing volumes on offline or broken nodes
2049
        continue
2050
      for volume in node_vol_should[node]:
2051
        test = volume not in n_img.volumes
2052
        _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
2053
                 "volume %s missing on node %s", volume, node)
2054

    
2055
    if instanceconfig.admin_state == constants.ADMINST_UP:
2056
      pri_img = node_image[node_current]
2057
      test = instance not in pri_img.instances and not pri_img.offline
2058
      _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
2059
               "instance not running on its primary node %s",
2060
               node_current)
2061

    
2062
    diskdata = [(nname, success, status, idx)
2063
                for (nname, disks) in diskstatus.items()
2064
                for idx, (success, status) in enumerate(disks)]
2065

    
2066
    for nname, success, bdev_status, idx in diskdata:
2067
      # the 'ghost node' construction in Exec() ensures that we have a
2068
      # node here
2069
      snode = node_image[nname]
2070
      bad_snode = snode.ghost or snode.offline
2071
      _ErrorIf(instanceconfig.admin_state == constants.ADMINST_UP and
2072
               not success and not bad_snode,
2073
               constants.CV_EINSTANCEFAULTYDISK, instance,
2074
               "couldn't retrieve status for disk/%s on %s: %s",
2075
               idx, nname, bdev_status)
2076
      _ErrorIf((instanceconfig.admin_state == constants.ADMINST_UP and
2077
                success and bdev_status.ldisk_status == constants.LDS_FAULTY),
2078
               constants.CV_EINSTANCEFAULTYDISK, instance,
2079
               "disk/%s on %s is faulty", idx, nname)
2080

    
2081
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2082
    """Verify if there are any unknown volumes in the cluster.
2083

2084
    The .os, .swap and backup volumes are ignored. All other volumes are
2085
    reported as unknown.
2086

2087
    @type reserved: L{ganeti.utils.FieldSet}
2088
    @param reserved: a FieldSet of reserved volume names
2089

2090
    """
2091
    for node, n_img in node_image.items():
2092
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2093
        # skip non-healthy nodes
2094
        continue
2095
      for volume in n_img.volumes:
2096
        test = ((node not in node_vol_should or
2097
                volume not in node_vol_should[node]) and
2098
                not reserved.Matches(volume))
2099
        self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
2100
                      "volume %s is unknown", volume)
2101

    
2102
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
2103
    """Verify N+1 Memory Resilience.
2104

2105
    Check that if one single node dies we can still start all the
2106
    instances it was primary for.
2107

2108
    """
2109
    cluster_info = self.cfg.GetClusterInfo()
2110
    for node, n_img in node_image.items():
2111
      # This code checks that every node which is now listed as
2112
      # secondary has enough memory to host all instances it is
2113
      # supposed to should a single other node in the cluster fail.
2114
      # FIXME: not ready for failover to an arbitrary node
2115
      # FIXME: does not support file-backed instances
2116
      # WARNING: we currently take into account down instances as well
2117
      # as up ones, considering that even if they're down someone
2118
      # might want to start them even in the event of a node failure.
2119
      if n_img.offline:
2120
        # we're skipping offline nodes from the N+1 warning, since
2121
        # most likely we don't have good memory infromation from them;
2122
        # we already list instances living on such nodes, and that's
2123
        # enough warning
2124
        continue
2125
      for prinode, instances in n_img.sbp.items():
2126
        needed_mem = 0
2127
        for instance in instances:
2128
          bep = cluster_info.FillBE(instance_cfg[instance])
2129
          if bep[constants.BE_AUTO_BALANCE]:
2130
            needed_mem += bep[constants.BE_MEMORY]
2131
        test = n_img.mfree < needed_mem
2132
        self._ErrorIf(test, constants.CV_ENODEN1, node,
2133
                      "not enough memory to accomodate instance failovers"
2134
                      " should node %s fail (%dMiB needed, %dMiB available)",
2135
                      prinode, needed_mem, n_img.mfree)
2136

    
2137
  @classmethod
2138
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
2139
                   (files_all, files_opt, files_mc, files_vm)):
2140
    """Verifies file checksums collected from all nodes.
2141

2142
    @param errorif: Callback for reporting errors
2143
    @param nodeinfo: List of L{objects.Node} objects
2144
    @param master_node: Name of master node
2145
    @param all_nvinfo: RPC results
2146

2147
    """
2148
    # Define functions determining which nodes to consider for a file
2149
    files2nodefn = [
2150
      (files_all, None),
2151
      (files_mc, lambda node: (node.master_candidate or
2152
                               node.name == master_node)),
2153
      (files_vm, lambda node: node.vm_capable),
2154
      ]
2155

    
2156
    # Build mapping from filename to list of nodes which should have the file
2157
    nodefiles = {}
2158
    for (files, fn) in files2nodefn:
2159
      if fn is None:
2160
        filenodes = nodeinfo
2161
      else:
2162
        filenodes = filter(fn, nodeinfo)
2163
      nodefiles.update((filename,
2164
                        frozenset(map(operator.attrgetter("name"), filenodes)))
2165
                       for filename in files)
2166

    
2167
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2168

    
2169
    fileinfo = dict((filename, {}) for filename in nodefiles)
2170
    ignore_nodes = set()
2171

    
2172
    for node in nodeinfo:
2173
      if node.offline:
2174
        ignore_nodes.add(node.name)
2175
        continue
2176

    
2177
      nresult = all_nvinfo[node.name]
2178

    
2179
      if nresult.fail_msg or not nresult.payload:
2180
        node_files = None
2181
      else:
2182
        node_files = nresult.payload.get(constants.NV_FILELIST, None)
2183

    
2184
      test = not (node_files and isinstance(node_files, dict))
2185
      errorif(test, constants.CV_ENODEFILECHECK, node.name,
2186
              "Node did not return file checksum data")
2187
      if test:
2188
        ignore_nodes.add(node.name)
2189
        continue
2190

    
2191
      # Build per-checksum mapping from filename to nodes having it
2192
      for (filename, checksum) in node_files.items():
2193
        assert filename in nodefiles
2194
        fileinfo[filename].setdefault(checksum, set()).add(node.name)
2195

    
2196
    for (filename, checksums) in fileinfo.items():
2197
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2198

    
2199
      # Nodes having the file
2200
      with_file = frozenset(node_name
2201
                            for nodes in fileinfo[filename].values()
2202
                            for node_name in nodes) - ignore_nodes
2203

    
2204
      expected_nodes = nodefiles[filename] - ignore_nodes
2205

    
2206
      # Nodes missing file
2207
      missing_file = expected_nodes - with_file
2208

    
2209
      if filename in files_opt:
2210
        # All or no nodes
2211
        errorif(missing_file and missing_file != expected_nodes,
2212
                constants.CV_ECLUSTERFILECHECK, None,
2213
                "File %s is optional, but it must exist on all or no"
2214
                " nodes (not found on %s)",
2215
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2216
      else:
2217
        errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2218
                "File %s is missing from node(s) %s", filename,
2219
                utils.CommaJoin(utils.NiceSort(missing_file)))
2220

    
2221
        # Warn if a node has a file it shouldn't
2222
        unexpected = with_file - expected_nodes
2223
        errorif(unexpected,
2224
                constants.CV_ECLUSTERFILECHECK, None,
2225
                "File %s should not exist on node(s) %s",
2226
                filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2227

    
2228
      # See if there are multiple versions of the file
2229
      test = len(checksums) > 1
2230
      if test:
2231
        variants = ["variant %s on %s" %
2232
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2233
                    for (idx, (checksum, nodes)) in
2234
                      enumerate(sorted(checksums.items()))]
2235
      else:
2236
        variants = []
2237

    
2238
      errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2239
              "File %s found with %s different checksums (%s)",
2240
              filename, len(checksums), "; ".join(variants))
2241

    
2242
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2243
                      drbd_map):
2244
    """Verifies and the node DRBD status.
2245

2246
    @type ninfo: L{objects.Node}
2247
    @param ninfo: the node to check
2248
    @param nresult: the remote results for the node
2249
    @param instanceinfo: the dict of instances
2250
    @param drbd_helper: the configured DRBD usermode helper
2251
    @param drbd_map: the DRBD map as returned by
2252
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2253

2254
    """
2255
    node = ninfo.name
2256
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2257

    
2258
    if drbd_helper:
2259
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2260
      test = (helper_result == None)
2261
      _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2262
               "no drbd usermode helper returned")
2263
      if helper_result:
2264
        status, payload = helper_result
2265
        test = not status
2266
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2267
                 "drbd usermode helper check unsuccessful: %s", payload)
2268
        test = status and (payload != drbd_helper)
2269
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2270
                 "wrong drbd usermode helper: %s", payload)
2271

    
2272
    # compute the DRBD minors
2273
    node_drbd = {}
2274
    for minor, instance in drbd_map[node].items():
2275
      test = instance not in instanceinfo
2276
      _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2277
               "ghost instance '%s' in temporary DRBD map", instance)
2278
        # ghost instance should not be running, but otherwise we
2279
        # don't give double warnings (both ghost instance and
2280
        # unallocated minor in use)
2281
      if test:
2282
        node_drbd[minor] = (instance, False)
2283
      else:
2284
        instance = instanceinfo[instance]
2285
        node_drbd[minor] = (instance.name,
2286
                            instance.admin_state == constants.ADMINST_UP)
2287

    
2288
    # and now check them
2289
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2290
    test = not isinstance(used_minors, (tuple, list))
2291
    _ErrorIf(test, constants.CV_ENODEDRBD, node,
2292
             "cannot parse drbd status file: %s", str(used_minors))
2293
    if test:
2294
      # we cannot check drbd status
2295
      return
2296

    
2297
    for minor, (iname, must_exist) in node_drbd.items():
2298
      test = minor not in used_minors and must_exist
2299
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2300
               "drbd minor %d of instance %s is not active", minor, iname)
2301
    for minor in used_minors:
2302
      test = minor not in node_drbd
2303
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2304
               "unallocated drbd minor %d is in use", minor)
2305

    
2306
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2307
    """Builds the node OS structures.
2308

2309
    @type ninfo: L{objects.Node}
2310
    @param ninfo: the node to check
2311
    @param nresult: the remote results for the node
2312
    @param nimg: the node image object
2313

2314
    """
2315
    node = ninfo.name
2316
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2317

    
2318
    remote_os = nresult.get(constants.NV_OSLIST, None)
2319
    test = (not isinstance(remote_os, list) or
2320
            not compat.all(isinstance(v, list) and len(v) == 7
2321
                           for v in remote_os))
2322

    
2323
    _ErrorIf(test, constants.CV_ENODEOS, node,
2324
             "node hasn't returned valid OS data")
2325

    
2326
    nimg.os_fail = test
2327

    
2328
    if test:
2329
      return
2330

    
2331
    os_dict = {}
2332

    
2333
    for (name, os_path, status, diagnose,
2334
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2335

    
2336
      if name not in os_dict:
2337
        os_dict[name] = []
2338

    
2339
      # parameters is a list of lists instead of list of tuples due to
2340
      # JSON lacking a real tuple type, fix it:
2341
      parameters = [tuple(v) for v in parameters]
2342
      os_dict[name].append((os_path, status, diagnose,
2343
                            set(variants), set(parameters), set(api_ver)))
2344

    
2345
    nimg.oslist = os_dict
2346

    
2347
  def _VerifyNodeOS(self, ninfo, nimg, base):
2348
    """Verifies the node OS list.
2349

2350
    @type ninfo: L{objects.Node}
2351
    @param ninfo: the node to check
2352
    @param nimg: the node image object
2353
    @param base: the 'template' node we match against (e.g. from the master)
2354

2355
    """
2356
    node = ninfo.name
2357
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2358

    
2359
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2360

    
2361
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2362
    for os_name, os_data in nimg.oslist.items():
2363
      assert os_data, "Empty OS status for OS %s?!" % os_name
2364
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2365
      _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2366
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2367
      _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2368
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2369
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2370
      # comparisons with the 'base' image
2371
      test = os_name not in base.oslist
2372
      _ErrorIf(test, constants.CV_ENODEOS, node,
2373
               "Extra OS %s not present on reference node (%s)",
2374
               os_name, base.name)
2375
      if test:
2376
        continue
2377
      assert base.oslist[os_name], "Base node has empty OS status?"
2378
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2379
      if not b_status:
2380
        # base OS is invalid, skipping
2381
        continue
2382
      for kind, a, b in [("API version", f_api, b_api),
2383
                         ("variants list", f_var, b_var),
2384
                         ("parameters", beautify_params(f_param),
2385
                          beautify_params(b_param))]:
2386
        _ErrorIf(a != b, constants.CV_ENODEOS, node,
2387
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2388
                 kind, os_name, base.name,
2389
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2390

    
2391
    # check any missing OSes
2392
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2393
    _ErrorIf(missing, constants.CV_ENODEOS, node,
2394
             "OSes present on reference node %s but missing on this node: %s",
2395
             base.name, utils.CommaJoin(missing))
2396

    
2397
  def _VerifyOob(self, ninfo, nresult):
2398
    """Verifies out of band functionality of a node.
2399

2400
    @type ninfo: L{objects.Node}
2401
    @param ninfo: the node to check
2402
    @param nresult: the remote results for the node
2403

2404
    """
2405
    node = ninfo.name
2406
    # We just have to verify the paths on master and/or master candidates
2407
    # as the oob helper is invoked on the master
2408
    if ((ninfo.master_candidate or ninfo.master_capable) and
2409
        constants.NV_OOB_PATHS in nresult):
2410
      for path_result in nresult[constants.NV_OOB_PATHS]:
2411
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2412

    
2413
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2414
    """Verifies and updates the node volume data.
2415

2416
    This function will update a L{NodeImage}'s internal structures
2417
    with data from the remote call.
2418

2419
    @type ninfo: L{objects.Node}
2420
    @param ninfo: the node to check
2421
    @param nresult: the remote results for the node
2422
    @param nimg: the node image object
2423
    @param vg_name: the configured VG name
2424

2425
    """
2426
    node = ninfo.name
2427
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2428

    
2429
    nimg.lvm_fail = True
2430
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2431
    if vg_name is None:
2432
      pass
2433
    elif isinstance(lvdata, basestring):
2434
      _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2435
               utils.SafeEncode(lvdata))
2436
    elif not isinstance(lvdata, dict):
2437
      _ErrorIf(True, constants.CV_ENODELVM, node,
2438
               "rpc call to node failed (lvlist)")
2439
    else:
2440
      nimg.volumes = lvdata
2441
      nimg.lvm_fail = False
2442

    
2443
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2444
    """Verifies and updates the node instance list.
2445

2446
    If the listing was successful, then updates this node's instance
2447
    list. Otherwise, it marks the RPC call as failed for the instance
2448
    list key.
2449

2450
    @type ninfo: L{objects.Node}
2451
    @param ninfo: the node to check
2452
    @param nresult: the remote results for the node
2453
    @param nimg: the node image object
2454

2455
    """
2456
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2457
    test = not isinstance(idata, list)
2458
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2459
                  "rpc call to node failed (instancelist): %s",
2460
                  utils.SafeEncode(str(idata)))
2461
    if test:
2462
      nimg.hyp_fail = True
2463
    else:
2464
      nimg.instances = idata
2465

    
2466
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2467
    """Verifies and computes a node information map
2468

2469
    @type ninfo: L{objects.Node}
2470
    @param ninfo: the node to check
2471
    @param nresult: the remote results for the node
2472
    @param nimg: the node image object
2473
    @param vg_name: the configured VG name
2474

2475
    """
2476
    node = ninfo.name
2477
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2478

    
2479
    # try to read free memory (from the hypervisor)
2480
    hv_info = nresult.get(constants.NV_HVINFO, None)
2481
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2482
    _ErrorIf(test, constants.CV_ENODEHV, node,
2483
             "rpc call to node failed (hvinfo)")
2484
    if not test:
2485
      try:
2486
        nimg.mfree = int(hv_info["memory_free"])
2487
      except (ValueError, TypeError):
2488
        _ErrorIf(True, constants.CV_ENODERPC, node,
2489
                 "node returned invalid nodeinfo, check hypervisor")
2490

    
2491
    # FIXME: devise a free space model for file based instances as well
2492
    if vg_name is not None:
2493
      test = (constants.NV_VGLIST not in nresult or
2494
              vg_name not in nresult[constants.NV_VGLIST])
2495
      _ErrorIf(test, constants.CV_ENODELVM, node,
2496
               "node didn't return data for the volume group '%s'"
2497
               " - it is either missing or broken", vg_name)
2498
      if not test:
2499
        try:
2500
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2501
        except (ValueError, TypeError):
2502
          _ErrorIf(True, constants.CV_ENODERPC, node,
2503
                   "node returned invalid LVM info, check LVM status")
2504

    
2505
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2506
    """Gets per-disk status information for all instances.
2507

2508
    @type nodelist: list of strings
2509
    @param nodelist: Node names
2510
    @type node_image: dict of (name, L{objects.Node})
2511
    @param node_image: Node objects
2512
    @type instanceinfo: dict of (name, L{objects.Instance})
2513
    @param instanceinfo: Instance objects
2514
    @rtype: {instance: {node: [(succes, payload)]}}
2515
    @return: a dictionary of per-instance dictionaries with nodes as
2516
        keys and disk information as values; the disk information is a
2517
        list of tuples (success, payload)
2518

2519
    """
2520
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2521

    
2522
    node_disks = {}
2523
    node_disks_devonly = {}
2524
    diskless_instances = set()
2525
    diskless = constants.DT_DISKLESS
2526

    
2527
    for nname in nodelist:
2528
      node_instances = list(itertools.chain(node_image[nname].pinst,
2529
                                            node_image[nname].sinst))
2530
      diskless_instances.update(inst for inst in node_instances
2531
                                if instanceinfo[inst].disk_template == diskless)
2532
      disks = [(inst, disk)
2533
               for inst in node_instances
2534
               for disk in instanceinfo[inst].disks]
2535

    
2536
      if not disks:
2537
        # No need to collect data
2538
        continue
2539

    
2540
      node_disks[nname] = disks
2541

    
2542
      # Creating copies as SetDiskID below will modify the objects and that can
2543
      # lead to incorrect data returned from nodes
2544
      devonly = [dev.Copy() for (_, dev) in disks]
2545

    
2546
      for dev in devonly:
2547
        self.cfg.SetDiskID(dev, nname)
2548

    
2549
      node_disks_devonly[nname] = devonly
2550

    
2551
    assert len(node_disks) == len(node_disks_devonly)
2552

    
2553
    # Collect data from all nodes with disks
2554
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2555
                                                          node_disks_devonly)
2556

    
2557
    assert len(result) == len(node_disks)
2558

    
2559
    instdisk = {}
2560

    
2561
    for (nname, nres) in result.items():
2562
      disks = node_disks[nname]
2563

    
2564
      if nres.offline:
2565
        # No data from this node
2566
        data = len(disks) * [(False, "node offline")]
2567
      else:
2568
        msg = nres.fail_msg
2569
        _ErrorIf(msg, constants.CV_ENODERPC, nname,
2570
                 "while getting disk information: %s", msg)
2571
        if msg:
2572
          # No data from this node
2573
          data = len(disks) * [(False, msg)]
2574
        else:
2575
          data = []
2576
          for idx, i in enumerate(nres.payload):
2577
            if isinstance(i, (tuple, list)) and len(i) == 2:
2578
              data.append(i)
2579
            else:
2580
              logging.warning("Invalid result from node %s, entry %d: %s",
2581
                              nname, idx, i)
2582
              data.append((False, "Invalid result from the remote node"))
2583

    
2584
      for ((inst, _), status) in zip(disks, data):
2585
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2586

    
2587
    # Add empty entries for diskless instances.
2588
    for inst in diskless_instances:
2589
      assert inst not in instdisk
2590
      instdisk[inst] = {}
2591

    
2592
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2593
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2594
                      compat.all(isinstance(s, (tuple, list)) and
2595
                                 len(s) == 2 for s in statuses)
2596
                      for inst, nnames in instdisk.items()
2597
                      for nname, statuses in nnames.items())
2598
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2599

    
2600
    return instdisk
2601

    
2602
  @staticmethod
2603
  def _SshNodeSelector(group_uuid, all_nodes):
2604
    """Create endless iterators for all potential SSH check hosts.
2605

2606
    """
2607
    nodes = [node for node in all_nodes
2608
             if (node.group != group_uuid and
2609
                 not node.offline)]
2610
    keyfunc = operator.attrgetter("group")
2611

    
2612
    return map(itertools.cycle,
2613
               [sorted(map(operator.attrgetter("name"), names))
2614
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2615
                                                  keyfunc)])
2616

    
2617
  @classmethod
2618
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2619
    """Choose which nodes should talk to which other nodes.
2620

2621
    We will make nodes contact all nodes in their group, and one node from
2622
    every other group.
2623

2624
    @warning: This algorithm has a known issue if one node group is much
2625
      smaller than others (e.g. just one node). In such a case all other
2626
      nodes will talk to the single node.
2627

2628
    """
2629
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2630
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2631

    
2632
    return (online_nodes,
2633
            dict((name, sorted([i.next() for i in sel]))
2634
                 for name in online_nodes))
2635

    
2636
  def BuildHooksEnv(self):
2637
    """Build hooks env.
2638

2639
    Cluster-Verify hooks just ran in the post phase and their failure makes
2640
    the output be logged in the verify output and the verification to fail.
2641

2642
    """
2643
    env = {
2644
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2645
      }
2646

    
2647
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2648
               for node in self.my_node_info.values())
2649

    
2650
    return env
2651

    
2652
  def BuildHooksNodes(self):
2653
    """Build hooks nodes.
2654

2655
    """
2656
    return ([], self.my_node_names)
2657

    
2658
  def Exec(self, feedback_fn):
2659
    """Verify integrity of the node group, performing various test on nodes.
2660

2661
    """
2662
    # This method has too many local variables. pylint: disable=R0914
2663
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2664

    
2665
    if not self.my_node_names:
2666
      # empty node group
2667
      feedback_fn("* Empty node group, skipping verification")
2668
      return True
2669

    
2670
    self.bad = False
2671
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2672
    verbose = self.op.verbose
2673
    self._feedback_fn = feedback_fn
2674

    
2675
    vg_name = self.cfg.GetVGName()
2676
    drbd_helper = self.cfg.GetDRBDHelper()
2677
    cluster = self.cfg.GetClusterInfo()
2678
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2679
    hypervisors = cluster.enabled_hypervisors
2680
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2681

    
2682
    i_non_redundant = [] # Non redundant instances
2683
    i_non_a_balanced = [] # Non auto-balanced instances
2684
    i_offline = 0 # Count of offline instances
2685
    n_offline = 0 # Count of offline nodes
2686
    n_drained = 0 # Count of nodes being drained
2687
    node_vol_should = {}
2688

    
2689
    # FIXME: verify OS list
2690

    
2691
    # File verification
2692
    filemap = _ComputeAncillaryFiles(cluster, False)
2693

    
2694
    # do local checksums
2695
    master_node = self.master_node = self.cfg.GetMasterNode()
2696
    master_ip = self.cfg.GetMasterIP()
2697

    
2698
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2699

    
2700
    user_scripts = []
2701
    if self.cfg.GetUseExternalMipScript():
2702
      user_scripts.append(constants.EXTERNAL_MASTER_SETUP_SCRIPT)
2703

    
2704
    node_verify_param = {
2705
      constants.NV_FILELIST:
2706
        utils.UniqueSequence(filename
2707
                             for files in filemap
2708
                             for filename in files),
2709
      constants.NV_NODELIST:
2710
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2711
                                  self.all_node_info.values()),
2712
      constants.NV_HYPERVISOR: hypervisors,
2713
      constants.NV_HVPARAMS:
2714
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2715
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2716
                                 for node in node_data_list
2717
                                 if not node.offline],
2718
      constants.NV_INSTANCELIST: hypervisors,
2719
      constants.NV_VERSION: None,
2720
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2721
      constants.NV_NODESETUP: None,
2722
      constants.NV_TIME: None,
2723
      constants.NV_MASTERIP: (master_node, master_ip),
2724
      constants.NV_OSLIST: None,
2725
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2726
      constants.NV_USERSCRIPTS: user_scripts,
2727
      }
2728

    
2729
    if vg_name is not None:
2730
      node_verify_param[constants.NV_VGLIST] = None
2731
      node_verify_param[constants.NV_LVLIST] = vg_name
2732
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2733
      node_verify_param[constants.NV_DRBDLIST] = None
2734

    
2735
    if drbd_helper:
2736
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2737

    
2738
    # bridge checks
2739
    # FIXME: this needs to be changed per node-group, not cluster-wide
2740
    bridges = set()
2741
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2742
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2743
      bridges.add(default_nicpp[constants.NIC_LINK])
2744
    for instance in self.my_inst_info.values():
2745
      for nic in instance.nics:
2746
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2747
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2748
          bridges.add(full_nic[constants.NIC_LINK])
2749

    
2750
    if bridges:
2751
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2752

    
2753
    # Build our expected cluster state
2754
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2755
                                                 name=node.name,
2756
                                                 vm_capable=node.vm_capable))
2757
                      for node in node_data_list)
2758

    
2759
    # Gather OOB paths
2760
    oob_paths = []
2761
    for node in self.all_node_info.values():
2762
      path = _SupportsOob(self.cfg, node)
2763
      if path and path not in oob_paths:
2764
        oob_paths.append(path)
2765

    
2766
    if oob_paths:
2767
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2768

    
2769
    for instance in self.my_inst_names:
2770
      inst_config = self.my_inst_info[instance]
2771

    
2772
      for nname in inst_config.all_nodes:
2773
        if nname not in node_image:
2774
          gnode = self.NodeImage(name=nname)
2775
          gnode.ghost = (nname not in self.all_node_info)
2776
          node_image[nname] = gnode
2777

    
2778
      inst_config.MapLVsByNode(node_vol_should)
2779

    
2780
      pnode = inst_config.primary_node
2781
      node_image[pnode].pinst.append(instance)
2782

    
2783
      for snode in inst_config.secondary_nodes:
2784
        nimg = node_image[snode]
2785
        nimg.sinst.append(instance)
2786
        if pnode not in nimg.sbp:
2787
          nimg.sbp[pnode] = []
2788
        nimg.sbp[pnode].append(instance)
2789

    
2790
    # At this point, we have the in-memory data structures complete,
2791
    # except for the runtime information, which we'll gather next
2792

    
2793
    # Due to the way our RPC system works, exact response times cannot be
2794
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2795
    # time before and after executing the request, we can at least have a time
2796
    # window.
2797
    nvinfo_starttime = time.time()
2798
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2799
                                           node_verify_param,
2800
                                           self.cfg.GetClusterName())
2801
    nvinfo_endtime = time.time()
2802

    
2803
    if self.extra_lv_nodes and vg_name is not None:
2804
      extra_lv_nvinfo = \
2805
          self.rpc.call_node_verify(self.extra_lv_nodes,
2806
                                    {constants.NV_LVLIST: vg_name},
2807
                                    self.cfg.GetClusterName())
2808
    else:
2809
      extra_lv_nvinfo = {}
2810

    
2811
    all_drbd_map = self.cfg.ComputeDRBDMap()
2812

    
2813
    feedback_fn("* Gathering disk information (%s nodes)" %
2814
                len(self.my_node_names))
2815
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2816
                                     self.my_inst_info)
2817

    
2818
    feedback_fn("* Verifying configuration file consistency")
2819

    
2820
    # If not all nodes are being checked, we need to make sure the master node
2821
    # and a non-checked vm_capable node are in the list.
2822
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2823
    if absent_nodes:
2824
      vf_nvinfo = all_nvinfo.copy()
2825
      vf_node_info = list(self.my_node_info.values())
2826
      additional_nodes = []
2827
      if master_node not in self.my_node_info:
2828
        additional_nodes.append(master_node)
2829
        vf_node_info.append(self.all_node_info[master_node])
2830
      # Add the first vm_capable node we find which is not included
2831
      for node in absent_nodes:
2832
        nodeinfo = self.all_node_info[node]
2833
        if nodeinfo.vm_capable and not nodeinfo.offline:
2834
          additional_nodes.append(node)
2835
          vf_node_info.append(self.all_node_info[node])
2836
          break
2837
      key = constants.NV_FILELIST
2838
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2839
                                                 {key: node_verify_param[key]},
2840
                                                 self.cfg.GetClusterName()))
2841
    else:
2842
      vf_nvinfo = all_nvinfo
2843
      vf_node_info = self.my_node_info.values()
2844

    
2845
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2846

    
2847
    feedback_fn("* Verifying node status")
2848

    
2849
    refos_img = None
2850

    
2851
    for node_i in node_data_list:
2852
      node = node_i.name
2853
      nimg = node_image[node]
2854

    
2855
      if node_i.offline:
2856
        if verbose:
2857
          feedback_fn("* Skipping offline node %s" % (node,))
2858
        n_offline += 1
2859
        continue
2860

    
2861
      if node == master_node:
2862
        ntype = "master"
2863
      elif node_i.master_candidate:
2864
        ntype = "master candidate"
2865
      elif node_i.drained:
2866
        ntype = "drained"
2867
        n_drained += 1
2868
      else:
2869
        ntype = "regular"
2870
      if verbose:
2871
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2872

    
2873
      msg = all_nvinfo[node].fail_msg
2874
      _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2875
               msg)
2876
      if msg:
2877
        nimg.rpc_fail = True
2878
        continue
2879

    
2880
      nresult = all_nvinfo[node].payload
2881

    
2882
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2883
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2884
      self._VerifyNodeNetwork(node_i, nresult)
2885
      self._VerifyNodeUserScripts(node_i, nresult)
2886
      self._VerifyOob(node_i, nresult)
2887

    
2888
      if nimg.vm_capable:
2889
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2890
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2891
                             all_drbd_map)
2892

    
2893
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2894
        self._UpdateNodeInstances(node_i, nresult, nimg)
2895
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2896
        self._UpdateNodeOS(node_i, nresult, nimg)
2897

    
2898
        if not nimg.os_fail:
2899
          if refos_img is None:
2900
            refos_img = nimg
2901
          self._VerifyNodeOS(node_i, nimg, refos_img)
2902
        self._VerifyNodeBridges(node_i, nresult, bridges)
2903

    
2904
        # Check whether all running instancies are primary for the node. (This
2905
        # can no longer be done from _VerifyInstance below, since some of the
2906
        # wrong instances could be from other node groups.)
2907
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2908

    
2909
        for inst in non_primary_inst:
2910
          # FIXME: investigate best way to handle offline insts
2911
          if inst.admin_state == constants.ADMINST_OFFLINE:
2912
            if verbose:
2913
              feedback_fn("* Skipping offline instance %s" % inst.name)
2914
            i_offline += 1
2915
            continue
2916
          test = inst in self.all_inst_info
2917
          _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2918
                   "instance should not run on node %s", node_i.name)
2919
          _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2920
                   "node is running unknown instance %s", inst)
2921

    
2922
    for node, result in extra_lv_nvinfo.items():
2923
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2924
                              node_image[node], vg_name)
2925

    
2926
    feedback_fn("* Verifying instance status")
2927
    for instance in self.my_inst_names:
2928
      if verbose:
2929
        feedback_fn("* Verifying instance %s" % instance)
2930
      inst_config = self.my_inst_info[instance]
2931
      self._VerifyInstance(instance, inst_config, node_image,
2932
                           instdisk[instance])
2933
      inst_nodes_offline = []
2934

    
2935
      pnode = inst_config.primary_node
2936
      pnode_img = node_image[pnode]
2937
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2938
               constants.CV_ENODERPC, pnode, "instance %s, connection to"
2939
               " primary node failed", instance)
2940

    
2941
      _ErrorIf(inst_config.admin_state == constants.ADMINST_UP and
2942
               pnode_img.offline,
2943
               constants.CV_EINSTANCEBADNODE, instance,
2944
               "instance is marked as running and lives on offline node %s",
2945
               inst_config.primary_node)
2946

    
2947
      # If the instance is non-redundant we cannot survive losing its primary
2948
      # node, so we are not N+1 compliant. On the other hand we have no disk
2949
      # templates with more than one secondary so that situation is not well
2950
      # supported either.
2951
      # FIXME: does not support file-backed instances
2952
      if not inst_config.secondary_nodes:
2953
        i_non_redundant.append(instance)
2954

    
2955
      _ErrorIf(len(inst_config.secondary_nodes) > 1,
2956
               constants.CV_EINSTANCELAYOUT,
2957
               instance, "instance has multiple secondary nodes: %s",
2958
               utils.CommaJoin(inst_config.secondary_nodes),
2959
               code=self.ETYPE_WARNING)
2960

    
2961
      if inst_config.disk_template in constants.DTS_INT_MIRROR:
2962
        pnode = inst_config.primary_node
2963
        instance_nodes = utils.NiceSort(inst_config.all_nodes)
2964
        instance_groups = {}
2965

    
2966
        for node in instance_nodes:
2967
          instance_groups.setdefault(self.all_node_info[node].group,
2968
                                     []).append(node)
2969

    
2970
        pretty_list = [
2971
          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
2972
          # Sort so that we always list the primary node first.
2973
          for group, nodes in sorted(instance_groups.items(),
2974
                                     key=lambda (_, nodes): pnode in nodes,
2975
                                     reverse=True)]
2976

    
2977
        self._ErrorIf(len(instance_groups) > 1,
2978
                      constants.CV_EINSTANCESPLITGROUPS,
2979
                      instance, "instance has primary and secondary nodes in"
2980
                      " different groups: %s", utils.CommaJoin(pretty_list),
2981
                      code=self.ETYPE_WARNING)
2982

    
2983
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2984
        i_non_a_balanced.append(instance)
2985

    
2986
      for snode in inst_config.secondary_nodes:
2987
        s_img = node_image[snode]
2988
        _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2989
                 snode, "instance %s, connection to secondary node failed",
2990
                 instance)
2991

    
2992
        if s_img.offline:
2993
          inst_nodes_offline.append(snode)
2994

    
2995
      # warn that the instance lives on offline nodes
2996
      _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
2997
               "instance has offline secondary node(s) %s",
2998
               utils.CommaJoin(inst_nodes_offline))
2999
      # ... or ghost/non-vm_capable nodes
3000
      for node in inst_config.all_nodes:
3001
        _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
3002
                 instance, "instance lives on ghost node %s", node)
3003
        _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
3004
                 instance, "instance lives on non-vm_capable node %s", node)
3005

    
3006
    feedback_fn("* Verifying orphan volumes")
3007
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3008

    
3009
    # We will get spurious "unknown volume" warnings if any node of this group
3010
    # is secondary for an instance whose primary is in another group. To avoid
3011
    # them, we find these instances and add their volumes to node_vol_should.
3012
    for inst in self.all_inst_info.values():
3013
      for secondary in inst.secondary_nodes:
3014
        if (secondary in self.my_node_info
3015
            and inst.name not in self.my_inst_info):
3016
          inst.MapLVsByNode(node_vol_should)
3017
          break
3018

    
3019
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3020

    
3021
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3022
      feedback_fn("* Verifying N+1 Memory redundancy")
3023
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3024

    
3025
    feedback_fn("* Other Notes")
3026
    if i_non_redundant:
3027
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3028
                  % len(i_non_redundant))
3029

    
3030
    if i_non_a_balanced:
3031
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3032
                  % len(i_non_a_balanced))
3033

    
3034
    if i_offline:
3035
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3036

    
3037
    if n_offline:
3038
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3039

    
3040
    if n_drained:
3041
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3042

    
3043
    return not self.bad
3044

    
3045
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3046
    """Analyze the post-hooks' result
3047

3048
    This method analyses the hook result, handles it, and sends some
3049
    nicely-formatted feedback back to the user.
3050

3051
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3052
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3053
    @param hooks_results: the results of the multi-node hooks rpc call
3054
    @param feedback_fn: function used send feedback back to the caller
3055
    @param lu_result: previous Exec result
3056
    @return: the new Exec result, based on the previous result
3057
        and hook results
3058

3059
    """
3060
    # We only really run POST phase hooks, only for non-empty groups,
3061
    # and are only interested in their results
3062
    if not self.my_node_names:
3063
      # empty node group
3064
      pass
3065
    elif phase == constants.HOOKS_PHASE_POST:
3066
      # Used to change hooks' output to proper indentation
3067
      feedback_fn("* Hooks Results")
3068
      assert hooks_results, "invalid result from hooks"
3069

    
3070
      for node_name in hooks_results:
3071
        res = hooks_results[node_name]
3072
        msg = res.fail_msg
3073
        test = msg and not res.offline
3074
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3075
                      "Communication failure in hooks execution: %s", msg)
3076
        if res.offline or msg:
3077
          # No need to investigate payload if node is offline or gave
3078
          # an error.
3079
          continue
3080
        for script, hkr, output in res.payload:
3081
          test = hkr == constants.HKR_FAIL
3082
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3083
                        "Script %s failed, output:", script)
3084
          if test:
3085
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3086
            feedback_fn("%s" % output)
3087
            lu_result = False
3088

    
3089
    return lu_result
3090

    
3091

    
3092
class LUClusterVerifyDisks(NoHooksLU):
3093
  """Verifies the cluster disks status.
3094

3095
  """
3096
  REQ_BGL = False
3097

    
3098
  def ExpandNames(self):
3099
    self.share_locks = _ShareAll()
3100
    self.needed_locks = {
3101
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3102
      }
3103

    
3104
  def Exec(self, feedback_fn):
3105
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3106

    
3107
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3108
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3109
                           for group in group_names])
3110

    
3111

    
3112
class LUGroupVerifyDisks(NoHooksLU):
3113
  """Verifies the status of all disks in a node group.
3114

3115
  """
3116
  REQ_BGL = False
3117

    
3118
  def ExpandNames(self):
3119
    # Raises errors.OpPrereqError on its own if group can't be found
3120
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
3121

    
3122
    self.share_locks = _ShareAll()
3123
    self.needed_locks = {
3124
      locking.LEVEL_INSTANCE: [],
3125
      locking.LEVEL_NODEGROUP: [],
3126
      locking.LEVEL_NODE: [],
3127
      }
3128

    
3129
  def DeclareLocks(self, level):
3130
    if level == locking.LEVEL_INSTANCE:
3131
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
3132

    
3133
      # Lock instances optimistically, needs verification once node and group
3134
      # locks have been acquired
3135
      self.needed_locks[locking.LEVEL_INSTANCE] = \
3136
        self.cfg.GetNodeGroupInstances(self.group_uuid)
3137

    
3138
    elif level == locking.LEVEL_NODEGROUP:
3139
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3140

    
3141
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
3142
        set([self.group_uuid] +
3143
            # Lock all groups used by instances optimistically; this requires
3144
            # going via the node before it's locked, requiring verification
3145
            # later on
3146
            [group_uuid
3147
             for instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
3148
             for group_uuid in self.cfg.GetInstanceNodeGroups(instance_name)])
3149

    
3150
    elif level == locking.LEVEL_NODE:
3151
      # This will only lock the nodes in the group to be verified which contain
3152
      # actual instances
3153
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3154
      self._LockInstancesNodes()
3155

    
3156
      # Lock all nodes in group to be verified
3157
      assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
3158
      member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
3159
      self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
3160

    
3161
  def CheckPrereq(self):
3162
    owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
3163
    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
3164
    owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
3165

    
3166
    assert self.group_uuid in owned_groups
3167

    
3168
    # Check if locked instances are still correct
3169
    _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
3170

    
3171
    # Get instance information
3172
    self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
3173

    
3174
    # Check if node groups for locked instances are still correct
3175
    for (instance_name, inst) in self.instances.items():
3176
      assert owned_nodes.issuperset(inst.all_nodes), \
3177
        "Instance %s's nodes changed while we kept the lock" % instance_name
3178

    
3179
      inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
3180
                                             owned_groups)
3181

    
3182
      assert self.group_uuid in inst_groups, \
3183
        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
3184

    
3185
  def Exec(self, feedback_fn):
3186
    """Verify integrity of cluster disks.
3187

3188
    @rtype: tuple of three items
3189
    @return: a tuple of (dict of node-to-node_error, list of instances
3190
        which need activate-disks, dict of instance: (node, volume) for
3191
        missing volumes
3192

3193
    """
3194
    res_nodes = {}
3195
    res_instances = set()
3196
    res_missing = {}
3197

    
3198
    nv_dict = _MapInstanceDisksToNodes([inst
3199
            for inst in self.instances.values()
3200
            if inst.admin_state == constants.ADMINST_UP])
3201

    
3202
    if nv_dict:
3203
      nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
3204
                             set(self.cfg.GetVmCapableNodeList()))
3205

    
3206
      node_lvs = self.rpc.call_lv_list(nodes, [])
3207

    
3208
      for (node, node_res) in node_lvs.items():
3209
        if node_res.offline:
3210
          continue
3211

    
3212
        msg = node_res.fail_msg
3213
        if msg:
3214
          logging.warning("Error enumerating LVs on node %s: %s", node, msg)
3215
          res_nodes[node] = msg
3216
          continue
3217

    
3218
        for lv_name, (_, _, lv_online) in node_res.payload.items():
3219
          inst = nv_dict.pop((node, lv_name), None)
3220
          if not (lv_online or inst is None):
3221
            res_instances.add(inst)
3222

    
3223
      # any leftover items in nv_dict are missing LVs, let's arrange the data
3224
      # better
3225
      for key, inst in nv_dict.iteritems():
3226
        res_missing.setdefault(inst, []).append(list(key))
3227

    
3228
    return (res_nodes, list(res_instances), res_missing)
3229

    
3230

    
3231
class LUClusterRepairDiskSizes(NoHooksLU):
3232
  """Verifies the cluster disks sizes.
3233

3234
  """
3235
  REQ_BGL = False
3236

    
3237
  def ExpandNames(self):
3238
    if self.op.instances:
3239
      self.wanted_names = _GetWantedInstances(self, self.op.instances)
3240
      self.needed_locks = {
3241
        locking.LEVEL_NODE_RES: [],
3242
        locking.LEVEL_INSTANCE: self.wanted_names,
3243
        }
3244
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
3245
    else:
3246
      self.wanted_names = None
3247
      self.needed_locks = {
3248
        locking.LEVEL_NODE_RES: locking.ALL_SET,
3249
        locking.LEVEL_INSTANCE: locking.ALL_SET,
3250
        }
3251
    self.share_locks = {
3252
      locking.LEVEL_NODE_RES: 1,
3253
      locking.LEVEL_INSTANCE: 0,
3254
      }
3255

    
3256
  def DeclareLocks(self, level):
3257
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
3258
      self._LockInstancesNodes(primary_only=True, level=level)
3259

    
3260
  def CheckPrereq(self):
3261
    """Check prerequisites.
3262

3263
    This only checks the optional instance list against the existing names.
3264

3265
    """
3266
    if self.wanted_names is None:
3267
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
3268

    
3269
    self.wanted_instances = \
3270
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
3271

    
3272
  def _EnsureChildSizes(self, disk):
3273
    """Ensure children of the disk have the needed disk size.
3274

3275
    This is valid mainly for DRBD8 and fixes an issue where the
3276
    children have smaller disk size.
3277

3278
    @param disk: an L{ganeti.objects.Disk} object
3279

3280
    """
3281
    if disk.dev_type == constants.LD_DRBD8:
3282
      assert disk.children, "Empty children for DRBD8?"
3283
      fchild = disk.children[0]
3284
      mismatch = fchild.size < disk.size
3285
      if mismatch:
3286
        self.LogInfo("Child disk has size %d, parent %d, fixing",
3287
                     fchild.size, disk.size)
3288
        fchild.size = disk.size
3289

    
3290
      # and we recurse on this child only, not on the metadev
3291
      return self._EnsureChildSizes(fchild) or mismatch
3292
    else:
3293
      return False
3294

    
3295
  def Exec(self, feedback_fn):
3296
    """Verify the size of cluster disks.
3297

3298
    """
3299
    # TODO: check child disks too
3300
    # TODO: check differences in size between primary/secondary nodes
3301
    per_node_disks = {}
3302
    for instance in self.wanted_instances:
3303
      pnode = instance.primary_node
3304
      if pnode not in per_node_disks:
3305
        per_node_disks[pnode] = []
3306
      for idx, disk in enumerate(instance.disks):
3307
        per_node_disks[pnode].append((instance, idx, disk))
3308

    
3309
    assert not (frozenset(per_node_disks.keys()) -
3310
                self.owned_locks(locking.LEVEL_NODE_RES)), \
3311
      "Not owning correct locks"
3312
    assert not self.owned_locks(locking.LEVEL_NODE)
3313

    
3314
    changed = []
3315
    for node, dskl in per_node_disks.items():
3316
      newl = [v[2].Copy() for v in dskl]
3317
      for dsk in newl:
3318
        self.cfg.SetDiskID(dsk, node)
3319
      result = self.rpc.call_blockdev_getsize(node, newl)
3320
      if result.fail_msg:
3321
        self.LogWarning("Failure in blockdev_getsize call to node"
3322
                        " %s, ignoring", node)
3323
        continue
3324
      if len(result.payload) != len(dskl):
3325
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
3326
                        " result.payload=%s", node, len(dskl), result.payload)
3327
        self.LogWarning("Invalid result from node %s, ignoring node results",
3328
                        node)
3329
        continue
3330
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
3331
        if size is None:
3332
          self.LogWarning("Disk %d of instance %s did not return size"
3333
                          " information, ignoring", idx, instance.name)
3334
          continue
3335
        if not isinstance(size, (int, long)):
3336
          self.LogWarning("Disk %d of instance %s did not return valid"
3337
                          " size information, ignoring", idx, instance.name)
3338
          continue
3339
        size = size >> 20
3340
        if size != disk.size:
3341
          self.LogInfo("Disk %d of instance %s has mismatched size,"
3342
                       " correcting: recorded %d, actual %d", idx,
3343
                       instance.name, disk.size, size)
3344
          disk.size = size
3345
          self.cfg.Update(instance, feedback_fn)
3346
          changed.append((instance.name, idx, size))
3347
        if self._EnsureChildSizes(disk):
3348
          self.cfg.Update(instance, feedback_fn)
3349
          changed.append((instance.name, idx, disk.size))
3350
    return changed
3351

    
3352

    
3353
class LUClusterRename(LogicalUnit):
3354
  """Rename the cluster.
3355

3356
  """
3357
  HPATH = "cluster-rename"
3358
  HTYPE = constants.HTYPE_CLUSTER
3359

    
3360
  def BuildHooksEnv(self):
3361
    """Build hooks env.
3362

3363
    """
3364
    return {
3365
      "OP_TARGET": self.cfg.GetClusterName(),
3366
      "NEW_NAME": self.op.name,
3367
      }
3368

    
3369
  def BuildHooksNodes(self):
3370
    """Build hooks nodes.
3371

3372
    """
3373
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
3374

    
3375
  def CheckPrereq(self):
3376
    """Verify that the passed name is a valid one.
3377

3378
    """
3379
    hostname = netutils.GetHostname(name=self.op.name,
3380
                                    family=self.cfg.GetPrimaryIPFamily())
3381

    
3382
    new_name = hostname.name
3383
    self.ip = new_ip = hostname.ip
3384
    old_name = self.cfg.GetClusterName()
3385
    old_ip = self.cfg.GetMasterIP()
3386
    if new_name == old_name and new_ip == old_ip:
3387
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
3388
                                 " cluster has changed",
3389
                                 errors.ECODE_INVAL)
3390
    if new_ip != old_ip:
3391
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
3392
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
3393
                                   " reachable on the network" %
3394
                                   new_ip, errors.ECODE_NOTUNIQUE)
3395

    
3396
    self.op.name = new_name
3397

    
3398
  def Exec(self, feedback_fn):
3399
    """Rename the cluster.
3400

3401
    """
3402
    clustername = self.op.name
3403
    new_ip = self.ip
3404

    
3405
    # shutdown the master IP
3406
    master_params = self.cfg.GetMasterNetworkParameters()
3407
    ems = self.cfg.GetUseExternalMipScript()
3408
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
3409
                                                     master_params, ems)
3410
    result.Raise("Could not disable the master role")
3411

    
3412
    try:
3413
      cluster = self.cfg.GetClusterInfo()
3414
      cluster.cluster_name = clustername
3415
      cluster.master_ip = new_ip
3416
      self.cfg.Update(cluster, feedback_fn)
3417

    
3418
      # update the known hosts file
3419
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
3420
      node_list = self.cfg.GetOnlineNodeList()
3421
      try:
3422
        node_list.remove(master_params.name)
3423
      except ValueError:
3424
        pass
3425
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
3426
    finally:
3427
      master_params.ip = new_ip
3428
      result = self.rpc.call_node_activate_master_ip(master_params.name,
3429
                                                     master_params, ems)
3430
      msg = result.fail_msg
3431
      if msg:
3432
        self.LogWarning("Could not re-enable the master role on"
3433
                        " the master, please restart manually: %s", msg)
3434

    
3435
    return clustername
3436

    
3437

    
3438
def _ValidateNetmask(cfg, netmask):
3439
  """Checks if a netmask is valid.
3440

3441
  @type cfg: L{config.ConfigWriter}
3442
  @param cfg: The cluster configuration
3443
  @type netmask: int
3444
  @param netmask: the netmask to be verified
3445
  @raise errors.OpPrereqError: if the validation fails
3446

3447
  """
3448
  ip_family = cfg.GetPrimaryIPFamily()
3449
  try:
3450
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
3451
  except errors.ProgrammerError:
3452
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
3453
                               ip_family)
3454
  if not ipcls.ValidateNetmask(netmask):
3455
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
3456
                                (netmask))
3457

    
3458

    
3459
class LUClusterSetParams(LogicalUnit):
3460
  """Change the parameters of the cluster.
3461

3462
  """
3463
  HPATH = "cluster-modify"
3464
  HTYPE = constants.HTYPE_CLUSTER
3465
  REQ_BGL = False
3466

    
3467
  def CheckArguments(self):
3468
    """Check parameters
3469

3470
    """
3471
    if self.op.uid_pool:
3472
      uidpool.CheckUidPool(self.op.uid_pool)
3473

    
3474
    if self.op.add_uids:
3475
      uidpool.CheckUidPool(self.op.add_uids)
3476

    
3477
    if self.op.remove_uids:
3478
      uidpool.CheckUidPool(self.op.remove_uids)
3479

    
3480
    if self.op.master_netmask is not None:
3481
      _ValidateNetmask(self.cfg, self.op.master_netmask)
3482

    
3483
  def ExpandNames(self):
3484
    # FIXME: in the future maybe other cluster params won't require checking on
3485
    # all nodes to be modified.
3486
    self.needed_locks = {
3487
      locking.LEVEL_NODE: locking.ALL_SET,
3488
    }
3489
    self.share_locks[locking.LEVEL_NODE] = 1
3490

    
3491
  def BuildHooksEnv(self):
3492
    """Build hooks env.
3493

3494
    """
3495
    return {
3496
      "OP_TARGET": self.cfg.GetClusterName(),
3497
      "NEW_VG_NAME": self.op.vg_name,
3498
      }
3499

    
3500
  def BuildHooksNodes(self):
3501
    """Build hooks nodes.
3502

3503
    """
3504
    mn = self.cfg.GetMasterNode()
3505
    return ([mn], [mn])
3506

    
3507
  def CheckPrereq(self):
3508
    """Check prerequisites.
3509

3510
    This checks whether the given params don't conflict and
3511
    if the given volume group is valid.
3512

3513
    """
3514
    if self.op.vg_name is not None and not self.op.vg_name:
3515
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
3516
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
3517
                                   " instances exist", errors.ECODE_INVAL)
3518

    
3519
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
3520
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
3521
        raise errors.OpPrereqError("Cannot disable drbd helper while"
3522
                                   " drbd-based instances exist",
3523
                                   errors.ECODE_INVAL)
3524

    
3525
    node_list = self.owned_locks(locking.LEVEL_NODE)
3526

    
3527
    # if vg_name not None, checks given volume group on all nodes
3528
    if self.op.vg_name:
3529
      vglist = self.rpc.call_vg_list(node_list)
3530
      for node in node_list:
3531
        msg = vglist[node].fail_msg
3532
        if msg:
3533
          # ignoring down node
3534
          self.LogWarning("Error while gathering data on node %s"
3535
                          " (ignoring node): %s", node, msg)
3536
          continue
3537
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
3538
                                              self.op.vg_name,
3539
                                              constants.MIN_VG_SIZE)
3540
        if vgstatus:
3541
          raise errors.OpPrereqError("Error on node '%s': %s" %
3542
                                     (node, vgstatus), errors.ECODE_ENVIRON)
3543

    
3544
    if self.op.drbd_helper:
3545
      # checks given drbd helper on all nodes
3546
      helpers = self.rpc.call_drbd_helper(node_list)
3547
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
3548
        if ninfo.offline:
3549
          self.LogInfo("Not checking drbd helper on offline node %s", node)
3550
          continue
3551
        msg = helpers[node].fail_msg
3552
        if msg:
3553
          raise errors.OpPrereqError("Error checking drbd helper on node"
3554
                                     " '%s': %s" % (node, msg),
3555
                                     errors.ECODE_ENVIRON)
3556
        node_helper = helpers[node].payload
3557
        if node_helper != self.op.drbd_helper:
3558
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
3559
                                     (node, node_helper), errors.ECODE_ENVIRON)
3560

    
3561
    self.cluster = cluster = self.cfg.GetClusterInfo()
3562
    # validate params changes
3563
    if self.op.beparams:
3564
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
3565
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
3566

    
3567
    if self.op.ndparams:
3568
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
3569
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
3570

    
3571
      # TODO: we need a more general way to handle resetting
3572
      # cluster-level parameters to default values
3573
      if self.new_ndparams["oob_program"] == "":
3574
        self.new_ndparams["oob_program"] = \
3575
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
3576

    
3577
    if self.op.nicparams:
3578
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
3579
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
3580
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
3581
      nic_errors = []
3582

    
3583
      # check all instances for consistency
3584
      for instance in self.cfg.GetAllInstancesInfo().values():
3585
        for nic_idx, nic in enumerate(instance.nics):
3586
          params_copy = copy.deepcopy(nic.nicparams)
3587
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
3588

    
3589
          # check parameter syntax
3590
          try:
3591
            objects.NIC.CheckParameterSyntax(params_filled)
3592
          except errors.ConfigurationError, err:
3593
            nic_errors.append("Instance %s, nic/%d: %s" %
3594
                              (instance.name, nic_idx, err))
3595

    
3596
          # if we're moving instances to routed, check that they have an ip
3597
          target_mode = params_filled[constants.NIC_MODE]
3598
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
3599
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
3600
                              " address" % (instance.name, nic_idx))
3601
      if nic_errors:
3602
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
3603
                                   "\n".join(nic_errors))
3604

    
3605
    # hypervisor list/parameters
3606
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
3607
    if self.op.hvparams:
3608
      for hv_name, hv_dict in self.op.hvparams.items():
3609
        if hv_name not in self.new_hvparams:
3610
          self.new_hvparams[hv_name] = hv_dict
3611
        else:
3612
          self.new_hvparams[hv_name].update(hv_dict)
3613

    
3614
    # os hypervisor parameters
3615
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
3616
    if self.op.os_hvp:
3617
      for os_name, hvs in self.op.os_hvp.items():
3618
        if os_name not in self.new_os_hvp:
3619
          self.new_os_hvp[os_name] = hvs
3620
        else:
3621
          for hv_name, hv_dict in hvs.items():
3622
            if hv_name not in self.new_os_hvp[os_name]:
3623
              self.new_os_hvp[os_name][hv_name] = hv_dict
3624
            else:
3625
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
3626

    
3627
    # os parameters
3628
    self.new_osp = objects.FillDict(cluster.osparams, {})
3629
    if self.op.osparams:
3630
      for os_name, osp in self.op.osparams.items():
3631
        if os_name not in self.new_osp:
3632
          self.new_osp[os_name] = {}
3633

    
3634
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
3635
                                                  use_none=True)
3636

    
3637
        if not self.new_osp[os_name]:
3638
          # we removed all parameters
3639
          del self.new_osp[os_name]
3640
        else:
3641
          # check the parameter validity (remote check)
3642
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
3643
                         os_name, self.new_osp[os_name])
3644

    
3645
    # changes to the hypervisor list
3646
    if self.op.enabled_hypervisors is not None:
3647
      self.hv_list = self.op.enabled_hypervisors
3648
      for hv in self.hv_list:
3649
        # if the hypervisor doesn't already exist in the cluster
3650
        # hvparams, we initialize it to empty, and then (in both
3651
        # cases) we make sure to fill the defaults, as we might not
3652
        # have a complete defaults list if the hypervisor wasn't
3653
        # enabled before
3654
        if hv not in new_hvp:
3655
          new_hvp[hv] = {}
3656
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
3657
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
3658
    else:
3659
      self.hv_list = cluster.enabled_hypervisors
3660

    
3661
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
3662
      # either the enabled list has changed, or the parameters have, validate
3663
      for hv_name, hv_params in self.new_hvparams.items():
3664
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
3665
            (self.op.enabled_hypervisors and
3666
             hv_name in self.op.enabled_hypervisors)):
3667
          # either this is a new hypervisor, or its parameters have changed
3668
          hv_class = hypervisor.GetHypervisor(hv_name)
3669
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3670
          hv_class.CheckParameterSyntax(hv_params)
3671
          _CheckHVParams(self, node_list, hv_name, hv_params)
3672

    
3673
    if self.op.os_hvp:
3674
      # no need to check any newly-enabled hypervisors, since the
3675
      # defaults have already been checked in the above code-block
3676
      for os_name, os_hvp in self.new_os_hvp.items():
3677
        for hv_name, hv_params in os_hvp.items():
3678
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3679
          # we need to fill in the new os_hvp on top of the actual hv_p
3680
          cluster_defaults = self.new_hvparams.get(hv_name, {})
3681
          new_osp = objects.FillDict(cluster_defaults, hv_params)
3682
          hv_class = hypervisor.GetHypervisor(hv_name)
3683
          hv_class.CheckParameterSyntax(new_osp)
3684
          _CheckHVParams(self, node_list, hv_name, new_osp)
3685

    
3686
    if self.op.default_iallocator:
3687
      alloc_script = utils.FindFile(self.op.default_iallocator,
3688
                                    constants.IALLOCATOR_SEARCH_PATH,
3689
                                    os.path.isfile)
3690
      if alloc_script is None:
3691
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
3692
                                   " specified" % self.op.default_iallocator,
3693
                                   errors.ECODE_INVAL)
3694

    
3695
  def Exec(self, feedback_fn):
3696
    """Change the parameters of the cluster.
3697

3698
    """
3699
    if self.op.vg_name is not None:
3700
      new_volume = self.op.vg_name
3701
      if not new_volume:
3702
        new_volume = None
3703
      if new_volume != self.cfg.GetVGName():
3704
        self.cfg.SetVGName(new_volume)
3705
      else:
3706
        feedback_fn("Cluster LVM configuration already in desired"
3707
                    " state, not changing")
3708
    if self.op.drbd_helper is not None:
3709
      new_helper = self.op.drbd_helper
3710
      if not new_helper:
3711
        new_helper = None
3712
      if new_helper != self.cfg.GetDRBDHelper():
3713
        self.cfg.SetDRBDHelper(new_helper)
3714
      else:
3715
        feedback_fn("Cluster DRBD helper already in desired state,"
3716
                    " not changing")
3717
    if self.op.hvparams:
3718
      self.cluster.hvparams = self.new_hvparams
3719
    if self.op.os_hvp:
3720
      self.cluster.os_hvp = self.new_os_hvp
3721
    if self.op.enabled_hypervisors is not None:
3722
      self.cluster.hvparams = self.new_hvparams
3723
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
3724
    if self.op.beparams:
3725
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
3726
    if self.op.nicparams:
3727
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
3728
    if self.op.osparams:
3729
      self.cluster.osparams = self.new_osp
3730
    if self.op.ndparams:
3731
      self.cluster.ndparams = self.new_ndparams
3732

    
3733
    if self.op.candidate_pool_size is not None:
3734
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3735
      # we need to update the pool size here, otherwise the save will fail
3736
      _AdjustCandidatePool(self, [])
3737

    
3738
    if self.op.maintain_node_health is not None:
3739
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
3740
        feedback_fn("Note: CONFD was disabled at build time, node health"
3741
                    " maintenance is not useful (still enabling it)")
3742
      self.cluster.maintain_node_health = self.op.maintain_node_health
3743

    
3744
    if self.op.prealloc_wipe_disks is not None:
3745
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3746

    
3747
    if self.op.add_uids is not None:
3748
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3749

    
3750
    if self.op.remove_uids is not None:
3751
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3752

    
3753
    if self.op.uid_pool is not None:
3754
      self.cluster.uid_pool = self.op.uid_pool
3755

    
3756
    if self.op.default_iallocator is not None:
3757
      self.cluster.default_iallocator = self.op.default_iallocator
3758

    
3759
    if self.op.reserved_lvs is not None:
3760
      self.cluster.reserved_lvs = self.op.reserved_lvs
3761

    
3762
    if self.op.use_external_mip_script is not None:
3763
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
3764

    
3765
    def helper_os(aname, mods, desc):
3766
      desc += " OS list"
3767
      lst = getattr(self.cluster, aname)
3768
      for key, val in mods:
3769
        if key == constants.DDM_ADD:
3770
          if val in lst:
3771
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3772
          else:
3773
            lst.append(val)
3774
        elif key == constants.DDM_REMOVE:
3775
          if val in lst:
3776
            lst.remove(val)
3777
          else:
3778
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3779
        else:
3780
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3781

    
3782
    if self.op.hidden_os:
3783
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3784

    
3785
    if self.op.blacklisted_os:
3786
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3787

    
3788
    if self.op.master_netdev:
3789
      master_params = self.cfg.GetMasterNetworkParameters()
3790
      ems = self.cfg.GetUseExternalMipScript()
3791
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3792
                  self.cluster.master_netdev)
3793
      result = self.rpc.call_node_deactivate_master_ip(master_params.name,
3794
                                                       master_params, ems)
3795
      result.Raise("Could not disable the master ip")
3796
      feedback_fn("Changing master_netdev from %s to %s" %
3797
                  (master_params.netdev, self.op.master_netdev))
3798
      self.cluster.master_netdev = self.op.master_netdev
3799

    
3800
    if self.op.master_netmask:
3801
      master_params = self.cfg.GetMasterNetworkParameters()
3802
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
3803
      result = self.rpc.call_node_change_master_netmask(master_params.name,
3804
                                                        master_params.netmask,
3805
                                                        self.op.master_netmask,
3806
                                                        master_params.ip,
3807
                                                        master_params.netdev)
3808
      if result.fail_msg:
3809
        msg = "Could not change the master IP netmask: %s" % result.fail_msg
3810
        feedback_fn(msg)
3811

    
3812
      self.cluster.master_netmask = self.op.master_netmask
3813

    
3814
    self.cfg.Update(self.cluster, feedback_fn)
3815

    
3816
    if self.op.master_netdev:
3817
      master_params = self.cfg.GetMasterNetworkParameters()
3818
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3819
                  self.op.master_netdev)
3820
      ems = self.cfg.GetUseExternalMipScript()
3821
      result = self.rpc.call_node_activate_master_ip(master_params.name,
3822
                                                     master_params, ems)
3823
      if result.fail_msg:
3824
        self.LogWarning("Could not re-enable the master ip on"
3825
                        " the master, please restart manually: %s",
3826
                        result.fail_msg)
3827

    
3828

    
3829
def _UploadHelper(lu, nodes, fname):
3830
  """Helper for uploading a file and showing warnings.
3831

3832
  """
3833
  if os.path.exists(fname):
3834
    result = lu.rpc.call_upload_file(nodes, fname)
3835
    for to_node, to_result in result.items():
3836
      msg = to_result.fail_msg
3837
      if msg:
3838
        msg = ("Copy of file %s to node %s failed: %s" %
3839
               (fname, to_node, msg))
3840
        lu.proc.LogWarning(msg)
3841

    
3842

    
3843
def _ComputeAncillaryFiles(cluster, redist):
3844
  """Compute files external to Ganeti which need to be consistent.
3845

3846
  @type redist: boolean
3847
  @param redist: Whether to include files which need to be redistributed
3848

3849
  """
3850
  # Compute files for all nodes
3851
  files_all = set([
3852
    constants.SSH_KNOWN_HOSTS_FILE,
3853
    constants.CONFD_HMAC_KEY,
3854
    constants.CLUSTER_DOMAIN_SECRET_FILE,
3855
    constants.SPICE_CERT_FILE,
3856
    constants.SPICE_CACERT_FILE,
3857
    constants.RAPI_USERS_FILE,
3858
    ])
3859

    
3860
  if not redist:
3861
    files_all.update(constants.ALL_CERT_FILES)
3862
    files_all.update(ssconf.SimpleStore().GetFileList())
3863
  else:
3864
    # we need to ship at least the RAPI certificate
3865
    files_all.add(constants.RAPI_CERT_FILE)
3866

    
3867
  if cluster.modify_etc_hosts:
3868
    files_all.add(constants.ETC_HOSTS)
3869

    
3870
  # Files which are optional, these must:
3871
  # - be present in one other category as well
3872
  # - either exist or not exist on all nodes of that category (mc, vm all)
3873
  files_opt = set([
3874
    constants.RAPI_USERS_FILE,
3875
    ])
3876

    
3877
  # Files which should only be on master candidates
3878
  files_mc = set()
3879

    
3880
  if not redist:
3881
    files_mc.add(constants.CLUSTER_CONF_FILE)
3882

    
3883
    # FIXME: this should also be replicated but Ganeti doesn't support files_mc
3884
    # replication
3885
    files_mc.add(constants.DEFAULT_MASTER_SETUP_SCRIPT)
3886

    
3887
  # Files which should only be on VM-capable nodes
3888
  files_vm = set(filename
3889
    for hv_name in cluster.enabled_hypervisors
3890
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0])
3891

    
3892
  files_opt |= set(filename
3893
    for hv_name in cluster.enabled_hypervisors
3894
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1])
3895

    
3896
  # Filenames in each category must be unique
3897
  all_files_set = files_all | files_mc | files_vm
3898
  assert (len(all_files_set) ==
3899
          sum(map(len, [files_all, files_mc, files_vm]))), \
3900
         "Found file listed in more than one file list"
3901

    
3902
  # Optional files must be present in one other category
3903
  assert all_files_set.issuperset(files_opt), \
3904
         "Optional file not in a different required list"
3905

    
3906
  return (files_all, files_opt, files_mc, files_vm)
3907

    
3908

    
3909
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3910
  """Distribute additional files which are part of the cluster configuration.
3911

3912
  ConfigWriter takes care of distributing the config and ssconf files, but
3913
  there are more files which should be distributed to all nodes. This function
3914
  makes sure those are copied.
3915

3916
  @param lu: calling logical unit
3917
  @param additional_nodes: list of nodes not in the config to distribute to
3918
  @type additional_vm: boolean
3919
  @param additional_vm: whether the additional nodes are vm-capable or not
3920

3921
  """
3922
  # Gather target nodes
3923
  cluster = lu.cfg.GetClusterInfo()
3924
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3925

    
3926
  online_nodes = lu.cfg.GetOnlineNodeList()
3927
  vm_nodes = lu.cfg.GetVmCapableNodeList()
3928

    
3929
  if additional_nodes is not None:
3930
    online_nodes.extend(additional_nodes)
3931
    if additional_vm:
3932
      vm_nodes.extend(additional_nodes)
3933

    
3934
  # Never distribute to master node
3935
  for nodelist in [online_nodes, vm_nodes]:
3936
    if master_info.name in nodelist:
3937
      nodelist.remove(master_info.name)
3938

    
3939
  # Gather file lists
3940
  (files_all, _, files_mc, files_vm) = \
3941
    _ComputeAncillaryFiles(cluster, True)
3942

    
3943
  # Never re-distribute configuration file from here
3944
  assert not (constants.CLUSTER_CONF_FILE in files_all or
3945
              constants.CLUSTER_CONF_FILE in files_vm)
3946
  assert not files_mc, "Master candidates not handled in this function"
3947

    
3948
  filemap = [
3949
    (online_nodes, files_all),
3950
    (vm_nodes, files_vm),
3951
    ]
3952

    
3953
  # Upload the files
3954
  for (node_list, files) in filemap:
3955
    for fname in files:
3956
      _UploadHelper(lu, node_list, fname)
3957

    
3958

    
3959
class LUClusterRedistConf(NoHooksLU):
3960
  """Force the redistribution of cluster configuration.
3961

3962
  This is a very simple LU.
3963

3964
  """
3965
  REQ_BGL = False
3966

    
3967
  def ExpandNames(self):
3968
    self.needed_locks = {
3969
      locking.LEVEL_NODE: locking.ALL_SET,
3970
    }
3971
    self.share_locks[locking.LEVEL_NODE] = 1
3972

    
3973
  def Exec(self, feedback_fn):
3974
    """Redistribute the configuration.
3975

3976
    """
3977
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3978
    _RedistributeAncillaryFiles(self)
3979

    
3980

    
3981
class LUClusterActivateMasterIp(NoHooksLU):
3982
  """Activate the master IP on the master node.
3983

3984
  """
3985
  def Exec(self, feedback_fn):
3986
    """Activate the master IP.
3987

3988
    """
3989
    master_params = self.cfg.GetMasterNetworkParameters()
3990
    ems = self.cfg.GetUseExternalMipScript()
3991
    result = self.rpc.call_node_activate_master_ip(master_params.name,
3992
                                                   master_params, ems)
3993
    result.Raise("Could not activate the master IP")
3994

    
3995

    
3996
class LUClusterDeactivateMasterIp(NoHooksLU):
3997
  """Deactivate the master IP on the master node.
3998

3999
  """
4000
  def Exec(self, feedback_fn):
4001
    """Deactivate the master IP.
4002

4003
    """
4004
    master_params = self.cfg.GetMasterNetworkParameters()
4005
    ems = self.cfg.GetUseExternalMipScript()
4006
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
4007
                                                     master_params, ems)
4008
    result.Raise("Could not deactivate the master IP")
4009

    
4010

    
4011
def _WaitForSync(lu, instance, disks=None, oneshot=False):
4012
  """Sleep and poll for an instance's disk to sync.
4013

4014
  """
4015
  if not instance.disks or disks is not None and not disks:
4016
    return True
4017

    
4018
  disks = _ExpandCheckDisks(instance, disks)
4019

    
4020
  if not oneshot:
4021
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
4022

    
4023
  node = instance.primary_node
4024

    
4025
  for dev in disks:
4026
    lu.cfg.SetDiskID(dev, node)
4027

    
4028
  # TODO: Convert to utils.Retry
4029

    
4030
  retries = 0
4031
  degr_retries = 10 # in seconds, as we sleep 1 second each time
4032
  while True:
4033
    max_time = 0
4034
    done = True
4035
    cumul_degraded = False
4036
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
4037
    msg = rstats.fail_msg
4038
    if msg:
4039
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
4040
      retries += 1
4041
      if retries >= 10:
4042
        raise errors.RemoteError("Can't contact node %s for mirror data,"
4043
                                 " aborting." % node)
4044
      time.sleep(6)
4045
      continue
4046
    rstats = rstats.payload
4047
    retries = 0
4048
    for i, mstat in enumerate(rstats):
4049
      if mstat is None:
4050
        lu.LogWarning("Can't compute data for node %s/%s",
4051
                           node, disks[i].iv_name)
4052
        continue
4053

    
4054
      cumul_degraded = (cumul_degraded or
4055
                        (mstat.is_degraded and mstat.sync_percent is None))
4056
      if mstat.sync_percent is not None:
4057
        done = False
4058
        if mstat.estimated_time is not None:
4059
          rem_time = ("%s remaining (estimated)" %
4060
                      utils.FormatSeconds(mstat.estimated_time))
4061
          max_time = mstat.estimated_time
4062
        else:
4063
          rem_time = "no time estimate"
4064
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
4065
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
4066

    
4067
    # if we're done but degraded, let's do a few small retries, to
4068
    # make sure we see a stable and not transient situation; therefore
4069
    # we force restart of the loop
4070
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
4071
      logging.info("Degraded disks found, %d retries left", degr_retries)
4072
      degr_retries -= 1
4073
      time.sleep(1)
4074
      continue
4075

    
4076
    if done or oneshot:
4077
      break
4078

    
4079
    time.sleep(min(60, max_time))
4080

    
4081
  if done:
4082
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
4083
  return not cumul_degraded
4084

    
4085

    
4086
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
4087
  """Check that mirrors are not degraded.
4088

4089
  The ldisk parameter, if True, will change the test from the
4090
  is_degraded attribute (which represents overall non-ok status for
4091
  the device(s)) to the ldisk (representing the local storage status).
4092

4093
  """
4094
  lu.cfg.SetDiskID(dev, node)
4095

    
4096
  result = True
4097

    
4098
  if on_primary or dev.AssembleOnSecondary():
4099
    rstats = lu.rpc.call_blockdev_find(node, dev)
4100
    msg = rstats.fail_msg
4101
    if msg:
4102
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
4103
      result = False
4104
    elif not rstats.payload:
4105
      lu.LogWarning("Can't find disk on node %s", node)
4106
      result = False
4107
    else:
4108
      if ldisk:
4109
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
4110
      else:
4111
        result = result and not rstats.payload.is_degraded
4112

    
4113
  if dev.children:
4114
    for child in dev.children:
4115
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
4116

    
4117
  return result
4118