Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ a8c931c0

History | View | Annotate | Download (356.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay to many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39

    
40
from ganeti import ssh
41
from ganeti import utils
42
from ganeti import errors
43
from ganeti import hypervisor
44
from ganeti import locking
45
from ganeti import constants
46
from ganeti import objects
47
from ganeti import serializer
48
from ganeti import ssconf
49
from ganeti import uidpool
50
from ganeti import compat
51
from ganeti import masterd
52

    
53
import ganeti.masterd.instance # pylint: disable-msg=W0611
54

    
55

    
56
# need to define these here before the actual LUs
57

    
58
def _EmptyList():
59
  """Returns an empty list.
60

61
  """
62
  return []
63

    
64

    
65
def _EmptyDict():
66
  """Returns an empty dict.
67

68
  """
69
  return {}
70

    
71

    
72
class LogicalUnit(object):
73
  """Logical Unit base class.
74

75
  Subclasses must follow these rules:
76
    - implement ExpandNames
77
    - implement CheckPrereq (except when tasklets are used)
78
    - implement Exec (except when tasklets are used)
79
    - implement BuildHooksEnv
80
    - redefine HPATH and HTYPE
81
    - optionally redefine their run requirements:
82
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
83

84
  Note that all commands require root permissions.
85

86
  @ivar dry_run_result: the value (if any) that will be returned to the caller
87
      in dry-run mode (signalled by opcode dry_run parameter)
88
  @cvar _OP_DEFS: a list of opcode attributes and the defaults values
89
      they should get if not already existing
90

91
  """
92
  HPATH = None
93
  HTYPE = None
94
  _OP_REQP = []
95
  _OP_DEFS = []
96
  REQ_BGL = True
97

    
98
  def __init__(self, processor, op, context, rpc):
99
    """Constructor for LogicalUnit.
100

101
    This needs to be overridden in derived classes in order to check op
102
    validity.
103

104
    """
105
    self.proc = processor
106
    self.op = op
107
    self.cfg = context.cfg
108
    self.context = context
109
    self.rpc = rpc
110
    # Dicts used to declare locking needs to mcpu
111
    self.needed_locks = None
112
    self.acquired_locks = {}
113
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
114
    self.add_locks = {}
115
    self.remove_locks = {}
116
    # Used to force good behavior when calling helper functions
117
    self.recalculate_locks = {}
118
    self.__ssh = None
119
    # logging
120
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
121
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
122
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
123
    # support for dry-run
124
    self.dry_run_result = None
125
    # support for generic debug attribute
126
    if (not hasattr(self.op, "debug_level") or
127
        not isinstance(self.op.debug_level, int)):
128
      self.op.debug_level = 0
129

    
130
    # Tasklets
131
    self.tasklets = None
132

    
133
    for aname, aval in self._OP_DEFS:
134
      if not hasattr(self.op, aname):
135
        if callable(aval):
136
          dval = aval()
137
        else:
138
          dval = aval
139
        setattr(self.op, aname, dval)
140

    
141
    for attr_name in self._OP_REQP:
142
      attr_val = getattr(op, attr_name, None)
143
      if attr_val is None:
144
        raise errors.OpPrereqError("Required parameter '%s' missing" %
145
                                   attr_name, errors.ECODE_INVAL)
146

    
147
    self.CheckArguments()
148

    
149
  def __GetSSH(self):
150
    """Returns the SshRunner object
151

152
    """
153
    if not self.__ssh:
154
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
155
    return self.__ssh
156

    
157
  ssh = property(fget=__GetSSH)
158

    
159
  def CheckArguments(self):
160
    """Check syntactic validity for the opcode arguments.
161

162
    This method is for doing a simple syntactic check and ensure
163
    validity of opcode parameters, without any cluster-related
164
    checks. While the same can be accomplished in ExpandNames and/or
165
    CheckPrereq, doing these separate is better because:
166

167
      - ExpandNames is left as as purely a lock-related function
168
      - CheckPrereq is run after we have acquired locks (and possible
169
        waited for them)
170

171
    The function is allowed to change the self.op attribute so that
172
    later methods can no longer worry about missing parameters.
173

174
    """
175
    pass
176

    
177
  def ExpandNames(self):
178
    """Expand names for this LU.
179

180
    This method is called before starting to execute the opcode, and it should
181
    update all the parameters of the opcode to their canonical form (e.g. a
182
    short node name must be fully expanded after this method has successfully
183
    completed). This way locking, hooks, logging, ecc. can work correctly.
184

185
    LUs which implement this method must also populate the self.needed_locks
186
    member, as a dict with lock levels as keys, and a list of needed lock names
187
    as values. Rules:
188

189
      - use an empty dict if you don't need any lock
190
      - if you don't need any lock at a particular level omit that level
191
      - don't put anything for the BGL level
192
      - if you want all locks at a level use locking.ALL_SET as a value
193

194
    If you need to share locks (rather than acquire them exclusively) at one
195
    level you can modify self.share_locks, setting a true value (usually 1) for
196
    that level. By default locks are not shared.
197

198
    This function can also define a list of tasklets, which then will be
199
    executed in order instead of the usual LU-level CheckPrereq and Exec
200
    functions, if those are not defined by the LU.
201

202
    Examples::
203

204
      # Acquire all nodes and one instance
205
      self.needed_locks = {
206
        locking.LEVEL_NODE: locking.ALL_SET,
207
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
208
      }
209
      # Acquire just two nodes
210
      self.needed_locks = {
211
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
212
      }
213
      # Acquire no locks
214
      self.needed_locks = {} # No, you can't leave it to the default value None
215

216
    """
217
    # The implementation of this method is mandatory only if the new LU is
218
    # concurrent, so that old LUs don't need to be changed all at the same
219
    # time.
220
    if self.REQ_BGL:
221
      self.needed_locks = {} # Exclusive LUs don't need locks.
222
    else:
223
      raise NotImplementedError
224

    
225
  def DeclareLocks(self, level):
226
    """Declare LU locking needs for a level
227

228
    While most LUs can just declare their locking needs at ExpandNames time,
229
    sometimes there's the need to calculate some locks after having acquired
230
    the ones before. This function is called just before acquiring locks at a
231
    particular level, but after acquiring the ones at lower levels, and permits
232
    such calculations. It can be used to modify self.needed_locks, and by
233
    default it does nothing.
234

235
    This function is only called if you have something already set in
236
    self.needed_locks for the level.
237

238
    @param level: Locking level which is going to be locked
239
    @type level: member of ganeti.locking.LEVELS
240

241
    """
242

    
243
  def CheckPrereq(self):
244
    """Check prerequisites for this LU.
245

246
    This method should check that the prerequisites for the execution
247
    of this LU are fulfilled. It can do internode communication, but
248
    it should be idempotent - no cluster or system changes are
249
    allowed.
250

251
    The method should raise errors.OpPrereqError in case something is
252
    not fulfilled. Its return value is ignored.
253

254
    This method should also update all the parameters of the opcode to
255
    their canonical form if it hasn't been done by ExpandNames before.
256

257
    """
258
    if self.tasklets is not None:
259
      for (idx, tl) in enumerate(self.tasklets):
260
        logging.debug("Checking prerequisites for tasklet %s/%s",
261
                      idx + 1, len(self.tasklets))
262
        tl.CheckPrereq()
263
    else:
264
      raise NotImplementedError
265

    
266
  def Exec(self, feedback_fn):
267
    """Execute the LU.
268

269
    This method should implement the actual work. It should raise
270
    errors.OpExecError for failures that are somewhat dealt with in
271
    code, or expected.
272

273
    """
274
    if self.tasklets is not None:
275
      for (idx, tl) in enumerate(self.tasklets):
276
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
277
        tl.Exec(feedback_fn)
278
    else:
279
      raise NotImplementedError
280

    
281
  def BuildHooksEnv(self):
282
    """Build hooks environment for this LU.
283

284
    This method should return a three-node tuple consisting of: a dict
285
    containing the environment that will be used for running the
286
    specific hook for this LU, a list of node names on which the hook
287
    should run before the execution, and a list of node names on which
288
    the hook should run after the execution.
289

290
    The keys of the dict must not have 'GANETI_' prefixed as this will
291
    be handled in the hooks runner. Also note additional keys will be
292
    added by the hooks runner. If the LU doesn't define any
293
    environment, an empty dict (and not None) should be returned.
294

295
    No nodes should be returned as an empty list (and not None).
296

297
    Note that if the HPATH for a LU class is None, this function will
298
    not be called.
299

300
    """
301
    raise NotImplementedError
302

    
303
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
304
    """Notify the LU about the results of its hooks.
305

306
    This method is called every time a hooks phase is executed, and notifies
307
    the Logical Unit about the hooks' result. The LU can then use it to alter
308
    its result based on the hooks.  By default the method does nothing and the
309
    previous result is passed back unchanged but any LU can define it if it
310
    wants to use the local cluster hook-scripts somehow.
311

312
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
313
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
314
    @param hook_results: the results of the multi-node hooks rpc call
315
    @param feedback_fn: function used send feedback back to the caller
316
    @param lu_result: the previous Exec result this LU had, or None
317
        in the PRE phase
318
    @return: the new Exec result, based on the previous result
319
        and hook results
320

321
    """
322
    # API must be kept, thus we ignore the unused argument and could
323
    # be a function warnings
324
    # pylint: disable-msg=W0613,R0201
325
    return lu_result
326

    
327
  def _ExpandAndLockInstance(self):
328
    """Helper function to expand and lock an instance.
329

330
    Many LUs that work on an instance take its name in self.op.instance_name
331
    and need to expand it and then declare the expanded name for locking. This
332
    function does it, and then updates self.op.instance_name to the expanded
333
    name. It also initializes needed_locks as a dict, if this hasn't been done
334
    before.
335

336
    """
337
    if self.needed_locks is None:
338
      self.needed_locks = {}
339
    else:
340
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
341
        "_ExpandAndLockInstance called with instance-level locks set"
342
    self.op.instance_name = _ExpandInstanceName(self.cfg,
343
                                                self.op.instance_name)
344
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
345

    
346
  def _LockInstancesNodes(self, primary_only=False):
347
    """Helper function to declare instances' nodes for locking.
348

349
    This function should be called after locking one or more instances to lock
350
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
351
    with all primary or secondary nodes for instances already locked and
352
    present in self.needed_locks[locking.LEVEL_INSTANCE].
353

354
    It should be called from DeclareLocks, and for safety only works if
355
    self.recalculate_locks[locking.LEVEL_NODE] is set.
356

357
    In the future it may grow parameters to just lock some instance's nodes, or
358
    to just lock primaries or secondary nodes, if needed.
359

360
    If should be called in DeclareLocks in a way similar to::
361

362
      if level == locking.LEVEL_NODE:
363
        self._LockInstancesNodes()
364

365
    @type primary_only: boolean
366
    @param primary_only: only lock primary nodes of locked instances
367

368
    """
369
    assert locking.LEVEL_NODE in self.recalculate_locks, \
370
      "_LockInstancesNodes helper function called with no nodes to recalculate"
371

    
372
    # TODO: check if we're really been called with the instance locks held
373

    
374
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
375
    # future we might want to have different behaviors depending on the value
376
    # of self.recalculate_locks[locking.LEVEL_NODE]
377
    wanted_nodes = []
378
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
379
      instance = self.context.cfg.GetInstanceInfo(instance_name)
380
      wanted_nodes.append(instance.primary_node)
381
      if not primary_only:
382
        wanted_nodes.extend(instance.secondary_nodes)
383

    
384
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
385
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
386
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
387
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
388

    
389
    del self.recalculate_locks[locking.LEVEL_NODE]
390

    
391

    
392
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
393
  """Simple LU which runs no hooks.
394

395
  This LU is intended as a parent for other LogicalUnits which will
396
  run no hooks, in order to reduce duplicate code.
397

398
  """
399
  HPATH = None
400
  HTYPE = None
401

    
402
  def BuildHooksEnv(self):
403
    """Empty BuildHooksEnv for NoHooksLu.
404

405
    This just raises an error.
406

407
    """
408
    assert False, "BuildHooksEnv called for NoHooksLUs"
409

    
410

    
411
class Tasklet:
412
  """Tasklet base class.
413

414
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
415
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
416
  tasklets know nothing about locks.
417

418
  Subclasses must follow these rules:
419
    - Implement CheckPrereq
420
    - Implement Exec
421

422
  """
423
  def __init__(self, lu):
424
    self.lu = lu
425

    
426
    # Shortcuts
427
    self.cfg = lu.cfg
428
    self.rpc = lu.rpc
429

    
430
  def CheckPrereq(self):
431
    """Check prerequisites for this tasklets.
432

433
    This method should check whether the prerequisites for the execution of
434
    this tasklet are fulfilled. It can do internode communication, but it
435
    should be idempotent - no cluster or system changes are allowed.
436

437
    The method should raise errors.OpPrereqError in case something is not
438
    fulfilled. Its return value is ignored.
439

440
    This method should also update all parameters to their canonical form if it
441
    hasn't been done before.
442

443
    """
444
    raise NotImplementedError
445

    
446
  def Exec(self, feedback_fn):
447
    """Execute the tasklet.
448

449
    This method should implement the actual work. It should raise
450
    errors.OpExecError for failures that are somewhat dealt with in code, or
451
    expected.
452

453
    """
454
    raise NotImplementedError
455

    
456

    
457
def _GetWantedNodes(lu, nodes):
458
  """Returns list of checked and expanded node names.
459

460
  @type lu: L{LogicalUnit}
461
  @param lu: the logical unit on whose behalf we execute
462
  @type nodes: list
463
  @param nodes: list of node names or None for all nodes
464
  @rtype: list
465
  @return: the list of nodes, sorted
466
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
467

468
  """
469
  if not isinstance(nodes, list):
470
    raise errors.OpPrereqError("Invalid argument type 'nodes'",
471
                               errors.ECODE_INVAL)
472

    
473
  if not nodes:
474
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
475
      " non-empty list of nodes whose name is to be expanded.")
476

    
477
  wanted = [_ExpandNodeName(lu.cfg, name) for name in nodes]
478
  return utils.NiceSort(wanted)
479

    
480

    
481
def _GetWantedInstances(lu, instances):
482
  """Returns list of checked and expanded instance names.
483

484
  @type lu: L{LogicalUnit}
485
  @param lu: the logical unit on whose behalf we execute
486
  @type instances: list
487
  @param instances: list of instance names or None for all instances
488
  @rtype: list
489
  @return: the list of instances, sorted
490
  @raise errors.OpPrereqError: if the instances parameter is wrong type
491
  @raise errors.OpPrereqError: if any of the passed instances is not found
492

493
  """
494
  if not isinstance(instances, list):
495
    raise errors.OpPrereqError("Invalid argument type 'instances'",
496
                               errors.ECODE_INVAL)
497

    
498
  if instances:
499
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
500
  else:
501
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
502
  return wanted
503

    
504

    
505
def _GetUpdatedParams(old_params, update_dict,
506
                      use_default=True, use_none=False):
507
  """Return the new version of a parameter dictionary.
508

509
  @type old_params: dict
510
  @param old_params: old parameters
511
  @type update_dict: dict
512
  @param update_dict: dict containing new parameter values, or
513
      constants.VALUE_DEFAULT to reset the parameter to its default
514
      value
515
  @param use_default: boolean
516
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
517
      values as 'to be deleted' values
518
  @param use_none: boolean
519
  @type use_none: whether to recognise C{None} values as 'to be
520
      deleted' values
521
  @rtype: dict
522
  @return: the new parameter dictionary
523

524
  """
525
  params_copy = copy.deepcopy(old_params)
526
  for key, val in update_dict.iteritems():
527
    if ((use_default and val == constants.VALUE_DEFAULT) or
528
        (use_none and val is None)):
529
      try:
530
        del params_copy[key]
531
      except KeyError:
532
        pass
533
    else:
534
      params_copy[key] = val
535
  return params_copy
536

    
537

    
538
def _CheckOutputFields(static, dynamic, selected):
539
  """Checks whether all selected fields are valid.
540

541
  @type static: L{utils.FieldSet}
542
  @param static: static fields set
543
  @type dynamic: L{utils.FieldSet}
544
  @param dynamic: dynamic fields set
545

546
  """
547
  f = utils.FieldSet()
548
  f.Extend(static)
549
  f.Extend(dynamic)
550

    
551
  delta = f.NonMatching(selected)
552
  if delta:
553
    raise errors.OpPrereqError("Unknown output fields selected: %s"
554
                               % ",".join(delta), errors.ECODE_INVAL)
555

    
556

    
557
def _CheckBooleanOpField(op, name):
558
  """Validates boolean opcode parameters.
559

560
  This will ensure that an opcode parameter is either a boolean value,
561
  or None (but that it always exists).
562

563
  """
564
  val = getattr(op, name, None)
565
  if not (val is None or isinstance(val, bool)):
566
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
567
                               (name, str(val)), errors.ECODE_INVAL)
568
  setattr(op, name, val)
569

    
570

    
571
def _CheckGlobalHvParams(params):
572
  """Validates that given hypervisor params are not global ones.
573

574
  This will ensure that instances don't get customised versions of
575
  global params.
576

577
  """
578
  used_globals = constants.HVC_GLOBALS.intersection(params)
579
  if used_globals:
580
    msg = ("The following hypervisor parameters are global and cannot"
581
           " be customized at instance level, please modify them at"
582
           " cluster level: %s" % utils.CommaJoin(used_globals))
