Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 5ed4c956

History | View | Annotate | Download (425.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay to many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42
import itertools
43

    
44
from ganeti import ssh
45
from ganeti import utils
46
from ganeti import errors
47
from ganeti import hypervisor
48
from ganeti import locking
49
from ganeti import constants
50
from ganeti import objects
51
from ganeti import serializer
52
from ganeti import ssconf
53
from ganeti import uidpool
54
from ganeti import compat
55
from ganeti import masterd
56
from ganeti import netutils
57
from ganeti import query
58
from ganeti import qlang
59
from ganeti import opcodes
60

    
61
import ganeti.masterd.instance # pylint: disable-msg=W0611
62

    
63

    
64
def _SupportsOob(cfg, node):
65
  """Tells if node supports OOB.
66

67
  @type cfg: L{config.ConfigWriter}
68
  @param cfg: The cluster configuration
69
  @type node: L{objects.Node}
70
  @param node: The node
71
  @return: The OOB script if supported or an empty string otherwise
72

73
  """
74
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
75

    
76

    
77
class ResultWithJobs:
78
  """Data container for LU results with jobs.
79

80
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
81
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
82
  contained in the C{jobs} attribute and include the job IDs in the opcode
83
  result.
84

85
  """
86
  def __init__(self, jobs, **kwargs):
87
    """Initializes this class.
88

89
    Additional return values can be specified as keyword arguments.
90

91
    @type jobs: list of lists of L{opcode.OpCode}
92
    @param jobs: A list of lists of opcode objects
93

94
    """
95
    self.jobs = jobs
96
    self.other = kwargs
97

    
98

    
99
class LogicalUnit(object):
100
  """Logical Unit base class.
101

102
  Subclasses must follow these rules:
103
    - implement ExpandNames
104
    - implement CheckPrereq (except when tasklets are used)
105
    - implement Exec (except when tasklets are used)
106
    - implement BuildHooksEnv
107
    - implement BuildHooksNodes
108
    - redefine HPATH and HTYPE
109
    - optionally redefine their run requirements:
110
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
111

112
  Note that all commands require root permissions.
113

114
  @ivar dry_run_result: the value (if any) that will be returned to the caller
115
      in dry-run mode (signalled by opcode dry_run parameter)
116

117
  """
118
  HPATH = None
119
  HTYPE = None
120
  REQ_BGL = True
121

    
122
  def __init__(self, processor, op, context, rpc):
123
    """Constructor for LogicalUnit.
124

125
    This needs to be overridden in derived classes in order to check op
126
    validity.
127

128
    """
129
    self.proc = processor
130
    self.op = op
131
    self.cfg = context.cfg
132
    self.glm = context.glm
133
    self.context = context
134
    self.rpc = rpc
135
    # Dicts used to declare locking needs to mcpu
136
    self.needed_locks = None
137
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
138
    self.add_locks = {}
139
    self.remove_locks = {}
140
    # Used to force good behavior when calling helper functions
141
    self.recalculate_locks = {}
142
    # logging
143
    self.Log = processor.Log # pylint: disable-msg=C0103
144
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
145
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
146
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
147
    # support for dry-run
148
    self.dry_run_result = None
149
    # support for generic debug attribute
150
    if (not hasattr(self.op, "debug_level") or
151
        not isinstance(self.op.debug_level, int)):
152
      self.op.debug_level = 0
153

    
154
    # Tasklets
155
    self.tasklets = None
156

    
157
    # Validate opcode parameters and set defaults
158
    self.op.Validate(True)
159

    
160
    self.CheckArguments()
161

    
162
  def CheckArguments(self):
163
    """Check syntactic validity for the opcode arguments.
164

165
    This method is for doing a simple syntactic check and ensure
166
    validity of opcode parameters, without any cluster-related
167
    checks. While the same can be accomplished in ExpandNames and/or
168
    CheckPrereq, doing these separate is better because:
169

170
      - ExpandNames is left as as purely a lock-related function
171
      - CheckPrereq is run after we have acquired locks (and possible
172
        waited for them)
173

174
    The function is allowed to change the self.op attribute so that
175
    later methods can no longer worry about missing parameters.
176

177
    """
178
    pass
179

    
180
  def ExpandNames(self):
181
    """Expand names for this LU.
182

183
    This method is called before starting to execute the opcode, and it should
184
    update all the parameters of the opcode to their canonical form (e.g. a
185
    short node name must be fully expanded after this method has successfully
186
    completed). This way locking, hooks, logging, etc. can work correctly.
187

188
    LUs which implement this method must also populate the self.needed_locks
189
    member, as a dict with lock levels as keys, and a list of needed lock names
190
    as values. Rules:
191

192
      - use an empty dict if you don't need any lock
193
      - if you don't need any lock at a particular level omit that level
194
      - don't put anything for the BGL level
195
      - if you want all locks at a level use locking.ALL_SET as a value
196

197
    If you need to share locks (rather than acquire them exclusively) at one
198
    level you can modify self.share_locks, setting a true value (usually 1) for
199
    that level. By default locks are not shared.
200

201
    This function can also define a list of tasklets, which then will be
202
    executed in order instead of the usual LU-level CheckPrereq and Exec
203
    functions, if those are not defined by the LU.
204

205
    Examples::
206

207
      # Acquire all nodes and one instance
208
      self.needed_locks = {
209
        locking.LEVEL_NODE: locking.ALL_SET,
210
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
211
      }
212
      # Acquire just two nodes
213
      self.needed_locks = {
214
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
215
      }
216
      # Acquire no locks
217
      self.needed_locks = {} # No, you can't leave it to the default value None
218

219
    """
220
    # The implementation of this method is mandatory only if the new LU is
221
    # concurrent, so that old LUs don't need to be changed all at the same
222
    # time.
223
    if self.REQ_BGL:
224
      self.needed_locks = {} # Exclusive LUs don't need locks.
225
    else:
226
      raise NotImplementedError
227

    
228
  def DeclareLocks(self, level):
229
    """Declare LU locking needs for a level
230

231
    While most LUs can just declare their locking needs at ExpandNames time,
232
    sometimes there's the need to calculate some locks after having acquired
233
    the ones before. This function is called just before acquiring locks at a
234
    particular level, but after acquiring the ones at lower levels, and permits
235
    such calculations. It can be used to modify self.needed_locks, and by
236
    default it does nothing.
237

238
    This function is only called if you have something already set in
239
    self.needed_locks for the level.
240

241
    @param level: Locking level which is going to be locked
242
    @type level: member of ganeti.locking.LEVELS
243

244
    """
245

    
246
  def CheckPrereq(self):
247
    """Check prerequisites for this LU.
248

249
    This method should check that the prerequisites for the execution
250
    of this LU are fulfilled. It can do internode communication, but
251
    it should be idempotent - no cluster or system changes are
252
    allowed.
253

254
    The method should raise errors.OpPrereqError in case something is
255
    not fulfilled. Its return value is ignored.
256

257
    This method should also update all the parameters of the opcode to
258
    their canonical form if it hasn't been done by ExpandNames before.
259

260
    """
261
    if self.tasklets is not None:
262
      for (idx, tl) in enumerate(self.tasklets):
263
        logging.debug("Checking prerequisites for tasklet %s/%s",
264
                      idx + 1, len(self.tasklets))
265
        tl.CheckPrereq()
266
    else:
267
      pass
268

    
269
  def Exec(self, feedback_fn):
270
    """Execute the LU.
271

272
    This method should implement the actual work. It should raise
273
    errors.OpExecError for failures that are somewhat dealt with in
274
    code, or expected.
275

276
    """
277
    if self.tasklets is not None:
278
      for (idx, tl) in enumerate(self.tasklets):
279
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
280
        tl.Exec(feedback_fn)
281
    else:
282
      raise NotImplementedError
283

    
284
  def BuildHooksEnv(self):
285
    """Build hooks environment for this LU.
286

287
    @rtype: dict
288
    @return: Dictionary containing the environment that will be used for
289
      running the hooks for this LU. The keys of the dict must not be prefixed
290
      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
291
      will extend the environment with additional variables. If no environment
292
      should be defined, an empty dictionary should be returned (not C{None}).
293
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
294
      will not be called.
295

296
    """
297
    raise NotImplementedError
298

    
299
  def BuildHooksNodes(self):
300
    """Build list of nodes to run LU's hooks.
301

302
    @rtype: tuple; (list, list)
303
    @return: Tuple containing a list of node names on which the hook
304
      should run before the execution and a list of node names on which the
305
      hook should run after the execution. No nodes should be returned as an
306
      empty list (and not None).
307
    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
308
      will not be called.
309

310
    """
311
    raise NotImplementedError
312

    
313
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
314
    """Notify the LU about the results of its hooks.
315

316
    This method is called every time a hooks phase is executed, and notifies
317
    the Logical Unit about the hooks' result. The LU can then use it to alter
318
    its result based on the hooks.  By default the method does nothing and the
319
    previous result is passed back unchanged but any LU can define it if it
320
    wants to use the local cluster hook-scripts somehow.
321

322
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
323
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
324
    @param hook_results: the results of the multi-node hooks rpc call
325
    @param feedback_fn: function used send feedback back to the caller
326
    @param lu_result: the previous Exec result this LU had, or None
327
        in the PRE phase
328
    @return: the new Exec result, based on the previous result
329
        and hook results
330

331
    """
332
    # API must be kept, thus we ignore the unused argument and could
333
    # be a function warnings
334
    # pylint: disable-msg=W0613,R0201
335
    return lu_result
336

    
337
  def _ExpandAndLockInstance(self):
338
    """Helper function to expand and lock an instance.
339

340
    Many LUs that work on an instance take its name in self.op.instance_name
341
    and need to expand it and then declare the expanded name for locking. This
342
    function does it, and then updates self.op.instance_name to the expanded
343
    name. It also initializes needed_locks as a dict, if this hasn't been done
344
    before.
345

346
    """
347
    if self.needed_locks is None:
348
      self.needed_locks = {}
349
    else:
350
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
351
        "_ExpandAndLockInstance called with instance-level locks set"
352
    self.op.instance_name = _ExpandInstanceName(self.cfg,
353
                                                self.op.instance_name)
354
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
355

    
356
  def _LockInstancesNodes(self, primary_only=False):
357
    """Helper function to declare instances' nodes for locking.
358

359
    This function should be called after locking one or more instances to lock
360
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
361
    with all primary or secondary nodes for instances already locked and
362
    present in self.needed_locks[locking.LEVEL_INSTANCE].
363

364
    It should be called from DeclareLocks, and for safety only works if
365
    self.recalculate_locks[locking.LEVEL_NODE] is set.
366

367
    In the future it may grow parameters to just lock some instance's nodes, or
368
    to just lock primaries or secondary nodes, if needed.
369

370
    If should be called in DeclareLocks in a way similar to::
371

372
      if level == locking.LEVEL_NODE:
373
        self._LockInstancesNodes()
374

375
    @type primary_only: boolean
376
    @param primary_only: only lock primary nodes of locked instances
377

378
    """
379
    assert locking.LEVEL_NODE in self.recalculate_locks, \
380
      "_LockInstancesNodes helper function called with no nodes to recalculate"
381

    
382
    # TODO: check if we're really been called with the instance locks held
383

    
384
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
385
    # future we might want to have different behaviors depending on the value
386
    # of self.recalculate_locks[locking.LEVEL_NODE]
387
    wanted_nodes = []
388
    for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
389
      instance = self.context.cfg.GetInstanceInfo(instance_name)
390
      wanted_nodes.append(instance.primary_node)
391
      if not primary_only:
392
        wanted_nodes.extend(instance.secondary_nodes)
393

    
394
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
395
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
396
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
397
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
398

    
399
    del self.recalculate_locks[locking.LEVEL_NODE]
400

    
401

    
402
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
403
  """Simple LU which runs no hooks.
404

405
  This LU is intended as a parent for other LogicalUnits which will
406
  run no hooks, in order to reduce duplicate code.
407

408
  """
409
  HPATH = None
410
  HTYPE = None
411

    
412
  def BuildHooksEnv(self):
413
    """Empty BuildHooksEnv for NoHooksLu.
414

415
    This just raises an error.
416

417
    """
418
    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
419

    
420
  def BuildHooksNodes(self):
421
    """Empty BuildHooksNodes for NoHooksLU.
422

423
    """
424
    raise AssertionError("BuildHooksNodes called for NoHooksLU")
425

    
426

    
427
class Tasklet:
428
  """Tasklet base class.
429

430
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
431
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
432
  tasklets know nothing about locks.
433

434
  Subclasses must follow these rules:
435
    - Implement CheckPrereq
436
    - Implement Exec
437

438
  """
439
  def __init__(self, lu):
440
    self.lu = lu
441

    
442
    # Shortcuts
443
    self.cfg = lu.cfg
444
    self.rpc = lu.rpc
445

    
446
  def CheckPrereq(self):
447
    """Check prerequisites for this tasklets.
448

449
    This method should check whether the prerequisites for the execution of
450
    this tasklet are fulfilled. It can do internode communication, but it
451
    should be idempotent - no cluster or system changes are allowed.
452

453
    The method should raise errors.OpPrereqError in case something is not
454
    fulfilled. Its return value is ignored.
455

456
    This method should also update all parameters to their canonical form if it
457
    hasn't been done before.
458

459
    """
460
    pass
461

    
462
  def Exec(self, feedback_fn):
463
    """Execute the tasklet.
464

465
    This method should implement the actual work. It should raise
466
    errors.OpExecError for failures that are somewhat dealt with in code, or
467
    expected.
468

469
    """
470
    raise NotImplementedError
471

    
472

    
473
class _QueryBase:
474
  """Base for query utility classes.
475

476
  """
477
  #: Attribute holding field definitions
478
  FIELDS = None
479

    
480
  def __init__(self, filter_, fields, use_locking):
481
    """Initializes this class.
482

483
    """
484
    self.use_locking = use_locking
485

    
486
    self.query = query.Query(self.FIELDS, fields, filter_=filter_,
487
                             namefield="name")
488
    self.requested_data = self.query.RequestedData()
489
    self.names = self.query.RequestedNames()
490

    
491
    # Sort only if no names were requested
492
    self.sort_by_name = not self.names
493

    
494
    self.do_locking = None
495
    self.wanted = None
496

    
497
  def _GetNames(self, lu, all_names, lock_level):
498
    """Helper function to determine names asked for in the query.
499

500
    """
501
    if self.do_locking:
502
      names = lu.glm.list_owned(lock_level)
503
    else:
504
      names = all_names
505

    
506
    if self.wanted == locking.ALL_SET:
507
      assert not self.names
508
      # caller didn't specify names, so ordering is not important
509
      return utils.NiceSort(names)
510

    
511
    # caller specified names and we must keep the same order
512
    assert self.names
513
    assert not self.do_locking or lu.glm.is_owned(lock_level)
514

    
515
    missing = set(self.wanted).difference(names)
516
    if missing:
517
      raise errors.OpExecError("Some items were removed before retrieving"
518
                               " their data: %s" % missing)
519

    
520
    # Return expanded names
521
    return self.wanted
522

    
523
  def ExpandNames(self, lu):
524
    """Expand names for this query.
525

526
    See L{LogicalUnit.ExpandNames}.
527

528
    """
529
    raise NotImplementedError()
530

    
531
  def DeclareLocks(self, lu, level):
532
    """Declare locks for this query.
533

534
    See L{LogicalUnit.DeclareLocks}.
535

536
    """
537
    raise NotImplementedError()
538

    
539
  def _GetQueryData(self, lu):
540
    """Collects all data for this query.
541

542
    @return: Query data object
543

544
    """
545
    raise NotImplementedError()
546

    
547
  def NewStyleQuery(self, lu):
548
    """Collect data and execute query.
549

550
    """
551
    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
552
                                  sort_by_name=self.sort_by_name)
553

    
554
  def OldStyleQuery(self, lu):
555
    """Collect data and execute query.
556

557
    """
558
    return self.query.OldStyleQuery(self._GetQueryData(lu),
559
                                    sort_by_name=self.sort_by_name)
560

    
561

    
562
def _GetWantedNodes(lu, nodes):
563
  """Returns list of checked and expanded node names.
564

565
  @type lu: L{LogicalUnit}
566
  @param lu: the logical unit on whose behalf we execute
567
  @type nodes: list
568
  @param nodes: list of node names or None for all nodes
569
  @rtype: list
570
  @return: the list of nodes, sorted
571
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
572

573
  """
574
  if nodes:
575
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
576

    
577
  return utils.NiceSort(lu.cfg.GetNodeList())
578

    
579

    
580
def _GetWantedInstances(lu, instances):
581
  """Returns list of checked and expanded instance names.
582

583
  @type lu: L{LogicalUnit}
584
  @param lu: the logical unit on whose behalf we execute
585
  @type instances: list
586
  @param instances: list of instance names or None for all instances
587
  @rtype: list
588
  @return: the list of instances, sorted
589
  @raise errors.OpPrereqError: if the instances parameter is wrong type
590
  @raise errors.OpPrereqError: if any of the passed instances is not found
591

592
  """
593
  if instances:
594
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
595
  else:
596
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
597
  return wanted
598

    
599

    
600
def _GetUpdatedParams(old_params, update_dict,
601
                      use_default=True, use_none=False):