583
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
584

    
585

    
586
def _CheckNodeOnline(lu, node):
587
  """Ensure that a given node is online.
588

589
  @param lu: the LU on behalf of which we make the check
590
  @param node: the node to check
591
  @raise errors.OpPrereqError: if the node is offline
592

593
  """
594
  if lu.cfg.GetNodeInfo(node).offline:
595
    raise errors.OpPrereqError("Can't use offline node %s" % node,
596
                               errors.ECODE_INVAL)
597

    
598

    
599
def _CheckNodeNotDrained(lu, node):
600
  """Ensure that a given node is not drained.
601

602
  @param lu: the LU on behalf of which we make the check
603
  @param node: the node to check
604
  @raise errors.OpPrereqError: if the node is drained
605

606
  """
607
  if lu.cfg.GetNodeInfo(node).drained:
608
    raise errors.OpPrereqError("Can't use drained node %s" % node,
609
                               errors.ECODE_INVAL)
610

    
611

    
612
def _CheckNodeHasOS(lu, node, os_name, force_variant):
613
  """Ensure that a node supports a given OS.
614

615
  @param lu: the LU on behalf of which we make the check
616
  @param node: the node to check
617
  @param os_name: the OS to query about
618
  @param force_variant: whether to ignore variant errors
619
  @raise errors.OpPrereqError: if the node is not supporting the OS
620

621
  """
622
  result = lu.rpc.call_os_get(node, os_name)
623
  result.Raise("OS '%s' not in supported OS list for node %s" %
624
               (os_name, node),
625
               prereq=True, ecode=errors.ECODE_INVAL)
626
  if not force_variant:
627
    _CheckOSVariant(result.payload, os_name)
628

    
629

    
630
def _RequireFileStorage():
631
  """Checks that file storage is enabled.
632

633
  @raise errors.OpPrereqError: when file storage is disabled
634

635
  """
636
  if not constants.ENABLE_FILE_STORAGE:
637
    raise errors.OpPrereqError("File storage disabled at configure time",
638
                               errors.ECODE_INVAL)
639

    
640

    
641
def _CheckDiskTemplate(template):
642
  """Ensure a given disk template is valid.
643

644
  """
645
  if template not in constants.DISK_TEMPLATES:
646
    msg = ("Invalid disk template name '%s', valid templates are: %s" %
647
           (template, utils.CommaJoin(constants.DISK_TEMPLATES)))
648
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
649
  if template == constants.DT_FILE:
650
    _RequireFileStorage()
651

    
652

    
653
def _CheckStorageType(storage_type):
654
  """Ensure a given storage type is valid.
655

656
  """
657
  if storage_type not in constants.VALID_STORAGE_TYPES:
658
    raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
659
                               errors.ECODE_INVAL)
660
  if storage_type == constants.ST_FILE:
661
    _RequireFileStorage()
662

    
663

    
664
def _GetClusterDomainSecret():
665
  """Reads the cluster domain secret.
666

667
  """
668
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
669
                               strict=True)
670

    
671

    
672
def _CheckInstanceDown(lu, instance, reason):
673
  """Ensure that an instance is not running."""
674
  if instance.admin_up:
675
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
676
                               (instance.name, reason), errors.ECODE_STATE)
677

    
678
  pnode = instance.primary_node
679
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
680
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
681
              prereq=True, ecode=errors.ECODE_ENVIRON)
682

    
683
  if instance.name in ins_l.payload:
684
    raise errors.OpPrereqError("Instance %s is running, %s" %
685
                               (instance.name, reason), errors.ECODE_STATE)
686

    
687

    
688
def _ExpandItemName(fn, name, kind):
689
  """Expand an item name.
690

691
  @param fn: the function to use for expansion
692
  @param name: requested item name
693
  @param kind: text description ('Node' or 'Instance')
694
  @return: the resolved (full) name
695
  @raise errors.OpPrereqError: if the item is not found
696

697
  """
698
  full_name = fn(name)
699
  if full_name is None:
700
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
701
                               errors.ECODE_NOENT)
702
  return full_name
703

    
704

    
705
def _ExpandNodeName(cfg, name):
706
  """Wrapper over L{_ExpandItemName} for nodes."""
707
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
708

    
709

    
710
def _ExpandInstanceName(cfg, name):
711
  """Wrapper over L{_ExpandItemName} for instance."""
712
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
713

    
714

    
715
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
716
                          memory, vcpus, nics, disk_template, disks,
717
                          bep, hvp, hypervisor_name):
718
  """Builds instance related env variables for hooks
719

720
  This builds the hook environment from individual variables.
721

722
  @type name: string
723
  @param name: the name of the instance
724
  @type primary_node: string
725
  @param primary_node: the name of the instance's primary node
726
  @type secondary_nodes: list
727
  @param secondary_nodes: list of secondary nodes as strings
728
  @type os_type: string
729
  @param os_type: the name of the instance's OS
730
  @type status: boolean
731
  @param status: the should_run status of the instance
732
  @type memory: string
733
  @param memory: the memory size of the instance
734
  @type vcpus: string
735
  @param vcpus: the count of VCPUs the instance has
736
  @type nics: list
737
  @param nics: list of tuples (ip, mac, mode, link) representing
738
      the NICs the instance has
739
  @type disk_template: string
740
  @param disk_template: the disk template of the instance
741
  @type disks: list
742
  @param disks: the list of (size, mode) pairs
743
  @type bep: dict
744
  @param bep: the backend parameters for the instance
745
  @type hvp: dict
746
  @param hvp: the hypervisor parameters for the instance
747
  @type hypervisor_name: string
748
  @param hypervisor_name: the hypervisor for the instance
749
  @rtype: dict
750
  @return: the hook environment for this instance
751

752
  """
753
  if status:
754
    str_status = "up"
755
  else:
756
    str_status = "down"
757
  env = {
758
    "OP_TARGET": name,
759
    "INSTANCE_NAME": name,
760
    "INSTANCE_PRIMARY": primary_node,
761
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
762
    "INSTANCE_OS_TYPE": os_type,
763
    "INSTANCE_STATUS": str_status,
764
    "INSTANCE_MEMORY": memory,
765
    "INSTANCE_VCPUS": vcpus,
766
    "INSTANCE_DISK_TEMPLATE": disk_template,
767
    "INSTANCE_HYPERVISOR": hypervisor_name,
768
  }
769

    
770
  if nics:
771
    nic_count = len(nics)
772
    for idx, (ip, mac, mode, link) in enumerate(nics):
773
      if ip is None:
774
        ip = ""
775
      env["INSTANCE_NIC%d_IP" % idx] = ip
776
      env["INSTANCE_NIC%d_MAC" % idx] = mac
777
      env["INSTANCE_NIC%d_MODE" % idx] = mode
778
      env["INSTANCE_NIC%d_LINK" % idx] = link
779
      if mode == constants.NIC_MODE_BRIDGED:
780
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
781
  else:
782
    nic_count = 0
783

    
784
  env["INSTANCE_NIC_COUNT"] = nic_count
785

    
786
  if disks:
787
    disk_count = len(disks)
788
    for idx, (size, mode) in enumerate(disks):
789
      env["INSTANCE_DISK%d_SIZE" % idx] = size
790
      env["INSTANCE_DISK%d_MODE" % idx] = mode
791
  else:
792
    disk_count = 0
793

    
794
  env["INSTANCE_DISK_COUNT"] = disk_count
795

    
796
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
797
    for key, value in source.items():
798
      env["INSTANCE_%s_%s" % (kind, key)] = value
799

    
800
  return env
801

    
802

    
803
def _NICListToTuple(lu, nics):
804
  """Build a list of nic information tuples.
805

806
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
807
  value in LUQueryInstanceData.
808

809
  @type lu:  L{LogicalUnit}
810
  @param lu: the logical unit on whose behalf we execute
811
  @type nics: list of L{objects.NIC}
812
  @param nics: list of nics to convert to hooks tuples
813

814
  """
815
  hooks_nics = []
816
  cluster = lu.cfg.GetClusterInfo()
817
  for nic in nics:
818
    ip = nic.ip
819
    mac = nic.mac
820
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
821
    mode = filled_params[constants.NIC_MODE]
822
    link = filled_params[constants.NIC_LINK]
823
    hooks_nics.append((ip, mac, mode, link))
824
  return hooks_nics
825

    
826

    
827
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
828
  """Builds instance related env variables for hooks from an object.
829

830
  @type lu: L{LogicalUnit}
831
  @param lu: the logical unit on whose behalf we execute
832
  @type instance: L{objects.Instance}
833
  @param instance: the instance for which we should build the
834
      environment
835
  @type override: dict
836
  @param override: dictionary with key/values that will override
837
      our values
838
  @rtype: dict
839
  @return: the hook environment dictionary
840

841
  """
842
  cluster = lu.cfg.GetClusterInfo()
843
  bep = cluster.FillBE(instance)
844
  hvp = cluster.FillHV(instance)
845
  args = {
846
    'name': instance.name,
847
    'primary_node': instance.primary_node,
848
    'secondary_nodes': instance.secondary_nodes,
849
    'os_type': instance.os,
850
    'status': instance.admin_up,
851
    'memory': bep[constants.BE_MEMORY],
852
    'vcpus': bep[constants.BE_VCPUS],
853
    'nics': _NICListToTuple(lu, instance.nics),
854
    'disk_template': instance.disk_template,
855
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
856
    'bep': bep,
857
    'hvp': hvp,
858
    'hypervisor_name': instance.hypervisor,
859
  }
860
  if override:
861
    args.update(override)
862
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
863

    
864

    
865
def _AdjustCandidatePool(lu, exceptions):
866
  """Adjust the candidate pool after node operations.
867

868
  """
869
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
870
  if mod_list:
871
    lu.LogInfo("Promoted nodes to master candidate role: %s",
872
               utils.CommaJoin(node.name for node in mod_list))
873
    for name in mod_list:
874
      lu.context.ReaddNode(name)
875
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
876
  if mc_now > mc_max:
877
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
878
               (mc_now, mc_max))
879

    
880

    
881
def _DecideSelfPromotion(lu, exceptions=None):
882
  """Decide whether I should promote myself as a master candidate.
883

884
  """
885
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
886
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
887
  # the new node will increase mc_max with one, so:
888
  mc_should = min(mc_should + 1, cp_size)
889
  return mc_now < mc_should
890

    
891

    
892
def _CheckNicsBridgesExist(lu, target_nics, target_node):
893
  """Check that the brigdes needed by a list of nics exist.
894

895
  """
896
  cluster = lu.cfg.GetClusterInfo()
897
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
898
  brlist = [params[constants.NIC_LINK] for params in paramslist
899
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
900
  if brlist:
901
    result = lu.rpc.call_bridges_exist(target_node, brlist)
902
    result.Raise("Error checking bridges on destination node '%s'" %
903
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
904

    
905

    
906
def _CheckInstanceBridgesExist(lu, instance, node=None):
907
  """Check that the brigdes needed by an instance exist.
908

909
  """
910
  if node is None:
911
    node = instance.primary_node
912
  _CheckNicsBridgesExist(lu, instance.nics, node)
913

    
914

    
915
def _CheckOSVariant(os_obj, name):
916
  """Check whether an OS name conforms to the os variants specification.
917

918
  @type os_obj: L{objects.OS}
919
  @param os_obj: OS object to check
920
  @type name: string
921
  @param name: OS name passed by the user, to check for validity
922

923
  """
924
  if not os_obj.supported_variants:
925
    return
926
  try:
927
    variant = name.split("+", 1)[1]
928
  except IndexError:
929
    raise errors.OpPrereqError("OS name must include a variant",
930
                               errors.ECODE_INVAL)
931

    
932
  if variant not in os_obj.supported_variants:
933
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
934

    
935

    
936
def _GetNodeInstancesInner(cfg, fn):
937
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
938

    
939

    
940
def _GetNodeInstances(cfg, node_name):
941
  """Returns a list of all primary and secondary instances on a node.
942

943
  """
944

    
945
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
946

    
947

    
948
def _GetNodePrimaryInstances(cfg, node_name):
949
  """Returns primary instances on a node.
950

951
  """
952
  return _GetNodeInstancesInner(cfg,
953
                                lambda inst: node_name == inst.primary_node)
954

    
955

    
956
def _GetNodeSecondaryInstances(cfg, node_name):
957
  """Returns secondary instances on a node.
958

959
  """
960
  return _GetNodeInstancesInner(cfg,
961
                                lambda inst: node_name in inst.secondary_nodes)
962

    
963

    
964
def _GetStorageTypeArgs(cfg, storage_type):
965
  """Returns the arguments for a storage type.
966

967
  """
968
  # Special case for file storage
969
  if storage_type == constants.ST_FILE:
970
    # storage.FileStorage wants a list of storage directories
971
    return [[cfg.GetFileStorageDir()]]
972

    
973
  return []
974

    
975

    
976
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
977
  faulty = []
978

    
979
  for dev in instance.disks:
980
    cfg.SetDiskID(dev, node_name)
981

    
982
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
983
  result.Raise("Failed to get disk status from node %s" % node_name,
984
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
985

    
986
  for idx, bdev_status in enumerate(result.payload):
987
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
988
      faulty.append(idx)
989

    
990
  return faulty
991

    
992

    
993
class LUPostInitCluster(LogicalUnit):
994
  """Logical unit for running hooks after cluster initialization.
995

996
  """
997
  HPATH = "cluster-init"
998
  HTYPE = constants.HTYPE_CLUSTER
999
  _OP_REQP = []
1000

    
1001
  def BuildHooksEnv(self):
1002
    """Build hooks env.
1003

1004
    """
1005
    env = {"OP_TARGET": self.cfg.GetClusterName()}
1006
    mn = self.cfg.GetMasterNode()
1007
    return env, [], [mn]
1008

    
1009
  def CheckPrereq(self):
1010
    """No prerequisites to check.
1011

1012
    """
1013
    return True
1014

    
1015
  def Exec(self, feedback_fn):
1016
    """Nothing to do.
1017

1018
    """
1019
    return True
1020

    
1021

    
1022
class LUDestroyCluster(LogicalUnit):
1023
  """Logical unit for destroying the cluster.
1024

1025
  """
1026
  HPATH = "cluster-destroy"
1027
  HTYPE = constants.HTYPE_CLUSTER
1028
  _OP_REQP = []
1029

    
1030
  def BuildHooksEnv(self):
1031
    """Build hooks env.
1032

1033
    """
1034
    env = {"OP_TARGET": self.cfg.GetClusterName()}
1035
    return env, [], []
1036

    
1037
  def CheckPrereq(self):
1038
    """Check prerequisites.
1039

1040
    This checks whether the cluster is empty.
1041

1042
    Any errors are signaled by raising errors.OpPrereqError.
1043

1044
    """
1045
    master = self.cfg.GetMasterNode()
1046

    
1047
    nodelist = self.cfg.GetNodeList()
1048
    if len(nodelist) != 1 or nodelist[0] != master:
1049
      raise errors.OpPrereqError("There are still %d node(s) in"
1050
                                 " this cluster." % (len(nodelist) - 1),
1051
                                 errors.ECODE_INVAL)
1052
    instancelist = self.cfg.GetInstanceList()
1053
    if instancelist:
1054
      raise errors.OpPrereqError("There are still %d instance(s) in"
1055
                                 " this cluster." % len(instancelist),
1056
                                 errors.ECODE_INVAL)
1057

    
1058
  def Exec(self, feedback_fn):
1059
    """Destroys the cluster.
1060

1061
    """
1062
    master = self.cfg.GetMasterNode()
1063
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1064

    
1065
    # Run post hooks on master node before it's removed
1066
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
1067
    try:
1068
      hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
1069
    except:
1070
      # pylint: disable-msg=W0702
1071
      self.LogWarning("Errors occurred running hooks on %s" % master)
1072

    
1073
    result = self.rpc.call_node_stop_master(master, False)
1074
    result.Raise("Could not disable the master role")
1075

    
1076
    if modify_ssh_setup:
1077
      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1078
      utils.CreateBackup(priv_key)
1079
      utils.CreateBackup(pub_key)
1080

    
1081
    return master
1082

    
1083

    
1084
def _VerifyCertificate(filename):
1085
  """Verifies a certificate for LUVerifyCluster.
1086

1087
  @type filename: string
1088
  @param filename: Path to PEM file
1089

1090
  """
1091
  try:
1092
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1093
                                           utils.ReadFile(filename))
1094
  except Exception, err: # pylint: disable-msg=W0703
1095
    return (LUVerifyCluster.ETYPE_ERROR,
1096
            "Failed to load X509 certificate %s: %s" % (filename, err))
1097

    
1098
  (errcode, msg) = \
1099
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1100
                                constants.SSL_CERT_EXPIRATION_ERROR)
1101

    
1102
  if msg:
1103
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1104
  else:
1105
    fnamemsg = None
1106

    
1107
  if errcode is None:
1108
    return (None, fnamemsg)
1109
  elif errcode == utils.CERT_WARNING:
1110
    return (LUVerifyCluster.ETYPE_WARNING, fnamemsg)
1111
  elif errcode == utils.CERT_ERROR:
1112
    return (LUVerifyCluster.ETYPE_ERROR, fnamemsg)
1113

    
1114
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1115

    
1116

    
1117
class LUVerifyCluster(LogicalUnit):
1118
  """Verifies the cluster status.
1119

1120
  """
1121
  HPATH = "cluster-verify"
1122
  HTYPE = constants.HTYPE_CLUSTER
1123
  _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
1124
  REQ_BGL = False
1125

    
1126
  TCLUSTER = "cluster"
1127
  TNODE = "node"
1128
  TINSTANCE = "instance"
1129

    
1130
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1131
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1132
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1133
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1134
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1135
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1136
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1137
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1138
  ENODEDRBD = (TNODE, "ENODEDRBD")
1139
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1140
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1141
  ENODEHV = (TNODE, "ENODEHV")
1142
  ENODELVM = (TNODE, "ENODELVM")
1143
  ENODEN1 = (TNODE, "ENODEN1")
1144
  ENODENET = (TNODE, "ENODENET")
1145
  ENODEOS = (TNODE, "ENODEOS")
1146
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1147
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1148
  ENODERPC = (TNODE, "ENODERPC")
1149
  ENODESSH = (TNODE, "ENODESSH")
1150
  ENODEVERSION = (TNODE, "ENODEVERSION")
1151
  ENODESETUP = (TNODE, "ENODESETUP")
1152
  ENODETIME = (TNODE, "ENODETIME")
1153

    
1154
  ETYPE_FIELD = "code"
1155
  ETYPE_ERROR = "ERROR"
1156
  ETYPE_WARNING = "WARNING"
1157

    
1158
  class NodeImage(object):
1159
    """A class representing the logical and physical status of a node.
1160

1161
    @type name: string
1162
    @ivar name: the node name to which this object refers
1163
    @ivar volumes: a structure as returned from
1164
        L{ganeti.backend.GetVolumeList} (runtime)
1165
    @ivar instances: a list of running instances (runtime)
1166
    @ivar pinst: list of configured primary instances (config)
1167
    @ivar sinst: list of configured secondary instances (config)
1168
    @ivar sbp: diction of {secondary-node: list of instances} of all peers
1169
        of this node (config)
1170
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1171
    @ivar dfree: free disk, as reported by the node (runtime)
1172
    @ivar offline: the offline status (config)
1173
    @type rpc_fail: boolean
1174
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1175
        not whether the individual keys were correct) (runtime)
1176
    @type lvm_fail: boolean
1177
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1178
    @type hyp_fail: boolean
1179
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1180
    @type ghost: boolean
1181
    @ivar ghost: whether this is a known node or not (config)
1182
    @type os_fail: boolean
1183
    @ivar os_fail: whether the RPC call didn't return valid OS data
1184
    @type oslist: list
1185
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1186

1187
    """
1188
    def __init__(self, offline=False, name=None):
1189
      self.name = name
1190
      self.volumes = {}
1191
      self.instances = []
1192
      self.pinst = []
1193
      self.sinst = []
1194
      self.sbp = {}
1195
      self.mfree = 0
1196
      self.dfree = 0
1197
      self.offline = offline
1198
      self.rpc_fail = False
1199
      self.lvm_fail = False
1200
      self.hyp_fail = False
1201
      self.ghost = False
1202
      self.os_fail = False
1203
      self.oslist = {}
1204

    
1205
  def ExpandNames(self):
1206
    self.needed_locks = {
1207
      locking.LEVEL_NODE: locking.ALL_SET,
1208
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1209
    }
1210
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1211

    
1212
  def _Error(self, ecode, item, msg, *args, **kwargs):
1213
    """Format an error message.
1214

1215
    Based on the opcode's error_codes parameter, either format a
1216
    parseable error code, or a simpler error string.
1217

1218
    This must be called only from Exec and functions called from Exec.
1219

1220
    """
1221
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1222
    itype, etxt = ecode
1223
    # first complete the msg
1224
    if args:
1225
      msg = msg % args
1226
    # then format the whole message
1227
    if self.op.error_codes:
1228
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1229
    else:
1230
      if item:
1231
        item = " " + item
1232
      else:
1233
        item = ""
1234
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1235
    # and finally report it via the feedback_fn
1236
    self._feedback_fn("  - %s" % msg)
1237

    
1238
  def _ErrorIf(self, cond, *args, **kwargs):
1239
    """Log an error message if the passed condition is True.
1240

1241
    """
1242
    cond = bool(cond) or self.op.debug_simulate_errors
1243
    if cond:
1244
      self._Error(*args, **kwargs)
1245
    # do not mark the operation as failed for WARN cases only
1246
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1247
      self.bad = self.bad or cond
1248

    
1249
  def _VerifyNode(self, ninfo, nresult):
1250
    """Run multiple tests against a node.
1251

1252
    Test list:
1253

1254
      - compares ganeti version
1255
      - checks vg existence and size > 20G
1256
      - checks config file checksum
1257
      - checks ssh to other nodes
1258

1259
    @type ninfo: L{objects.Node}
1260
    @param ninfo: the node to check
1261
    @param nresult: the results from the node
1262
    @rtype: boolean
1263
    @return: whether overall this call was successful (and we can expect
1264
         reasonable values in the respose)
1265

1266
    """
1267
    node = ninfo.name
1268
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1269

    
1270
    # main result, nresult should be a non-empty dict
1271
    test = not nresult or not isinstance(nresult, dict)
1272
    _ErrorIf(test, self.ENODERPC, node,
1273
                  "unable to verify node: no data returned")
1274
    if test:
1275
      return False
1276

    
1277
    # compares ganeti version
1278
    local_version = constants.PROTOCOL_VERSION
1279
    remote_version = nresult.get("version", None)
1280
    test = not (remote_version and
1281
                isinstance(remote_version, (list, tuple)) and
1282
                len(remote_version) == 2)
1283
    _ErrorIf(test, self.ENODERPC, node,
1284
             "connection to node returned invalid data")
1285
    if test:
1286
      return False
1287

    
1288
    test = local_version != remote_version[0]
1289
    _ErrorIf(test, self.ENODEVERSION, node,
1290
             "incompatible protocol versions: master %s,"
1291
             " node %s", local_version, remote_version[0])
1292
    if test:
1293
      return False
1294

    
1295
    # node seems compatible, we can actually try to look into its results
1296

    
1297
    # full package version
1298
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1299
                  self.ENODEVERSION, node,
1300
                  "software version mismatch: master %s, node %s",
1301
                  constants.RELEASE_VERSION, remote_version[1],
1302
                  code=self.ETYPE_WARNING)