602
  """Return the new version of a parameter dictionary.
603

604
  @type old_params: dict
605
  @param old_params: old parameters
606
  @type update_dict: dict
607
  @param update_dict: dict containing new parameter values, or
608
      constants.VALUE_DEFAULT to reset the parameter to its default
609
      value
610
  @param use_default: boolean
611
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
612
      values as 'to be deleted' values
613
  @param use_none: boolean
614
  @type use_none: whether to recognise C{None} values as 'to be
615
      deleted' values
616
  @rtype: dict
617
  @return: the new parameter dictionary
618

619
  """
620
  params_copy = copy.deepcopy(old_params)
621
  for key, val in update_dict.iteritems():
622
    if ((use_default and val == constants.VALUE_DEFAULT) or
623
        (use_none and val is None)):
624
      try:
625
        del params_copy[key]
626
      except KeyError:
627
        pass
628
    else:
629
      params_copy[key] = val
630
  return params_copy
631

    
632

    
633
def _ReleaseLocks(lu, level, names=None, keep=None):
634
  """Releases locks owned by an LU.
635

636
  @type lu: L{LogicalUnit}
637
  @param level: Lock level
638
  @type names: list or None
639
  @param names: Names of locks to release
640
  @type keep: list or None
641
  @param keep: Names of locks to retain
642

643
  """
644
  assert not (keep is not None and names is not None), \
645
         "Only one of the 'names' and the 'keep' parameters can be given"
646

    
647
  if names is not None:
648
    should_release = names.__contains__
649
  elif keep:
650
    should_release = lambda name: name not in keep
651
  else:
652
    should_release = None
653

    
654
  if should_release:
655
    retain = []
656
    release = []
657

    
658
    # Determine which locks to release
659
    for name in lu.glm.list_owned(level):
660
      if should_release(name):
661
        release.append(name)
662
      else:
663
        retain.append(name)
664

    
665
    assert len(lu.glm.list_owned(level)) == (len(retain) + len(release))
666

    
667
    # Release just some locks
668
    lu.glm.release(level, names=release)
669

    
670
    assert frozenset(lu.glm.list_owned(level)) == frozenset(retain)
671
  else:
672
    # Release everything
673
    lu.glm.release(level)
674

    
675
    assert not lu.glm.is_owned(level), "No locks should be owned"
676

    
677

    
678
def _RunPostHook(lu, node_name):
679
  """Runs the post-hook for an opcode on a single node.
680

681
  """
682
  hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu)
683
  try:
684
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
685
  except:
686
    # pylint: disable-msg=W0702
687
    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
688

    
689

    
690
def _CheckOutputFields(static, dynamic, selected):
691
  """Checks whether all selected fields are valid.
692

693
  @type static: L{utils.FieldSet}
694
  @param static: static fields set
695
  @type dynamic: L{utils.FieldSet}
696
  @param dynamic: dynamic fields set
697

698
  """
699
  f = utils.FieldSet()
700
  f.Extend(static)
701
  f.Extend(dynamic)
702

    
703
  delta = f.NonMatching(selected)
704
  if delta:
705
    raise errors.OpPrereqError("Unknown output fields selected: %s"
706
                               % ",".join(delta), errors.ECODE_INVAL)
707

    
708

    
709
def _CheckGlobalHvParams(params):
710
  """Validates that given hypervisor params are not global ones.
711

712
  This will ensure that instances don't get customised versions of
713
  global params.
714

715
  """
716
  used_globals = constants.HVC_GLOBALS.intersection(params)
717
  if used_globals:
718
    msg = ("The following hypervisor parameters are global and cannot"
719
           " be customized at instance level, please modify them at"
720
           " cluster level: %s" % utils.CommaJoin(used_globals))
721
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
722

    
723

    
724
def _CheckNodeOnline(lu, node, msg=None):
725
  """Ensure that a given node is online.
726

727
  @param lu: the LU on behalf of which we make the check
728
  @param node: the node to check
729
  @param msg: if passed, should be a message to replace the default one
730
  @raise errors.OpPrereqError: if the node is offline
731

732
  """
733
  if msg is None:
734
    msg = "Can't use offline node"
735
  if lu.cfg.GetNodeInfo(node).offline:
736
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
737

    
738

    
739
def _CheckNodeNotDrained(lu, node):
740
  """Ensure that a given node is not drained.
741

742
  @param lu: the LU on behalf of which we make the check
743
  @param node: the node to check
744
  @raise errors.OpPrereqError: if the node is drained
745

746
  """
747
  if lu.cfg.GetNodeInfo(node).drained:
748
    raise errors.OpPrereqError("Can't use drained node %s" % node,
749
                               errors.ECODE_STATE)
750

    
751

    
752
def _CheckNodeVmCapable(lu, node):
753
  """Ensure that a given node is vm capable.
754

755
  @param lu: the LU on behalf of which we make the check
756
  @param node: the node to check
757
  @raise errors.OpPrereqError: if the node is not vm capable
758

759
  """
760
  if not lu.cfg.GetNodeInfo(node).vm_capable:
761
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
762
                               errors.ECODE_STATE)
763

    
764

    
765
def _CheckNodeHasOS(lu, node, os_name, force_variant):
766
  """Ensure that a node supports a given OS.
767

768
  @param lu: the LU on behalf of which we make the check
769
  @param node: the node to check
770
  @param os_name: the OS to query about
771
  @param force_variant: whether to ignore variant errors
772
  @raise errors.OpPrereqError: if the node is not supporting the OS
773

774
  """
775
  result = lu.rpc.call_os_get(node, os_name)
776
  result.Raise("OS '%s' not in supported OS list for node %s" %
777
               (os_name, node),
778
               prereq=True, ecode=errors.ECODE_INVAL)
779
  if not force_variant:
780
    _CheckOSVariant(result.payload, os_name)
781

    
782

    
783
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
784
  """Ensure that a node has the given secondary ip.
785

786
  @type lu: L{LogicalUnit}
787
  @param lu: the LU on behalf of which we make the check
788
  @type node: string
789
  @param node: the node to check
790
  @type secondary_ip: string
791
  @param secondary_ip: the ip to check
792
  @type prereq: boolean
793
  @param prereq: whether to throw a prerequisite or an execute error
794
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
795
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
796

797
  """
798
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
799
  result.Raise("Failure checking secondary ip on node %s" % node,
800
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
801
  if not result.payload:
802
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
803
           " please fix and re-run this command" % secondary_ip)
804
    if prereq:
805
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
806
    else:
807
      raise errors.OpExecError(msg)
808

    
809

    
810
def _GetClusterDomainSecret():
811
  """Reads the cluster domain secret.
812

813
  """
814
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
815
                               strict=True)
816

    
817

    
818
def _CheckInstanceDown(lu, instance, reason):
819
  """Ensure that an instance is not running."""
820
  if instance.admin_up:
821
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
822
                               (instance.name, reason), errors.ECODE_STATE)
823

    
824
  pnode = instance.primary_node
825
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
826
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
827
              prereq=True, ecode=errors.ECODE_ENVIRON)
828

    
829
  if instance.name in ins_l.payload:
830
    raise errors.OpPrereqError("Instance %s is running, %s" %
831
                               (instance.name, reason), errors.ECODE_STATE)
832

    
833

    
834
def _ExpandItemName(fn, name, kind):
835
  """Expand an item name.
836

837
  @param fn: the function to use for expansion
838
  @param name: requested item name
839
  @param kind: text description ('Node' or 'Instance')
840
  @return: the resolved (full) name
841
  @raise errors.OpPrereqError: if the item is not found
842

843
  """
844
  full_name = fn(name)
845
  if full_name is None:
846
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
847
                               errors.ECODE_NOENT)
848
  return full_name
849

    
850

    
851
def _ExpandNodeName(cfg, name):
852
  """Wrapper over L{_ExpandItemName} for nodes."""
853
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
854

    
855

    
856
def _ExpandInstanceName(cfg, name):
857
  """Wrapper over L{_ExpandItemName} for instance."""
858
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
859

    
860

    
861
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
862
                          memory, vcpus, nics, disk_template, disks,
863
                          bep, hvp, hypervisor_name):
864
  """Builds instance related env variables for hooks
865

866
  This builds the hook environment from individual variables.
867

868
  @type name: string
869
  @param name: the name of the instance
870
  @type primary_node: string
871
  @param primary_node: the name of the instance's primary node
872
  @type secondary_nodes: list
873
  @param secondary_nodes: list of secondary nodes as strings
874
  @type os_type: string
875
  @param os_type: the name of the instance's OS
876
  @type status: boolean
877
  @param status: the should_run status of the instance
878
  @type memory: string
879
  @param memory: the memory size of the instance
880
  @type vcpus: string
881
  @param vcpus: the count of VCPUs the instance has
882
  @type nics: list
883
  @param nics: list of tuples (ip, mac, mode, link) representing
884
      the NICs the instance has
885
  @type disk_template: string
886
  @param disk_template: the disk template of the instance
887
  @type disks: list
888
  @param disks: the list of (size, mode) pairs
889
  @type bep: dict
890
  @param bep: the backend parameters for the instance
891
  @type hvp: dict
892
  @param hvp: the hypervisor parameters for the instance
893
  @type hypervisor_name: string
894
  @param hypervisor_name: the hypervisor for the instance
895
  @rtype: dict
896
  @return: the hook environment for this instance
897

898
  """
899
  if status:
900
    str_status = "up"
901
  else:
902
    str_status = "down"
903
  env = {
904
    "OP_TARGET": name,
905
    "INSTANCE_NAME": name,
906
    "INSTANCE_PRIMARY": primary_node,
907
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
908
    "INSTANCE_OS_TYPE": os_type,
909
    "INSTANCE_STATUS": str_status,
910
    "INSTANCE_MEMORY": memory,
911
    "INSTANCE_VCPUS": vcpus,
912
    "INSTANCE_DISK_TEMPLATE": disk_template,
913
    "INSTANCE_HYPERVISOR": hypervisor_name,
914
  }
915

    
916
  if nics:
917
    nic_count = len(nics)
918
    for idx, (ip, mac, mode, link) in enumerate(nics):
919
      if ip is None:
920
        ip = ""
921
      env["INSTANCE_NIC%d_IP" % idx] = ip
922
      env["INSTANCE_NIC%d_MAC" % idx] = mac
923
      env["INSTANCE_NIC%d_MODE" % idx] = mode
924
      env["INSTANCE_NIC%d_LINK" % idx] = link
925
      if mode == constants.NIC_MODE_BRIDGED:
926
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
927
  else:
928
    nic_count = 0
929

    
930
  env["INSTANCE_NIC_COUNT"] = nic_count
931

    
932
  if disks:
933
    disk_count = len(disks)
934
    for idx, (size, mode) in enumerate(disks):
935
      env["INSTANCE_DISK%d_SIZE" % idx] = size
936
      env["INSTANCE_DISK%d_MODE" % idx] = mode
937
  else:
938
    disk_count = 0
939

    
940
  env["INSTANCE_DISK_COUNT"] = disk_count
941

    
942
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
943
    for key, value in source.items():
944
      env["INSTANCE_%s_%s" % (kind, key)] = value
945

    
946
  return env
947

    
948

    
949
def _NICListToTuple(lu, nics):
950
  """Build a list of nic information tuples.
951

952
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
953
  value in LUInstanceQueryData.
954

955
  @type lu:  L{LogicalUnit}
956
  @param lu: the logical unit on whose behalf we execute
957
  @type nics: list of L{objects.NIC}
958
  @param nics: list of nics to convert to hooks tuples
959

960
  """
961
  hooks_nics = []
962
  cluster = lu.cfg.GetClusterInfo()
963
  for nic in nics:
964
    ip = nic.ip
965
    mac = nic.mac
966
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
967
    mode = filled_params[constants.NIC_MODE]
968
    link = filled_params[constants.NIC_LINK]
969
    hooks_nics.append((ip, mac, mode, link))
970
  return hooks_nics
971

    
972

    
973
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
974
  """Builds instance related env variables for hooks from an object.
975

976
  @type lu: L{LogicalUnit}
977
  @param lu: the logical unit on whose behalf we execute
978
  @type instance: L{objects.Instance}
979
  @param instance: the instance for which we should build the
980
      environment
981
  @type override: dict
982
  @param override: dictionary with key/values that will override
983
      our values
984
  @rtype: dict
985
  @return: the hook environment dictionary
986

987
  """
988
  cluster = lu.cfg.GetClusterInfo()
989
  bep = cluster.FillBE(instance)
990
  hvp = cluster.FillHV(instance)
991
  args = {
992
    'name': instance.name,
993
    'primary_node': instance.primary_node,
994
    'secondary_nodes': instance.secondary_nodes,
995
    'os_type': instance.os,
996
    'status': instance.admin_up,
997
    'memory': bep[constants.BE_MEMORY],
998
    'vcpus': bep[constants.BE_VCPUS],
999
    'nics': _NICListToTuple(lu, instance.nics),
1000
    'disk_template': instance.disk_template,
1001
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
1002
    'bep': bep,
1003
    'hvp': hvp,
1004
    'hypervisor_name': instance.hypervisor,
1005
  }
1006
  if override:
1007
    args.update(override)
1008
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
1009

    
1010

    
1011
def _AdjustCandidatePool(lu, exceptions):
1012
  """Adjust the candidate pool after node operations.
1013

1014
  """
1015
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1016
  if mod_list:
1017
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1018
               utils.CommaJoin(node.name for node in mod_list))
1019
    for name in mod_list:
1020
      lu.context.ReaddNode(name)
1021
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1022
  if mc_now > mc_max:
1023
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1024
               (mc_now, mc_max))
1025

    
1026

    
1027
def _DecideSelfPromotion(lu, exceptions=None):
1028
  """Decide whether I should promote myself as a master candidate.
1029

1030
  """
1031
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1032
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1033
  # the new node will increase mc_max with one, so:
1034
  mc_should = min(mc_should + 1, cp_size)
1035
  return mc_now < mc_should
1036

    
1037

    
1038
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1039
  """Check that the brigdes needed by a list of nics exist.
1040

1041
  """
1042
  cluster = lu.cfg.GetClusterInfo()
1043
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1044
  brlist = [params[constants.NIC_LINK] for params in paramslist
1045
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1046
  if brlist:
1047
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1048
    result.Raise("Error checking bridges on destination node '%s'" %
1049
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1050

    
1051

    
1052
def _CheckInstanceBridgesExist(lu, instance, node=None):
1053
  """Check that the brigdes needed by an instance exist.
1054

1055
  """
1056
  if node is None:
1057
    node = instance.primary_node
1058
  _CheckNicsBridgesExist(lu, instance.nics, node)
1059

    
1060

    
1061
def _CheckOSVariant(os_obj, name):
1062
  """Check whether an OS name conforms to the os variants specification.
1063

1064
  @type os_obj: L{objects.OS}
1065
  @param os_obj: OS object to check
1066
  @type name: string
1067
  @param name: OS name passed by the user, to check for validity
1068

1069
  """
1070
  if not os_obj.supported_variants:
1071
    return
1072
  variant = objects.OS.GetVariant(name)
1073
  if not variant:
1074
    raise errors.OpPrereqError("OS name must include a variant",
1075
                               errors.ECODE_INVAL)
1076

    
1077
  if variant not in os_obj.supported_variants:
1078
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1079

    
1080

    
1081
def _GetNodeInstancesInner(cfg, fn):
1082
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1083

    
1084

    
1085
def _GetNodeInstances(cfg, node_name):
1086
  """Returns a list of all primary and secondary instances on a node.
1087

1088
  """
1089

    
1090
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1091

    
1092

    
1093
def _GetNodePrimaryInstances(cfg, node_name):
1094
  """Returns primary instances on a node.
1095

1096
  """
1097
  return _GetNodeInstancesInner(cfg,
1098
                                lambda inst: node_name == inst.primary_node)
1099

    
1100

    
1101
def _GetNodeSecondaryInstances(cfg, node_name):
1102
  """Returns secondary instances on a node.
1103

1104
  """
1105
  return _GetNodeInstancesInner(cfg,
1106
                                lambda inst: node_name in inst.secondary_nodes)
1107

    
1108

    
1109
def _GetStorageTypeArgs(cfg, storage_type):
1110
  """Returns the arguments for a storage type.
1111

1112
  """
1113
  # Special case for file storage
1114
  if storage_type == constants.ST_FILE:
1115
    # storage.FileStorage wants a list of storage directories
1116
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1117

    
1118
  return []
1119

    
1120

    
1121
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
1122
  faulty = []
1123

    
1124
  for dev in instance.disks:
1125
    cfg.SetDiskID(dev, node_name)
1126

    
1127
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
1128
  result.Raise("Failed to get disk status from node %s" % node_name,
1129
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1130

    
1131
  for idx, bdev_status in enumerate(result.payload):
1132
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1133
      faulty.append(idx)
1134

    
1135
  return faulty
1136

    
1137

    
1138
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1139
  """Check the sanity of iallocator and node arguments and use the
1140
  cluster-wide iallocator if appropriate.
1141

1142
  Check that at most one of (iallocator, node) is specified. If none is
1143
  specified, then the LU's opcode's iallocator slot is filled with the
1144
  cluster-wide default iallocator.
1145

1146
  @type iallocator_slot: string
1147
  @param iallocator_slot: the name of the opcode iallocator slot
1148
  @type node_slot: string
1149
  @param node_slot: the name of the opcode target node slot
1150

1151
  """
1152
  node = getattr(lu.op, node_slot, None)
1153
  iallocator = getattr(lu.op, iallocator_slot, None)
1154

    
1155
  if node is not None and iallocator is not None:
1156
    raise errors.OpPrereqError("Do not specify both, iallocator and node.",
1157
                               errors.ECODE_INVAL)
1158
  elif node is None and iallocator is None:
1159
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1160
    if default_iallocator:
1161
      setattr(lu.op, iallocator_slot, default_iallocator)
1162
    else:
1163
      raise errors.OpPrereqError("No iallocator or node given and no"
1164
                                 " cluster-wide default iallocator found."
1165
                                 " Please specify either an iallocator or a"
1166
                                 " node, or set a cluster-wide default"
1167
                                 " iallocator.")
1168

    
1169

    
1170
class LUClusterPostInit(LogicalUnit):
1171
  """Logical unit for running hooks after cluster initialization.
1172

1173
  """
1174
  HPATH = "cluster-init"
1175
  HTYPE = constants.HTYPE_CLUSTER
1176

    
1177
  def BuildHooksEnv(self):
1178
    """Build hooks env.
1179

1180
    """
1181
    return {
1182
      "OP_TARGET": self.cfg.GetClusterName(),
1183
      }
1184

    
1185
  def BuildHooksNodes(self):
1186
    """Build hooks nodes.
1187

1188
    """
1189
    return ([], [self.cfg.GetMasterNode()])
1190

    
1191
  def Exec(self, feedback_fn):
1192
    """Nothing to do.
1193

1194
    """
1195
    return True
1196

    
1197

    
1198
class LUClusterDestroy(LogicalUnit):
1199
  """Logical unit for destroying the cluster.
1200

1201
  """
1202
  HPATH = "cluster-destroy"
1203
  HTYPE = constants.HTYPE_CLUSTER
1204

    
1205
  def BuildHooksEnv(self):
1206
    """Build hooks env.
1207

1208
    """
1209
    return {
1210
      "OP_TARGET": self.cfg.GetClusterName(),
1211
      }
1212

    
1213
  def BuildHooksNodes(self):
1214
    """Build hooks nodes.
1215

1216
    """
1217
    return ([], [])
1218

    
1219
  def CheckPrereq(self):
1220
    """Check prerequisites.
1221

1222
    This checks whether the cluster is empty.
1223

1224
    Any errors are signaled by raising errors.OpPrereqError.
1225

1226
    """
1227
    master = self.cfg.GetMasterNode()
1228

    
1229
    nodelist = self.cfg.GetNodeList()
1230
    if len(nodelist) != 1 or nodelist[0] != master:
1231
      raise errors.OpPrereqError("There are still %d node(s) in"
1232
                                 " this cluster." % (len(nodelist) - 1),
1233
                                 errors.ECODE_INVAL)
1234
    instancelist = self.cfg.GetInstanceList()
1235
    if instancelist:
1236
      raise errors.OpPrereqError("There are still %d instance(s) in"
1237
                                 " this cluster." % len(instancelist),
1238
                                 errors.ECODE_INVAL)
1239

    
1240
  def Exec(self, feedback_fn):
1241
    """Destroys the cluster.
1242

1243
    """
1244
    master = self.cfg.GetMasterNode()
1245

    
1246
    # Run post hooks on master node before it's removed
1247
    _RunPostHook(self, master)
1248

    
1249
    result = self.rpc.call_node_stop_master(master, False)
1250
    result.Raise("Could not disable the master role")
1251

    
1252
    return master
1253

    
1254

    
1255
def _VerifyCertificate(filename):
1256
  """Verifies a certificate for LUClusterVerify.
1257

1258
  @type filename: string
1259
  @param filename: Path to PEM file
1260

1261
  """
1262
  try:
1263
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1264
                                           utils.ReadFile(filename))
1265
  except Exception, err: # pylint: disable-msg=W0703
1266
    return (LUClusterVerify.ETYPE_ERROR,
1267
            "Failed to load X509 certificate %s: %s" % (filename, err))
1268

    
1269
  (errcode, msg) = \
1270
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1271
                                constants.SSL_CERT_EXPIRATION_ERROR)
1272

    
1273
  if msg:
1274
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1275
  else:
1276
    fnamemsg = None
1277

    
1278
  if errcode is None:
1279
    return (None, fnamemsg)
1280
  elif errcode == utils.CERT_WARNING:
1281
    return (LUClusterVerify.ETYPE_WARNING, fnamemsg)
1282
  elif errcode == utils.CERT_ERROR:
1283
    return (LUClusterVerify.ETYPE_ERROR, fnamemsg)
1284

    
1285
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1286

    
1287

    
1288
class LUClusterVerify(LogicalUnit):
1289
  """Verifies the cluster status.
1290

1291
  """
1292
  HPATH = "cluster-verify"
1293
  HTYPE = constants.HTYPE_CLUSTER
1294
  REQ_BGL = False
1295

    
1296
  TCLUSTER = "cluster"
1297
  TNODE = "node"
1298
  TINSTANCE = "instance"
1299

    
1300
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1301
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1302
  ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK")
1303
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1304
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1305
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1306
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1307
  EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
1308
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1309
  EINSTANCESPLITGROUPS = (TINSTANCE, "EINSTANCESPLITGROUPS")
1310
  ENODEDRBD = (TNODE, "ENODEDRBD")
1311
  ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
1312
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1313
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1314
  ENODEHV = (TNODE, "ENODEHV")
1315
  ENODELVM = (TNODE, "ENODELVM")
1316
  ENODEN1 = (TNODE, "ENODEN1")
1317
  ENODENET = (TNODE, "ENODENET")
1318
  ENODEOS = (TNODE, "ENODEOS")
1319
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1320
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1321
  ENODERPC = (TNODE, "ENODERPC")
1322
  ENODESSH = (TNODE, "ENODESSH")
1323
  ENODEVERSION = (TNODE, "ENODEVERSION")
1324
  ENODESETUP = (TNODE, "ENODESETUP")
1325
  ENODETIME = (TNODE, "ENODETIME")
1326
  ENODEOOBPATH = (TNODE, "ENODEOOBPATH")
1327

    
1328
  ETYPE_FIELD = "code"
1329
  ETYPE_ERROR = "ERROR"
1330
  ETYPE_WARNING = "WARNING"
1331

    
1332
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1333

    
1334
  class NodeImage(object):
1335
    """A class representing the logical and physical status of a node.
1336

1337
    @type name: string
1338
    @ivar name: the node name to which this object refers
1339
    @ivar volumes: a structure as returned from
1340
        L{ganeti.backend.GetVolumeList} (runtime)
1341
    @ivar instances: a list of running instances (runtime)
1342
    @ivar pinst: list of configured primary instances (config)
1343
    @ivar sinst: list of configured secondary instances (config)
1344
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1345
        instances for which this node is secondary (config)
1346
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1347
    @ivar dfree: free disk, as reported by the node (runtime)
1348
    @ivar offline: the offline status (config)
1349
    @type rpc_fail: boolean
1350
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1351
        not whether the individual keys were correct) (runtime)
1352
    @type lvm_fail: boolean
1353
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1354
    @type hyp_fail: boolean
1355
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1356
    @type ghost: boolean
1357
    @ivar ghost: whether this is a known node or not (config)
1358
    @type os_fail: boolean
1359
    @ivar os_fail: whether the RPC call didn't return valid OS data
1360
    @type oslist: list
1361
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1362
    @type vm_capable: boolean
1363
    @ivar vm_capable: whether the node can host instances
1364

1365
    """
1366
    def __init__(self, offline=False, name=None, vm_capable=True):
1367
      self.name = name
1368
      self.volumes = {}
1369
      self.instances = []
1370
      self.pinst = []
1371
      self.sinst = []
1372
      self.sbp = {}
1373
      self.mfree = 0
1374
      self.dfree = 0
1375
      self.offline = offline
1376
      self.vm_capable = vm_capable
1377
      self.rpc_fail = False
1378
      self.lvm_fail = False
1379
      self.hyp_fail = False
1380
      self.ghost = False
1381
      self.os_fail = False
1382
      self.oslist = {}
1383

    
1384
  def ExpandNames(self):
1385
    self.needed_locks = {
1386
      locking.LEVEL_NODE: locking.ALL_SET,
1387
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1388
    }
1389
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1390

    
1391
  def _Error(self, ecode, item, msg, *args, **kwargs):
1392
    """Format an error message.
1393

1394
    Based on the opcode's error_codes parameter, either format a
1395
    parseable error code, or a simpler error string.
1396

1397
    This must be called only from Exec and functions called from Exec.
1398

1399
    """
1400
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1401
    itype, etxt = ecode
1402
    # first complete the msg
1403
    if args:
1404
      msg = msg % args
1405
    # then format the whole message
1406
    if self.op.error_codes:
1407
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1408
    else:
1409
      if item:
1410
        item = " " + item
1411
      else:
1412
        item = ""
1413
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1414
    # and finally report it via the feedback_fn
1415
    self._feedback_fn("  - %s" % msg)
1416

    
1417
  def _ErrorIf(self, cond, *args, **kwargs):
1418
    """Log an error message if the passed condition is True.
1419

1420
    """
1421
    cond = bool(cond) or self.op.debug_simulate_errors
1422
    if cond:
1423
      self._Error(*args, **kwargs)
1424
    # do not mark the operation as failed for WARN cases only
1425
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1426
      self.bad = self.bad or cond
1427

    
1428
  def _VerifyNode(self, ninfo, nresult):
1429
    """Perform some basic validation on data returned from a node.
1430

1431
      - check the result data structure is well formed and has all the
1432
        mandatory fields
1433
      - check ganeti version
1434

1435
    @type ninfo: L{objects.Node}
1436
    @param ninfo: the node to check
1437
    @param nresult: the results from the node
1438
    @rtype: boolean
1439
    @return: whether overall this call was successful (and we can expect
1440
         reasonable values in the respose)
1441

1442
    """
1443
    node = ninfo.name
1444
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1445

    
1446
    # main result, nresult should be a non-empty dict
1447
    test = not nresult or not isinstance(nresult, dict)
1448
    _ErrorIf(test, self.ENODERPC, node,
1449
                  "unable to verify node: no data returned")
1450
    if test:
1451
      return False
1452

    
1453
    # compares ganeti version
1454
    local_version = constants.PROTOCOL_VERSION
1455
    remote_version = nresult.get("version", None)
1456
    test = not (remote_version and
1457
                isinstance(remote_version, (list, tuple)) and
1458
                len(remote_version) == 2)
1459
    _ErrorIf(test, self.ENODERPC, node,
1460
             "connection to node returned invalid data")
1461
    if test:
1462
      return False
1463

    
1464
    test = local_version != remote_version[0]
1465
    _ErrorIf(test, self.ENODEVERSION, node,
1466
             "incompatible protocol versions: master %s,"
1467
             " node %s", local_version, remote_version[0])
1468
    if test:
1469
      return False
1470

    
1471
    # node seems compatible, we can actually try to look into its results
1472

    
1473
    # full package version
1474
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1475
                  self.ENODEVERSION, node,
1476
                  "software version mismatch: master %s, node %s",
1477
                  constants.RELEASE_VERSION, remote_version[1],
1478
                  code=self.ETYPE_WARNING)
1479

    
1480
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1481
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1482
      for hv_name, hv_result in hyp_result.iteritems():
1483
        test = hv_result is not None
1484
        _ErrorIf(test, self.ENODEHV, node,
1485
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1486

    
1487
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1488
    if ninfo.vm_capable and isinstance(hvp_result, list):
1489
      for item, hv_name, hv_result in hvp_result:
1490
        _ErrorIf(True, self.ENODEHV, node,
1491
                 "hypervisor %s parameter verify failure (source %s): %s",
1492
                 hv_name, item, hv_result)
1493

    
1494
    test = nresult.get(constants.NV_NODESETUP,
1495
                       ["Missing NODESETUP results"])
1496
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1497
             "; ".join(test))
1498

    
1499
    return True
1500

    
1501
  def _VerifyNodeTime(self, ninfo, nresult,
1502
                      nvinfo_starttime, nvinfo_endtime):
1503
    """Check the node time.
1504

1505
    @type ninfo: L{objects.Node}
1506
    @param ninfo: the node to check
1507
    @param nresult: the remote results for the node
1508
    @param nvinfo_starttime: the start time of the RPC call
1509
    @param nvinfo_endtime: the end time of the RPC call
1510

1511
    """
1512
    node = ninfo.name
1513
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1514

    
1515
    ntime = nresult.get(constants.NV_TIME, None)
1516
    try:
1517
      ntime_merged = utils.MergeTime(ntime)
1518
    except (ValueError, TypeError):
1519
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1520
      return
1521

    
1522
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1523
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1524
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1525
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1526
    else:
1527
      ntime_diff = None
1528

    
1529
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1530
             "Node time diverges by at least %s from master node time",
1531
             ntime_diff)
1532

    
1533
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1534
    """Check the node time.
1535

1536
    @type ninfo: L{objects.Node}
1537
    @param ninfo: the node to check
1538
    @param nresult: the remote results for the node
1539
    @param vg_name: the configured VG name
1540

1541
    """
1542
    if vg_name is None:
1543
      return
1544

    
1545
    node = ninfo.name
1546
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1547

    
1548
    # checks vg existence and size > 20G
1549
    vglist = nresult.get(constants.NV_VGLIST, None)
1550
    test = not vglist
1551
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1552
    if not test:
1553
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1554
                                            constants.MIN_VG_SIZE)
1555
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1556

    
1557
    # check pv names
1558
    pvlist = nresult.get(constants.NV_PVLIST, None)
1559
    test = pvlist is None
1560
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1561
    if not test:
1562
      # check that ':' is not present in PV names, since it's a
1563
      # special character for lvcreate (denotes the range of PEs to
1564
      # use on the PV)
1565
      for _, pvname, owner_vg in pvlist:
1566
        test = ":" in pvname
1567
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1568
                 " '%s' of VG '%s'", pvname, owner_vg)
1569

    
1570
  def _VerifyNodeNetwork(self, ninfo, nresult):
1571
    """Check the node time.
1572

1573
    @type ninfo: L{objects.Node}
1574
    @param ninfo: the node to check
1575
    @param nresult: the remote results for the node
1576

1577
    """
1578
    node = ninfo.name
1579
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1580

    
1581
    test = constants.NV_NODELIST not in nresult
1582
    _ErrorIf(test, self.ENODESSH, node,
1583
             "node hasn't returned node ssh connectivity data")
1584
    if not test:
1585
      if nresult[constants.NV_NODELIST]:
1586
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1587
          _ErrorIf(True, self.ENODESSH, node,
1588
                   "ssh communication with node '%s': %s", a_node, a_msg)
1589

    
1590
    test = constants.NV_NODENETTEST not in nresult
1591
    _ErrorIf(test, self.ENODENET, node,
1592
             "node hasn't returned node tcp connectivity data")
1593
    if not test:
1594
      if nresult[constants.NV_NODENETTEST]:
1595
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1596
        for anode in nlist:
1597
          _ErrorIf(True, self.ENODENET, node,
1598
                   "tcp communication with node '%s': %s",
1599
                   anode, nresult[constants.NV_NODENETTEST][anode])
1600

    
1601
    test = constants.NV_MASTERIP not in nresult
1602
    _ErrorIf(test, self.ENODENET, node,
1603
             "node hasn't returned node master IP reachability data")
1604
    if not test:
1605
      if not nresult[constants.NV_MASTERIP]:
1606
        if node == self.master_node:
1607
          msg = "the master node cannot reach the master IP (not configured?)"
1608
        else:
1609
          msg = "cannot reach the master IP"
1610
        _ErrorIf(True, self.ENODENET, node, msg)
1611

    
1612
  def _VerifyInstance(self, instance, instanceconfig, node_image,
1613
                      diskstatus):
1614
    """Verify an instance.
1615

1616
    This function checks to see if the required block devices are
1617
    available on the instance's node.
1618

1619
    """
1620
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1621
    node_current = instanceconfig.primary_node
1622

    
1623
    node_vol_should = {}
1624
    instanceconfig.MapLVsByNode(node_vol_should)
1625

    
1626
    for node in node_vol_should:
1627
      n_img = node_image[node]
1628
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1629
        # ignore missing volumes on offline or broken nodes
1630
        continue
1631
      for volume in node_vol_should[node]:
1632
        test = volume not in n_img.volumes
1633
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1634
                 "volume %s missing on node %s", volume, node)
1635

    
1636
    if instanceconfig.admin_up:
1637
      pri_img = node_image[node_current]
1638
      test = instance not in pri_img.instances and not pri_img.offline
1639
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1640
               "instance not running on its primary node %s",
1641
               node_current)
1642

    
1643
    for node, n_img in node_image.items():
1644
      if node != node_current:
1645
        test = instance in n_img.instances
1646
        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1647
                 "instance should not run on node %s", node)
1648

    
1649
    diskdata = [(nname, success, status, idx)
1650
                for (nname, disks) in diskstatus.items()
1651
                for idx, (success, status) in enumerate(disks)]
1652

    
1653
    for nname, success, bdev_status, idx in diskdata:
1654
      # the 'ghost node' construction in Exec() ensures that we have a
1655
      # node here
1656
      snode = node_image[nname]
1657
      bad_snode = snode.ghost or snode.offline
1658
      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
1659
               self.EINSTANCEFAULTYDISK, instance,
1660
               "couldn't retrieve status for disk/%s on %s: %s",
1661
               idx, nname, bdev_status)
1662
      _ErrorIf((instanceconfig.admin_up and success and
1663
                bdev_status.ldisk_status == constants.LDS_FAULTY),
1664
               self.EINSTANCEFAULTYDISK, instance,
1665
               "disk/%s on %s is faulty", idx, nname)