1303

    
1304
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1305
    if isinstance(hyp_result, dict):
1306
      for hv_name, hv_result in hyp_result.iteritems():
1307
        test = hv_result is not None
1308
        _ErrorIf(test, self.ENODEHV, node,
1309
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1310

    
1311

    
1312
    test = nresult.get(constants.NV_NODESETUP,
1313
                           ["Missing NODESETUP results"])
1314
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1315
             "; ".join(test))
1316

    
1317
    return True
1318

    
1319
  def _VerifyNodeTime(self, ninfo, nresult,
1320
                      nvinfo_starttime, nvinfo_endtime):
1321
    """Check the node time.
1322

1323
    @type ninfo: L{objects.Node}
1324
    @param ninfo: the node to check
1325
    @param nresult: the remote results for the node
1326
    @param nvinfo_starttime: the start time of the RPC call
1327
    @param nvinfo_endtime: the end time of the RPC call
1328

1329
    """
1330
    node = ninfo.name
1331
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1332

    
1333
    ntime = nresult.get(constants.NV_TIME, None)
1334
    try:
1335
      ntime_merged = utils.MergeTime(ntime)
1336
    except (ValueError, TypeError):
1337
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1338
      return
1339

    
1340
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1341
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1342
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1343
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1344
    else:
1345
      ntime_diff = None
1346

    
1347
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1348
             "Node time diverges by at least %s from master node time",
1349
             ntime_diff)
1350

    
1351
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1352
    """Check the node time.
1353

1354
    @type ninfo: L{objects.Node}
1355
    @param ninfo: the node to check
1356
    @param nresult: the remote results for the node
1357
    @param vg_name: the configured VG name
1358

1359
    """
1360
    if vg_name is None:
1361
      return
1362

    
1363
    node = ninfo.name
1364
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1365

    
1366
    # checks vg existence and size > 20G
1367
    vglist = nresult.get(constants.NV_VGLIST, None)
1368
    test = not vglist
1369
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1370
    if not test:
1371
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1372
                                            constants.MIN_VG_SIZE)
1373
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1374

    
1375
    # check pv names
1376
    pvlist = nresult.get(constants.NV_PVLIST, None)
1377
    test = pvlist is None
1378
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1379
    if not test:
1380
      # check that ':' is not present in PV names, since it's a
1381
      # special character for lvcreate (denotes the range of PEs to
1382
      # use on the PV)
1383
      for _, pvname, owner_vg in pvlist:
1384
        test = ":" in pvname
1385
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1386
                 " '%s' of VG '%s'", pvname, owner_vg)
1387

    
1388
  def _VerifyNodeNetwork(self, ninfo, nresult):
1389
    """Check the node time.
1390

1391
    @type ninfo: L{objects.Node}
1392
    @param ninfo: the node to check
1393
    @param nresult: the remote results for the node
1394

1395
    """
1396
    node = ninfo.name
1397
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1398

    
1399
    test = constants.NV_NODELIST not in nresult
1400
    _ErrorIf(test, self.ENODESSH, node,
1401
             "node hasn't returned node ssh connectivity data")
1402
    if not test:
1403
      if nresult[constants.NV_NODELIST]:
1404
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1405
          _ErrorIf(True, self.ENODESSH, node,
1406
                   "ssh communication with node '%s': %s", a_node, a_msg)
1407

    
1408
    test = constants.NV_NODENETTEST not in nresult
1409
    _ErrorIf(test, self.ENODENET, node,
1410
             "node hasn't returned node tcp connectivity data")
1411
    if not test:
1412
      if nresult[constants.NV_NODENETTEST]:
1413
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1414
        for anode in nlist:
1415
          _ErrorIf(True, self.ENODENET, node,
1416
                   "tcp communication with node '%s': %s",
1417
                   anode, nresult[constants.NV_NODENETTEST][anode])
1418

    
1419
    test = constants.NV_MASTERIP not in nresult
1420
    _ErrorIf(test, self.ENODENET, node,
1421
             "node hasn't returned node master IP reachability data")
1422
    if not test:
1423
      if not nresult[constants.NV_MASTERIP]:
1424
        if node == self.master_node:
1425
          msg = "the master node cannot reach the master IP (not configured?)"
1426
        else:
1427
          msg = "cannot reach the master IP"
1428
        _ErrorIf(True, self.ENODENET, node, msg)
1429

    
1430

    
1431
  def _VerifyInstance(self, instance, instanceconfig, node_image):
1432
    """Verify an instance.
1433

1434
    This function checks to see if the required block devices are
1435
    available on the instance's node.
1436

1437
    """
1438
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1439
    node_current = instanceconfig.primary_node
1440

    
1441
    node_vol_should = {}
1442
    instanceconfig.MapLVsByNode(node_vol_should)
1443

    
1444
    for node in node_vol_should:
1445
      n_img = node_image[node]
1446
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1447
        # ignore missing volumes on offline or broken nodes
1448
        continue
1449
      for volume in node_vol_should[node]:
1450
        test = volume not in n_img.volumes
1451
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1452
                 "volume %s missing on node %s", volume, node)
1453

    
1454
    if instanceconfig.admin_up:
1455
      pri_img = node_image[node_current]
1456
      test = instance not in pri_img.instances and not pri_img.offline
1457
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1458
               "instance not running on its primary node %s",
1459
               node_current)
1460

    
1461
    for node, n_img in node_image.items():
1462
      if (not node == node_current):
1463
        test = instance in n_img.instances
1464
        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1465
                 "instance should not run on node %s", node)
1466

    
1467
  def _VerifyOrphanVolumes(self, node_vol_should, node_image):
1468
    """Verify if there are any unknown volumes in the cluster.
1469

1470
    The .os, .swap and backup volumes are ignored. All other volumes are
1471
    reported as unknown.
1472

1473
    """
1474
    for node, n_img in node_image.items():
1475
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1476
        # skip non-healthy nodes
1477
        continue
1478
      for volume in n_img.volumes:
1479
        test = (node not in node_vol_should or
1480
                volume not in node_vol_should[node])
1481
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1482
                      "volume %s is unknown", volume)
1483

    
1484
  def _VerifyOrphanInstances(self, instancelist, node_image):
1485
    """Verify the list of running instances.
1486

1487
    This checks what instances are running but unknown to the cluster.
1488

1489
    """
1490
    for node, n_img in node_image.items():
1491
      for o_inst in n_img.instances:
1492
        test = o_inst not in instancelist
1493
        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1494
                      "instance %s on node %s should not exist", o_inst, node)
1495

    
1496
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1497
    """Verify N+1 Memory Resilience.
1498

1499
    Check that if one single node dies we can still start all the
1500
    instances it was primary for.
1501

1502
    """
1503
    for node, n_img in node_image.items():
1504
      # This code checks that every node which is now listed as
1505
      # secondary has enough memory to host all instances it is
1506
      # supposed to should a single other node in the cluster fail.
1507
      # FIXME: not ready for failover to an arbitrary node
1508
      # FIXME: does not support file-backed instances
1509
      # WARNING: we currently take into account down instances as well
1510
      # as up ones, considering that even if they're down someone
1511
      # might want to start them even in the event of a node failure.
1512
      for prinode, instances in n_img.sbp.items():
1513
        needed_mem = 0
1514
        for instance in instances:
1515
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1516
          if bep[constants.BE_AUTO_BALANCE]:
1517
            needed_mem += bep[constants.BE_MEMORY]
1518
        test = n_img.mfree < needed_mem
1519
        self._ErrorIf(test, self.ENODEN1, node,
1520
                      "not enough memory on to accommodate"
1521
                      " failovers should peer node %s fail", prinode)
1522

    
1523
  def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum,
1524
                       master_files):
1525
    """Verifies and computes the node required file checksums.
1526

1527
    @type ninfo: L{objects.Node}
1528
    @param ninfo: the node to check
1529
    @param nresult: the remote results for the node
1530
    @param file_list: required list of files
1531
    @param local_cksum: dictionary of local files and their checksums
1532
    @param master_files: list of files that only masters should have
1533

1534
    """
1535
    node = ninfo.name
1536
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1537

    
1538
    remote_cksum = nresult.get(constants.NV_FILELIST, None)
1539
    test = not isinstance(remote_cksum, dict)
1540
    _ErrorIf(test, self.ENODEFILECHECK, node,
1541
             "node hasn't returned file checksum data")
1542
    if test:
1543
      return
1544

    
1545
    for file_name in file_list:
1546
      node_is_mc = ninfo.master_candidate
1547
      must_have = (file_name not in master_files) or node_is_mc
1548
      # missing
1549
      test1 = file_name not in remote_cksum
1550
      # invalid checksum
1551
      test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1552
      # existing and good
1553
      test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1554
      _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1555
               "file '%s' missing", file_name)
1556
      _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1557
               "file '%s' has wrong checksum", file_name)
1558
      # not candidate and this is not a must-have file
1559
      _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1560
               "file '%s' should not exist on non master"
1561
               " candidates (and the file is outdated)", file_name)
1562
      # all good, except non-master/non-must have combination
1563
      _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1564
               "file '%s' should not exist"
1565
               " on non master candidates", file_name)
1566

    
1567
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_map):
1568
    """Verifies and the node DRBD status.
1569

1570
    @type ninfo: L{objects.Node}
1571
    @param ninfo: the node to check
1572
    @param nresult: the remote results for the node
1573
    @param instanceinfo: the dict of instances
1574
    @param drbd_map: the DRBD map as returned by
1575
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
1576

1577
    """
1578
    node = ninfo.name
1579
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1580

    
1581
    # compute the DRBD minors
1582
    node_drbd = {}
1583
    for minor, instance in drbd_map[node].items():
1584
      test = instance not in instanceinfo
1585
      _ErrorIf(test, self.ECLUSTERCFG, None,
1586
               "ghost instance '%s' in temporary DRBD map", instance)
1587
        # ghost instance should not be running, but otherwise we
1588
        # don't give double warnings (both ghost instance and
1589
        # unallocated minor in use)
1590
      if test:
1591
        node_drbd[minor] = (instance, False)
1592
      else:
1593
        instance = instanceinfo[instance]
1594
        node_drbd[minor] = (instance.name, instance.admin_up)
1595

    
1596
    # and now check them
1597
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
1598
    test = not isinstance(used_minors, (tuple, list))
1599
    _ErrorIf(test, self.ENODEDRBD, node,
1600
             "cannot parse drbd status file: %s", str(used_minors))
1601
    if test:
1602
      # we cannot check drbd status
1603
      return
1604

    
1605
    for minor, (iname, must_exist) in node_drbd.items():
1606
      test = minor not in used_minors and must_exist
1607
      _ErrorIf(test, self.ENODEDRBD, node,
1608
               "drbd minor %d of instance %s is not active", minor, iname)
1609
    for minor in used_minors:
1610
      test = minor not in node_drbd
1611
      _ErrorIf(test, self.ENODEDRBD, node,
1612
               "unallocated drbd minor %d is in use", minor)
1613

    
1614
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
1615
    """Builds the node OS structures.
1616

1617
    @type ninfo: L{objects.Node}
1618
    @param ninfo: the node to check
1619
    @param nresult: the remote results for the node
1620
    @param nimg: the node image object
1621

1622
    """
1623
    node = ninfo.name
1624
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1625

    
1626
    remote_os = nresult.get(constants.NV_OSLIST, None)
1627
    test = (not isinstance(remote_os, list) or
1628
            not compat.all(remote_os,
1629
                           lambda v: isinstance(v, list) and len(v) == 7))
1630

    
1631
    _ErrorIf(test, self.ENODEOS, node,
1632
             "node hasn't returned valid OS data")
1633

    
1634
    nimg.os_fail = test
1635

    
1636
    if test:
1637
      return
1638

    
1639
    os_dict = {}
1640

    
1641
    for (name, os_path, status, diagnose,
1642
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
1643

    
1644
      if name not in os_dict:
1645
        os_dict[name] = []
1646

    
1647
      # parameters is a list of lists instead of list of tuples due to
1648
      # JSON lacking a real tuple type, fix it:
1649
      parameters = [tuple(v) for v in parameters]
1650
      os_dict[name].append((os_path, status, diagnose,
1651
                            set(variants), set(parameters), set(api_ver)))
1652

    
1653
    nimg.oslist = os_dict
1654

    
1655
  def _VerifyNodeOS(self, ninfo, nimg, base):
1656
    """Verifies the node OS list.
1657

1658
    @type ninfo: L{objects.Node}
1659
    @param ninfo: the node to check
1660
    @param nimg: the node image object
1661
    @param base: the 'template' node we match against (e.g. from the master)
1662

1663
    """
1664
    node = ninfo.name
1665
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1666

    
1667
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
1668

    
1669
    for os_name, os_data in nimg.oslist.items():
1670
      assert os_data, "Empty OS status for OS %s?!" % os_name
1671
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
1672
      _ErrorIf(not f_status, self.ENODEOS, node,
1673
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
1674
      _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
1675
               "OS '%s' has multiple entries (first one shadows the rest): %s",
1676
               os_name, utils.CommaJoin([v[0] for v in os_data]))
1677
      # this will catched in backend too
1678
      _ErrorIf(compat.any(f_api, lambda v: v >= constants.OS_API_V15)
1679
               and not f_var, self.ENODEOS, node,
1680
               "OS %s with API at least %d does not declare any variant",
1681
               os_name, constants.OS_API_V15)
1682
      # comparisons with the 'base' image
1683
      test = os_name not in base.oslist
1684
      _ErrorIf(test, self.ENODEOS, node,
1685
               "Extra OS %s not present on reference node (%s)",
1686
               os_name, base.name)
1687
      if test:
1688
        continue
1689
      assert base.oslist[os_name], "Base node has empty OS status?"
1690
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
1691
      if not b_status:
1692
        # base OS is invalid, skipping
1693
        continue
1694
      for kind, a, b in [("API version", f_api, b_api),
1695
                         ("variants list", f_var, b_var),
1696
                         ("parameters", f_param, b_param)]:
1697
        _ErrorIf(a != b, self.ENODEOS, node,
1698
                 "OS %s %s differs from reference node %s: %s vs. %s",
1699
                 kind, os_name, base.name,
1700
                 utils.CommaJoin(a), utils.CommaJoin(a))
1701

    
1702
    # check any missing OSes
1703
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
1704
    _ErrorIf(missing, self.ENODEOS, node,
1705
             "OSes present on reference node %s but missing on this node: %s",
1706
             base.name, utils.CommaJoin(missing))
1707

    
1708
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
1709
    """Verifies and updates the node volume data.
1710

1711
    This function will update a L{NodeImage}'s internal structures
1712
    with data from the remote call.
1713

1714
    @type ninfo: L{objects.Node}
1715
    @param ninfo: the node to check
1716
    @param nresult: the remote results for the node
1717
    @param nimg: the node image object
1718
    @param vg_name: the configured VG name
1719

1720
    """
1721
    node = ninfo.name
1722
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1723

    
1724
    nimg.lvm_fail = True
1725
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1726
    if vg_name is None:
1727
      pass
1728
    elif isinstance(lvdata, basestring):
1729
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1730
               utils.SafeEncode(lvdata))
1731
    elif not isinstance(lvdata, dict):
1732
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1733
    else:
1734
      nimg.volumes = lvdata
1735
      nimg.lvm_fail = False
1736

    
1737
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
1738
    """Verifies and updates the node instance list.
1739

1740
    If the listing was successful, then updates this node's instance
1741
    list. Otherwise, it marks the RPC call as failed for the instance
1742
    list key.
1743

1744
    @type ninfo: L{objects.Node}
1745
    @param ninfo: the node to check
1746
    @param nresult: the remote results for the node
1747
    @param nimg: the node image object
1748

1749
    """
1750
    idata = nresult.get(constants.NV_INSTANCELIST, None)
1751
    test = not isinstance(idata, list)
1752
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
1753
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
1754
    if test:
1755
      nimg.hyp_fail = True
1756
    else:
1757
      nimg.instances = idata
1758

    
1759
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
1760
    """Verifies and computes a node information map
1761

1762
    @type ninfo: L{objects.Node}
1763
    @param ninfo: the node to check
1764
    @param nresult: the remote results for the node
1765
    @param nimg: the node image object
1766
    @param vg_name: the configured VG name
1767

1768
    """
1769
    node = ninfo.name
1770
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1771

    
1772
    # try to read free memory (from the hypervisor)
1773
    hv_info = nresult.get(constants.NV_HVINFO, None)
1774
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
1775
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1776
    if not test:
1777
      try:
1778
        nimg.mfree = int(hv_info["memory_free"])
1779
      except (ValueError, TypeError):
1780
        _ErrorIf(True, self.ENODERPC, node,
1781
                 "node returned invalid nodeinfo, check hypervisor")
1782

    
1783
    # FIXME: devise a free space model for file based instances as well
1784
    if vg_name is not None:
1785
      test = (constants.NV_VGLIST not in nresult or
1786
              vg_name not in nresult[constants.NV_VGLIST])
1787
      _ErrorIf(test, self.ENODELVM, node,
1788
               "node didn't return data for the volume group '%s'"
1789
               " - it is either missing or broken", vg_name)
1790
      if not test:
1791
        try:
1792
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
1793
        except (ValueError, TypeError):
1794
          _ErrorIf(True, self.ENODERPC, node,
1795
                   "node returned invalid LVM info, check LVM status")
1796

    
1797
  def CheckPrereq(self):
1798
    """Check prerequisites.
1799

1800
    Transform the list of checks we're going to skip into a set and check that
1801
    all its members are valid.
1802

1803
    """
1804
    self.skip_set = frozenset(self.op.skip_checks)
1805
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1806
      raise errors.OpPrereqError("Invalid checks to be skipped specified",
1807
                                 errors.ECODE_INVAL)
1808

    
1809
  def BuildHooksEnv(self):
1810
    """Build hooks env.
1811

1812
    Cluster-Verify hooks just ran in the post phase and their failure makes
1813
    the output be logged in the verify output and the verification to fail.
1814

1815
    """
1816
    all_nodes = self.cfg.GetNodeList()
1817
    env = {
1818
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1819
      }
1820
    for node in self.cfg.GetAllNodesInfo().values():
1821
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1822

    
1823
    return env, [], all_nodes
1824

    
1825
  def Exec(self, feedback_fn):
1826
    """Verify integrity of cluster, performing various test on nodes.
1827

1828
    """
1829
    self.bad = False
1830
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1831
    verbose = self.op.verbose
1832
    self._feedback_fn = feedback_fn
1833
    feedback_fn("* Verifying global settings")
1834
    for msg in self.cfg.VerifyConfig():
1835
      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1836

    
1837
    # Check the cluster certificates
1838
    for cert_filename in constants.ALL_CERT_FILES:
1839
      (errcode, msg) = _VerifyCertificate(cert_filename)
1840
      _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
1841

    
1842
    vg_name = self.cfg.GetVGName()
1843
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1844
    cluster = self.cfg.GetClusterInfo()
1845
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
1846
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1847
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1848
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1849
                        for iname in instancelist)
1850
    i_non_redundant = [] # Non redundant instances
1851
    i_non_a_balanced = [] # Non auto-balanced instances
1852
    n_offline = 0 # Count of offline nodes
1853
    n_drained = 0 # Count of nodes being drained
1854
    node_vol_should = {}
1855

    
1856
    # FIXME: verify OS list
1857
    # do local checksums
1858
    master_files = [constants.CLUSTER_CONF_FILE]
1859
    master_node = self.master_node = self.cfg.GetMasterNode()
1860
    master_ip = self.cfg.GetMasterIP()
1861

    
1862
    file_names = ssconf.SimpleStore().GetFileList()
1863
    file_names.extend(constants.ALL_CERT_FILES)
1864
    file_names.extend(master_files)
1865
    if cluster.modify_etc_hosts:
1866
      file_names.append(constants.ETC_HOSTS)
1867

    
1868
    local_checksums = utils.FingerprintFiles(file_names)
1869

    
1870
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1871
    node_verify_param = {
1872
      constants.NV_FILELIST: file_names,
1873
      constants.NV_NODELIST: [node.name for node in nodeinfo
1874
                              if not node.offline],
1875
      constants.NV_HYPERVISOR: hypervisors,
1876
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1877
                                  node.secondary_ip) for node in nodeinfo
1878
                                 if not node.offline],
1879
      constants.NV_INSTANCELIST: hypervisors,
1880
      constants.NV_VERSION: None,
1881
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1882
      constants.NV_NODESETUP: None,
1883
      constants.NV_TIME: None,
1884
      constants.NV_MASTERIP: (master_node, master_ip),
1885
      constants.NV_OSLIST: None,
1886
      }
1887

    
1888
    if vg_name is not None:
1889
      node_verify_param[constants.NV_VGLIST] = None
1890
      node_verify_param[constants.NV_LVLIST] = vg_name
1891
      node_verify_param[constants.NV_PVLIST] = [vg_name]
1892
      node_verify_param[constants.NV_DRBDLIST] = None
1893

    
1894
    # Build our expected cluster state
1895
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
1896
                                                 name=node.name))
1897
                      for node in nodeinfo)
1898

    
1899
    for instance in instancelist:
1900
      inst_config = instanceinfo[instance]
1901

    
1902
      for nname in inst_config.all_nodes:
1903
        if nname not in node_image:
1904
          # ghost node
1905
          gnode = self.NodeImage(name=nname)
1906
          gnode.ghost = True
1907
          node_image[nname] = gnode
1908

    
1909
      inst_config.MapLVsByNode(node_vol_should)
1910

    
1911
      pnode = inst_config.primary_node
1912
      node_image[pnode].pinst.append(instance)
1913

    
1914
      for snode in inst_config.secondary_nodes:
1915
        nimg = node_image[snode]
1916
        nimg.sinst.append(instance)
1917
        if pnode not in nimg.sbp:
1918
          nimg.sbp[pnode] = []
1919
        nimg.sbp[pnode].append(instance)
1920

    
1921
    # At this point, we have the in-memory data structures complete,
1922
    # except for the runtime information, which we'll gather next
1923

    
1924
    # Due to the way our RPC system works, exact response times cannot be
1925
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
1926
    # time before and after executing the request, we can at least have a time
1927
    # window.
1928
    nvinfo_starttime = time.time()
1929
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1930
                                           self.cfg.GetClusterName())
1931
    nvinfo_endtime = time.time()
1932

    
1933
    all_drbd_map = self.cfg.ComputeDRBDMap()
1934

    
1935
    feedback_fn("* Verifying node status")
1936

    
1937
    refos_img = None
1938

    
1939
    for node_i in nodeinfo:
1940
      node = node_i.name
1941
      nimg = node_image[node]
1942

    
1943
      if node_i.offline:
1944
        if verbose:
1945
          feedback_fn("* Skipping offline node %s" % (node,))
1946
        n_offline += 1
1947
        continue
1948

    
1949
      if node == master_node:
1950
        ntype = "master"
1951
      elif node_i.master_candidate:
1952
        ntype = "master candidate"
1953
      elif node_i.drained:
1954
        ntype = "drained"
1955
        n_drained += 1
1956
      else:
1957
        ntype = "regular"
1958
      if verbose:
1959
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1960

    
1961
      msg = all_nvinfo[node].fail_msg
1962
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1963
      if msg:
1964
        nimg.rpc_fail = True
1965
        continue
1966

    
1967
      nresult = all_nvinfo[node].payload
1968

    
1969
      nimg.call_ok = self._VerifyNode(node_i, nresult)
1970
      self._VerifyNodeNetwork(node_i, nresult)
1971
      self._VerifyNodeLVM(node_i, nresult, vg_name)
1972
      self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums,
1973
                            master_files)
1974
      self._VerifyNodeDrbd(node_i, nresult, instanceinfo, all_drbd_map)
1975
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
1976

    
1977
      self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
1978
      self._UpdateNodeInstances(node_i, nresult, nimg)
1979
      self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
1980
      self._UpdateNodeOS(node_i, nresult, nimg)
1981
      if not nimg.os_fail:
1982
        if refos_img is None:
1983
          refos_img = nimg
1984
        self._VerifyNodeOS(node_i, nimg, refos_img)
1985

    
1986
    feedback_fn("* Verifying instance status")
1987
    for instance in instancelist:
1988
      if verbose:
1989
        feedback_fn("* Verifying instance %s" % instance)
1990
      inst_config = instanceinfo[instance]
1991
      self._VerifyInstance(instance, inst_config, node_image)
1992
      inst_nodes_offline = []
1993

    
1994
      pnode = inst_config.primary_node
1995
      pnode_img = node_image[pnode]
1996
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1997
               self.ENODERPC, pnode, "instance %s, connection to"
1998
               " primary node failed", instance)
1999

    
2000
      if pnode_img.offline:
2001
        inst_nodes_offline.append(pnode)
2002

    
2003
      # If the instance is non-redundant we cannot survive losing its primary
2004
      # node, so we are not N+1 compliant. On the other hand we have no disk
2005
      # templates with more than one secondary so that situation is not well
2006
      # supported either.
2007
      # FIXME: does not support file-backed instances
2008
      if not inst_config.secondary_nodes:
2009
        i_non_redundant.append(instance)
2010
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
2011
               instance, "instance has multiple secondary nodes: %s",
2012
               utils.CommaJoin(inst_config.secondary_nodes),
2013
               code=self.ETYPE_WARNING)
2014

    
2015
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2016
        i_non_a_balanced.append(instance)
2017

    
2018
      for snode in inst_config.secondary_nodes:
2019
        s_img = node_image[snode]
2020
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
2021
                 "instance %s, connection to secondary node failed", instance)
2022

    
2023
        if s_img.offline:
2024
          inst_nodes_offline.append(snode)
2025

    
2026
      # warn that the instance lives on offline nodes
2027
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
2028
               "instance lives on offline node(s) %s",
2029
               utils.CommaJoin(inst_nodes_offline))
2030
      # ... or ghost nodes
2031
      for node in inst_config.all_nodes:
2032
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
2033
                 "instance lives on ghost node %s", node)
2034

    
2035
    feedback_fn("* Verifying orphan volumes")
2036
    self._VerifyOrphanVolumes(node_vol_should, node_image)
2037

    
2038
    feedback_fn("* Verifying orphan instances")
2039
    self._VerifyOrphanInstances(instancelist, node_image)
2040

    
2041
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
2042
      feedback_fn("* Verifying N+1 Memory redundancy")
2043
      self._VerifyNPlusOneMemory(node_image, instanceinfo)
2044

    
2045
    feedback_fn("* Other Notes")
2046
    if i_non_redundant:
2047
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2048
                  % len(i_non_redundant))
2049

    
2050
    if i_non_a_balanced:
2051
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2052
                  % len(i_non_a_balanced))
2053

    
2054
    if n_offline:
2055
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2056

    
2057
    if n_drained:
2058
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2059

    
2060
    return not self.bad
2061

    
2062
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2063
    """Analyze the post-hooks' result
2064

2065
    This method analyses the hook result, handles it, and sends some
2066
    nicely-formatted feedback back to the user.
2067

2068
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2069
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2070
    @param hooks_results: the results of the multi-node hooks rpc call
2071
    @param feedback_fn: function used send feedback back to the caller
2072
    @param lu_result: previous Exec result
2073
    @return: the new Exec result, based on the previous result
2074
        and hook results
2075

2076
    """
2077
    # We only really run POST phase hooks, and are only interested in
2078
    # their results
2079
    if phase == constants.HOOKS_PHASE_POST:
2080
      # Used to change hooks' output to proper indentation
2081
      indent_re = re.compile('^', re.M)
2082
      feedback_fn("* Hooks Results")
2083
      assert hooks_results, "invalid result from hooks"
2084

    
2085
      for node_name in hooks_results:
2086
        res = hooks_results[node_name]
2087
        msg = res.fail_msg
2088
        test = msg and not res.offline
2089
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
2090
                      "Communication failure in hooks execution: %s", msg)
2091
        if res.offline or msg:
2092
          # No need to investigate payload if node is offline or gave an error.
2093
          # override manually lu_result here as _ErrorIf only
2094
          # overrides self.bad
2095
          lu_result = 1
2096
          continue
2097
        for script, hkr, output in res.payload:
2098
          test = hkr == constants.HKR_FAIL
2099
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
2100
                        "Script %s failed, output:", script)
2101
          if test:
2102
            output = indent_re.sub('      ', output)
2103
            feedback_fn("%s" % output)
2104
            lu_result = 0
2105

    
2106
      return lu_result
2107

    
2108

    
2109
class LUVerifyDisks(NoHooksLU):
2110
  """Verifies the cluster disks status.
2111

2112
  """