1666

    
1667
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1668
    """Verify if there are any unknown volumes in the cluster.
1669

1670
    The .os, .swap and backup volumes are ignored. All other volumes are
1671
    reported as unknown.
1672

1673
    @type reserved: L{ganeti.utils.FieldSet}
1674
    @param reserved: a FieldSet of reserved volume names
1675

1676
    """
1677
    for node, n_img in node_image.items():
1678
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1679
        # skip non-healthy nodes
1680
        continue
1681
      for volume in n_img.volumes:
1682
        test = ((node not in node_vol_should or
1683
                volume not in node_vol_should[node]) and
1684
                not reserved.Matches(volume))
1685
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1686
                      "volume %s is unknown", volume)
1687

    
1688
  def _VerifyOrphanInstances(self, instancelist, node_image):
1689
    """Verify the list of running instances.
1690

1691
    This checks what instances are running but unknown to the cluster.
1692

1693
    """
1694
    for node, n_img in node_image.items():
1695
      for o_inst in n_img.instances:
1696
        test = o_inst not in instancelist
1697
        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1698
                      "instance %s on node %s should not exist", o_inst, node)
1699

    
1700
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1701
    """Verify N+1 Memory Resilience.
1702

1703
    Check that if one single node dies we can still start all the
1704
    instances it was primary for.
1705

1706
    """
1707
    cluster_info = self.cfg.GetClusterInfo()
1708
    for node, n_img in node_image.items():
1709
      # This code checks that every node which is now listed as
1710
      # secondary has enough memory to host all instances it is
1711
      # supposed to should a single other node in the cluster fail.
1712
      # FIXME: not ready for failover to an arbitrary node
1713
      # FIXME: does not support file-backed instances
1714
      # WARNING: we currently take into account down instances as well
1715
      # as up ones, considering that even if they're down someone
1716
      # might want to start them even in the event of a node failure.
1717
      if n_img.offline:
1718
        # we're skipping offline nodes from the N+1 warning, since
1719
        # most likely we don't have good memory infromation from them;
1720
        # we already list instances living on such nodes, and that's
1721
        # enough warning
1722
        continue
1723
      for prinode, instances in n_img.sbp.items():
1724
        needed_mem = 0
1725
        for instance in instances:
1726
          bep = cluster_info.FillBE(instance_cfg[instance])
1727
          if bep[constants.BE_AUTO_BALANCE]:
1728
            needed_mem += bep[constants.BE_MEMORY]
1729
        test = n_img.mfree < needed_mem
1730
        self._ErrorIf(test, self.ENODEN1, node,
1731
                      "not enough memory to accomodate instance failovers"
1732
                      " should node %s fail (%dMiB needed, %dMiB available)",
1733
                      prinode, needed_mem, n_img.mfree)
1734

    
1735
  @classmethod
1736
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
1737
                   (files_all, files_all_opt, files_mc, files_vm)):
1738
    """Verifies file checksums collected from all nodes.
1739

1740
    @param errorif: Callback for reporting errors
1741
    @param nodeinfo: List of L{objects.Node} objects
1742
    @param master_node: Name of master node
1743
    @param all_nvinfo: RPC results
1744

1745
    """
1746
    node_names = frozenset(node.name for node in nodeinfo)
1747

    
1748
    assert master_node in node_names
1749
    assert (len(files_all | files_all_opt | files_mc | files_vm) ==
1750
            sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
1751
           "Found file listed in more than one file list"
1752

    
1753
    # Define functions determining which nodes to consider for a file
1754
    file2nodefn = dict([(filename, fn)
1755
      for (files, fn) in [(files_all, None),
1756
                          (files_all_opt, None),
1757
                          (files_mc, lambda node: (node.master_candidate or
1758
                                                   node.name == master_node)),
1759
                          (files_vm, lambda node: node.vm_capable)]
1760
      for filename in files])
1761

    
1762
    fileinfo = dict((filename, {}) for filename in file2nodefn.keys())
1763

    
1764
    for node in nodeinfo:
1765
      nresult = all_nvinfo[node.name]
1766

    
1767
      if nresult.fail_msg or not nresult.payload:
1768
        node_files = None
1769
      else:
1770
        node_files = nresult.payload.get(constants.NV_FILELIST, None)
1771

    
1772
      test = not (node_files and isinstance(node_files, dict))
1773
      errorif(test, cls.ENODEFILECHECK, node.name,
1774
              "Node did not return file checksum data")
1775
      if test:
1776
        continue
1777

    
1778
      for (filename, checksum) in node_files.items():
1779
        # Check if the file should be considered for a node
1780
        fn = file2nodefn[filename]
1781
        if fn is None or fn(node):
1782
          fileinfo[filename].setdefault(checksum, set()).add(node.name)
1783

    
1784
    for (filename, checksums) in fileinfo.items():
1785
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
1786

    
1787
      # Nodes having the file
1788
      with_file = frozenset(node_name
1789
                            for nodes in fileinfo[filename].values()
1790
                            for node_name in nodes)
1791

    
1792
      # Nodes missing file
1793
      missing_file = node_names - with_file
1794

    
1795
      if filename in files_all_opt:
1796
        # All or no nodes
1797
        errorif(missing_file and missing_file != node_names,
1798
                cls.ECLUSTERFILECHECK, None,
1799
                "File %s is optional, but it must exist on all or no nodes (not"
1800
                " found on %s)",
1801
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
1802
      else:
1803
        errorif(missing_file, cls.ECLUSTERFILECHECK, None,
1804
                "File %s is missing from node(s) %s", filename,
1805
                utils.CommaJoin(utils.NiceSort(missing_file)))
1806

    
1807
      # See if there are multiple versions of the file
1808
      test = len(checksums) > 1
1809
      if test:
1810
        variants = ["variant %s on %s" %
1811
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
1812
                    for (idx, (checksum, nodes)) in
1813
                      enumerate(sorted(checksums.items()))]
1814
      else:
1815
        variants = []
1816

    
1817
      errorif(test, cls.ECLUSTERFILECHECK, None,
1818
              "File %s found with %s different checksums (%s)",
1819
              filename, len(checksums), "; ".join(variants))
1820

    
1821
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
1822
                      drbd_map):
1823
    """Verifies and the node DRBD status.
1824

1825
    @type ninfo: L{objects.Node}
1826
    @param ninfo: the node to check
1827
    @param nresult: the remote results for the node
1828
    @param instanceinfo: the dict of instances
1829
    @param drbd_helper: the configured DRBD usermode helper
1830
    @param drbd_map: the DRBD map as returned by
1831
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
1832

1833
    """
1834
    node = ninfo.name
1835
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1836

    
1837
    if drbd_helper:
1838
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
1839
      test = (helper_result == None)
1840
      _ErrorIf(test, self.ENODEDRBDHELPER, node,
1841
               "no drbd usermode helper returned")
1842
      if helper_result:
1843
        status, payload = helper_result
1844
        test = not status
1845
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
1846
                 "drbd usermode helper check unsuccessful: %s", payload)
1847
        test = status and (payload != drbd_helper)
1848
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
1849
                 "wrong drbd usermode helper: %s", payload)
1850

    
1851
    # compute the DRBD minors
1852
    node_drbd = {}
1853
    for minor, instance in drbd_map[node].items():
1854
      test = instance not in instanceinfo
1855
      _ErrorIf(test, self.ECLUSTERCFG, None,
1856
               "ghost instance '%s' in temporary DRBD map", instance)
1857
        # ghost instance should not be running, but otherwise we
1858
        # don't give double warnings (both ghost instance and
1859
        # unallocated minor in use)
1860
      if test:
1861
        node_drbd[minor] = (instance, False)
1862
      else:
1863
        instance = instanceinfo[instance]
1864
        node_drbd[minor] = (instance.name, instance.admin_up)
1865

    
1866
    # and now check them
1867
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
1868
    test = not isinstance(used_minors, (tuple, list))
1869
    _ErrorIf(test, self.ENODEDRBD, node,
1870
             "cannot parse drbd status file: %s", str(used_minors))
1871
    if test:
1872
      # we cannot check drbd status
1873
      return
1874

    
1875
    for minor, (iname, must_exist) in node_drbd.items():
1876
      test = minor not in used_minors and must_exist
1877
      _ErrorIf(test, self.ENODEDRBD, node,
1878
               "drbd minor %d of instance %s is not active", minor, iname)
1879
    for minor in used_minors:
1880
      test = minor not in node_drbd
1881
      _ErrorIf(test, self.ENODEDRBD, node,
1882
               "unallocated drbd minor %d is in use", minor)
1883

    
1884
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
1885
    """Builds the node OS structures.
1886

1887
    @type ninfo: L{objects.Node}
1888
    @param ninfo: the node to check
1889
    @param nresult: the remote results for the node
1890
    @param nimg: the node image object
1891

1892
    """
1893
    node = ninfo.name
1894
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1895

    
1896
    remote_os = nresult.get(constants.NV_OSLIST, None)
1897
    test = (not isinstance(remote_os, list) or
1898
            not compat.all(isinstance(v, list) and len(v) == 7
1899
                           for v in remote_os))
1900

    
1901
    _ErrorIf(test, self.ENODEOS, node,
1902
             "node hasn't returned valid OS data")
1903

    
1904
    nimg.os_fail = test
1905

    
1906
    if test:
1907
      return
1908

    
1909
    os_dict = {}
1910

    
1911
    for (name, os_path, status, diagnose,
1912
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
1913

    
1914
      if name not in os_dict:
1915
        os_dict[name] = []
1916

    
1917
      # parameters is a list of lists instead of list of tuples due to
1918
      # JSON lacking a real tuple type, fix it:
1919
      parameters = [tuple(v) for v in parameters]
1920
      os_dict[name].append((os_path, status, diagnose,
1921
                            set(variants), set(parameters), set(api_ver)))
1922

    
1923
    nimg.oslist = os_dict
1924

    
1925
  def _VerifyNodeOS(self, ninfo, nimg, base):
1926
    """Verifies the node OS list.
1927

1928
    @type ninfo: L{objects.Node}
1929
    @param ninfo: the node to check
1930
    @param nimg: the node image object
1931
    @param base: the 'template' node we match against (e.g. from the master)
1932

1933
    """
1934
    node = ninfo.name
1935
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1936

    
1937
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
1938

    
1939
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
1940
    for os_name, os_data in nimg.oslist.items():
1941
      assert os_data, "Empty OS status for OS %s?!" % os_name
1942
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
1943
      _ErrorIf(not f_status, self.ENODEOS, node,
1944
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
1945
      _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
1946
               "OS '%s' has multiple entries (first one shadows the rest): %s",
1947
               os_name, utils.CommaJoin([v[0] for v in os_data]))
1948
      # this will catched in backend too
1949
      _ErrorIf(compat.any(v >= constants.OS_API_V15 for v in f_api)
1950
               and not f_var, self.ENODEOS, node,
1951
               "OS %s with API at least %d does not declare any variant",
1952
               os_name, constants.OS_API_V15)
1953
      # comparisons with the 'base' image
1954
      test = os_name not in base.oslist
1955
      _ErrorIf(test, self.ENODEOS, node,
1956
               "Extra OS %s not present on reference node (%s)",
1957
               os_name, base.name)
1958
      if test:
1959
        continue
1960
      assert base.oslist[os_name], "Base node has empty OS status?"
1961
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
1962
      if not b_status:
1963
        # base OS is invalid, skipping
1964
        continue
1965
      for kind, a, b in [("API version", f_api, b_api),
1966
                         ("variants list", f_var, b_var),
1967
                         ("parameters", beautify_params(f_param),
1968
                          beautify_params(b_param))]:
1969
        _ErrorIf(a != b, self.ENODEOS, node,
1970
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
1971
                 kind, os_name, base.name,
1972
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
1973

    
1974
    # check any missing OSes
1975
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
1976
    _ErrorIf(missing, self.ENODEOS, node,
1977
             "OSes present on reference node %s but missing on this node: %s",
1978
             base.name, utils.CommaJoin(missing))
1979

    
1980
  def _VerifyOob(self, ninfo, nresult):
1981
    """Verifies out of band functionality of a node.
1982

1983
    @type ninfo: L{objects.Node}
1984
    @param ninfo: the node to check
1985
    @param nresult: the remote results for the node
1986

1987
    """
1988
    node = ninfo.name
1989
    # We just have to verify the paths on master and/or master candidates
1990
    # as the oob helper is invoked on the master
1991
    if ((ninfo.master_candidate or ninfo.master_capable) and
1992
        constants.NV_OOB_PATHS in nresult):
1993
      for path_result in nresult[constants.NV_OOB_PATHS]:
1994
        self._ErrorIf(path_result, self.ENODEOOBPATH, node, path_result)
1995

    
1996
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
1997
    """Verifies and updates the node volume data.
1998

1999
    This function will update a L{NodeImage}'s internal structures
2000
    with data from the remote call.
2001

2002
    @type ninfo: L{objects.Node}
2003
    @param ninfo: the node to check
2004
    @param nresult: the remote results for the node
2005
    @param nimg: the node image object
2006
    @param vg_name: the configured VG name
2007

2008
    """
2009
    node = ninfo.name
2010
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2011

    
2012
    nimg.lvm_fail = True
2013
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2014
    if vg_name is None:
2015
      pass
2016
    elif isinstance(lvdata, basestring):
2017
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
2018
               utils.SafeEncode(lvdata))
2019
    elif not isinstance(lvdata, dict):
2020
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
2021
    else:
2022
      nimg.volumes = lvdata
2023
      nimg.lvm_fail = False
2024

    
2025
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2026
    """Verifies and updates the node instance list.
2027

2028
    If the listing was successful, then updates this node's instance
2029
    list. Otherwise, it marks the RPC call as failed for the instance
2030
    list key.
2031

2032
    @type ninfo: L{objects.Node}
2033
    @param ninfo: the node to check
2034
    @param nresult: the remote results for the node
2035
    @param nimg: the node image object
2036

2037
    """
2038
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2039
    test = not isinstance(idata, list)
2040
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
2041
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
2042
    if test:
2043
      nimg.hyp_fail = True
2044
    else:
2045
      nimg.instances = idata
2046

    
2047
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2048
    """Verifies and computes a node information map
2049

2050
    @type ninfo: L{objects.Node}
2051
    @param ninfo: the node to check
2052
    @param nresult: the remote results for the node
2053
    @param nimg: the node image object
2054
    @param vg_name: the configured VG name
2055

2056
    """
2057
    node = ninfo.name
2058
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2059

    
2060
    # try to read free memory (from the hypervisor)
2061
    hv_info = nresult.get(constants.NV_HVINFO, None)
2062
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2063
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
2064
    if not test:
2065
      try:
2066
        nimg.mfree = int(hv_info["memory_free"])
2067
      except (ValueError, TypeError):
2068
        _ErrorIf(True, self.ENODERPC, node,
2069
                 "node returned invalid nodeinfo, check hypervisor")
2070

    
2071
    # FIXME: devise a free space model for file based instances as well
2072
    if vg_name is not None:
2073
      test = (constants.NV_VGLIST not in nresult or
2074
              vg_name not in nresult[constants.NV_VGLIST])
2075
      _ErrorIf(test, self.ENODELVM, node,
2076
               "node didn't return data for the volume group '%s'"
2077
               " - it is either missing or broken", vg_name)
2078
      if not test:
2079
        try:
2080
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2081
        except (ValueError, TypeError):
2082
          _ErrorIf(True, self.ENODERPC, node,
2083
                   "node returned invalid LVM info, check LVM status")
2084

    
2085
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2086
    """Gets per-disk status information for all instances.
2087

2088
    @type nodelist: list of strings
2089
    @param nodelist: Node names
2090
    @type node_image: dict of (name, L{objects.Node})
2091
    @param node_image: Node objects
2092
    @type instanceinfo: dict of (name, L{objects.Instance})
2093
    @param instanceinfo: Instance objects
2094
    @rtype: {instance: {node: [(succes, payload)]}}
2095
    @return: a dictionary of per-instance dictionaries with nodes as
2096
        keys and disk information as values; the disk information is a
2097
        list of tuples (success, payload)
2098

2099
    """
2100
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2101

    
2102
    node_disks = {}
2103
    node_disks_devonly = {}
2104
    diskless_instances = set()
2105
    diskless = constants.DT_DISKLESS
2106

    
2107
    for nname in nodelist:
2108
      node_instances = list(itertools.chain(node_image[nname].pinst,
2109
                                            node_image[nname].sinst))
2110
      diskless_instances.update(inst for inst in node_instances
2111
                                if instanceinfo[inst].disk_template == diskless)
2112
      disks = [(inst, disk)
2113
               for inst in node_instances
2114
               for disk in instanceinfo[inst].disks]
2115

    
2116
      if not disks:
2117
        # No need to collect data
2118
        continue
2119

    
2120
      node_disks[nname] = disks
2121

    
2122
      # Creating copies as SetDiskID below will modify the objects and that can
2123
      # lead to incorrect data returned from nodes
2124
      devonly = [dev.Copy() for (_, dev) in disks]
2125

    
2126
      for dev in devonly:
2127
        self.cfg.SetDiskID(dev, nname)
2128

    
2129
      node_disks_devonly[nname] = devonly
2130

    
2131
    assert len(node_disks) == len(node_disks_devonly)
2132

    
2133
    # Collect data from all nodes with disks
2134
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2135
                                                          node_disks_devonly)
2136

    
2137
    assert len(result) == len(node_disks)
2138

    
2139
    instdisk = {}
2140

    
2141
    for (nname, nres) in result.items():
2142
      disks = node_disks[nname]
2143

    
2144
      if nres.offline:
2145
        # No data from this node
2146
        data = len(disks) * [(False, "node offline")]
2147
      else:
2148
        msg = nres.fail_msg
2149
        _ErrorIf(msg, self.ENODERPC, nname,
2150
                 "while getting disk information: %s", msg)
2151
        if msg:
2152
          # No data from this node
2153
          data = len(disks) * [(False, msg)]
2154
        else:
2155
          data = []
2156
          for idx, i in enumerate(nres.payload):
2157
            if isinstance(i, (tuple, list)) and len(i) == 2:
2158
              data.append(i)
2159
            else:
2160
              logging.warning("Invalid result from node %s, entry %d: %s",
2161
                              nname, idx, i)
2162
              data.append((False, "Invalid result from the remote node"))
2163

    
2164
      for ((inst, _), status) in zip(disks, data):
2165
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2166

    
2167
    # Add empty entries for diskless instances.
2168
    for inst in diskless_instances:
2169
      assert inst not in instdisk
2170
      instdisk[inst] = {}
2171

    
2172
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2173
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2174
                      compat.all(isinstance(s, (tuple, list)) and
2175
                                 len(s) == 2 for s in statuses)
2176
                      for inst, nnames in instdisk.items()
2177
                      for nname, statuses in nnames.items())
2178
    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
2179

    
2180
    return instdisk
2181

    
2182
  def _VerifyHVP(self, hvp_data):
2183
    """Verifies locally the syntax of the hypervisor parameters.
2184

2185
    """
2186
    for item, hv_name, hv_params in hvp_data:
2187
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
2188
             (item, hv_name))
2189
      try:
2190
        hv_class = hypervisor.GetHypervisor(hv_name)
2191
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2192
        hv_class.CheckParameterSyntax(hv_params)
2193
      except errors.GenericError, err:
2194
        self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
2195

    
2196
  def BuildHooksEnv(self):
2197
    """Build hooks env.
2198

2199
    Cluster-Verify hooks just ran in the post phase and their failure makes
2200
    the output be logged in the verify output and the verification to fail.
2201

2202
    """
2203
    cfg = self.cfg
2204

    
2205
    env = {
2206
      "CLUSTER_TAGS": " ".join(cfg.GetClusterInfo().GetTags())
2207
      }
2208

    
2209
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2210
               for node in cfg.GetAllNodesInfo().values())
2211

    
2212
    return env
2213

    
2214
  def BuildHooksNodes(self):
2215
    """Build hooks nodes.
2216

2217
    """
2218
    return ([], self.cfg.GetNodeList())
2219

    
2220
  def Exec(self, feedback_fn):
2221
    """Verify integrity of cluster, performing various test on nodes.
2222

2223
    """
2224
    # This method has too many local variables. pylint: disable-msg=R0914
2225
    self.bad = False
2226
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2227
    verbose = self.op.verbose
2228
    self._feedback_fn = feedback_fn
2229
    feedback_fn("* Verifying global settings")
2230
    for msg in self.cfg.VerifyConfig():
2231
      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
2232

    
2233
    # Check the cluster certificates
2234
    for cert_filename in constants.ALL_CERT_FILES:
2235
      (errcode, msg) = _VerifyCertificate(cert_filename)
2236
      _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
2237

    
2238
    vg_name = self.cfg.GetVGName()
2239
    drbd_helper = self.cfg.GetDRBDHelper()
2240
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2241
    cluster = self.cfg.GetClusterInfo()
2242
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
2243
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
2244
    nodeinfo_byname = dict(zip(nodelist, nodeinfo))
2245
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
2246
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
2247
                        for iname in instancelist)
2248
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2249
    i_non_redundant = [] # Non redundant instances
2250
    i_non_a_balanced = [] # Non auto-balanced instances
2251
    n_offline = 0 # Count of offline nodes
2252
    n_drained = 0 # Count of nodes being drained
2253
    node_vol_should = {}
2254

    
2255
    # FIXME: verify OS list
2256

    
2257
    # File verification
2258
    filemap = _ComputeAncillaryFiles(cluster, False)
2259

    
2260
    # do local checksums
2261
    master_node = self.master_node = self.cfg.GetMasterNode()
2262
    master_ip = self.cfg.GetMasterIP()
2263

    
2264
    # Compute the set of hypervisor parameters
2265
    hvp_data = []
2266
    for hv_name in hypervisors:
2267
      hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
2268
    for os_name, os_hvp in cluster.os_hvp.items():
2269
      for hv_name, hv_params in os_hvp.items():
2270
        if not hv_params:
2271
          continue
2272
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
2273
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
2274
    # TODO: collapse identical parameter values in a single one
2275
    for instance in instanceinfo.values():
2276
      if not instance.hvparams:
2277
        continue
2278
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
2279
                       cluster.FillHV(instance)))
2280
    # and verify them locally
2281
    self._VerifyHVP(hvp_data)
2282

    
2283
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
2284
    node_verify_param = {
2285
      constants.NV_FILELIST:
2286
        utils.UniqueSequence(filename
2287
                             for files in filemap
2288
                             for filename in files),
2289
      constants.NV_NODELIST: [node.name for node in nodeinfo
2290
                              if not node.offline],
2291
      constants.NV_HYPERVISOR: hypervisors,
2292
      constants.NV_HVPARAMS: hvp_data,
2293
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
2294
                                  node.secondary_ip) for node in nodeinfo
2295
                                 if not node.offline],
2296
      constants.NV_INSTANCELIST: hypervisors,
2297
      constants.NV_VERSION: None,
2298
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2299
      constants.NV_NODESETUP: None,
2300
      constants.NV_TIME: None,
2301
      constants.NV_MASTERIP: (master_node, master_ip),
2302
      constants.NV_OSLIST: None,
2303
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2304
      }
2305

    
2306
    if vg_name is not None:
2307
      node_verify_param[constants.NV_VGLIST] = None
2308
      node_verify_param[constants.NV_LVLIST] = vg_name
2309
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2310
      node_verify_param[constants.NV_DRBDLIST] = None
2311

    
2312
    if drbd_helper:
2313
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2314

    
2315
    # Build our expected cluster state
2316
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2317
                                                 name=node.name,
2318
                                                 vm_capable=node.vm_capable))
2319
                      for node in nodeinfo)
2320

    
2321
    # Gather OOB paths
2322
    oob_paths = []
2323
    for node in nodeinfo:
2324
      path = _SupportsOob(self.cfg, node)
2325
      if path and path not in oob_paths:
2326
        oob_paths.append(path)
2327

    
2328
    if oob_paths:
2329
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2330

    
2331
    for instance in instancelist:
2332
      inst_config = instanceinfo[instance]
2333

    
2334
      for nname in inst_config.all_nodes:
2335
        if nname not in node_image:
2336
          # ghost node
2337
          gnode = self.NodeImage(name=nname)
2338
          gnode.ghost = True
2339
          node_image[nname] = gnode
2340

    
2341
      inst_config.MapLVsByNode(node_vol_should)
2342

    
2343
      pnode = inst_config.primary_node
2344
      node_image[pnode].pinst.append(instance)
2345

    
2346
      for snode in inst_config.secondary_nodes:
2347
        nimg = node_image[snode]
2348
        nimg.sinst.append(instance)
2349
        if pnode not in nimg.sbp:
2350
          nimg.sbp[pnode] = []
2351
        nimg.sbp[pnode].append(instance)
2352

    
2353
    # At this point, we have the in-memory data structures complete,
2354
    # except for the runtime information, which we'll gather next
2355

    
2356
    # Due to the way our RPC system works, exact response times cannot be
2357
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2358
    # time before and after executing the request, we can at least have a time
2359
    # window.
2360
    nvinfo_starttime = time.time()
2361
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
2362
                                           self.cfg.GetClusterName())
2363
    nvinfo_endtime = time.time()
2364

    
2365
    all_drbd_map = self.cfg.ComputeDRBDMap()
2366

    
2367
    feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist))
2368
    instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo)
2369

    
2370
    feedback_fn("* Verifying configuration file consistency")
2371
    self._VerifyFiles(_ErrorIf, nodeinfo, master_node, all_nvinfo, filemap)
2372

    
2373
    feedback_fn("* Verifying node status")
2374

    
2375
    refos_img = None
2376

    
2377
    for node_i in nodeinfo:
2378
      node = node_i.name
2379
      nimg = node_image[node]
2380

    
2381
      if node_i.offline:
2382
        if verbose:
2383
          feedback_fn("* Skipping offline node %s" % (node,))
2384
        n_offline += 1
2385
        continue
2386

    
2387
      if node == master_node:
2388
        ntype = "master"
2389
      elif node_i.master_candidate:
2390
        ntype = "master candidate"
2391
      elif node_i.drained:
2392
        ntype = "drained"
2393
        n_drained += 1
2394
      else:
2395
        ntype = "regular"
2396
      if verbose:
2397
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2398

    
2399
      msg = all_nvinfo[node].fail_msg
2400
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
2401
      if msg:
2402
        nimg.rpc_fail = True
2403
        continue
2404

    
2405
      nresult = all_nvinfo[node].payload
2406

    
2407
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2408
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2409
      self._VerifyNodeNetwork(node_i, nresult)
2410
      self._VerifyOob(node_i, nresult)
2411

    
2412
      if nimg.vm_capable:
2413
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2414
        self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper,
2415
                             all_drbd_map)
2416

    
2417
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2418
        self._UpdateNodeInstances(node_i, nresult, nimg)
2419
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2420
        self._UpdateNodeOS(node_i, nresult, nimg)
2421
        if not nimg.os_fail:
2422
          if refos_img is None:
2423
            refos_img = nimg
2424
          self._VerifyNodeOS(node_i, nimg, refos_img)
2425

    
2426
    feedback_fn("* Verifying instance status")
2427
    for instance in instancelist:
2428
      if verbose:
2429
        feedback_fn("* Verifying instance %s" % instance)
2430
      inst_config = instanceinfo[instance]
2431
      self._VerifyInstance(instance, inst_config, node_image,
2432
                           instdisk[instance])
2433
      inst_nodes_offline = []
2434

    
2435
      pnode = inst_config.primary_node
2436
      pnode_img = node_image[pnode]
2437
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2438
               self.ENODERPC, pnode, "instance %s, connection to"
2439
               " primary node failed", instance)
2440

    
2441
      _ErrorIf(inst_config.admin_up and pnode_img.offline,
2442
               self.EINSTANCEBADNODE, instance,
2443
               "instance is marked as running and lives on offline node %s",
2444
               inst_config.primary_node)
2445

    
2446
      # If the instance is non-redundant we cannot survive losing its primary
2447
      # node, so we are not N+1 compliant. On the other hand we have no disk
2448
      # templates with more than one secondary so that situation is not well
2449
      # supported either.
2450
      # FIXME: does not support file-backed instances
2451
      if not inst_config.secondary_nodes:
2452
        i_non_redundant.append(instance)
2453

    
2454
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
2455
               instance, "instance has multiple secondary nodes: %s",
2456
               utils.CommaJoin(inst_config.secondary_nodes),
2457
               code=self.ETYPE_WARNING)
2458

    
2459
      if inst_config.disk_template in constants.DTS_INT_MIRROR:
2460
        pnode = inst_config.primary_node
2461
        instance_nodes = utils.NiceSort(inst_config.all_nodes)
2462
        instance_groups = {}
2463

    
2464
        for node in instance_nodes:
2465
          instance_groups.setdefault(nodeinfo_byname[node].group,
2466
                                     []).append(node)
2467

    
2468
        pretty_list = [
2469
          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
2470
          # Sort so that we always list the primary node first.
2471
          for group, nodes in sorted(instance_groups.items(),
2472
                                     key=lambda (_, nodes): pnode in nodes,
2473
                                     reverse=True)]
2474

    
2475
        self._ErrorIf(len(instance_groups) > 1, self.EINSTANCESPLITGROUPS,
2476
                      instance, "instance has primary and secondary nodes in"
2477
                      " different groups: %s", utils.CommaJoin(pretty_list),
2478
                      code=self.ETYPE_WARNING)
2479

    
2480
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2481
        i_non_a_balanced.append(instance)
2482

    
2483
      for snode in inst_config.secondary_nodes:
2484
        s_img = node_image[snode]
2485
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
2486
                 "instance %s, connection to secondary node failed", instance)
2487

    
2488
        if s_img.offline:
2489
          inst_nodes_offline.append(snode)
2490

    
2491
      # warn that the instance lives on offline nodes
2492
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
2493
               "instance has offline secondary node(s) %s",
2494
               utils.CommaJoin(inst_nodes_offline))
2495
      # ... or ghost/non-vm_capable nodes
2496
      for node in inst_config.all_nodes:
2497
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
2498
                 "instance lives on ghost node %s", node)
2499
        _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE,
2500
                 instance, "instance lives on non-vm_capable node %s", node)
2501

    
2502
    feedback_fn("* Verifying orphan volumes")
2503
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2504
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2505

    
2506
    feedback_fn("* Verifying orphan instances")
2507
    self._VerifyOrphanInstances(instancelist, node_image)
2508

    
2509
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2510
      feedback_fn("* Verifying N+1 Memory redundancy")
2511
      self._VerifyNPlusOneMemory(node_image, instanceinfo)
2512

    
2513
    feedback_fn("* Other Notes")
2514
    if i_non_redundant:
2515
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2516
                  % len(i_non_redundant))
2517

    
2518
    if i_non_a_balanced:
2519
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2520
                  % len(i_non_a_balanced))
2521

    
2522
    if n_offline:
2523
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2524

    
2525
    if n_drained:
2526
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2527

    
2528
    return not self.bad
2529

    
2530
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2531
    """Analyze the post-hooks' result
2532

2533
    This method analyses the hook result, handles it, and sends some
2534
    nicely-formatted feedback back to the user.
2535

2536
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2537
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2538
    @param hooks_results: the results of the multi-node hooks rpc call
2539
    @param feedback_fn: function used send feedback back to the caller
2540
    @param lu_result: previous Exec result
2541
    @return: the new Exec result, based on the previous result
2542
        and hook results
2543

2544
    """
2545
    # We only really run POST phase hooks, and are only interested in
2546
    # their results
2547
    if phase == constants.HOOKS_PHASE_POST:
2548
      # Used to change hooks' output to proper indentation
2549
      feedback_fn("* Hooks Results")
2550
      assert hooks_results, "invalid result from hooks"
2551

    
2552
      for node_name in hooks_results:
2553
        res = hooks_results[node_name]
2554
        msg = res.fail_msg
2555
        test = msg and not res.offline
2556
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
2557
                      "Communication failure in hooks execution: %s", msg)
2558
        if res.offline or msg:
2559
          # No need to investigate payload if node is offline or gave an error.
2560
          # override manually lu_result here as _ErrorIf only
2561
          # overrides self.bad
2562
          lu_result = 1
2563
          continue
2564
        for script, hkr, output in res.payload:
2565
          test = hkr == constants.HKR_FAIL
2566
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
2567
                        "Script %s failed, output:", script)
2568
          if test:
2569
            output = self._HOOKS_INDENT_RE.sub('      ', output)
2570
            feedback_fn("%s" % output)
2571
            lu_result = 0
2572

    
2573
      return lu_result
2574

    
2575

    
2576
class LUClusterVerifyDisks(NoHooksLU):
2577
  """Verifies the cluster disks status.
2578

2579
  """
2580
  REQ_BGL = False
2581

    
2582
  def ExpandNames(self):
2583
    self.needed_locks = {
2584
      locking.LEVEL_NODE: locking.ALL_SET,
2585
      locking.LEVEL_INSTANCE: locking.ALL_SET,
2586
    }
2587
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
2588

    
2589
  def Exec(self, feedback_fn):
2590
    """Verify integrity of cluster disks.
2591

2592
    @rtype: tuple of three items
2593
    @return: a tuple of (dict of node-to-node_error, list of instances
2594
        which need activate-disks, dict of instance: (node, volume) for
2595
        missing volumes
2596

2597
    """
2598
    result = res_nodes, res_instances, res_missing = {}, [], {}
2599

    
2600
    nodes = utils.NiceSort(self.cfg.GetVmCapableNodeList())
2601
    instances = self.cfg.GetAllInstancesInfo().values()
2602

    
2603
    nv_dict = {}
2604
    for inst in instances:
2605
      inst_lvs = {}
2606
      if not inst.admin_up:
2607
        continue
2608
      inst.MapLVsByNode(inst_lvs)
2609
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
2610
      for node, vol_list in inst_lvs.iteritems():
2611
        for vol in vol_list:
2612
          nv_dict[(node, vol)] = inst
2613

    
2614
    if not nv_dict:
2615
      return result
2616

    
2617
    node_lvs = self.rpc.call_lv_list(nodes, [])
2618
    for node, node_res in node_lvs.items():
2619
      if node_res.offline:
2620
        continue
2621
      msg = node_res.fail_msg
2622
      if msg:
2623
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
2624
        res_nodes[node] = msg
2625
        continue
2626

    
2627
      lvs = node_res.payload
2628
      for lv_name, (_, _, lv_online) in lvs.items():
2629
        inst = nv_dict.pop((node, lv_name), None)
2630
        if (not lv_online and inst is not None
2631
            and inst.name not in res_instances):
2632
          res_instances.append(inst.name)
2633

    
2634
    # any leftover items in nv_dict are missing LVs, let's arrange the
2635
    # data better
2636
    for key, inst in nv_dict.iteritems():
2637
      if inst.name not in res_missing:
2638
        res_missing[inst.name] = []
2639
      res_missing[inst.name].append(key)
2640

    
2641
    return result
2642

    
2643

    
2644
class LUClusterRepairDiskSizes(NoHooksLU):
2645
  """Verifies the cluster disks sizes.
2646

2647
  """
2648
  REQ_BGL = False
2649

    
2650
  def ExpandNames(self):
2651
    if self.op.instances:
2652
      self.wanted_names = []
2653
      for name in self.op.instances:
2654
        full_name = _ExpandInstanceName(self.cfg, name)
2655
        self.wanted_names.append(full_name)
2656
      self.needed_locks = {
2657
        locking.LEVEL_NODE: [],
2658
        locking.LEVEL_INSTANCE: self.wanted_names,
2659
        }
2660
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2661
    else:
2662
      self.wanted_names = None
2663
      self.needed_locks = {
2664
        locking.LEVEL_NODE: locking.ALL_SET,
2665
        locking.LEVEL_INSTANCE: locking.ALL_SET,
2666
        }
2667
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
2668

    
2669
  def DeclareLocks(self, level):
2670
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
2671
      self._LockInstancesNodes(primary_only=True)
2672

    
2673
  def CheckPrereq(self):
2674
    """Check prerequisites.
2675

2676
    This only checks the optional instance list against the existing names.
2677

2678
    """
2679
    if self.wanted_names is None:
2680
      self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
2681

    
2682
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
2683
                             in self.wanted_names]
2684

    
2685
  def _EnsureChildSizes(self, disk):
2686
    """Ensure children of the disk have the needed disk size.
2687

2688
    This is valid mainly for DRBD8 and fixes an issue where the
2689
    children have smaller disk size.
2690

2691
    @param disk: an L{ganeti.objects.Disk} object
2692

2693
    """
2694
    if disk.dev_type == constants.LD_DRBD8:
2695
      assert disk.children, "Empty children for DRBD8?"
2696
      fchild = disk.children[0]
2697
      mismatch = fchild.size < disk.size
2698
      if mismatch:
2699
        self.LogInfo("Child disk has size %d, parent %d, fixing",
2700
                     fchild.size, disk.size)
2701
        fchild.size = disk.size
2702

    
2703
      # and we recurse on this child only, not on the metadev
2704
      return self._EnsureChildSizes(fchild) or mismatch
2705
    else:
2706
      return False
2707

    
2708
  def Exec(self, feedback_fn):
2709
    """Verify the size of cluster disks.
2710

2711
    """
2712
    # TODO: check child disks too
2713
    # TODO: check differences in size between primary/secondary nodes
2714
    per_node_disks = {}
2715
    for instance in self.wanted_instances:
2716
      pnode = instance.primary_node
2717
      if pnode not in per_node_disks:
2718
        per_node_disks[pnode] = []
2719
      for idx, disk in enumerate(instance.disks):
2720
        per_node_disks[pnode].append((instance, idx, disk))
2721

    
2722
    changed = []
2723
    for node, dskl in per_node_disks.items():
2724
      newl = [v[2].Copy() for v in dskl]
2725
      for dsk in newl:
2726
        self.cfg.SetDiskID(dsk, node)
2727
      result = self.rpc.call_blockdev_getsize(node, newl)
2728
      if result.fail_msg:
2729
        self.LogWarning("Failure in blockdev_getsize call to node"
2730
                        " %s, ignoring", node)
2731
        continue
2732
      if len(result.payload) != len(dskl):
2733
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
2734
                        " result.payload=%s", node, len(dskl), result.payload)
2735
        self.LogWarning("Invalid result from node %s, ignoring node results",
2736
                        node)
2737
        continue
2738
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
2739
        if size is None:
2740
          self.LogWarning("Disk %d of instance %s did not return size"
2741
                          " information, ignoring", idx, instance.name)
2742
          continue
2743
        if not isinstance(size, (int, long)):
2744
          self.LogWarning("Disk %d of instance %s did not return valid"
2745
                          " size information, ignoring", idx, instance.name)
2746
          continue
2747
        size = size >> 20
2748
        if size != disk.size:
2749
          self.LogInfo("Disk %d of instance %s has mismatched size,"
2750
                       " correcting: recorded %d, actual %d", idx,
2751
                       instance.name, disk.size, size)
2752
          disk.size = size
2753
          self.cfg.Update(instance, feedback_fn)
2754
          changed.append((instance.name, idx, size))
2755
        if self._EnsureChildSizes(disk):
2756
          self.cfg.Update(instance, feedback_fn)
2757
          changed.append((instance.name, idx, disk.size))
2758
    return changed
2759

    
2760

    
2761
class LUClusterRename(LogicalUnit):
2762
  """Rename the cluster.
2763

2764
  """
2765
  HPATH = "cluster-rename"
2766
  HTYPE = constants.HTYPE_CLUSTER
2767

    
2768
  def BuildHooksEnv(self):
2769
    """Build hooks env.
2770

2771
    """
2772
    return {
2773
      "OP_TARGET": self.cfg.GetClusterName(),
2774
      "NEW_NAME": self.op.name,
2775
      }
2776

    
2777
  def BuildHooksNodes(self):
2778
    """Build hooks nodes.
2779

2780
    """
2781
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
2782

    
2783
  def CheckPrereq(self):
2784
    """Verify that the passed name is a valid one.
2785

2786
    """
2787
    hostname = netutils.GetHostname(name=self.op.name,
2788
                                    family=self.cfg.GetPrimaryIPFamily())
2789

    
2790
    new_name = hostname.name
2791
    self.ip = new_ip = hostname.ip
2792
    old_name = self.cfg.GetClusterName()
2793
    old_ip = self.cfg.GetMasterIP()
2794
    if new_name == old_name and new_ip == old_ip:
2795
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
2796
                                 " cluster has changed",
2797
                                 errors.ECODE_INVAL)
2798
    if new_ip != old_ip:
2799
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
2800
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
2801
                                   " reachable on the network" %
2802
                                   new_ip, errors.ECODE_NOTUNIQUE)
2803

    
2804
    self.op.name = new_name
2805

    
2806
  def Exec(self, feedback_fn):
2807
    """Rename the cluster.
2808

2809
    """
2810
    clustername = self.op.name
2811
    ip = self.ip
2812

    
2813
    # shutdown the master IP
2814
    master = self.cfg.GetMasterNode()
2815
    result = self.rpc.call_node_stop_master(master, False)
2816
    result.Raise("Could not disable the master role")
2817

    
2818
    try:
2819
      cluster = self.cfg.GetClusterInfo()
2820
      cluster.cluster_name = clustername
2821
      cluster.master_ip = ip
2822
      self.cfg.Update(cluster, feedback_fn)
2823

    
2824
      # update the known hosts file
2825
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
2826
      node_list = self.cfg.GetOnlineNodeList()
2827
      try:
2828
        node_list.remove(master)
2829
      except ValueError:
2830
        pass
2831
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
2832
    finally:
2833
      result = self.rpc.call_node_start_master(master, False, False)
2834
      msg = result.fail_msg
2835
      if msg:
2836
        self.LogWarning("Could not re-enable the master role on"
2837
                        " the master, please restart manually: %s", msg)
2838

    
2839
    return clustername
2840

    
2841

    
2842
class LUClusterSetParams(LogicalUnit):
2843
  """Change the parameters of the cluster.
2844

2845
  """
2846
  HPATH = "cluster-modify"
2847
  HTYPE = constants.HTYPE_CLUSTER
2848
  REQ_BGL = False
2849

    
2850
  def CheckArguments(self):
2851
    """Check parameters
2852

2853
    """
2854
    if self.op.uid_pool:
2855
      uidpool.CheckUidPool(self.op.uid_pool)
2856

    
2857
    if self.op.add_uids:
2858
      uidpool.CheckUidPool(self.op.add_uids)
2859

    
2860
    if self.op.remove_uids:
2861
      uidpool.CheckUidPool(self.op.remove_uids)
2862

    
2863
  def ExpandNames(self):
2864
    # FIXME: in the future maybe other cluster params won't require checking on
2865
    # all nodes to be modified.
2866
    self.needed_locks = {
2867
      locking.LEVEL_NODE: locking.ALL_SET,
2868
    }
2869
    self.share_locks[locking.LEVEL_NODE] = 1
2870

    
2871
  def BuildHooksEnv(self):
2872
    """Build hooks env.
2873

2874
    """
2875
    return {
2876
      "OP_TARGET": self.cfg.GetClusterName(),
2877
      "NEW_VG_NAME": self.op.vg_name,
2878
      }
2879

    
2880
  def BuildHooksNodes(self):
2881
    """Build hooks nodes.
2882

2883
    """
2884
    mn = self.cfg.GetMasterNode()
2885
    return ([mn], [mn])
2886

    
2887
  def CheckPrereq(self):
2888
    """Check prerequisites.
2889

2890
    This checks whether the given params don't conflict and
2891
    if the given volume group is valid.
2892

2893
    """
2894
    if self.op.vg_name is not None and not self.op.vg_name:
2895
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
2896
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
2897
                                   " instances exist", errors.ECODE_INVAL)
2898

    
2899
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
2900
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
2901
        raise errors.OpPrereqError("Cannot disable drbd helper while"
2902
                                   " drbd-based instances exist",
2903
                                   errors.ECODE_INVAL)
2904

    
2905
    node_list = self.glm.list_owned(locking.LEVEL_NODE)
2906

    
2907
    # if vg_name not None, checks given volume group on all nodes
2908
    if self.op.vg_name:
2909
      vglist = self.rpc.call_vg_list(node_list)
2910
      for node in node_list:
2911
        msg = vglist[node].fail_msg
2912
        if msg:
2913
          # ignoring down node
2914
          self.LogWarning("Error while gathering data on node %s"
2915
                          " (ignoring node): %s", node, msg)
2916
          continue
2917
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2918
                                              self.op.vg_name,
2919
                                              constants.MIN_VG_SIZE)
2920
        if vgstatus:
2921
          raise errors.OpPrereqError("Error on node '%s': %s" %
2922
                                     (node, vgstatus), errors.ECODE_ENVIRON)
2923

    
2924
    if self.op.drbd_helper:
2925
      # checks given drbd helper on all nodes
2926
      helpers = self.rpc.call_drbd_helper(node_list)
2927
      for node in node_list:
2928
        ninfo = self.cfg.GetNodeInfo(node)
2929
        if ninfo.offline:
2930
          self.LogInfo("Not checking drbd helper on offline node %s", node)
2931
          continue
2932
        msg = helpers[node].fail_msg
2933
        if msg:
2934
          raise errors.OpPrereqError("Error checking drbd helper on node"
2935
                                     " '%s': %s" % (node, msg),
2936
                                     errors.ECODE_ENVIRON)
2937
        node_helper = helpers[node].payload
2938
        if node_helper != self.op.drbd_helper:
2939
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
2940
                                     (node, node_helper), errors.ECODE_ENVIRON)
2941

    
2942
    self.cluster = cluster = self.cfg.GetClusterInfo()
2943
    # validate params changes
2944
    if self.op.beparams:
2945
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2946
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
2947

    
2948
    if self.op.ndparams:
2949
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
2950
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
2951

    
2952
      # TODO: we need a more general way to handle resetting
2953
      # cluster-level parameters to default values
2954
      if self.new_ndparams["oob_program"] == "":
2955
        self.new_ndparams["oob_program"] = \
2956
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
2957

    
2958
    if self.op.nicparams:
2959
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2960
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
2961
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
2962
      nic_errors = []
2963

    
2964
      # check all instances for consistency
2965
      for instance in self.cfg.GetAllInstancesInfo().values():
2966
        for nic_idx, nic in enumerate(instance.nics):
2967
          params_copy = copy.deepcopy(nic.nicparams)
2968
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
2969

    
2970
          # check parameter syntax
2971
          try:
2972
            objects.NIC.CheckParameterSyntax(params_filled)
2973
          except errors.ConfigurationError, err:
2974
            nic_errors.append("Instance %s, nic/%d: %s" %
2975
                              (instance.name, nic_idx, err))
2976

    
2977
          # if we're moving instances to routed, check that they have an ip
2978
          target_mode = params_filled[constants.NIC_MODE]
2979
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2980
            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2981
                              (instance.name, nic_idx))
2982
      if nic_errors:
2983
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2984
                                   "\n".join(nic_errors))
2985

    
2986
    # hypervisor list/parameters
2987
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
2988
    if self.op.hvparams:
2989
      for hv_name, hv_dict in self.op.hvparams.items():
2990
        if hv_name not in self.new_hvparams:
2991
          self.new_hvparams[hv_name] = hv_dict
2992
        else:
2993
          self.new_hvparams[hv_name].update(hv_dict)
2994

    
2995
    # os hypervisor parameters
2996
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
2997
    if self.op.os_hvp:
2998
      for os_name, hvs in self.op.os_hvp.items():
2999
        if os_name not in self.new_os_hvp:
3000
          self.new_os_hvp[os_name] = hvs
3001
        else:
3002
          for hv_name, hv_dict in hvs.items():
3003
            if hv_name not in self.new_os_hvp[os_name]:
3004
              self.new_os_hvp[os_name][hv_name] = hv_dict
3005
            else:
3006
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
3007

    
3008
    # os parameters
3009
    self.new_osp = objects.FillDict(cluster.osparams, {})
3010
    if self.op.osparams:
3011
      for os_name, osp in self.op.osparams.items():
3012
        if os_name not in self.new_osp:
3013
          self.new_osp[os_name] = {}
3014

    
3015
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
3016
                                                  use_none=True)
3017

    
3018
        if not self.new_osp[os_name]:
3019
          # we removed all parameters
3020
          del self.new_osp[os_name]
3021
        else:
3022
          # check the parameter validity (remote check)
3023
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
3024
                         os_name, self.new_osp[os_name])
3025

    
3026
    # changes to the hypervisor list
3027
    if self.op.enabled_hypervisors is not None:
3028
      self.hv_list = self.op.enabled_hypervisors
3029
      for hv in self.hv_list:
3030
        # if the hypervisor doesn't already exist in the cluster
3031
        # hvparams, we initialize it to empty, and then (in both
3032
        # cases) we make sure to fill the defaults, as we might not
3033
        # have a complete defaults list if the hypervisor wasn't
3034
        # enabled before
3035
        if hv not in new_hvp:
3036
          new_hvp[hv] = {}
3037
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
3038
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
3039
    else:
3040
      self.hv_list = cluster.enabled_hypervisors
3041

    
3042
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
3043
      # either the enabled list has changed, or the parameters have, validate
3044
      for hv_name, hv_params in self.new_hvparams.items():
3045
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
3046
            (self.op.enabled_hypervisors and
3047
             hv_name in self.op.enabled_hypervisors)):
3048
          # either this is a new hypervisor, or its parameters have changed
3049
          hv_class = hypervisor.GetHypervisor(hv_name)
3050
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3051
          hv_class.CheckParameterSyntax(hv_params)
3052
          _CheckHVParams(self, node_list, hv_name, hv_params)
3053

    
3054
    if self.op.os_hvp:
3055
      # no need to check any newly-enabled hypervisors, since the
3056
      # defaults have already been checked in the above code-block
3057
      for os_name, os_hvp in self.new_os_hvp.items():
3058
        for hv_name, hv_params in os_hvp.items():
3059
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
3060
          # we need to fill in the new os_hvp on top of the actual hv_p
3061
          cluster_defaults = self.new_hvparams.get(hv_name, {})
3062
          new_osp = objects.FillDict(cluster_defaults, hv_params)
3063
          hv_class = hypervisor.GetHypervisor(hv_name)
3064
          hv_class.CheckParameterSyntax(new_osp)
3065
          _CheckHVParams(self, node_list, hv_name, new_osp)
3066

    
3067
    if self.op.default_iallocator:
3068
      alloc_script = utils.FindFile(self.op.default_iallocator,
3069
                                    constants.IALLOCATOR_SEARCH_PATH,
3070
                                    os.path.isfile)
3071
      if alloc_script is None:
3072
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
3073
                                   " specified" % self.op.default_iallocator,
3074
                                   errors.ECODE_INVAL)
3075

    
3076
  def Exec(self, feedback_fn):
3077
    """Change the parameters of the cluster.
3078

3079
    """
3080
    if self.op.vg_name is not None:
3081
      new_volume = self.op.vg_name
3082
      if not new_volume:
3083
        new_volume = None
3084
      if new_volume != self.cfg.GetVGName():
3085
        self.cfg.SetVGName(new_volume)
3086
      else:
3087
        feedback_fn("Cluster LVM configuration already in desired"
3088
                    " state, not changing")
3089
    if self.op.drbd_helper is not None:
3090
      new_helper = self.op.drbd_helper
3091
      if not new_helper:
3092
        new_helper = None
3093
      if new_helper != self.cfg.GetDRBDHelper():
3094
        self.cfg.SetDRBDHelper(new_helper)
3095
      else:
3096
        feedback_fn("Cluster DRBD helper already in desired state,"
3097
                    " not changing")
3098
    if self.op.hvparams:
3099
      self.cluster.hvparams = self.new_hvparams
3100
    if self.op.os_hvp:
3101
      self.cluster.os_hvp = self.new_os_hvp
3102
    if self.op.enabled_hypervisors is not None:
3103
      self.cluster.hvparams = self.new_hvparams
3104
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
3105
    if self.op.beparams:
3106
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
3107
    if self.op.nicparams:
3108
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
3109
    if self.op.osparams:
3110
      self.cluster.osparams = self.new_osp
3111
    if self.op.ndparams:
3112
      self.cluster.ndparams = self.new_ndparams
3113

    
3114
    if self.op.candidate_pool_size is not None:
3115
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
3116
      # we need to update the pool size here, otherwise the save will fail
3117
      _AdjustCandidatePool(self, [])
3118

    
3119
    if self.op.maintain_node_health is not None:
3120
      self.cluster.maintain_node_health = self.op.maintain_node_health
3121

    
3122
    if self.op.prealloc_wipe_disks is not None:
3123
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
3124

    
3125
    if self.op.add_uids is not None:
3126
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
3127

    
3128
    if self.op.remove_uids is not None:
3129
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
3130

    
3131
    if self.op.uid_pool is not None:
3132
      self.cluster.uid_pool = self.op.uid_pool
3133

    
3134
    if self.op.default_iallocator is not None:
3135
      self.cluster.default_iallocator = self.op.default_iallocator
3136

    
3137
    if self.op.reserved_lvs is not None:
3138
      self.cluster.reserved_lvs = self.op.reserved_lvs
3139

    
3140
    def helper_os(aname, mods, desc):
3141
      desc += " OS list"
3142
      lst = getattr(self.cluster, aname)
3143
      for key, val in mods:
3144
        if key == constants.DDM_ADD:
3145
          if val in lst:
3146
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
3147
          else:
3148
            lst.append(val)
3149
        elif key == constants.DDM_REMOVE:
3150
          if val in lst:
3151
            lst.remove(val)
3152
          else:
3153
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
3154
        else:
3155
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
3156

    
3157
    if self.op.hidden_os:
3158
      helper_os("hidden_os", self.op.hidden_os, "hidden")
3159

    
3160
    if self.op.blacklisted_os:
3161
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
3162

    
3163
    if self.op.master_netdev:
3164
      master = self.cfg.GetMasterNode()
3165
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
3166
                  self.cluster.master_netdev)
3167
      result = self.rpc.call_node_stop_master(master, False)
3168
      result.Raise("Could not disable the master ip")
3169
      feedback_fn("Changing master_netdev from %s to %s" %
3170
                  (self.cluster.master_netdev, self.op.master_netdev))
3171
      self.cluster.master_netdev = self.op.master_netdev
3172

    
3173
    self.cfg.Update(self.cluster, feedback_fn)
3174

    
3175
    if self.op.master_netdev:
3176
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
3177
                  self.op.master_netdev)
3178
      result = self.rpc.call_node_start_master(master, False, False)
3179
      if result.fail_msg:
3180
        self.LogWarning("Could not re-enable the master ip on"
3181
                        " the master, please restart manually: %s",
3182
                        result.fail_msg)
3183

    
3184

    
3185
def _UploadHelper(lu, nodes, fname):
3186
  """Helper for uploading a file and showing warnings.
3187

3188
  """
3189
  if os.path.exists(fname):
3190
    result = lu.rpc.call_upload_file(nodes, fname)
3191
    for to_node, to_result in result.items():
3192
      msg = to_result.fail_msg
3193
      if msg:
3194
        msg = ("Copy of file %s to node %s failed: %s" %
3195
               (fname, to_node, msg))
3196
        lu.proc.LogWarning(msg)
3197

    
3198

    
3199
def _ComputeAncillaryFiles(cluster, redist):
3200
  """Compute files external to Ganeti which need to be consistent.
3201

3202
  @type redist: boolean
3203
  @param redist: Whether to include files which need to be redistributed
3204

3205
  """
3206
  # Compute files for all nodes
3207
  files_all = set([
3208
    constants.SSH_KNOWN_HOSTS_FILE,
3209
    constants.CONFD_HMAC_KEY,
3210
    constants.CLUSTER_DOMAIN_SECRET_FILE,
3211
    ])
3212

    
3213
  if not redist:
3214
    files_all.update(constants.ALL_CERT_FILES)
3215
    files_all.update(ssconf.SimpleStore().GetFileList())
3216

    
3217
  if cluster.modify_etc_hosts:
3218
    files_all.add(constants.ETC_HOSTS)
3219

    
3220
  # Files which must either exist on all nodes or on none
3221
  files_all_opt = set([
3222
    constants.RAPI_USERS_FILE,
3223
    ])
3224

    
3225
  # Files which should only be on master candidates
3226
  files_mc = set()
3227
  if not redist:
3228
    files_mc.add(constants.CLUSTER_CONF_FILE)
3229

    
3230
  # Files which should only be on VM-capable nodes
3231
  files_vm = set(filename
3232
    for hv_name in cluster.enabled_hypervisors
3233
    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles())
3234

    
3235
  # Filenames must be unique
3236
  assert (len(files_all | files_all_opt | files_mc | files_vm) ==
3237
          sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
3238
         "Found file listed in more than one file list"
3239

    
3240
  return (files_all, files_all_opt, files_mc, files_vm)
3241

    
3242

    
3243
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3244
  """Distribute additional files which are part of the cluster configuration.
3245

3246
  ConfigWriter takes care of distributing the config and ssconf files, but
3247
  there are more files which should be distributed to all nodes. This function
3248
  makes sure those are copied.
3249

3250
  @param lu: calling logical unit
3251
  @param additional_nodes: list of nodes not in the config to distribute to
3252
  @type additional_vm: boolean
3253
  @param additional_vm: whether the additional nodes are vm-capable or not
3254

3255
  """
3256
  # Gather target nodes
3257
  cluster = lu.cfg.GetClusterInfo()
3258
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3259

    
3260
  online_nodes = lu.cfg.GetOnlineNodeList()
3261
  vm_nodes = lu.cfg.GetVmCapableNodeList()
3262

    
3263
  if additional_nodes is not None:
3264
    online_nodes.extend(additional_nodes)
3265
    if additional_vm:
3266
      vm_nodes.extend(additional_nodes)
3267

    
3268
  # Never distribute to master node
3269
  for nodelist in [online_nodes, vm_nodes]:
3270
    if master_info.name in nodelist:
3271
      nodelist.remove(master_info.name)
3272

    
3273
  # Gather file lists
3274
  (files_all, files_all_opt, files_mc, files_vm) = \
3275
    _ComputeAncillaryFiles(cluster, True)
3276

    
3277
  # Never re-distribute configuration file from here
3278
  assert not (constants.CLUSTER_CONF_FILE in files_all or
3279
              constants.CLUSTER_CONF_FILE in files_vm)
3280
  assert not files_mc, "Master candidates not handled in this function"
3281

    
3282
  filemap = [
3283
    (online_nodes, files_all),
3284
    (online_nodes, files_all_opt),
3285
    (vm_nodes, files_vm),
3286
    ]
3287

    
3288
  # Upload the files
3289
  for (node_list, files) in filemap:
3290
    for fname in files:
3291
      _UploadHelper(lu, node_list, fname)
3292

    
3293

    
3294
class LUClusterRedistConf(NoHooksLU):
3295
  """Force the redistribution of cluster configuration.
3296

3297
  This is a very simple LU.
3298

3299
  """
3300
  REQ_BGL = False
3301

    
3302
  def ExpandNames(self):
3303
    self.needed_locks = {
3304
      locking.LEVEL_NODE: locking.ALL_SET,
3305
    }
3306
    self.share_locks[locking.LEVEL_NODE] = 1
3307

    
3308
  def Exec(self, feedback_fn):
3309
    """Redistribute the configuration.
3310

3311
    """
3312
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3313
    _RedistributeAncillaryFiles(self)
3314

    
3315

    
3316
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3317
  """Sleep and poll for an instance's disk to sync.
3318

3319
  """
3320
  if not instance.disks or disks is not None and not disks:
3321
    return True
3322

    
3323
  disks = _ExpandCheckDisks(instance, disks)
3324

    
3325
  if not oneshot:
3326
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3327

    
3328
  node = instance.primary_node
3329

    
3330
  for dev in disks:
3331
    lu.cfg.SetDiskID(dev, node)
3332

    
3333
  # TODO: Convert to utils.Retry
3334

    
3335
  retries = 0
3336
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3337
  while True:
3338
    max_time = 0
3339
    done = True
3340
    cumul_degraded = False
3341
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3342
    msg = rstats.fail_msg
3343
    if msg:
3344
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3345
      retries += 1
3346
      if retries >= 10:
3347
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3348
                                 " aborting." % node)
3349
      time.sleep(6)
3350
      continue
3351
    rstats = rstats.payload
3352
    retries = 0
3353
    for i, mstat in enumerate(rstats):
3354
      if mstat is None:
3355
        lu.LogWarning("Can't compute data for node %s/%s",
3356
                           node, disks[i].iv_name)
3357
        continue
3358

    
3359
      cumul_degraded = (cumul_degraded or
3360
                        (mstat.is_degraded and mstat.sync_percent is None))
3361
      if mstat.sync_percent is not None:
3362
        done = False
3363
        if mstat.estimated_time is not None:
3364
          rem_time = ("%s remaining (estimated)" %
3365
                      utils.FormatSeconds(mstat.estimated_time))
3366
          max_time = mstat.estimated_time
3367
        else:
3368
          rem_time = "no time estimate"
3369
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
3370
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
3371

    
3372
    # if we're done but degraded, let's do a few small retries, to
3373
    # make sure we see a stable and not transient situation; therefore
3374
    # we force restart of the loop
3375
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3376
      logging.info("Degraded disks found, %d retries left", degr_retries)
3377
      degr_retries -= 1
3378
      time.sleep(1)
3379
      continue
3380

    
3381
    if done or oneshot:
3382
      break
3383

    
3384
    time.sleep(min(60, max_time))
3385

    
3386
  if done:
3387
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
3388
  return not cumul_degraded
3389

    
3390

    
3391
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3392
  """Check that mirrors are not degraded.
3393

3394
  The ldisk parameter, if True, will change the test from the
3395
  is_degraded attribute (which represents overall non-ok status for
3396
  the device(s)) to the ldisk (representing the local storage status).
3397

3398
  """
3399
  lu.cfg.SetDiskID(dev, node)
3400

    
3401
  result = True
3402

    
3403
  if on_primary or dev.AssembleOnSecondary():
3404
    rstats = lu.rpc.call_blockdev_find(node, dev)
3405
    msg = rstats.fail_msg
3406
    if msg:
3407
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
3408
      result = False
3409
    elif not rstats.payload:
3410
      lu.LogWarning("Can't find disk on node %s", node)
3411
      result = False
3412
    else:
3413
      if ldisk:
3414
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
3415
      else:
3416
        result = result and not rstats.payload.is_degraded
3417

    
3418
  if dev.children:
3419
    for child in dev.children:
3420
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
3421

    
3422
  return result
3423

    
3424

    
3425
class LUOobCommand(NoHooksLU):
3426
  """Logical unit for OOB handling.
3427

3428
  """
3429
  REG_BGL = False
3430
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
3431

    
3432
  def CheckPrereq(self):
3433
    """Check prerequisites.
3434

3435
    This checks:
3436
     - the node exists in the configuration
3437
     - OOB is supported
3438

3439
    Any errors are signaled by raising errors.OpPrereqError.
3440

3441
    """
3442
    self.nodes = []
3443
    self.master_node = self.cfg.GetMasterNode()
3444

    
3445
    assert self.op.power_delay >= 0.0
3446

    
3447
    if self.op.node_names:
3448
      if (self.op.command in self._SKIP_MASTER and
3449
          self.master_node in self.op.node_names):
3450
        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
3451
        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
3452

    
3453
        if master_oob_handler:
3454
          additional_text = ("run '%s %s %s' if you want to operate on the"
3455
                             " master regardless") % (master_oob_handler,
3456
                                                      self.op.command,
3457
                                                      self.master_node)
3458
        else:
3459
          additional_text = "it does not support out-of-band operations"
3460

    
3461
        raise errors.OpPrereqError(("Operating on the master node %s is not"
3462
                                    " allowed for %s; %s") %
3463
                                   (self.master_node, self.op.command,
3464
                                    additional_text), errors.ECODE_INVAL)
3465
    else:
3466
      self.op.node_names = self.cfg.GetNodeList()
3467
      if self.op.command in self._SKIP_MASTER:
3468
        self.op.node_names.remove(self.master_node)
3469

    
3470
    if self.op.command in self._SKIP_MASTER:
3471
      assert self.master_node not in self.op.node_names
3472

    
3473
    for node_name in self.op.node_names:
3474
      node = self.cfg.GetNodeInfo(node_name)
3475

    
3476
      if node is None:
3477
        raise errors.OpPrereqError("Node %s not found" % node_name,
3478
                                   errors.ECODE_NOENT)
3479
      else:
3480
        self.nodes.append(node)