2113
  _OP_REQP = []
2114
  REQ_BGL = False
2115

    
2116
  def ExpandNames(self):
2117
    self.needed_locks = {
2118
      locking.LEVEL_NODE: locking.ALL_SET,
2119
      locking.LEVEL_INSTANCE: locking.ALL_SET,
2120
    }
2121
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
2122

    
2123
  def CheckPrereq(self):
2124
    """Check prerequisites.
2125

2126
    This has no prerequisites.
2127

2128
    """
2129
    pass
2130

    
2131
  def Exec(self, feedback_fn):
2132
    """Verify integrity of cluster disks.
2133

2134
    @rtype: tuple of three items
2135
    @return: a tuple of (dict of node-to-node_error, list of instances
2136
        which need activate-disks, dict of instance: (node, volume) for
2137
        missing volumes
2138

2139
    """
2140
    result = res_nodes, res_instances, res_missing = {}, [], {}
2141

    
2142
    vg_name = self.cfg.GetVGName()
2143
    nodes = utils.NiceSort(self.cfg.GetNodeList())
2144
    instances = [self.cfg.GetInstanceInfo(name)
2145
                 for name in self.cfg.GetInstanceList()]
2146

    
2147
    nv_dict = {}
2148
    for inst in instances:
2149
      inst_lvs = {}
2150
      if (not inst.admin_up or
2151
          inst.disk_template not in constants.DTS_NET_MIRROR):
2152
        continue
2153
      inst.MapLVsByNode(inst_lvs)
2154
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
2155
      for node, vol_list in inst_lvs.iteritems():
2156
        for vol in vol_list:
2157
          nv_dict[(node, vol)] = inst
2158

    
2159
    if not nv_dict:
2160
      return result
2161

    
2162
    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
2163

    
2164
    for node in nodes:
2165
      # node_volume
2166
      node_res = node_lvs[node]
2167
      if node_res.offline:
2168
        continue
2169
      msg = node_res.fail_msg
2170
      if msg:
2171
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
2172
        res_nodes[node] = msg
2173
        continue
2174

    
2175
      lvs = node_res.payload
2176
      for lv_name, (_, _, lv_online) in lvs.items():
2177
        inst = nv_dict.pop((node, lv_name), None)
2178
        if (not lv_online and inst is not None
2179
            and inst.name not in res_instances):
2180
          res_instances.append(inst.name)
2181

    
2182
    # any leftover items in nv_dict are missing LVs, let's arrange the
2183
    # data better
2184
    for key, inst in nv_dict.iteritems():
2185
      if inst.name not in res_missing:
2186
        res_missing[inst.name] = []
2187
      res_missing[inst.name].append(key)
2188

    
2189
    return result
2190

    
2191

    
2192
class LURepairDiskSizes(NoHooksLU):
2193
  """Verifies the cluster disks sizes.
2194

2195
  """
2196
  _OP_REQP = ["instances"]
2197
  REQ_BGL = False
2198

    
2199
  def ExpandNames(self):
2200
    if not isinstance(self.op.instances, list):
2201
      raise errors.OpPrereqError("Invalid argument type 'instances'",
2202
                                 errors.ECODE_INVAL)
2203

    
2204
    if self.op.instances:
2205
      self.wanted_names = []
2206
      for name in self.op.instances:
2207
        full_name = _ExpandInstanceName(self.cfg, name)
2208
        self.wanted_names.append(full_name)
2209
      self.needed_locks = {
2210
        locking.LEVEL_NODE: [],
2211
        locking.LEVEL_INSTANCE: self.wanted_names,
2212
        }
2213
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2214
    else:
2215
      self.wanted_names = None
2216
      self.needed_locks = {
2217
        locking.LEVEL_NODE: locking.ALL_SET,
2218
        locking.LEVEL_INSTANCE: locking.ALL_SET,
2219
        }
2220
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
2221

    
2222
  def DeclareLocks(self, level):
2223
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
2224
      self._LockInstancesNodes(primary_only=True)
2225

    
2226
  def CheckPrereq(self):
2227
    """Check prerequisites.
2228

2229
    This only checks the optional instance list against the existing names.
2230

2231
    """
2232
    if self.wanted_names is None:
2233
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2234

    
2235
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
2236
                             in self.wanted_names]
2237

    
2238
  def _EnsureChildSizes(self, disk):
2239
    """Ensure children of the disk have the needed disk size.
2240

2241
    This is valid mainly for DRBD8 and fixes an issue where the
2242
    children have smaller disk size.
2243

2244
    @param disk: an L{ganeti.objects.Disk} object
2245

2246
    """
2247
    if disk.dev_type == constants.LD_DRBD8:
2248
      assert disk.children, "Empty children for DRBD8?"
2249
      fchild = disk.children[0]
2250
      mismatch = fchild.size < disk.size
2251
      if mismatch:
2252
        self.LogInfo("Child disk has size %d, parent %d, fixing",
2253
                     fchild.size, disk.size)
2254
        fchild.size = disk.size
2255

    
2256
      # and we recurse on this child only, not on the metadev
2257
      return self._EnsureChildSizes(fchild) or mismatch
2258
    else:
2259
      return False
2260

    
2261
  def Exec(self, feedback_fn):
2262
    """Verify the size of cluster disks.
2263

2264
    """
2265
    # TODO: check child disks too
2266
    # TODO: check differences in size between primary/secondary nodes
2267
    per_node_disks = {}
2268
    for instance in self.wanted_instances:
2269
      pnode = instance.primary_node
2270
      if pnode not in per_node_disks:
2271
        per_node_disks[pnode] = []
2272
      for idx, disk in enumerate(instance.disks):
2273
        per_node_disks[pnode].append((instance, idx, disk))
2274

    
2275
    changed = []
2276
    for node, dskl in per_node_disks.items():
2277
      newl = [v[2].Copy() for v in dskl]
2278
      for dsk in newl:
2279
        self.cfg.SetDiskID(dsk, node)
2280
      result = self.rpc.call_blockdev_getsizes(node, newl)
2281
      if result.fail_msg:
2282
        self.LogWarning("Failure in blockdev_getsizes call to node"
2283
                        " %s, ignoring", node)
2284
        continue
2285
      if len(result.data) != len(dskl):
2286
        self.LogWarning("Invalid result from node %s, ignoring node results",
2287
                        node)
2288
        continue
2289
      for ((instance, idx, disk), size) in zip(dskl, result.data):
2290
        if size is None:
2291
          self.LogWarning("Disk %d of instance %s did not return size"
2292
                          " information, ignoring", idx, instance.name)
2293
          continue
2294
        if not isinstance(size, (int, long)):
2295
          self.LogWarning("Disk %d of instance %s did not return valid"
2296
                          " size information, ignoring", idx, instance.name)
2297
          continue
2298
        size = size >> 20
2299
        if size != disk.size:
2300
          self.LogInfo("Disk %d of instance %s has mismatched size,"
2301
                       " correcting: recorded %d, actual %d", idx,
2302
                       instance.name, disk.size, size)
2303
          disk.size = size
2304
          self.cfg.Update(instance, feedback_fn)
2305
          changed.append((instance.name, idx, size))
2306
        if self._EnsureChildSizes(disk):
2307
          self.cfg.Update(instance, feedback_fn)
2308
          changed.append((instance.name, idx, disk.size))
2309
    return changed
2310

    
2311

    
2312
class LURenameCluster(LogicalUnit):
2313
  """Rename the cluster.
2314

2315
  """
2316
  HPATH = "cluster-rename"
2317
  HTYPE = constants.HTYPE_CLUSTER
2318
  _OP_REQP = ["name"]
2319

    
2320
  def BuildHooksEnv(self):
2321
    """Build hooks env.
2322

2323
    """
2324
    env = {
2325
      "OP_TARGET": self.cfg.GetClusterName(),
2326
      "NEW_NAME": self.op.name,
2327
      }
2328
    mn = self.cfg.GetMasterNode()
2329
    all_nodes = self.cfg.GetNodeList()
2330
    return env, [mn], all_nodes
2331

    
2332
  def CheckPrereq(self):
2333
    """Verify that the passed name is a valid one.
2334

2335
    """
2336
    hostname = utils.GetHostInfo(self.op.name)
2337

    
2338
    new_name = hostname.name
2339
    self.ip = new_ip = hostname.ip
2340
    old_name = self.cfg.GetClusterName()
2341
    old_ip = self.cfg.GetMasterIP()
2342
    if new_name == old_name and new_ip == old_ip:
2343
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
2344
                                 " cluster has changed",
2345
                                 errors.ECODE_INVAL)
2346
    if new_ip != old_ip:
2347
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
2348
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
2349
                                   " reachable on the network. Aborting." %
2350
                                   new_ip, errors.ECODE_NOTUNIQUE)
2351

    
2352
    self.op.name = new_name
2353

    
2354
  def Exec(self, feedback_fn):
2355
    """Rename the cluster.
2356

2357
    """
2358
    clustername = self.op.name
2359
    ip = self.ip
2360

    
2361
    # shutdown the master IP
2362
    master = self.cfg.GetMasterNode()
2363
    result = self.rpc.call_node_stop_master(master, False)
2364
    result.Raise("Could not disable the master role")
2365

    
2366
    try:
2367
      cluster = self.cfg.GetClusterInfo()
2368
      cluster.cluster_name = clustername
2369
      cluster.master_ip = ip
2370
      self.cfg.Update(cluster, feedback_fn)
2371

    
2372
      # update the known hosts file
2373
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
2374
      node_list = self.cfg.GetNodeList()
2375
      try:
2376
        node_list.remove(master)
2377
      except ValueError:
2378
        pass
2379
      result = self.rpc.call_upload_file(node_list,
2380
                                         constants.SSH_KNOWN_HOSTS_FILE)
2381
      for to_node, to_result in result.iteritems():
2382
        msg = to_result.fail_msg
2383
        if msg:
2384
          msg = ("Copy of file %s to node %s failed: %s" %
2385
                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
2386
          self.proc.LogWarning(msg)
2387

    
2388
    finally:
2389
      result = self.rpc.call_node_start_master(master, False, False)
2390
      msg = result.fail_msg
2391
      if msg:
2392
        self.LogWarning("Could not re-enable the master role on"
2393
                        " the master, please restart manually: %s", msg)
2394

    
2395

    
2396
def _RecursiveCheckIfLVMBased(disk):
2397
  """Check if the given disk or its children are lvm-based.
2398

2399
  @type disk: L{objects.Disk}
2400
  @param disk: the disk to check
2401
  @rtype: boolean
2402
  @return: boolean indicating whether a LD_LV dev_type was found or not
2403

2404
  """
2405
  if disk.children:
2406
    for chdisk in disk.children:
2407
      if _RecursiveCheckIfLVMBased(chdisk):
2408
        return True
2409
  return disk.dev_type == constants.LD_LV
2410

    
2411

    
2412
class LUSetClusterParams(LogicalUnit):
2413
  """Change the parameters of the cluster.
2414

2415
  """
2416
  HPATH = "cluster-modify"
2417
  HTYPE = constants.HTYPE_CLUSTER
2418
  _OP_REQP = []
2419
  _OP_DEFS = [
2420
    ("candidate_pool_size", None),
2421
    ("uid_pool", None),
2422
    ("add_uids", None),
2423
    ("remove_uids", None),
2424
    ]
2425
  REQ_BGL = False
2426

    
2427
  def CheckArguments(self):
2428
    """Check parameters
2429

2430
    """
2431
    if self.op.candidate_pool_size is not None:
2432
      try:
2433
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
2434
      except (ValueError, TypeError), err:
2435
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
2436
                                   str(err), errors.ECODE_INVAL)
2437
      if self.op.candidate_pool_size < 1:
2438
        raise errors.OpPrereqError("At least one master candidate needed",
2439
                                   errors.ECODE_INVAL)
2440

    
2441
    _CheckBooleanOpField(self.op, "maintain_node_health")
2442

    
2443
    if self.op.uid_pool:
2444
      uidpool.CheckUidPool(self.op.uid_pool)
2445

    
2446
    if self.op.add_uids:
2447
      uidpool.CheckUidPool(self.op.add_uids)
2448

    
2449
    if self.op.remove_uids:
2450
      uidpool.CheckUidPool(self.op.remove_uids)
2451

    
2452
  def ExpandNames(self):
2453
    # FIXME: in the future maybe other cluster params won't require checking on
2454
    # all nodes to be modified.
2455
    self.needed_locks = {
2456
      locking.LEVEL_NODE: locking.ALL_SET,
2457
    }
2458
    self.share_locks[locking.LEVEL_NODE] = 1
2459

    
2460
  def BuildHooksEnv(self):
2461
    """Build hooks env.
2462

2463
    """
2464
    env = {
2465
      "OP_TARGET": self.cfg.GetClusterName(),
2466
      "NEW_VG_NAME": self.op.vg_name,
2467
      }
2468
    mn = self.cfg.GetMasterNode()
2469
    return env, [mn], [mn]
2470

    
2471
  def CheckPrereq(self):
2472
    """Check prerequisites.
2473

2474
    This checks whether the given params don't conflict and
2475
    if the given volume group is valid.
2476

2477
    """
2478
    if self.op.vg_name is not None and not self.op.vg_name:
2479
      instances = self.cfg.GetAllInstancesInfo().values()
2480
      for inst in instances:
2481
        for disk in inst.disks:
2482
          if _RecursiveCheckIfLVMBased(disk):
2483
            raise errors.OpPrereqError("Cannot disable lvm storage while"
2484
                                       " lvm-based instances exist",
2485
                                       errors.ECODE_INVAL)
2486

    
2487
    node_list = self.acquired_locks[locking.LEVEL_NODE]
2488

    
2489
    # if vg_name not None, checks given volume group on all nodes
2490
    if self.op.vg_name:
2491
      vglist = self.rpc.call_vg_list(node_list)
2492
      for node in node_list:
2493
        msg = vglist[node].fail_msg
2494
        if msg:
2495
          # ignoring down node
2496
          self.LogWarning("Error while gathering data on node %s"
2497
                          " (ignoring node): %s", node, msg)
2498
          continue
2499
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2500
                                              self.op.vg_name,
2501
                                              constants.MIN_VG_SIZE)
2502
        if vgstatus:
2503
          raise errors.OpPrereqError("Error on node '%s': %s" %
2504
                                     (node, vgstatus), errors.ECODE_ENVIRON)
2505

    
2506
    self.cluster = cluster = self.cfg.GetClusterInfo()
2507
    # validate params changes
2508
    if self.op.beparams:
2509
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2510
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
2511

    
2512
    if self.op.nicparams:
2513
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2514
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
2515
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
2516
      nic_errors = []
2517

    
2518
      # check all instances for consistency
2519
      for instance in self.cfg.GetAllInstancesInfo().values():
2520
        for nic_idx, nic in enumerate(instance.nics):
2521
          params_copy = copy.deepcopy(nic.nicparams)
2522
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
2523

    
2524
          # check parameter syntax
2525
          try:
2526
            objects.NIC.CheckParameterSyntax(params_filled)
2527
          except errors.ConfigurationError, err:
2528
            nic_errors.append("Instance %s, nic/%d: %s" %
2529
                              (instance.name, nic_idx, err))
2530

    
2531
          # if we're moving instances to routed, check that they have an ip
2532
          target_mode = params_filled[constants.NIC_MODE]
2533
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2534
            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2535
                              (instance.name, nic_idx))
2536
      if nic_errors:
2537
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2538
                                   "\n".join(nic_errors))
2539

    
2540
    # hypervisor list/parameters
2541
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
2542
    if self.op.hvparams:
2543
      if not isinstance(self.op.hvparams, dict):
2544
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
2545
                                   errors.ECODE_INVAL)
2546
      for hv_name, hv_dict in self.op.hvparams.items():
2547
        if hv_name not in self.new_hvparams:
2548
          self.new_hvparams[hv_name] = hv_dict
2549
        else:
2550
          self.new_hvparams[hv_name].update(hv_dict)
2551

    
2552
    # os hypervisor parameters
2553
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
2554
    if self.op.os_hvp:
2555
      if not isinstance(self.op.os_hvp, dict):
2556
        raise errors.OpPrereqError("Invalid 'os_hvp' parameter on input",
2557
                                   errors.ECODE_INVAL)
2558
      for os_name, hvs in self.op.os_hvp.items():
2559
        if not isinstance(hvs, dict):
2560
          raise errors.OpPrereqError(("Invalid 'os_hvp' parameter on"
2561
                                      " input"), errors.ECODE_INVAL)
2562
        if os_name not in self.new_os_hvp:
2563
          self.new_os_hvp[os_name] = hvs
2564
        else:
2565
          for hv_name, hv_dict in hvs.items():
2566
            if hv_name not in self.new_os_hvp[os_name]:
2567
              self.new_os_hvp[os_name][hv_name] = hv_dict
2568
            else:
2569
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
2570

    
2571
    # os parameters
2572
    self.new_osp = objects.FillDict(cluster.osparams, {})
2573
    if self.op.osparams:
2574
      if not isinstance(self.op.osparams, dict):
2575
        raise errors.OpPrereqError("Invalid 'osparams' parameter on input",
2576
                                   errors.ECODE_INVAL)
2577
      for os_name, osp in self.op.osparams.items():
2578
        if not isinstance(osp, dict):
2579
          raise errors.OpPrereqError(("Invalid 'osparams' parameter on"
2580
                                      " input"), errors.ECODE_INVAL)
2581
        if os_name not in self.new_osp:
2582
          self.new_osp[os_name] = {}
2583

    
2584
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
2585
                                                  use_none=True)
2586

    
2587
        if not self.new_osp[os_name]:
2588
          # we removed all parameters
2589
          del self.new_osp[os_name]
2590
        else:
2591
          # check the parameter validity (remote check)
2592
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
2593
                         os_name, self.new_osp[os_name])
2594

    
2595
    # changes to the hypervisor list
2596
    if self.op.enabled_hypervisors is not None:
2597
      self.hv_list = self.op.enabled_hypervisors
2598
      if not self.hv_list:
2599
        raise errors.OpPrereqError("Enabled hypervisors list must contain at"
2600
                                   " least one member",
2601
                                   errors.ECODE_INVAL)
2602
      invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
2603
      if invalid_hvs:
2604
        raise errors.OpPrereqError("Enabled hypervisors contains invalid"
2605
                                   " entries: %s" %
2606
                                   utils.CommaJoin(invalid_hvs),
2607
                                   errors.ECODE_INVAL)
2608
      for hv in self.hv_list:
2609
        # if the hypervisor doesn't already exist in the cluster
2610
        # hvparams, we initialize it to empty, and then (in both
2611
        # cases) we make sure to fill the defaults, as we might not
2612
        # have a complete defaults list if the hypervisor wasn't
2613
        # enabled before
2614
        if hv not in new_hvp:
2615
          new_hvp[hv] = {}
2616
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
2617
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
2618
    else:
2619
      self.hv_list = cluster.enabled_hypervisors
2620

    
2621
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
2622
      # either the enabled list has changed, or the parameters have, validate
2623
      for hv_name, hv_params in self.new_hvparams.items():
2624
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
2625
            (self.op.enabled_hypervisors and
2626
             hv_name in self.op.enabled_hypervisors)):
2627
          # either this is a new hypervisor, or its parameters have changed
2628
          hv_class = hypervisor.GetHypervisor(hv_name)
2629
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2630
          hv_class.CheckParameterSyntax(hv_params)
2631
          _CheckHVParams(self, node_list, hv_name, hv_params)
2632

    
2633
    if self.op.os_hvp:
2634
      # no need to check any newly-enabled hypervisors, since the
2635
      # defaults have already been checked in the above code-block
2636
      for os_name, os_hvp in self.new_os_hvp.items():
2637
        for hv_name, hv_params in os_hvp.items():
2638
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2639
          # we need to fill in the new os_hvp on top of the actual hv_p
2640
          cluster_defaults = self.new_hvparams.get(hv_name, {})
2641
          new_osp = objects.FillDict(cluster_defaults, hv_params)
2642
          hv_class = hypervisor.GetHypervisor(hv_name)
2643
          hv_class.CheckParameterSyntax(new_osp)
2644
          _CheckHVParams(self, node_list, hv_name, new_osp)
2645

    
2646

    
2647
  def Exec(self, feedback_fn):
2648
    """Change the parameters of the cluster.
2649

2650
    """
2651
    if self.op.vg_name is not None:
2652
      new_volume = self.op.vg_name
2653
      if not new_volume:
2654
        new_volume = None
2655
      if new_volume != self.cfg.GetVGName():
2656
        self.cfg.SetVGName(new_volume)
2657
      else:
2658
        feedback_fn("Cluster LVM configuration already in desired"
2659
                    " state, not changing")
2660
    if self.op.hvparams:
2661
      self.cluster.hvparams = self.new_hvparams
2662
    if self.op.os_hvp:
2663
      self.cluster.os_hvp = self.new_os_hvp
2664
    if self.op.enabled_hypervisors is not None:
2665
      self.cluster.hvparams = self.new_hvparams
2666
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2667
    if self.op.beparams:
2668
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2669
    if self.op.nicparams:
2670
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2671
    if self.op.osparams:
2672
      self.cluster.osparams = self.new_osp
2673

    
2674
    if self.op.candidate_pool_size is not None:
2675
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
2676
      # we need to update the pool size here, otherwise the save will fail
2677
      _AdjustCandidatePool(self, [])
2678

    
2679
    if self.op.maintain_node_health is not None:
2680
      self.cluster.maintain_node_health = self.op.maintain_node_health
2681

    
2682
    if self.op.add_uids is not None:
2683
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
2684

    
2685
    if self.op.remove_uids is not None:
2686
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
2687

    
2688
    if self.op.uid_pool is not None:
2689
      self.cluster.uid_pool = self.op.uid_pool
2690

    
2691
    self.cfg.Update(self.cluster, feedback_fn)
2692

    
2693

    
2694
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2695
  """Distribute additional files which are part of the cluster configuration.
2696

2697
  ConfigWriter takes care of distributing the config and ssconf files, but
2698
  there are more files which should be distributed to all nodes. This function
2699
  makes sure those are copied.
2700

2701
  @param lu: calling logical unit
2702
  @param additional_nodes: list of nodes not in the config to distribute to
2703

2704
  """
2705
  # 1. Gather target nodes
2706
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2707
  dist_nodes = lu.cfg.GetOnlineNodeList()
2708
  if additional_nodes is not None:
2709
    dist_nodes.extend(additional_nodes)
2710
  if myself.name in dist_nodes:
2711
    dist_nodes.remove(myself.name)
2712

    
2713
  # 2. Gather files to distribute
2714
  dist_files = set([constants.ETC_HOSTS,
2715
                    constants.SSH_KNOWN_HOSTS_FILE,
2716
                    constants.RAPI_CERT_FILE,
2717
                    constants.RAPI_USERS_FILE,
2718
                    constants.CONFD_HMAC_KEY,
2719
                    constants.CLUSTER_DOMAIN_SECRET_FILE,
2720
                   ])
2721

    
2722
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2723
  for hv_name in enabled_hypervisors:
2724
    hv_class = hypervisor.GetHypervisor(hv_name)
2725
    dist_files.update(hv_class.GetAncillaryFiles())
2726

    
2727
  # 3. Perform the files upload
2728
  for fname in dist_files:
2729
    if os.path.exists(fname):
2730
      result = lu.rpc.call_upload_file(dist_nodes, fname)
2731
      for to_node, to_result in result.items():
2732
        msg = to_result.fail_msg
2733
        if msg:
2734
          msg = ("Copy of file %s to node %s failed: %s" %
2735
                 (fname, to_node, msg))
2736
          lu.proc.LogWarning(msg)
2737

    
2738

    
2739
class LURedistributeConfig(NoHooksLU):
2740
  """Force the redistribution of cluster configuration.
2741

2742
  This is a very simple LU.
2743

2744
  """
2745
  _OP_REQP = []
2746
  REQ_BGL = False
2747

    
2748
  def ExpandNames(self):
2749
    self.needed_locks = {
2750
      locking.LEVEL_NODE: locking.ALL_SET,
2751
    }
2752
    self.share_locks[locking.LEVEL_NODE] = 1
2753

    
2754
  def CheckPrereq(self):
2755
    """Check prerequisites.
2756

2757
    """
2758

    
2759
  def Exec(self, feedback_fn):
2760
    """Redistribute the configuration.
2761

2762
    """
2763
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2764
    _RedistributeAncillaryFiles(self)
2765

    
2766

    
2767
def _WaitForSync(lu, instance, disks=None, oneshot=False):
2768
  """Sleep and poll for an instance's disk to sync.
2769

2770
  """
2771
  if not instance.disks or disks is not None and not disks:
2772
    return True
2773

    
2774
  disks = _ExpandCheckDisks(instance, disks)
2775

    
2776
  if not oneshot:
2777
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2778

    
2779
  node = instance.primary_node
2780

    
2781
  for dev in disks:
2782
    lu.cfg.SetDiskID(dev, node)
2783

    
2784
  # TODO: Convert to utils.Retry
2785

    
2786
  retries = 0
2787
  degr_retries = 10 # in seconds, as we sleep 1 second each time
2788
  while True:
2789
    max_time = 0
2790
    done = True
2791
    cumul_degraded = False
2792
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
2793
    msg = rstats.fail_msg
2794
    if msg:
2795
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2796
      retries += 1
2797
      if retries >= 10:
2798
        raise errors.RemoteError("Can't contact node %s for mirror data,"
2799
                                 " aborting." % node)
2800
      time.sleep(6)
2801
      continue
2802
    rstats = rstats.payload
2803
    retries = 0
2804
    for i, mstat in enumerate(rstats):
2805
      if mstat is None:
2806
        lu.LogWarning("Can't compute data for node %s/%s",
2807
                           node, disks[i].iv_name)
2808
        continue
2809

    
2810
      cumul_degraded = (cumul_degraded or
2811
                        (mstat.is_degraded and mstat.sync_percent is None))
2812
      if mstat.sync_percent is not None:
2813
        done = False
2814
        if mstat.estimated_time is not None:
2815
          rem_time = ("%s remaining (estimated)" %
2816
                      utils.FormatSeconds(mstat.estimated_time))
2817
          max_time = mstat.estimated_time
2818
        else:
2819
          rem_time = "no time estimate"
2820
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2821
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
2822

    
2823
    # if we're done but degraded, let's do a few small retries, to
2824
    # make sure we see a stable and not transient situation; therefore
2825
    # we force restart of the loop
2826
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
2827
      logging.info("Degraded disks found, %d retries left", degr_retries)
2828
      degr_retries -= 1
2829
      time.sleep(1)
2830
      continue
2831

    
2832
    if done or oneshot:
2833
      break
2834

    
2835
    time.sleep(min(60, max_time))
2836

    
2837
  if done:
2838
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2839
  return not cumul_degraded
2840

    
2841

    
2842
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2843
  """Check that mirrors are not degraded.
2844

2845
  The ldisk parameter, if True, will change the test from the
2846
  is_degraded attribute (which represents overall non-ok status for
2847
  the device(s)) to the ldisk (representing the local storage status).
2848

2849
  """
2850
  lu.cfg.SetDiskID(dev, node)
2851

    
2852
  result = True
2853

    
2854
  if on_primary or dev.AssembleOnSecondary():
2855
    rstats = lu.rpc.call_blockdev_find(node, dev)
2856
    msg = rstats.fail_msg
2857
    if msg:
2858
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2859
      result = False
2860
    elif not rstats.payload:
2861
      lu.LogWarning("Can't find disk on node %s", node)
2862
      result = False
2863
    else:
2864
      if ldisk:
2865
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2866
      else:
2867
        result = result and not rstats.payload.is_degraded
2868

    
2869
  if dev.children:
2870
    for child in dev.children:
2871
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2872

    
2873
  return result
2874

    
2875

    
2876
class LUDiagnoseOS(NoHooksLU):
2877
  """Logical unit for OS diagnose/query.
2878

2879
  """
2880
  _OP_REQP = ["output_fields", "names"]
2881
  REQ_BGL = False
2882
  _FIELDS_STATIC = utils.FieldSet()
2883
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants",
2884
                                   "parameters", "api_versions")
2885

    
2886
  def ExpandNames(self):
2887
    if self.op.names:
2888
      raise errors.OpPrereqError("Selective OS query not supported",
2889
                                 errors.ECODE_INVAL)
2890

    
2891
    _CheckOutputFields(static=self._FIELDS_STATIC,
2892
                       dynamic=self._FIELDS_DYNAMIC,
2893
                       selected=self.op.output_fields)
2894

    
2895
    # Lock all nodes, in shared mode
2896
    # Temporary removal of locks, should be reverted later
2897
    # TODO: reintroduce locks when they are lighter-weight
2898
    self.needed_locks = {}
2899
    #self.share_locks[locking.LEVEL_NODE] = 1
2900
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2901

    
2902
  def CheckPrereq(self):
2903
    """Check prerequisites.
2904

2905
    """
2906

    
2907
  @staticmethod
2908
  def _DiagnoseByOS(rlist):
2909
    """Remaps a per-node return list into an a per-os per-node dictionary
2910

2911
    @param rlist: a map with node names as keys and OS objects as values
2912

2913
    @rtype: dict
2914
    @return: a dictionary with osnames as keys and as value another
2915
        map, with nodes as keys and tuples of (path, status, diagnose,
2916
        variants, parameters, api_versions) as values, eg::
2917

2918
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
2919
                                     (/srv/..., False, "invalid api")],
2920
                           "node2": [(/srv/..., True, "", [], [])]}
2921
          }
2922

2923
    """
2924
    all_os = {}
2925
    # we build here the list of nodes that didn't fail the RPC (at RPC
2926
    # level), so that nodes with a non-responding node daemon don't
2927
    # make all OSes invalid
2928
    good_nodes = [node_name for node_name in rlist
2929
                  if not rlist[node_name].fail_msg]
2930
    for node_name, nr in rlist.items():
2931
      if nr.fail_msg or not nr.payload:
2932
        continue
2933
      for (name, path, status, diagnose, variants,
2934
           params, api_versions) in nr.payload:
2935
        if name not in all_os:
2936
          # build a list of nodes for this os containing empty lists
2937
          # for each node in node_list
2938
          all_os[name] = {}
2939
          for nname in good_nodes:
2940
            all_os[name][nname] = []
2941
        # convert params from [name, help] to (name, help)
2942
        params = [tuple(v) for v in params]
2943
        all_os[name][node_name].append((path, status, diagnose,
2944
                                        variants, params, api_versions))
2945
    return all_os
2946

    
2947
  def Exec(self, feedback_fn):
2948
    """Compute the list of OSes.
2949

2950
    """
2951
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2952
    node_data = self.rpc.call_os_diagnose(valid_nodes)
2953
    pol = self._DiagnoseByOS(node_data)
2954
    output = []
2955

    
2956
    for os_name, os_data in pol.items():
2957
      row = []
2958
      valid = True
2959
      (variants, params, api_versions) = null_state = (set(), set(), set())
2960
      for idx, osl in enumerate(os_data.values()):
2961
        valid = bool(valid and osl and osl[0][1])
2962
        if not valid:
2963
          (variants, params, api_versions) = null_state
2964
          break
2965
        node_variants, node_params, node_api = osl[0][3:6]
2966
        if idx == 0: # first entry
2967
          variants = set(node_variants)
2968
          params = set(node_params)
2969
          api_versions = set(node_api)
2970
        else: # keep consistency
2971
          variants.intersection_update(node_variants)
2972
          params.intersection_update(node_params)
2973
          api_versions.intersection_update(node_api)
2974

    
2975
      for field in self.op.output_fields:
2976
        if field == "name":
2977
          val = os_name
2978
        elif field == "valid":
2979
          val = valid
2980
        elif field == "node_status":
2981
          # this is just a copy of the dict
2982
          val = {}
2983
          for node_name, nos_list in os_data.items():
2984
            val[node_name] = nos_list
2985
        elif field == "variants":
2986
          val = list(variants)
2987
        elif field == "parameters":
2988
          val = list(params)
2989
        elif field == "api_versions":
2990
          val = list(api_versions)
2991
        else:
2992
          raise errors.ParameterError(field)
2993
        row.append(val)
2994
      output.append(row)
2995

    
2996
    return output
2997

    
2998

    
2999
class LURemoveNode(LogicalUnit):
3000
  """Logical unit for removing a node.
3001

3002
  """
3003
  HPATH = "node-remove"
3004
  HTYPE = constants.HTYPE_NODE
3005
  _OP_REQP = ["node_name"]
3006

    
3007
  def BuildHooksEnv(self):
3008
    """Build hooks env.
3009

3010
    This doesn't run on the target node in the pre phase as a failed
3011
    node would then be impossible to remove.
3012

3013
    """
3014
    env = {
3015
      "OP_TARGET": self.op.node_name,
3016
      "NODE_NAME": self.op.node_name,
3017
      }
3018
    all_nodes = self.cfg.GetNodeList()
3019
    try:
3020
      all_nodes.remove(self.op.node_name)
3021
    except ValueError:
3022
      logging.warning("Node %s which is about to be removed not found"
3023
                      " in the all nodes list", self.op.node_name)
3024
    return env, all_nodes, all_nodes
3025

    
3026
  def CheckPrereq(self):
3027
    """Check prerequisites.
3028

3029
    This checks:
3030
     - the node exists in the configuration
3031
     - it does not have primary or secondary instances
3032
     - it's not the master
3033

3034
    Any errors are signaled by raising errors.OpPrereqError.
3035

3036
    """
3037
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3038
    node = self.cfg.GetNodeInfo(self.op.node_name)
3039
    assert node is not None
3040

    
3041
    instance_list = self.cfg.GetInstanceList()
3042

    
3043
    masternode = self.cfg.GetMasterNode()
3044
    if node.name == masternode:
3045
      raise errors.OpPrereqError("Node is the master node,"
3046
                                 " you need to failover first.",
3047
                                 errors.ECODE_INVAL)
3048

    
3049
    for instance_name in instance_list:
3050
      instance = self.cfg.GetInstanceInfo(instance_name)
3051
      if node.name in instance.all_nodes:
3052
        raise errors.OpPrereqError("Instance %s is still running on the node,"
3053
                                   " please remove first." % instance_name,
3054
                                   errors.ECODE_INVAL)
3055
    self.op.node_name = node.name
3056
    self.node = node
3057

    
3058
  def Exec(self, feedback_fn):
3059
    """Removes the node from the cluster.
3060

3061
    """
3062
    node = self.node
3063
    logging.info("Stopping the node daemon and removing configs from node %s",
3064
                 node.name)
3065

    
3066
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
3067

    
3068
    # Promote nodes to master candidate as needed