3481

    
3482
      if (not self.op.ignore_status and
3483
          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
3484
        raise errors.OpPrereqError(("Cannot power off node %s because it is"
3485
                                    " not marked offline") % node_name,
3486
                                   errors.ECODE_STATE)
3487

    
3488
  def ExpandNames(self):
3489
    """Gather locks we need.
3490

3491
    """
3492
    if self.op.node_names:
3493
      self.op.node_names = [_ExpandNodeName(self.cfg, name)
3494
                            for name in self.op.node_names]
3495
      lock_names = self.op.node_names
3496
    else:
3497
      lock_names = locking.ALL_SET
3498

    
3499
    self.needed_locks = {
3500
      locking.LEVEL_NODE: lock_names,
3501
      }
3502

    
3503
  def Exec(self, feedback_fn):
3504
    """Execute OOB and return result if we expect any.
3505

3506
    """
3507
    master_node = self.master_node
3508
    ret = []
3509

    
3510
    for idx, node in enumerate(self.nodes):
3511
      node_entry = [(constants.RS_NORMAL, node.name)]
3512
      ret.append(node_entry)
3513

    
3514
      oob_program = _SupportsOob(self.cfg, node)
3515

    
3516
      if not oob_program:
3517
        node_entry.append((constants.RS_UNAVAIL, None))
3518
        continue
3519

    
3520
      logging.info("Executing out-of-band command '%s' using '%s' on %s",
3521
                   self.op.command, oob_program, node.name)
3522
      result = self.rpc.call_run_oob(master_node, oob_program,
3523
                                     self.op.command, node.name,
3524
                                     self.op.timeout)
3525

    
3526
      if result.fail_msg:
3527
        self.LogWarning("Out-of-band RPC failed on node '%s': %s",
3528
                        node.name, result.fail_msg)
3529
        node_entry.append((constants.RS_NODATA, None))
3530
      else:
3531
        try:
3532
          self._CheckPayload(result)
3533
        except errors.OpExecError, err:
3534
          self.LogWarning("Payload returned by node '%s' is not valid: %s",
3535
                          node.name, err)
3536
          node_entry.append((constants.RS_NODATA, None))
3537
        else:
3538
          if self.op.command == constants.OOB_HEALTH:
3539
            # For health we should log important events
3540
            for item, status in result.payload:
3541
              if status in [constants.OOB_STATUS_WARNING,
3542
                            constants.OOB_STATUS_CRITICAL]:
3543
                self.LogWarning("Item '%s' on node '%s' has status '%s'",
3544
                                item, node.name, status)
3545

    
3546
          if self.op.command == constants.OOB_POWER_ON:
3547
            node.powered = True
3548
          elif self.op.command == constants.OOB_POWER_OFF:
3549
            node.powered = False
3550
          elif self.op.command == constants.OOB_POWER_STATUS:
3551
            powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
3552
            if powered != node.powered:
3553
              logging.warning(("Recorded power state (%s) of node '%s' does not"
3554
                               " match actual power state (%s)"), node.powered,
3555
                              node.name, powered)
3556

    
3557
          # For configuration changing commands we should update the node
3558
          if self.op.command in (constants.OOB_POWER_ON,
3559
                                 constants.OOB_POWER_OFF):
3560
            self.cfg.Update(node, feedback_fn)
3561

    
3562
          node_entry.append((constants.RS_NORMAL, result.payload))
3563

    
3564
          if (self.op.command == constants.OOB_POWER_ON and
3565
              idx < len(self.nodes) - 1):
3566
            time.sleep(self.op.power_delay)
3567

    
3568
    return ret
3569

    
3570
  def _CheckPayload(self, result):
3571
    """Checks if the payload is valid.
3572

3573
    @param result: RPC result
3574
    @raises errors.OpExecError: If payload is not valid
3575

3576
    """
3577
    errs = []
3578
    if self.op.command == constants.OOB_HEALTH:
3579
      if not isinstance(result.payload, list):
3580
        errs.append("command 'health' is expected to return a list but got %s" %
3581
                    type(result.payload))
3582
      else:
3583
        for item, status in result.payload:
3584
          if status not in constants.OOB_STATUSES:
3585
            errs.append("health item '%s' has invalid status '%s'" %
3586
                        (item, status))
3587

    
3588
    if self.op.command == constants.OOB_POWER_STATUS:
3589
      if not isinstance(result.payload, dict):
3590
        errs.append("power-status is expected to return a dict but got %s" %
3591
                    type(result.payload))
3592

    
3593
    if self.op.command in [
3594
        constants.OOB_POWER_ON,
3595
        constants.OOB_POWER_OFF,
3596
        constants.OOB_POWER_CYCLE,
3597
        ]:
3598
      if result.payload is not None:
3599
        errs.append("%s is expected to not return payload but got '%s'" %
3600
                    (self.op.command, result.payload))
3601

    
3602
    if errs:
3603
      raise errors.OpExecError("Check of out-of-band payload failed due to %s" %
3604
                               utils.CommaJoin(errs))
3605

    
3606
class _OsQuery(_QueryBase):
3607
  FIELDS = query.OS_FIELDS
3608

    
3609
  def ExpandNames(self, lu):
3610
    # Lock all nodes in shared mode
3611
    # Temporary removal of locks, should be reverted later
3612
    # TODO: reintroduce locks when they are lighter-weight
3613
    lu.needed_locks = {}
3614
    #self.share_locks[locking.LEVEL_NODE] = 1
3615
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3616

    
3617
    # The following variables interact with _QueryBase._GetNames
3618
    if self.names:
3619
      self.wanted = self.names
3620
    else:
3621
      self.wanted = locking.ALL_SET
3622

    
3623
    self.do_locking = self.use_locking
3624

    
3625
  def DeclareLocks(self, lu, level):
3626
    pass
3627

    
3628
  @staticmethod
3629
  def _DiagnoseByOS(rlist):
3630
    """Remaps a per-node return list into an a per-os per-node dictionary
3631

3632
    @param rlist: a map with node names as keys and OS objects as values
3633

3634
    @rtype: dict
3635
    @return: a dictionary with osnames as keys and as value another
3636
        map, with nodes as keys and tuples of (path, status, diagnose,
3637
        variants, parameters, api_versions) as values, eg::
3638

3639
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
3640
                                     (/srv/..., False, "invalid api")],
3641
                           "node2": [(/srv/..., True, "", [], [])]}
3642
          }
3643

3644
    """
3645
    all_os = {}
3646
    # we build here the list of nodes that didn't fail the RPC (at RPC
3647
    # level), so that nodes with a non-responding node daemon don't
3648
    # make all OSes invalid
3649
    good_nodes = [node_name for node_name in rlist
3650
                  if not rlist[node_name].fail_msg]
3651
    for node_name, nr in rlist.items():
3652
      if nr.fail_msg or not nr.payload:
3653
        continue
3654
      for (name, path, status, diagnose, variants,
3655
           params, api_versions) in nr.payload:
3656
        if name not in all_os:
3657
          # build a list of nodes for this os containing empty lists
3658
          # for each node in node_list
3659
          all_os[name] = {}
3660
          for nname in good_nodes:
3661
            all_os[name][nname] = []
3662
        # convert params from [name, help] to (name, help)
3663
        params = [tuple(v) for v in params]
3664
        all_os[name][node_name].append((path, status, diagnose,
3665
                                        variants, params, api_versions))
3666
    return all_os
3667

    
3668
  def _GetQueryData(self, lu):
3669
    """Computes the list of nodes and their attributes.
3670

3671
    """
3672
    # Locking is not used
3673
    assert not (compat.any(lu.glm.is_owned(level)
3674
                           for level in locking.LEVELS) or
3675
                self.do_locking or self.use_locking)
3676

    
3677
    valid_nodes = [node.name
3678
                   for node in lu.cfg.GetAllNodesInfo().values()
3679
                   if not node.offline and node.vm_capable]
3680
    pol = self._DiagnoseByOS(lu.rpc.call_os_diagnose(valid_nodes))
3681
    cluster = lu.cfg.GetClusterInfo()
3682

    
3683
    data = {}
3684

    
3685
    for (os_name, os_data) in pol.items():
3686
      info = query.OsInfo(name=os_name, valid=True, node_status=os_data,
3687
                          hidden=(os_name in cluster.hidden_os),
3688
                          blacklisted=(os_name in cluster.blacklisted_os))
3689

    
3690
      variants = set()
3691
      parameters = set()
3692
      api_versions = set()
3693

    
3694
      for idx, osl in enumerate(os_data.values()):
3695
        info.valid = bool(info.valid and osl and osl[0][1])
3696
        if not info.valid:
3697
          break
3698

    
3699
        (node_variants, node_params, node_api) = osl[0][3:6]
3700
        if idx == 0:
3701
          # First entry
3702
          variants.update(node_variants)
3703
          parameters.update(node_params)
3704
          api_versions.update(node_api)
3705
        else:
3706
          # Filter out inconsistent values
3707
          variants.intersection_update(node_variants)
3708
          parameters.intersection_update(node_params)
3709
          api_versions.intersection_update(node_api)
3710

    
3711
      info.variants = list(variants)
3712
      info.parameters = list(parameters)
3713
      info.api_versions = list(api_versions)
3714

    
3715
      data[os_name] = info
3716

    
3717
    # Prepare data in requested order
3718
    return [data[name] for name in self._GetNames(lu, pol.keys(), None)
3719
            if name in data]
3720

    
3721

    
3722
class LUOsDiagnose(NoHooksLU):
3723
  """Logical unit for OS diagnose/query.
3724

3725
  """
3726
  REQ_BGL = False
3727

    
3728
  @staticmethod
3729
  def _BuildFilter(fields, names):
3730
    """Builds a filter for querying OSes.
3731

3732
    """
3733
    name_filter = qlang.MakeSimpleFilter("name", names)
3734

    
3735
    # Legacy behaviour: Hide hidden, blacklisted or invalid OSes if the
3736
    # respective field is not requested
3737
    status_filter = [[qlang.OP_NOT, [qlang.OP_TRUE, fname]]
3738
                     for fname in ["hidden", "blacklisted"]
3739
                     if fname not in fields]
3740
    if "valid" not in fields:
3741
      status_filter.append([qlang.OP_TRUE, "valid"])
3742

    
3743
    if status_filter:
3744
      status_filter.insert(0, qlang.OP_AND)
3745
    else:
3746
      status_filter = None
3747

    
3748
    if name_filter and status_filter:
3749
      return [qlang.OP_AND, name_filter, status_filter]
3750
    elif name_filter:
3751
      return name_filter
3752
    else:
3753
      return status_filter
3754

    
3755
  def CheckArguments(self):
3756
    self.oq = _OsQuery(self._BuildFilter(self.op.output_fields, self.op.names),
3757
                       self.op.output_fields, False)
3758

    
3759
  def ExpandNames(self):
3760
    self.oq.ExpandNames(self)
3761

    
3762
  def Exec(self, feedback_fn):
3763
    return self.oq.OldStyleQuery(self)
3764

    
3765

    
3766
class LUNodeRemove(LogicalUnit):
3767
  """Logical unit for removing a node.
3768

3769
  """
3770
  HPATH = "node-remove"
3771
  HTYPE = constants.HTYPE_NODE
3772

    
3773
  def BuildHooksEnv(self):
3774
    """Build hooks env.
3775

3776
    This doesn't run on the target node in the pre phase as a failed
3777
    node would then be impossible to remove.
3778

3779
    """
3780
    return {
3781
      "OP_TARGET": self.op.node_name,
3782
      "NODE_NAME": self.op.node_name,
3783
      }
3784

    
3785
  def BuildHooksNodes(self):
3786
    """Build hooks nodes.
3787

3788
    """
3789
    all_nodes = self.cfg.GetNodeList()
3790
    try:
3791
      all_nodes.remove(self.op.node_name)
3792
    except ValueError:
3793
      logging.warning("Node '%s', which is about to be removed, was not found"
3794
                      " in the list of all nodes", self.op.node_name)
3795
    return (all_nodes, all_nodes)
3796

    
3797
  def CheckPrereq(self):
3798
    """Check prerequisites.
3799

3800
    This checks:
3801
     - the node exists in the configuration
3802
     - it does not have primary or secondary instances
3803
     - it's not the master
3804

3805
    Any errors are signaled by raising errors.OpPrereqError.
3806

3807
    """
3808
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3809
    node = self.cfg.GetNodeInfo(self.op.node_name)
3810
    assert node is not None
3811

    
3812
    instance_list = self.cfg.GetInstanceList()
3813

    
3814
    masternode = self.cfg.GetMasterNode()
3815
    if node.name == masternode:
3816
      raise errors.OpPrereqError("Node is the master node, failover to another"
3817
                                 " node is required", errors.ECODE_INVAL)
3818

    
3819
    for instance_name in instance_list:
3820
      instance = self.cfg.GetInstanceInfo(instance_name)
3821
      if node.name in instance.all_nodes:
3822
        raise errors.OpPrereqError("Instance %s is still running on the node,"
3823
                                   " please remove first" % instance_name,
3824
                                   errors.ECODE_INVAL)
3825
    self.op.node_name = node.name
3826
    self.node = node
3827

    
3828
  def Exec(self, feedback_fn):
3829
    """Removes the node from the cluster.
3830

3831
    """
3832
    node = self.node
3833
    logging.info("Stopping the node daemon and removing configs from node %s",
3834
                 node.name)
3835

    
3836
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
3837

    
3838
    # Promote nodes to master candidate as needed
3839
    _AdjustCandidatePool(self, exceptions=[node.name])
3840
    self.context.RemoveNode(node.name)
3841

    
3842
    # Run post hooks on the node before it's removed
3843
    _RunPostHook(self, node.name)
3844

    
3845
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
3846
    msg = result.fail_msg
3847
    if msg:
3848
      self.LogWarning("Errors encountered on the remote node while leaving"
3849
                      " the cluster: %s", msg)
3850

    
3851
    # Remove node from our /etc/hosts
3852
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3853
      master_node = self.cfg.GetMasterNode()
3854
      result = self.rpc.call_etc_hosts_modify(master_node,
3855
                                              constants.ETC_HOSTS_REMOVE,
3856
                                              node.name, None)
3857
      result.Raise("Can't update hosts file with new host data")
3858
      _RedistributeAncillaryFiles(self)
3859

    
3860

    
3861
class _NodeQuery(_QueryBase):
3862
  FIELDS = query.NODE_FIELDS
3863

    
3864
  def ExpandNames(self, lu):
3865
    lu.needed_locks = {}
3866
    lu.share_locks[locking.LEVEL_NODE] = 1
3867

    
3868
    if self.names:
3869
      self.wanted = _GetWantedNodes(lu, self.names)
3870
    else:
3871
      self.wanted = locking.ALL_SET
3872

    
3873
    self.do_locking = (self.use_locking and
3874
                       query.NQ_LIVE in self.requested_data)
3875

    
3876
    if self.do_locking:
3877
      # if we don't request only static fields, we need to lock the nodes
3878
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
3879

    
3880
  def DeclareLocks(self, lu, level):
3881
    pass
3882

    
3883
  def _GetQueryData(self, lu):
3884
    """Computes the list of nodes and their attributes.
3885

3886
    """
3887
    all_info = lu.cfg.GetAllNodesInfo()
3888

    
3889
    nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
3890

    
3891
    # Gather data as requested
3892
    if query.NQ_LIVE in self.requested_data:
3893
      # filter out non-vm_capable nodes
3894
      toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
3895

    
3896
      node_data = lu.rpc.call_node_info(toquery_nodes, lu.cfg.GetVGName(),
3897
                                        lu.cfg.GetHypervisorType())
3898
      live_data = dict((name, nresult.payload)
3899
                       for (name, nresult) in node_data.items()
3900
                       if not nresult.fail_msg and nresult.payload)
3901
    else:
3902
      live_data = None
3903

    
3904
    if query.NQ_INST in self.requested_data:
3905
      node_to_primary = dict([(name, set()) for name in nodenames])
3906
      node_to_secondary = dict([(name, set()) for name in nodenames])
3907

    
3908
      inst_data = lu.cfg.GetAllInstancesInfo()
3909

    
3910
      for inst in inst_data.values():
3911
        if inst.primary_node in node_to_primary:
3912
          node_to_primary[inst.primary_node].add(inst.name)
3913
        for secnode in inst.secondary_nodes:
3914
          if secnode in node_to_secondary:
3915
            node_to_secondary[secnode].add(inst.name)
3916
    else:
3917
      node_to_primary = None
3918
      node_to_secondary = None
3919

    
3920
    if query.NQ_OOB in self.requested_data:
3921
      oob_support = dict((name, bool(_SupportsOob(lu.cfg, node)))
3922
                         for name, node in all_info.iteritems())
3923
    else:
3924
      oob_support = None
3925

    
3926
    if query.NQ_GROUP in self.requested_data:
3927
      groups = lu.cfg.GetAllNodeGroupsInfo()
3928
    else:
3929
      groups = {}
3930

    
3931
    return query.NodeQueryData([all_info[name] for name in nodenames],
3932
                               live_data, lu.cfg.GetMasterNode(),
3933
                               node_to_primary, node_to_secondary, groups,
3934
                               oob_support, lu.cfg.GetClusterInfo())
3935

    
3936

    
3937
class LUNodeQuery(NoHooksLU):
3938
  """Logical unit for querying nodes.
3939

3940
  """
3941
  # pylint: disable-msg=W0142
3942
  REQ_BGL = False
3943

    
3944
  def CheckArguments(self):
3945
    self.nq = _NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
3946
                         self.op.output_fields, self.op.use_locking)
3947

    
3948
  def ExpandNames(self):
3949
    self.nq.ExpandNames(self)
3950

    
3951
  def Exec(self, feedback_fn):
3952
    return self.nq.OldStyleQuery(self)
3953

    
3954

    
3955
class LUNodeQueryvols(NoHooksLU):
3956
  """Logical unit for getting volumes on node(s).
3957

3958
  """
3959
  REQ_BGL = False
3960
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
3961
  _FIELDS_STATIC = utils.FieldSet("node")
3962

    
3963
  def CheckArguments(self):
3964
    _CheckOutputFields(static=self._FIELDS_STATIC,
3965
                       dynamic=self._FIELDS_DYNAMIC,
3966
                       selected=self.op.output_fields)
3967

    
3968
  def ExpandNames(self):
3969
    self.needed_locks = {}
3970
    self.share_locks[locking.LEVEL_NODE] = 1
3971
    if not self.op.nodes:
3972
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3973
    else:
3974
      self.needed_locks[locking.LEVEL_NODE] = \
3975
        _GetWantedNodes(self, self.op.nodes)
3976

    
3977
  def Exec(self, feedback_fn):
3978
    """Computes the list of nodes and their attributes.
3979

3980
    """
3981
    nodenames = self.glm.list_owned(locking.LEVEL_NODE)
3982
    volumes = self.rpc.call_node_volumes(nodenames)
3983

    
3984
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3985
             in self.cfg.GetInstanceList()]
3986

    
3987
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
3988

    
3989
    output = []
3990
    for node in nodenames:
3991
      nresult = volumes[node]
3992
      if nresult.offline:
3993
        continue
3994
      msg = nresult.fail_msg
3995
      if msg:
3996
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
3997
        continue
3998

    
3999
      node_vols = nresult.payload[:]
4000
      node_vols.sort(key=lambda vol: vol['dev'])
4001

    
4002
      for vol in node_vols:
4003
        node_output = []
4004
        for field in self.op.output_fields:
4005
          if field == "node":
4006
            val = node
4007
          elif field == "phys":
4008
            val = vol['dev']
4009
          elif field == "vg":
4010
            val = vol['vg']
4011
          elif field == "name":
4012
            val = vol['name']
4013
          elif field == "size":
4014
            val = int(float(vol['size']))
4015
          elif field == "instance":
4016
            for inst in ilist:
4017
              if node not in lv_by_node[inst]:
4018
                continue
4019
              if vol['name'] in lv_by_node[inst][node]:
4020
                val = inst.name
4021
                break
4022
            else:
4023
              val = '-'
4024
          else:
4025
            raise errors.ParameterError(field)
4026
          node_output.append(str(val))
4027

    
4028
        output.append(node_output)
4029

    
4030
    return output
4031

    
4032

    
4033
class LUNodeQueryStorage(NoHooksLU):
4034
  """Logical unit for getting information on storage units on node(s).
4035

4036
  """
4037
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
4038
  REQ_BGL = False
4039

    
4040
  def CheckArguments(self):
4041
    _CheckOutputFields(static=self._FIELDS_STATIC,
4042
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
4043
                       selected=self.op.output_fields)
4044

    
4045
  def ExpandNames(self):
4046
    self.needed_locks = {}
4047
    self.share_locks[locking.LEVEL_NODE] = 1
4048

    
4049
    if self.op.nodes:
4050
      self.needed_locks[locking.LEVEL_NODE] = \
4051
        _GetWantedNodes(self, self.op.nodes)
4052
    else:
4053
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4054

    
4055
  def Exec(self, feedback_fn):
4056
    """Computes the list of nodes and their attributes.
4057

4058
    """
4059
    self.nodes = self.glm.list_owned(locking.LEVEL_NODE)
4060

    
4061
    # Always get name to sort by
4062
    if constants.SF_NAME in self.op.output_fields:
4063
      fields = self.op.output_fields[:]
4064
    else:
4065
      fields = [constants.SF_NAME] + self.op.output_fields
4066

    
4067
    # Never ask for node or type as it's only known to the LU
4068
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
4069
      while extra in fields:
4070
        fields.remove(extra)
4071

    
4072
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
4073
    name_idx = field_idx[constants.SF_NAME]
4074

    
4075
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
4076
    data = self.rpc.call_storage_list(self.nodes,
4077
                                      self.op.storage_type, st_args,
4078
                                      self.op.name, fields)
4079

    
4080
    result = []
4081

    
4082
    for node in utils.NiceSort(self.nodes):
4083
      nresult = data[node]
4084
      if nresult.offline:
4085
        continue
4086

    
4087
      msg = nresult.fail_msg
4088
      if msg:
4089
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
4090
        continue
4091

    
4092
      rows = dict([(row[name_idx], row) for row in nresult.payload])
4093

    
4094
      for name in utils.NiceSort(rows.keys()):
4095
        row = rows[name]
4096

    
4097
        out = []
4098

    
4099
        for field in self.op.output_fields:
4100
          if field == constants.SF_NODE:
4101
            val = node
4102