3069
    _AdjustCandidatePool(self, exceptions=[node.name])
3070
    self.context.RemoveNode(node.name)
3071

    
3072
    # Run post hooks on the node before it's removed
3073
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
3074
    try:
3075
      hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
3076
    except:
3077
      # pylint: disable-msg=W0702
3078
      self.LogWarning("Errors occurred running hooks on %s" % node.name)
3079

    
3080
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
3081
    msg = result.fail_msg
3082
    if msg:
3083
      self.LogWarning("Errors encountered on the remote node while leaving"
3084
                      " the cluster: %s", msg)
3085

    
3086
    # Remove node from our /etc/hosts
3087
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3088
      # FIXME: this should be done via an rpc call to node daemon
3089
      utils.RemoveHostFromEtcHosts(node.name)
3090
      _RedistributeAncillaryFiles(self)
3091

    
3092

    
3093
class LUQueryNodes(NoHooksLU):
3094
  """Logical unit for querying nodes.
3095

3096
  """
3097
  # pylint: disable-msg=W0142
3098
  _OP_REQP = ["output_fields", "names", "use_locking"]
3099
  REQ_BGL = False
3100

    
3101
  _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
3102
                    "master_candidate", "offline", "drained"]
3103

    
3104
  _FIELDS_DYNAMIC = utils.FieldSet(
3105
    "dtotal", "dfree",
3106
    "mtotal", "mnode", "mfree",
3107
    "bootid",
3108
    "ctotal", "cnodes", "csockets",
3109
    )
3110

    
3111
  _FIELDS_STATIC = utils.FieldSet(*[
3112
    "pinst_cnt", "sinst_cnt",
3113
    "pinst_list", "sinst_list",
3114
    "pip", "sip", "tags",
3115
    "master",
3116
    "role"] + _SIMPLE_FIELDS
3117
    )
3118

    
3119
  def ExpandNames(self):
3120
    _CheckOutputFields(static=self._FIELDS_STATIC,
3121
                       dynamic=self._FIELDS_DYNAMIC,
3122
                       selected=self.op.output_fields)
3123

    
3124
    self.needed_locks = {}
3125
    self.share_locks[locking.LEVEL_NODE] = 1
3126

    
3127
    if self.op.names:
3128
      self.wanted = _GetWantedNodes(self, self.op.names)
3129
    else:
3130
      self.wanted = locking.ALL_SET
3131

    
3132
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3133
    self.do_locking = self.do_node_query and self.op.use_locking
3134
    if self.do_locking:
3135
      # if we don't request only static fields, we need to lock the nodes
3136
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
3137

    
3138
  def CheckPrereq(self):
3139
    """Check prerequisites.
3140

3141
    """
3142
    # The validation of the node list is done in the _GetWantedNodes,
3143
    # if non empty, and if empty, there's no validation to do
3144
    pass
3145

    
3146
  def Exec(self, feedback_fn):
3147
    """Computes the list of nodes and their attributes.
3148

3149
    """
3150
    all_info = self.cfg.GetAllNodesInfo()
3151
    if self.do_locking:
3152
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
3153
    elif self.wanted != locking.ALL_SET:
3154
      nodenames = self.wanted
3155
      missing = set(nodenames).difference(all_info.keys())
3156
      if missing:
3157
        raise errors.OpExecError(
3158
          "Some nodes were removed before retrieving their data: %s" % missing)
3159
    else:
3160
      nodenames = all_info.keys()
3161

    
3162
    nodenames = utils.NiceSort(nodenames)
3163
    nodelist = [all_info[name] for name in nodenames]
3164

    
3165
    # begin data gathering
3166

    
3167
    if self.do_node_query:
3168
      live_data = {}
3169
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3170
                                          self.cfg.GetHypervisorType())
3171
      for name in nodenames:
3172
        nodeinfo = node_data[name]
3173
        if not nodeinfo.fail_msg and nodeinfo.payload:
3174
          nodeinfo = nodeinfo.payload
3175
          fn = utils.TryConvert
3176
          live_data[name] = {
3177
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
3178
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
3179
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
3180
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
3181
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
3182
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
3183
            "bootid": nodeinfo.get('bootid', None),
3184
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
3185
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
3186
            }
3187
        else:
3188
          live_data[name] = {}
3189
    else:
3190
      live_data = dict.fromkeys(nodenames, {})
3191

    
3192
    node_to_primary = dict([(name, set()) for name in nodenames])
3193
    node_to_secondary = dict([(name, set()) for name in nodenames])
3194

    
3195
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
3196
                             "sinst_cnt", "sinst_list"))
3197
    if inst_fields & frozenset(self.op.output_fields):
3198
      inst_data = self.cfg.GetAllInstancesInfo()
3199

    
3200
      for inst in inst_data.values():
3201
        if inst.primary_node in node_to_primary:
3202
          node_to_primary[inst.primary_node].add(inst.name)
3203
        for secnode in inst.secondary_nodes:
3204
          if secnode in node_to_secondary:
3205
            node_to_secondary[secnode].add(inst.name)
3206

    
3207
    master_node = self.cfg.GetMasterNode()
3208

    
3209
    # end data gathering
3210

    
3211
    output = []
3212
    for node in nodelist:
3213
      node_output = []
3214
      for field in self.op.output_fields:
3215
        if field in self._SIMPLE_FIELDS:
3216
          val = getattr(node, field)
3217
        elif field == "pinst_list":
3218
          val = list(node_to_primary[node.name])
3219
        elif field == "sinst_list":
3220
          val = list(node_to_secondary[node.name])
3221
        elif field == "pinst_cnt":
3222
          val = len(node_to_primary[node.name])
3223
        elif field == "sinst_cnt":
3224
          val = len(node_to_secondary[node.name])
3225
        elif field == "pip":
3226
          val = node.primary_ip
3227
        elif field == "sip":
3228
          val = node.secondary_ip
3229
        elif field == "tags":
3230
          val = list(node.GetTags())
3231
        elif field == "master":
3232
          val = node.name == master_node
3233
        elif self._FIELDS_DYNAMIC.Matches(field):
3234
          val = live_data[node.name].get(field, None)
3235
        elif field == "role":
3236
          if node.name == master_node:
3237
            val = "M"
3238
          elif node.master_candidate:
3239
            val = "C"
3240
          elif node.drained:
3241
            val = "D"
3242
          elif node.offline:
3243
            val = "O"
3244
          else:
3245
            val = "R"
3246
        else:
3247
          raise errors.ParameterError(field)
3248
        node_output.append(val)
3249
      output.append(node_output)
3250

    
3251
    return output
3252

    
3253

    
3254
class LUQueryNodeVolumes(NoHooksLU):
3255
  """Logical unit for getting volumes on node(s).
3256

3257
  """
3258
  _OP_REQP = ["nodes", "output_fields"]
3259
  REQ_BGL = False
3260
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
3261
  _FIELDS_STATIC = utils.FieldSet("node")
3262

    
3263
  def ExpandNames(self):
3264
    _CheckOutputFields(static=self._FIELDS_STATIC,
3265
                       dynamic=self._FIELDS_DYNAMIC,
3266
                       selected=self.op.output_fields)
3267

    
3268
    self.needed_locks = {}
3269
    self.share_locks[locking.LEVEL_NODE] = 1
3270
    if not self.op.nodes:
3271
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3272
    else:
3273
      self.needed_locks[locking.LEVEL_NODE] = \
3274
        _GetWantedNodes(self, self.op.nodes)
3275

    
3276
  def CheckPrereq(self):
3277
    """Check prerequisites.
3278

3279
    This checks that the fields required are valid output fields.
3280

3281
    """
3282
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
3283

    
3284
  def Exec(self, feedback_fn):
3285
    """Computes the list of nodes and their attributes.
3286

3287
    """
3288
    nodenames = self.nodes
3289
    volumes = self.rpc.call_node_volumes(nodenames)
3290

    
3291
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3292
             in self.cfg.GetInstanceList()]
3293

    
3294
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
3295

    
3296
    output = []
3297
    for node in nodenames:
3298
      nresult = volumes[node]
3299
      if nresult.offline:
3300
        continue
3301
      msg = nresult.fail_msg
3302
      if msg:
3303
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
3304
        continue
3305

    
3306
      node_vols = nresult.payload[:]
3307
      node_vols.sort(key=lambda vol: vol['dev'])
3308

    
3309
      for vol in node_vols:
3310
        node_output = []
3311
        for field in self.op.output_fields:
3312
          if field == "node":
3313
            val = node
3314
          elif field == "phys":
3315
            val = vol['dev']
3316
          elif field == "vg":
3317
            val = vol['vg']
3318
          elif field == "name":
3319
            val = vol['name']
3320
          elif field == "size":
3321
            val = int(float(vol['size']))
3322
          elif field == "instance":
3323
            for inst in ilist:
3324
              if node not in lv_by_node[inst]:
3325
                continue
3326
              if vol['name'] in lv_by_node[inst][node]:
3327
                val = inst.name
3328
                break
3329
            else:
3330
              val = '-'
3331
          else:
3332
            raise errors.ParameterError(field)
3333
          node_output.append(str(val))
3334

    
3335
        output.append(node_output)
3336

    
3337
    return output
3338

    
3339

    
3340
class LUQueryNodeStorage(NoHooksLU):
3341
  """Logical unit for getting information on storage units on node(s).
3342

3343
  """
3344
  _OP_REQP = ["nodes", "storage_type", "output_fields"]
3345
  _OP_DEFS = [("name", None)]
3346
  REQ_BGL = False
3347
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
3348

    
3349
  def CheckArguments(self):
3350
    _CheckStorageType(self.op.storage_type)
3351

    
3352
    _CheckOutputFields(static=self._FIELDS_STATIC,
3353
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
3354
                       selected=self.op.output_fields)
3355

    
3356
  def ExpandNames(self):
3357
    self.needed_locks = {}
3358
    self.share_locks[locking.LEVEL_NODE] = 1
3359

    
3360
    if self.op.nodes:
3361
      self.needed_locks[locking.LEVEL_NODE] = \
3362
        _GetWantedNodes(self, self.op.nodes)
3363
    else:
3364
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3365

    
3366
  def CheckPrereq(self):
3367
    """Check prerequisites.
3368

3369
    This checks that the fields required are valid output fields.
3370

3371
    """
3372
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
3373

    
3374
  def Exec(self, feedback_fn):
3375
    """Computes the list of nodes and their attributes.
3376

3377
    """
3378
    # Always get name to sort by
3379
    if constants.SF_NAME in self.op.output_fields:
3380
      fields = self.op.output_fields[:]
3381
    else:
3382
      fields = [constants.SF_NAME] + self.op.output_fields
3383

    
3384
    # Never ask for node or type as it's only known to the LU
3385
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
3386
      while extra in fields:
3387
        fields.remove(extra)
3388

    
3389
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
3390
    name_idx = field_idx[constants.SF_NAME]
3391

    
3392
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3393
    data = self.rpc.call_storage_list(self.nodes,
3394
                                      self.op.storage_type, st_args,
3395
                                      self.op.name, fields)
3396

    
3397
    result = []
3398

    
3399
    for node in utils.NiceSort(self.nodes):
3400
      nresult = data[node]
3401
      if nresult.offline:
3402
        continue
3403

    
3404
      msg = nresult.fail_msg
3405
      if msg:
3406
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
3407
        continue
3408

    
3409
      rows = dict([(row[name_idx], row) for row in nresult.payload])
3410

    
3411
      for name in utils.NiceSort(rows.keys()):
3412
        row = rows[name]
3413

    
3414
        out = []
3415

    
3416
        for field in self.op.output_fields:
3417
          if field == constants.SF_NODE:
3418
            val = node
3419
          elif field == constants.SF_TYPE:
3420
            val = self.op.storage_type
3421
          elif field in field_idx:
3422
            val = row[field_idx[field]]
3423
          else:
3424
            raise errors.ParameterError(field)
3425

    
3426
          out.append(val)
3427

    
3428
        result.append(out)
3429

    
3430
    return result
3431

    
3432

    
3433
class LUModifyNodeStorage(NoHooksLU):
3434
  """Logical unit for modifying a storage volume on a node.
3435

3436
  """
3437
  _OP_REQP = ["node_name", "storage_type", "name", "changes"]
3438
  REQ_BGL = False
3439

    
3440
  def CheckArguments(self):
3441
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3442

    
3443
    _CheckStorageType(self.op.storage_type)
3444

    
3445
  def ExpandNames(self):
3446
    self.needed_locks = {
3447
      locking.LEVEL_NODE: self.op.node_name,
3448
      }
3449

    
3450
  def CheckPrereq(self):
3451
    """Check prerequisites.
3452

3453
    """
3454
    storage_type = self.op.storage_type
3455

    
3456
    try:
3457
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
3458
    except KeyError:
3459
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
3460
                                 " modified" % storage_type,
3461
                                 errors.ECODE_INVAL)
3462

    
3463
    diff = set(self.op.changes.keys()) - modifiable
3464
    if diff:
3465
      raise errors.OpPrereqError("The following fields can not be modified for"
3466
                                 " storage units of type '%s': %r" %
3467
                                 (storage_type, list(diff)),
3468
                                 errors.ECODE_INVAL)
3469

    
3470
  def Exec(self, feedback_fn):
3471
    """Computes the list of nodes and their attributes.
3472

3473
    """
3474
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3475
    result = self.rpc.call_storage_modify(self.op.node_name,
3476
                                          self.op.storage_type, st_args,
3477
                                          self.op.name, self.op.changes)
3478
    result.Raise("Failed to modify storage unit '%s' on %s" %
3479
                 (self.op.name, self.op.node_name))
3480

    
3481

    
3482
class LUAddNode(LogicalUnit):
3483
  """Logical unit for adding node to the cluster.
3484

3485
  """
3486
  HPATH = "node-add"
3487
  HTYPE = constants.HTYPE_NODE
3488
  _OP_REQP = ["node_name"]
3489
  _OP_DEFS = [("secondary_ip", None)]
3490

    
3491
  def CheckArguments(self):
3492
    # validate/normalize the node name
3493
    self.op.node_name = utils.HostInfo.NormalizeName(self.op.node_name)
3494

    
3495
  def BuildHooksEnv(self):
3496
    """Build hooks env.
3497

3498
    This will run on all nodes before, and on all nodes + the new node after.
3499

3500
    """
3501
    env = {
3502
      "OP_TARGET": self.op.node_name,
3503
      "NODE_NAME": self.op.node_name,
3504
      "NODE_PIP": self.op.primary_ip,
3505
      "NODE_SIP": self.op.secondary_ip,
3506
      }
3507
    nodes_0 = self.cfg.GetNodeList()
3508
    nodes_1 = nodes_0 + [self.op.node_name, ]
3509
    return env, nodes_0, nodes_1
3510

    
3511
  def CheckPrereq(self):
3512
    """Check prerequisites.
3513

3514
    This checks:
3515
     - the new node is not already in the config
3516
     - it is resolvable
3517
     - its parameters (single/dual homed) matches the cluster
3518

3519
    Any errors are signaled by raising errors.OpPrereqError.
3520

3521
    """
3522
    node_name = self.op.node_name
3523
    cfg = self.cfg
3524

    
3525
    dns_data = utils.GetHostInfo(node_name)
3526

    
3527
    node = dns_data.name
3528
    primary_ip = self.op.primary_ip = dns_data.ip
3529
    if self.op.secondary_ip is None:
3530
      self.op.secondary_ip = primary_ip
3531
    if not utils.IsValidIP(self.op.secondary_ip):
3532
      raise errors.OpPrereqError("Invalid secondary IP given",
3533
                                 errors.ECODE_INVAL)
3534
    secondary_ip = self.op.secondary_ip
3535

    
3536
    node_list = cfg.GetNodeList()
3537
    if not self.op.readd and node in node_list:
3538
      raise errors.OpPrereqError("Node %s is already in the configuration" %
3539
                                 node, errors.ECODE_EXISTS)
3540
    elif self.op.readd and node not in node_list:
3541
      raise errors.OpPrereqError("Node %s is not in the configuration" % node,
3542
                                 errors.ECODE_NOENT)
3543

    
3544
    self.changed_primary_ip = False
3545

    
3546
    for existing_node_name in node_list:
3547
      existing_node = cfg.GetNodeInfo(existing_node_name)
3548

    
3549
      if self.op.readd and node == existing_node_name:
3550
        if existing_node.secondary_ip != secondary_ip:
3551
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
3552
                                     " address configuration as before",
3553
                                     errors.ECODE_INVAL)
3554
        if existing_node.primary_ip != primary_ip:
3555
          self.changed_primary_ip = True
3556

    
3557
        continue
3558

    
3559
      if (existing_node.primary_ip == primary_ip or
3560
          existing_node.secondary_ip == primary_ip or
3561
          existing_node.primary_ip == secondary_ip or
3562
          existing_node.secondary_ip == secondary_ip):
3563
        raise errors.OpPrereqError("New node ip address(es) conflict with"
3564
                                   " existing node %s" % existing_node.name,
3565
                                   errors.ECODE_NOTUNIQUE)
3566

    
3567
    # check that the type of the node (single versus dual homed) is the
3568
    # same as for the master
3569
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
3570
    master_singlehomed = myself.secondary_ip == myself.primary_ip
3571
    newbie_singlehomed = secondary_ip == primary_ip
3572
    if master_singlehomed != newbie_singlehomed:
3573
      if master_singlehomed:
3574
        raise errors.OpPrereqError("The master has no private ip but the"
3575
                                   " new node has one",
3576
                                   errors.ECODE_INVAL)
3577
      else:
3578
        raise errors.OpPrereqError("The master has a private ip but the"
3579
                                   " new node doesn't have one",
3580
                                   errors.ECODE_INVAL)
3581

    
3582
    # checks reachability
3583
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
3584
      raise errors.OpPrereqError("Node not reachable by ping",
3585
                                 errors.ECODE_ENVIRON)
3586

    
3587
    if not newbie_singlehomed:
3588
      # check reachability from my secondary ip to newbie's secondary ip
3589
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
3590
                           source=myself.secondary_ip):
3591
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
3592
                                   " based ping to noded port",
3593
                                   errors.ECODE_ENVIRON)
3594

    
3595
    if self.op.readd:
3596
      exceptions = [node]
3597
    else:
3598
      exceptions = []
3599

    
3600
    self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
3601

    
3602
    if self.op.readd:
3603
      self.new_node = self.cfg.GetNodeInfo(node)
3604
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
3605
    else:
3606
      self.new_node = objects.Node(name=node,
3607
                                   primary_ip=primary_ip,
3608
                                   secondary_ip=secondary_ip,
3609
                                   master_candidate=self.master_candidate,
3610
                                   offline=False, drained=False)
3611

    
3612
  def Exec(self, feedback_fn):
3613
    """Adds the new node to the cluster.
3614

3615
    """
3616
    new_node = self.new_node
3617
    node = new_node.name
3618

    
3619
    # for re-adds, reset the offline/drained/master-candidate flags;
3620
    # we need to reset here, otherwise offline would prevent RPC calls
3621
    # later in the procedure; this also means that if the re-add
3622
    # fails, we are left with a non-offlined, broken node
3623
    if self.op.readd:
3624
      new_node.drained = new_node.offline = False # pylint: disable-msg=W0201
3625
      self.LogInfo("Readding a node, the offline/drained flags were reset")
3626
      # if we demote the node, we do cleanup later in the procedure
3627
      new_node.master_candidate = self.master_candidate
3628
      if self.changed_primary_ip:
3629
        new_node.primary_ip = self.op.primary_ip
3630

    
3631
    # notify the user about any possible mc promotion
3632
    if new_node.master_candidate:
3633
      self.LogInfo("Node will be a master candidate")
3634

    
3635
    # check connectivity
3636
    result = self.rpc.call_version([node])[node]
3637
    result.Raise("Can't get version information from node %s" % node)
3638
    if constants.PROTOCOL_VERSION == result.payload:
3639
      logging.info("Communication to node %s fine, sw version %s match",
3640
                   node, result.payload)
3641
    else:
3642
      raise errors.OpExecError("Version mismatch master version %s,"
3643
                               " node version %s" %
3644
                               (constants.PROTOCOL_VERSION, result.payload))
3645

    
3646
    # setup ssh on node
3647
    if self.cfg.GetClusterInfo().modify_ssh_setup:
3648
      logging.info("Copy ssh key to node %s", node)
3649
      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
3650
      keyarray = []
3651
      keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
3652
                  constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
3653
                  priv_key, pub_key]
3654

    
3655
      for i in keyfiles:
3656
        keyarray.append(utils.ReadFile(i))
3657

    
3658
      result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
3659
                                      keyarray[2], keyarray[3], keyarray[4],
3660
                                      keyarray[5])
3661
      result.Raise("Cannot transfer ssh keys to the new node")
3662

    
3663
    # Add node to our /etc/hosts, and add key to known_hosts
3664
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3665
      # FIXME: this should be done via an rpc call to node daemon
3666
      utils.AddHostToEtcHosts(new_node.name)
3667

    
3668
    if new_node.secondary_ip != new_node.primary_ip:
3669
      result = self.rpc.call_node_has_ip_address(new_node.name,
3670
                                                 new_node.secondary_ip)
3671
      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
3672
                   prereq=True, ecode=errors.ECODE_ENVIRON)
3673
      if not result.payload:
3674
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
3675
                                 " you gave (%s). Please fix and re-run this"
3676
                                 " command." % new_node.secondary_ip)
3677

    
3678
    node_verify_list = [self.cfg.GetMasterNode()]
3679
    node_verify_param = {
3680
      constants.NV_NODELIST: [node],
3681
      # TODO: do a node-net-test as well?
3682
    }
3683

    
3684
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3685
                                       self.cfg.GetClusterName())
3686
    for verifier in node_verify_list:
3687
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
3688
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
3689
      if nl_payload:
3690
        for failed in nl_payload:
3691
          feedback_fn("ssh/hostname verification failed"
3692
                      " (checking from %s): %s" %
3693
                      (verifier, nl_payload[failed]))
3694
        raise errors.OpExecError("ssh/hostname verification failed.")
3695

    
3696
    if self.op.readd:
3697
      _RedistributeAncillaryFiles(self)
3698
      self.context.ReaddNode(new_node)
3699
      # make sure we redistribute the config
3700
      self.cfg.Update(new_node, feedback_fn)
3701
      # and make sure the new node will not have old files around
3702
      if not new_node.master_candidate:
3703
        result = self.rpc.call_node_demote_from_mc(new_node.name)
3704
        msg = result.fail_msg
3705
        if msg:
3706
          self.LogWarning("Node failed to demote itself from master"
3707
                          " candidate status: %s" % msg)
3708
    else:
3709
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
3710
      self.context.AddNode(new_node, self.proc.GetECId())
3711

    
3712

    
3713
class LUSetNodeParams(LogicalUnit):
3714
  """Modifies the parameters of a node.
3715

3716
  """
3717
  HPATH = "node-modify"
3718
  HTYPE = constants.HTYPE_NODE
3719
  _OP_REQP = ["node_name"]
3720
  REQ_BGL = False
3721

    
3722
  def CheckArguments(self):
3723
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3724
    _CheckBooleanOpField(self.op, 'master_candidate')
3725
    _CheckBooleanOpField(self.op, 'offline')
3726
    _CheckBooleanOpField(self.op, 'drained')
3727
    _CheckBooleanOpField(self.op, 'auto_promote')
3728
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
3729
    if all_mods.count(None) == 3:
3730
      raise errors.OpPrereqError("Please pass at least one modification",
3731
                                 errors.ECODE_INVAL)
3732
    if all_mods.count(True) > 1:
3733
      raise errors.OpPrereqError("Can't set the node into more than one"
3734
                                 " state at the same time",
3735
                                 errors.ECODE_INVAL)
3736

    
3737
    # Boolean value that tells us whether we're offlining or draining the node
3738
    self.offline_or_drain = (self.op.offline == True or
3739
                             self.op.drained == True)
3740
    self.deoffline_or_drain = (self.op.offline == False or
3741
                               self.op.drained == False)
3742
    self.might_demote = (self.op.master_candidate == False or
3743
                         self.offline_or_drain)
3744

    
3745
    self.lock_all = self.op.auto_promote and self.might_demote
3746

    
3747

    
3748
  def ExpandNames(self):
3749
    if self.lock_all:
3750
      self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET}
3751
    else:
3752
      self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3753

    
3754
  def BuildHooksEnv(self):
3755
    """Build hooks env.
3756

3757
    This runs on the master node.
3758

3759
    """
3760
    env = {
3761
      "OP_TARGET": self.op.node_name,
3762
      "MASTER_CANDIDATE": str(self.op.master_candidate),
3763
      "OFFLINE": str(self.op.offline),
3764
      "DRAINED": str(self.op.drained),
3765
      }
3766
    nl = [self.cfg.GetMasterNode(),
3767
          self.op.node_name]
3768
    return env, nl, nl
3769

    
3770
  def CheckPrereq(self):
3771
    """Check prerequisites.
3772

3773
    This only checks the instance list against the existing names.
3774

3775
    """
3776
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3777

    
3778
    if (self.op.master_candidate is not None or
3779
        self.op.drained is not None or
3780
        self.op.offline is not None):
3781
      # we can't change the master's node flags
3782
      if self.op.node_name == self.cfg.GetMasterNode():
3783
        raise errors.OpPrereqError("The master role can be changed"
3784
                                   " only via masterfailover",
3785
                                   errors.ECODE_INVAL)
3786

    
3787

    
3788
    if node.master_candidate and self.might_demote and not self.lock_all:
3789
      assert not self.op.auto_promote, "auto-promote set but lock_all not"
3790
      # check if after removing the current node, we're missing master
3791
      # candidates
3792
      (mc_remaining, mc_should, _) = \
3793
          self.cfg.GetMasterCandidateStats(exceptions=[node.name])
3794
      if mc_remaining < mc_should:
3795
        raise errors.OpPrereqError("Not enough master candidates, please"
3796
                                   " pass auto_promote to allow promotion",
3797
                                   errors.ECODE_INVAL)
3798

    
3799
    if (self.op.master_candidate == True and
3800
        ((node.offline and not self.op.offline == False) or
3801
         (node.drained and not self.op.drained == False))):
3802
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3803
                                 " to master_candidate" % node.name,
3804
                                 errors.ECODE_INVAL)
3805

    
3806
    # If we're being deofflined/drained, we'll MC ourself if needed
3807
    if (self.deoffline_or_drain and not self.offline_or_drain and not
3808
        self.op.master_candidate == True and not node.master_candidate):
3809
      self.op.master_candidate = _DecideSelfPromotion(self)
3810
      if self.op.master_candidate:
3811
        self.LogInfo("Autopromoting node to master candidate")
3812

    
3813
    return
3814

    
3815
  def Exec(self, feedback_fn):
3816
    """Modifies a node.
3817

3818
    """
3819
    node = self.node
3820

    
3821
    result = []
3822
    changed_mc = False
3823

    
3824
    if self.op.offline is not None:
3825
      node.offline = self.op.offline
3826
      result.append(("offline", str(self.op.offline)))
3827
      if self.op.offline == True:
3828
        if node.master_candidate:
3829
          node.master_candidate = False
3830
          changed_mc = True
3831
          result.append(("master_candidate", "auto-demotion due to offline"))
3832
        if node.drained:
3833
          node.drained = False
3834
          result.append(("drained", "clear drained status due to offline"))
3835

    
3836
    if self.op.master_candidate is not None:
3837
      node.master_candidate = self.op.master_candidate
3838
      changed_mc = True
3839
      result.append(("master_candidate", str(self.op.master_candidate)))
3840
      if self.op.master_candidate == False:
3841
        rrc = self.rpc.call_node_demote_from_mc(node.name)
3842
        msg = rrc.fail_msg
3843
        if msg:
3844
          self.LogWarning("Node failed to demote itself: %s" % msg)
3845

    
3846
    if self.op.drained is not None:
3847
      node.drained = self.op.drained
3848
      result.append(("drained", str(self.op.drained)))
3849
      if self.op.drained == True:
3850
        if node.master_candidate:
3851
          node.master_candidate = False
3852
          changed_mc = True
3853
          result.append(("master_candidate", "auto-demotion due to drain"))
3854
          rrc = self.rpc.call_node_demote_from_mc(node.name)
3855
          msg = rrc.fail_msg
3856
          if msg:
3857
            self.LogWarning("Node failed to demote itself: %s" % msg)
3858
        if node.offline:
3859
          node.offline = False
3860
          result.append(("offline", "clear offline status due to drain"))
3861

    
3862
    # we locked all nodes, we adjust the CP before updating this node
3863
    if self.lock_all:
3864
      _AdjustCandidatePool(self, [node.name])
3865

    
3866
    # this will trigger configuration file update, if needed
3867
    self.cfg.Update(node, feedback_fn)
3868

    
3869
    # this will trigger job queue propagation or cleanup
3870
    if changed_mc:
3871
      self.context.ReaddNode(node)
3872

    
3873
    return result
3874

    
3875

    
3876
class LUPowercycleNode(NoHooksLU):
3877
  """Powercycles a node.
3878

3879
  """
3880
  _OP_REQP = ["node_name", "force"]
3881
  REQ_BGL = False
3882

    
3883
  def CheckArguments(self):
3884
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3885
    if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
3886
      raise errors.OpPrereqError("The node is the master and the force"
3887
                                 " parameter was not set",
3888
                                 errors.ECODE_INVAL)
3889

    
3890
  def ExpandNames(self):
3891
    """Locking for PowercycleNode.
3892

3893
    This is a last-resort option and shouldn't block on other
3894
    jobs. Therefore, we grab no locks.
3895

3896
    """
3897
    self.needed_locks = {}
3898

    
3899
  def CheckPrereq(self):
3900
    """Check prerequisites.
3901

3902
    This LU has no prereqs.
3903

3904
    """
3905
    pass
3906

    
3907
  def Exec(self, feedback_fn):
3908
    """Reboots a node.
3909

3910
    """
3911
    result = self.rpc.call_node_powercycle(self.op.node_name,
3912
                                           self.cfg.GetHypervisorType())
3913
    result.Raise("Failed to schedule the reboot")
3914
    return result.payload
3915

    
3916

    
3917
class LUQueryClusterInfo(NoHooksLU):
3918
  """Query cluster configuration.
3919

3920
  """
3921
  _OP_REQP = []
3922
  REQ_BGL = False
3923

    
3924
  def ExpandNames(self):
3925
    self.needed_locks = {}
3926

    
3927
  def CheckPrereq(self):
3928
    """No prerequsites needed for this LU.
3929

3930
    """
3931
    pass
3932

    
3933
  def Exec(self, feedback_fn):
3934
    """Return cluster config.
3935

3936
    """
3937
    cluster = self.cfg.GetClusterInfo()
3938
    os_hvp = {}
3939

    
3940
    # Filter just for enabled hypervisors
3941
    for os_name, hv_dict in cluster.os_hvp.items():
3942
      os_hvp[os_name] = {}
3943
      for hv_name, hv_params in hv_dict.items():
3944
        if hv_name in cluster.enabled_hypervisors:
3945
          os_hvp[os_name][hv_name] = hv_params
3946

    
3947
    result = {
3948
      "software_version": constants.RELEASE_VERSION,
3949
      "protocol_version": constants.PROTOCOL_VERSION,
3950
      "config_version": constants.CONFIG_VERSION,
3951
      "os_api_version": max(constants.OS_API_VERSIONS),
3952
      "export_version": constants.EXPORT_VERSION,
3953
      "architecture": (platform.architecture()[0], platform.machine()),
3954
      "name": cluster.cluster_name,
3955
      "master": cluster.master_node,
3956
      "default_hypervisor": cluster.enabled_hypervisors[0],
3957
      "enabled_hypervisors": cluster.enabled_hypervisors,
3958
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3959
                        for hypervisor_name in cluster.enabled_hypervisors]),
3960
      "os_hvp": os_hvp,
3961
      "beparams": cluster.beparams,
3962
      "osparams": cluster.osparams,
3963
      "nicparams": cluster.nicparams,
3964
      "candidate_pool_size": cluster.candidate_pool_size,
3965
      "master_netdev": cluster.master_netdev,
3966
      "volume_group_name": cluster.volume_group_name,
3967
      "file_storage_dir": cluster.file_storage_dir,
3968
      "maintain_node_health": cluster.maintain_node_health,
3969
      "ctime": cluster.ctime,
3970
      "mtime": cluster.mtime,
3971
      "uuid": cluster.uuid,
3972
      "tags": list(cluster.GetTags()),
3973
      "uid_pool": cluster.uid_pool,
3974
      }
3975

    
3976
    return result
3977

    
3978

    
3979
class LUQueryConfigValues(NoHooksLU):
3980
  """Return configuration values.
3981

3982
  """
3983
  _OP_REQP = []
3984
  REQ_BGL = False
3985
  _FIELDS_DYNAMIC = utils.FieldSet()
3986
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3987
                                  "watcher_pause")
3988

    
3989
  def ExpandNames(self):
3990
    self.needed_locks = {}
3991

    
3992
    _CheckOutputFields(static=self._FIELDS_STATIC,
3993
                       dynamic=self._FIELDS_DYNAMIC,
3994
                       selected=self.op.output_fields)
3995

    
3996
  def CheckPrereq(self):
3997
    """No prerequisites.
3998

3999
    """
4000
    pass
4001

    
4002
  def Exec(self, feedback_fn):
4003
    """Dump a representation of the cluster config to the standard output.
4004

4005
    """
4006
    values = []
4007
    for field in self.op.output_fields:
4008
      if field == "cluster_name":
4009
        entry = self.cfg.GetClusterName()
4010
      elif field == "master_node":
4011
        entry = self.cfg.GetMasterNode()
4012
      elif field == "drain_flag":
4013
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
4014
      elif field == "watcher_pause":
4015
        entry = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
4016
      else:
4017
        raise errors.ParameterError(field)
4018
      values.append(entry)
4019
    return values
4020

    
4021

    
4022
class LUActivateInstanceDisks(NoHooksLU):
4023
  """Bring up an instance's disks.
4024

4025
  """
4026
  _OP_REQP = ["instance_name"]
4027
  _OP_DEFS = [("ignore_size", False)]
4028
  REQ_BGL = False
4029

    
4030
  def ExpandNames(self):
4031
    self._ExpandAndLockInstance()
4032
    self.needed_locks[locking.LEVEL_NODE] = []
4033
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4034

    
4035
  def DeclareLocks(self, level):
4036
    if level == locking.LEVEL_NODE:
4037
      self._LockInstancesNodes()
4038

    
4039
  def CheckPrereq(self):
4040
    """Check prerequisites.
4041

4042