Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ b18ecea2

History | View | Annotate | Download (367.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay to many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42

    
43
from ganeti import ssh
44
from ganeti import utils
45
from ganeti import errors
46
from ganeti import hypervisor
47
from ganeti import locking
48
from ganeti import constants
49
from ganeti import objects
50
from ganeti import serializer
51
from ganeti import ssconf
52
from ganeti import uidpool
53
from ganeti import compat
54
from ganeti import masterd
55
from ganeti import netutils
56
from ganeti import ht
57

    
58
import ganeti.masterd.instance # pylint: disable-msg=W0611
59

    
60
# Common opcode attributes
61

    
62
#: output fields for a query operation
63
_POutputFields = ("output_fields", ht.NoDefault, ht.TListOf(ht.TNonEmptyString))
64

    
65

    
66
#: the shutdown timeout
67
_PShutdownTimeout = ("shutdown_timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT,
68
                     ht.TPositiveInt)
69

    
70
#: the force parameter
71
_PForce = ("force", False, ht.TBool)
72

    
73
#: a required instance name (for single-instance LUs)
74
_PInstanceName = ("instance_name", ht.NoDefault, ht.TNonEmptyString)
75

    
76

    
77
#: a required node name (for single-node LUs)
78
_PNodeName = ("node_name", ht.NoDefault, ht.TNonEmptyString)
79

    
80
#: the migration type (live/non-live)
81
_PMigrationMode = ("mode", None,
82
                   ht.TOr(ht.TNone, ht.TElemOf(constants.HT_MIGRATION_MODES)))
83

    
84
#: the obsolete 'live' mode (boolean)
85
_PMigrationLive = ("live", None, ht.TMaybeBool)
86

    
87

    
88
# End types
89
class LogicalUnit(object):
90
  """Logical Unit base class.
91

92
  Subclasses must follow these rules:
93
    - implement ExpandNames
94
    - implement CheckPrereq (except when tasklets are used)
95
    - implement Exec (except when tasklets are used)
96
    - implement BuildHooksEnv
97
    - redefine HPATH and HTYPE
98
    - optionally redefine their run requirements:
99
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
100

101
  Note that all commands require root permissions.
102

103
  @ivar dry_run_result: the value (if any) that will be returned to the caller
104
      in dry-run mode (signalled by opcode dry_run parameter)
105
  @cvar _OP_PARAMS: a list of opcode attributes, their defaults values
106
      they should get if not already defined, and types they must match
107

108
  """
109
  HPATH = None
110
  HTYPE = None
111
  _OP_PARAMS = []
112
  REQ_BGL = True
113

    
114
  def __init__(self, processor, op, context, rpc):
115
    """Constructor for LogicalUnit.
116

117
    This needs to be overridden in derived classes in order to check op
118
    validity.
119

120
    """
121
    self.proc = processor
122
    self.op = op
123
    self.cfg = context.cfg
124
    self.context = context
125
    self.rpc = rpc
126
    # Dicts used to declare locking needs to mcpu
127
    self.needed_locks = None
128
    self.acquired_locks = {}
129
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
130
    self.add_locks = {}
131
    self.remove_locks = {}
132
    # Used to force good behavior when calling helper functions
133
    self.recalculate_locks = {}
134
    self.__ssh = None
135
    # logging
136
    self.Log = processor.Log # pylint: disable-msg=C0103
137
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
138
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
139
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
140
    # support for dry-run
141
    self.dry_run_result = None
142
    # support for generic debug attribute
143
    if (not hasattr(self.op, "debug_level") or
144
        not isinstance(self.op.debug_level, int)):
145
      self.op.debug_level = 0
146

    
147
    # Tasklets
148
    self.tasklets = None
149

    
150
    # The new kind-of-type-system
151
    op_id = self.op.OP_ID
152
    for attr_name, aval, test in self._OP_PARAMS:
153
      if not hasattr(op, attr_name):
154
        if aval == ht.NoDefault:
155
          raise errors.OpPrereqError("Required parameter '%s.%s' missing" %
156
                                     (op_id, attr_name), errors.ECODE_INVAL)
157
        else:
158
          if callable(aval):
159
            dval = aval()
160
          else:
161
            dval = aval
162
          setattr(self.op, attr_name, dval)
163
      attr_val = getattr(op, attr_name)
164
      if test == ht.NoType:
165
        # no tests here
166
        continue
167
      if not callable(test):
168
        raise errors.ProgrammerError("Validation for parameter '%s.%s' failed,"
169
                                     " given type is not a proper type (%s)" %
170
                                     (op_id, attr_name, test))
171
      if not test(attr_val):
172
        logging.error("OpCode %s, parameter %s, has invalid type %s/value %s",
173
                      self.op.OP_ID, attr_name, type(attr_val), attr_val)
174
        raise errors.OpPrereqError("Parameter '%s.%s' fails validation" %
175
                                   (op_id, attr_name), errors.ECODE_INVAL)
176

    
177
    self.CheckArguments()
178

    
179
  def __GetSSH(self):
180
    """Returns the SshRunner object
181

182
    """
183
    if not self.__ssh:
184
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
185
    return self.__ssh
186

    
187
  ssh = property(fget=__GetSSH)
188

    
189
  def CheckArguments(self):
190
    """Check syntactic validity for the opcode arguments.
191

192
    This method is for doing a simple syntactic check and ensure
193
    validity of opcode parameters, without any cluster-related
194
    checks. While the same can be accomplished in ExpandNames and/or
195
    CheckPrereq, doing these separate is better because:
196

197
      - ExpandNames is left as as purely a lock-related function
198
      - CheckPrereq is run after we have acquired locks (and possible
199
        waited for them)
200

201
    The function is allowed to change the self.op attribute so that
202
    later methods can no longer worry about missing parameters.
203

204
    """
205
    pass
206

    
207
  def ExpandNames(self):
208
    """Expand names for this LU.
209

210
    This method is called before starting to execute the opcode, and it should
211
    update all the parameters of the opcode to their canonical form (e.g. a
212
    short node name must be fully expanded after this method has successfully
213
    completed). This way locking, hooks, logging, ecc. can work correctly.
214

215
    LUs which implement this method must also populate the self.needed_locks
216
    member, as a dict with lock levels as keys, and a list of needed lock names
217
    as values. Rules:
218

219
      - use an empty dict if you don't need any lock
220
      - if you don't need any lock at a particular level omit that level
221
      - don't put anything for the BGL level
222
      - if you want all locks at a level use locking.ALL_SET as a value
223

224
    If you need to share locks (rather than acquire them exclusively) at one
225
    level you can modify self.share_locks, setting a true value (usually 1) for
226
    that level. By default locks are not shared.
227

228
    This function can also define a list of tasklets, which then will be
229
    executed in order instead of the usual LU-level CheckPrereq and Exec
230
    functions, if those are not defined by the LU.
231

232
    Examples::
233

234
      # Acquire all nodes and one instance
235
      self.needed_locks = {
236
        locking.LEVEL_NODE: locking.ALL_SET,
237
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
238
      }
239
      # Acquire just two nodes
240
      self.needed_locks = {
241
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
242
      }
243
      # Acquire no locks
244
      self.needed_locks = {} # No, you can't leave it to the default value None
245

246
    """
247
    # The implementation of this method is mandatory only if the new LU is
248
    # concurrent, so that old LUs don't need to be changed all at the same
249
    # time.
250
    if self.REQ_BGL:
251
      self.needed_locks = {} # Exclusive LUs don't need locks.
252
    else:
253
      raise NotImplementedError
254

    
255
  def DeclareLocks(self, level):
256
    """Declare LU locking needs for a level
257

258
    While most LUs can just declare their locking needs at ExpandNames time,
259
    sometimes there's the need to calculate some locks after having acquired
260
    the ones before. This function is called just before acquiring locks at a
261
    particular level, but after acquiring the ones at lower levels, and permits
262
    such calculations. It can be used to modify self.needed_locks, and by
263
    default it does nothing.
264

265
    This function is only called if you have something already set in
266
    self.needed_locks for the level.
267

268
    @param level: Locking level which is going to be locked
269
    @type level: member of ganeti.locking.LEVELS
270

271
    """
272

    
273
  def CheckPrereq(self):
274
    """Check prerequisites for this LU.
275

276
    This method should check that the prerequisites for the execution
277
    of this LU are fulfilled. It can do internode communication, but
278
    it should be idempotent - no cluster or system changes are
279
    allowed.
280

281
    The method should raise errors.OpPrereqError in case something is
282
    not fulfilled. Its return value is ignored.
283

284
    This method should also update all the parameters of the opcode to
285
    their canonical form if it hasn't been done by ExpandNames before.
286

287
    """
288
    if self.tasklets is not None:
289
      for (idx, tl) in enumerate(self.tasklets):
290
        logging.debug("Checking prerequisites for tasklet %s/%s",
291
                      idx + 1, len(self.tasklets))
292
        tl.CheckPrereq()
293
    else:
294
      pass
295

    
296
  def Exec(self, feedback_fn):
297
    """Execute the LU.
298

299
    This method should implement the actual work. It should raise
300
    errors.OpExecError for failures that are somewhat dealt with in
301
    code, or expected.
302

303
    """
304
    if self.tasklets is not None:
305
      for (idx, tl) in enumerate(self.tasklets):
306
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
307
        tl.Exec(feedback_fn)
308
    else:
309
      raise NotImplementedError
310

    
311
  def BuildHooksEnv(self):
312
    """Build hooks environment for this LU.
313

314
    This method should return a three-node tuple consisting of: a dict
315
    containing the environment that will be used for running the
316
    specific hook for this LU, a list of node names on which the hook
317
    should run before the execution, and a list of node names on which
318
    the hook should run after the execution.
319

320
    The keys of the dict must not have 'GANETI_' prefixed as this will
321
    be handled in the hooks runner. Also note additional keys will be
322
    added by the hooks runner. If the LU doesn't define any
323
    environment, an empty dict (and not None) should be returned.
324

325
    No nodes should be returned as an empty list (and not None).
326

327
    Note that if the HPATH for a LU class is None, this function will
328
    not be called.
329

330
    """
331
    raise NotImplementedError
332

    
333
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
334
    """Notify the LU about the results of its hooks.
335

336
    This method is called every time a hooks phase is executed, and notifies
337
    the Logical Unit about the hooks' result. The LU can then use it to alter
338
    its result based on the hooks.  By default the method does nothing and the
339
    previous result is passed back unchanged but any LU can define it if it
340
    wants to use the local cluster hook-scripts somehow.
341

342
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
343
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
344
    @param hook_results: the results of the multi-node hooks rpc call
345
    @param feedback_fn: function used send feedback back to the caller
346
    @param lu_result: the previous Exec result this LU had, or None
347
        in the PRE phase
348
    @return: the new Exec result, based on the previous result
349
        and hook results
350

351
    """
352
    # API must be kept, thus we ignore the unused argument and could
353
    # be a function warnings
354
    # pylint: disable-msg=W0613,R0201
355
    return lu_result
356

    
357
  def _ExpandAndLockInstance(self):
358
    """Helper function to expand and lock an instance.
359

360
    Many LUs that work on an instance take its name in self.op.instance_name
361
    and need to expand it and then declare the expanded name for locking. This
362
    function does it, and then updates self.op.instance_name to the expanded
363
    name. It also initializes needed_locks as a dict, if this hasn't been done
364
    before.
365

366
    """
367
    if self.needed_locks is None:
368
      self.needed_locks = {}
369
    else:
370
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
371
        "_ExpandAndLockInstance called with instance-level locks set"
372
    self.op.instance_name = _ExpandInstanceName(self.cfg,
373
                                                self.op.instance_name)
374
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
375

    
376
  def _LockInstancesNodes(self, primary_only=False):
377
    """Helper function to declare instances' nodes for locking.
378

379
    This function should be called after locking one or more instances to lock
380
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
381
    with all primary or secondary nodes for instances already locked and
382
    present in self.needed_locks[locking.LEVEL_INSTANCE].
383

384
    It should be called from DeclareLocks, and for safety only works if
385
    self.recalculate_locks[locking.LEVEL_NODE] is set.
386

387
    In the future it may grow parameters to just lock some instance's nodes, or
388
    to just lock primaries or secondary nodes, if needed.
389

390
    If should be called in DeclareLocks in a way similar to::
391

392
      if level == locking.LEVEL_NODE:
393
        self._LockInstancesNodes()
394

395
    @type primary_only: boolean
396
    @param primary_only: only lock primary nodes of locked instances
397

398
    """
399
    assert locking.LEVEL_NODE in self.recalculate_locks, \
400
      "_LockInstancesNodes helper function called with no nodes to recalculate"
401

    
402
    # TODO: check if we're really been called with the instance locks held
403

    
404
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
405
    # future we might want to have different behaviors depending on the value
406
    # of self.recalculate_locks[locking.LEVEL_NODE]
407
    wanted_nodes = []
408
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
409
      instance = self.context.cfg.GetInstanceInfo(instance_name)
410
      wanted_nodes.append(instance.primary_node)
411
      if not primary_only:
412
        wanted_nodes.extend(instance.secondary_nodes)
413

    
414
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
415
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
416
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
417
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
418

    
419
    del self.recalculate_locks[locking.LEVEL_NODE]
420

    
421

    
422
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
423
  """Simple LU which runs no hooks.
424

425
  This LU is intended as a parent for other LogicalUnits which will
426
  run no hooks, in order to reduce duplicate code.
427

428
  """
429
  HPATH = None
430
  HTYPE = None
431

    
432
  def BuildHooksEnv(self):
433
    """Empty BuildHooksEnv for NoHooksLu.
434

435
    This just raises an error.
436

437
    """
438
    assert False, "BuildHooksEnv called for NoHooksLUs"
439

    
440

    
441
class Tasklet:
442
  """Tasklet base class.
443

444
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
445
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
446
  tasklets know nothing about locks.
447

448
  Subclasses must follow these rules:
449
    - Implement CheckPrereq
450
    - Implement Exec
451

452
  """
453
  def __init__(self, lu):
454
    self.lu = lu
455

    
456
    # Shortcuts
457
    self.cfg = lu.cfg
458
    self.rpc = lu.rpc
459

    
460
  def CheckPrereq(self):
461
    """Check prerequisites for this tasklets.
462

463
    This method should check whether the prerequisites for the execution of
464
    this tasklet are fulfilled. It can do internode communication, but it
465
    should be idempotent - no cluster or system changes are allowed.
466

467
    The method should raise errors.OpPrereqError in case something is not
468
    fulfilled. Its return value is ignored.
469

470
    This method should also update all parameters to their canonical form if it
471
    hasn't been done before.
472

473
    """
474
    pass
475

    
476
  def Exec(self, feedback_fn):
477
    """Execute the tasklet.
478

479
    This method should implement the actual work. It should raise
480
    errors.OpExecError for failures that are somewhat dealt with in code, or
481
    expected.
482

483
    """
484
    raise NotImplementedError
485

    
486

    
487
def _GetWantedNodes(lu, nodes):
488
  """Returns list of checked and expanded node names.
489

490
  @type lu: L{LogicalUnit}
491
  @param lu: the logical unit on whose behalf we execute
492
  @type nodes: list
493
  @param nodes: list of node names or None for all nodes
494
  @rtype: list
495
  @return: the list of nodes, sorted
496
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
497

498
  """
499
  if not nodes:
500
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
501
      " non-empty list of nodes whose name is to be expanded.")
502

    
503
  wanted = [_ExpandNodeName(lu.cfg, name) for name in nodes]
504
  return utils.NiceSort(wanted)
505

    
506

    
507
def _GetWantedInstances(lu, instances):
508
  """Returns list of checked and expanded instance names.
509

510
  @type lu: L{LogicalUnit}
511
  @param lu: the logical unit on whose behalf we execute
512
  @type instances: list
513
  @param instances: list of instance names or None for all instances
514
  @rtype: list
515
  @return: the list of instances, sorted
516
  @raise errors.OpPrereqError: if the instances parameter is wrong type
517
  @raise errors.OpPrereqError: if any of the passed instances is not found
518

519
  """
520
  if instances:
521
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
522
  else:
523
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
524
  return wanted
525

    
526

    
527
def _GetUpdatedParams(old_params, update_dict,
528
                      use_default=True, use_none=False):
529
  """Return the new version of a parameter dictionary.
530

531
  @type old_params: dict
532
  @param old_params: old parameters
533
  @type update_dict: dict
534
  @param update_dict: dict containing new parameter values, or
535
      constants.VALUE_DEFAULT to reset the parameter to its default
536
      value
537
  @param use_default: boolean
538
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
539
      values as 'to be deleted' values
540
  @param use_none: boolean
541
  @type use_none: whether to recognise C{None} values as 'to be
542
      deleted' values
543
  @rtype: dict
544
  @return: the new parameter dictionary
545

546
  """
547
  params_copy = copy.deepcopy(old_params)
548
  for key, val in update_dict.iteritems():
549
    if ((use_default and val == constants.VALUE_DEFAULT) or
550
        (use_none and val is None)):
551
      try:
552
        del params_copy[key]
553
      except KeyError:
554
        pass
555
    else:
556
      params_copy[key] = val
557
  return params_copy
558

    
559

    
560
def _CheckOutputFields(static, dynamic, selected):
561
  """Checks whether all selected fields are valid.
562

563
  @type static: L{utils.FieldSet}
564
  @param static: static fields set
565
  @type dynamic: L{utils.FieldSet}
566
  @param dynamic: dynamic fields set
567

568
  """
569
  f = utils.FieldSet()
570
  f.Extend(static)
571
  f.Extend(dynamic)
572

    
573
  delta = f.NonMatching(selected)
574
  if delta:
575
    raise errors.OpPrereqError("Unknown output fields selected: %s"
576
                               % ",".join(delta), errors.ECODE_INVAL)
577

    
578

    
579
def _CheckGlobalHvParams(params):
580
  """Validates that given hypervisor params are not global ones.
581

582
  This will ensure that instances don't get customised versions of
583
  global params.
584

585
  """
586
  used_globals = constants.HVC_GLOBALS.intersection(params)
587
  if used_globals:
588
    msg = ("The following hypervisor parameters are global and cannot"
589
           " be customized at instance level, please modify them at"
590
           " cluster level: %s" % utils.CommaJoin(used_globals))
591
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
592

    
593

    
594
def _CheckNodeOnline(lu, node):
595
  """Ensure that a given node is online.
596

597
  @param lu: the LU on behalf of which we make the check
598
  @param node: the node to check
599
  @raise errors.OpPrereqError: if the node is offline
600

601
  """
602
  if lu.cfg.GetNodeInfo(node).offline:
603
    raise errors.OpPrereqError("Can't use offline node %s" % node,
604
                               errors.ECODE_INVAL)
605

    
606

    
607
def _CheckNodeNotDrained(lu, node):
608
  """Ensure that a given node is not drained.
609

610
  @param lu: the LU on behalf of which we make the check
611
  @param node: the node to check
612
  @raise errors.OpPrereqError: if the node is drained
613

614
  """
615
  if lu.cfg.GetNodeInfo(node).drained:
616
    raise errors.OpPrereqError("Can't use drained node %s" % node,
617
                               errors.ECODE_INVAL)
618

    
619

    
620
def _CheckNodeHasOS(lu, node, os_name, force_variant):
621
  """Ensure that a node supports a given OS.
622

623
  @param lu: the LU on behalf of which we make the check
624
  @param node: the node to check
625
  @param os_name: the OS to query about
626
  @param force_variant: whether to ignore variant errors
627
  @raise errors.OpPrereqError: if the node is not supporting the OS
628

629
  """
630
  result = lu.rpc.call_os_get(node, os_name)
631
  result.Raise("OS '%s' not in supported OS list for node %s" %
632
               (os_name, node),
633
               prereq=True, ecode=errors.ECODE_INVAL)
634
  if not force_variant:
635
    _CheckOSVariant(result.payload, os_name)
636

    
637

    
638
def _RequireFileStorage():
639
  """Checks that file storage is enabled.
640

641
  @raise errors.OpPrereqError: when file storage is disabled
642

643
  """
644
  if not constants.ENABLE_FILE_STORAGE:
645
    raise errors.OpPrereqError("File storage disabled at configure time",
646
                               errors.ECODE_INVAL)
647

    
648

    
649
def _CheckDiskTemplate(template):
650
  """Ensure a given disk template is valid.
651

652
  """
653
  if template not in constants.DISK_TEMPLATES:
654
    msg = ("Invalid disk template name '%s', valid templates are: %s" %
655
           (template, utils.CommaJoin(constants.DISK_TEMPLATES)))
656
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
657
  if template == constants.DT_FILE:
658
    _RequireFileStorage()
659
  return True
660

    
661

    
662
def _CheckStorageType(storage_type):
663
  """Ensure a given storage type is valid.
664

665
  """
666
  if storage_type not in constants.VALID_STORAGE_TYPES:
667
    raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
668
                               errors.ECODE_INVAL)
669
  if storage_type == constants.ST_FILE:
670
    _RequireFileStorage()
671
  return True
672

    
673

    
674
def _GetClusterDomainSecret():
675
  """Reads the cluster domain secret.
676

677
  """
678
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
679
                               strict=True)
680

    
681

    
682
def _CheckInstanceDown(lu, instance, reason):
683
  """Ensure that an instance is not running."""
684
  if instance.admin_up:
685
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
686
                               (instance.name, reason), errors.ECODE_STATE)
687

    
688
  pnode = instance.primary_node
689
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
690
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
691
              prereq=True, ecode=errors.ECODE_ENVIRON)
692

    
693
  if instance.name in ins_l.payload:
694
    raise errors.OpPrereqError("Instance %s is running, %s" %
695
                               (instance.name, reason), errors.ECODE_STATE)
696

    
697

    
698
def _ExpandItemName(fn, name, kind):
699
  """Expand an item name.
700

701
  @param fn: the function to use for expansion
702
  @param name: requested item name
703
  @param kind: text description ('Node' or 'Instance')
704
  @return: the resolved (full) name
705
  @raise errors.OpPrereqError: if the item is not found
706

707
  """
708
  full_name = fn(name)
709
  if full_name is None:
710
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
711
                               errors.ECODE_NOENT)
712
  return full_name
713

    
714

    
715
def _ExpandNodeName(cfg, name):
716
  """Wrapper over L{_ExpandItemName} for nodes."""
717
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
718

    
719

    
720
def _ExpandInstanceName(cfg, name):
721
  """Wrapper over L{_ExpandItemName} for instance."""
722
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
723

    
724

    
725
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
726
                          memory, vcpus, nics, disk_template, disks,
727
                          bep, hvp, hypervisor_name):
728
  """Builds instance related env variables for hooks
729

730
  This builds the hook environment from individual variables.
731

732
  @type name: string
733
  @param name: the name of the instance
734
  @type primary_node: string
735
  @param primary_node: the name of the instance's primary node
736
  @type secondary_nodes: list
737
  @param secondary_nodes: list of secondary nodes as strings
738
  @type os_type: string
739
  @param os_type: the name of the instance's OS
740
  @type status: boolean
741
  @param status: the should_run status of the instance
742
  @type memory: string
743
  @param memory: the memory size of the instance
744
  @type vcpus: string
745
  @param vcpus: the count of VCPUs the instance has
746
  @type nics: list
747
  @param nics: list of tuples (ip, mac, mode, link) representing
748
      the NICs the instance has
749
  @type disk_template: string
750
  @param disk_template: the disk template of the instance
751
  @type disks: list
752
  @param disks: the list of (size, mode) pairs
753
  @type bep: dict
754
  @param bep: the backend parameters for the instance
755
  @type hvp: dict
756
  @param hvp: the hypervisor parameters for the instance
757
  @type hypervisor_name: string
758
  @param hypervisor_name: the hypervisor for the instance
759
  @rtype: dict
760
  @return: the hook environment for this instance
761

762
  """
763
  if status:
764
    str_status = "up"
765
  else:
766
    str_status = "down"
767
  env = {
768
    "OP_TARGET": name,
769
    "INSTANCE_NAME": name,
770
    "INSTANCE_PRIMARY": primary_node,
771
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
772
    "INSTANCE_OS_TYPE": os_type,
773
    "INSTANCE_STATUS": str_status,
774
    "INSTANCE_MEMORY": memory,
775
    "INSTANCE_VCPUS": vcpus,
776
    "INSTANCE_DISK_TEMPLATE": disk_template,
777
    "INSTANCE_HYPERVISOR": hypervisor_name,
778
  }
779

    
780
  if nics:
781
    nic_count = len(nics)
782
    for idx, (ip, mac, mode, link) in enumerate(nics):
783
      if ip is None:
784
        ip = ""
785
      env["INSTANCE_NIC%d_IP" % idx] = ip
786
      env["INSTANCE_NIC%d_MAC" % idx] = mac
787
      env["INSTANCE_NIC%d_MODE" % idx] = mode
788
      env["INSTANCE_NIC%d_LINK" % idx] = link
789
      if mode == constants.NIC_MODE_BRIDGED:
790
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
791
  else:
792
    nic_count = 0
793

    
794
  env["INSTANCE_NIC_COUNT"] = nic_count
795

    
796
  if disks:
797
    disk_count = len(disks)
798
    for idx, (size, mode) in enumerate(disks):
799
      env["INSTANCE_DISK%d_SIZE" % idx] = size
800
      env["INSTANCE_DISK%d_MODE" % idx] = mode
801
  else:
802
    disk_count = 0
803

    
804
  env["INSTANCE_DISK_COUNT"] = disk_count
805

    
806
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
807
    for key, value in source.items():
808
      env["INSTANCE_%s_%s" % (kind, key)] = value
809

    
810
  return env
811

    
812

    
813
def _NICListToTuple(lu, nics):
814
  """Build a list of nic information tuples.
815

816
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
817
  value in LUQueryInstanceData.
818

819
  @type lu:  L{LogicalUnit}
820
  @param lu: the logical unit on whose behalf we execute
821
  @type nics: list of L{objects.NIC}
822
  @param nics: list of nics to convert to hooks tuples
823

824
  """
825
  hooks_nics = []
826
  cluster = lu.cfg.GetClusterInfo()
827
  for nic in nics:
828
    ip = nic.ip
829
    mac = nic.mac
830
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
831
    mode = filled_params[constants.NIC_MODE]
832
    link = filled_params[constants.NIC_LINK]
833
    hooks_nics.append((ip, mac, mode, link))
834
  return hooks_nics
835

    
836

    
837
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
838
  """Builds instance related env variables for hooks from an object.
839

840
  @type lu: L{LogicalUnit}
841
  @param lu: the logical unit on whose behalf we execute
842
  @type instance: L{objects.Instance}
843
  @param instance: the instance for which we should build the
844
      environment
845
  @type override: dict
846
  @param override: dictionary with key/values that will override
847
      our values
848
  @rtype: dict
849
  @return: the hook environment dictionary
850

851
  """
852
  cluster = lu.cfg.GetClusterInfo()
853
  bep = cluster.FillBE(instance)
854
  hvp = cluster.FillHV(instance)
855
  args = {
856
    'name': instance.name,
857
    'primary_node': instance.primary_node,
858
    'secondary_nodes': instance.secondary_nodes,
859
    'os_type': instance.os,
860
    'status': instance.admin_up,
861
    'memory': bep[constants.BE_MEMORY],
862
    'vcpus': bep[constants.BE_VCPUS],
863
    'nics': _NICListToTuple(lu, instance.nics),
864
    'disk_template': instance.disk_template,
865
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
866
    'bep': bep,
867
    'hvp': hvp,
868
    'hypervisor_name': instance.hypervisor,
869
  }
870
  if override:
871
    args.update(override)
872
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
873

    
874

    
875
def _AdjustCandidatePool(lu, exceptions):
876
  """Adjust the candidate pool after node operations.
877

878
  """
879
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
880
  if mod_list:
881
    lu.LogInfo("Promoted nodes to master candidate role: %s",
882
               utils.CommaJoin(node.name for node in mod_list))
883
    for name in mod_list:
884
      lu.context.ReaddNode(name)
885
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
886
  if mc_now > mc_max:
887
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
888
               (mc_now, mc_max))
889

    
890

    
891
def _DecideSelfPromotion(lu, exceptions=None):
892
  """Decide whether I should promote myself as a master candidate.
893

894
  """
895
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
896
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
897
  # the new node will increase mc_max with one, so:
898
  mc_should = min(mc_should + 1, cp_size)
899
  return mc_now < mc_should
900

    
901

    
902
def _CheckNicsBridgesExist(lu, target_nics, target_node):
903
  """Check that the brigdes needed by a list of nics exist.
904

905
  """
906
  cluster = lu.cfg.GetClusterInfo()
907
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
908
  brlist = [params[constants.NIC_LINK] for params in paramslist
909
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
910
  if brlist:
911
    result = lu.rpc.call_bridges_exist(target_node, brlist)
912
    result.Raise("Error checking bridges on destination node '%s'" %
913
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
914

    
915

    
916
def _CheckInstanceBridgesExist(lu, instance, node=None):
917
  """Check that the brigdes needed by an instance exist.
918

919
  """
920
  if node is None:
921
    node = instance.primary_node
922
  _CheckNicsBridgesExist(lu, instance.nics, node)
923

    
924

    
925
def _CheckOSVariant(os_obj, name):
926
  """Check whether an OS name conforms to the os variants specification.
927

928
  @type os_obj: L{objects.OS}
929
  @param os_obj: OS object to check
930
  @type name: string
931
  @param name: OS name passed by the user, to check for validity
932

933
  """
934
  if not os_obj.supported_variants:
935
    return
936
  variant = objects.OS.GetVariant(name)
937
  if not variant:
938
    raise errors.OpPrereqError("OS name must include a variant",
939
                               errors.ECODE_INVAL)
940

    
941
  if variant not in os_obj.supported_variants:
942
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
943

    
944

    
945
def _GetNodeInstancesInner(cfg, fn):
946
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
947

    
948

    
949
def _GetNodeInstances(cfg, node_name):
950
  """Returns a list of all primary and secondary instances on a node.
951

952
  """
953

    
954
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
955

    
956

    
957
def _GetNodePrimaryInstances(cfg, node_name):
958
  """Returns primary instances on a node.
959

960
  """
961
  return _GetNodeInstancesInner(cfg,
962
                                lambda inst: node_name == inst.primary_node)
963

    
964

    
965
def _GetNodeSecondaryInstances(cfg, node_name):
966
  """Returns secondary instances on a node.
967

968
  """
969
  return _GetNodeInstancesInner(cfg,
970
                                lambda inst: node_name in inst.secondary_nodes)
971

    
972

    
973
def _GetStorageTypeArgs(cfg, storage_type):
974
  """Returns the arguments for a storage type.
975

976
  """
977
  # Special case for file storage
978
  if storage_type == constants.ST_FILE:
979
    # storage.FileStorage wants a list of storage directories
980
    return [[cfg.GetFileStorageDir()]]
981

    
982
  return []
983

    
984

    
985
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
986
  faulty = []
987

    
988
  for dev in instance.disks:
989
    cfg.SetDiskID(dev, node_name)
990

    
991
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
992
  result.Raise("Failed to get disk status from node %s" % node_name,
993
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
994

    
995
  for idx, bdev_status in enumerate(result.payload):
996
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
997
      faulty.append(idx)
998

    
999
  return faulty
1000

    
1001

    
1002
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1003
  """Check the sanity of iallocator and node arguments and use the
1004
  cluster-wide iallocator if appropriate.
1005

1006
  Check that at most one of (iallocator, node) is specified. If none is
1007
  specified, then the LU's opcode's iallocator slot is filled with the
1008
  cluster-wide default iallocator.
1009

1010
  @type iallocator_slot: string
1011
  @param iallocator_slot: the name of the opcode iallocator slot
1012
  @type node_slot: string
1013
  @param node_slot: the name of the opcode target node slot
1014

1015
  """
1016
  node = getattr(lu.op, node_slot, None)
1017
  iallocator = getattr(lu.op, iallocator_slot, None)
1018

    
1019
  if node is not None and iallocator is not None:
1020
    raise errors.OpPrereqError("Do not specify both, iallocator and node.",
1021
                               errors.ECODE_INVAL)
1022
  elif node is None and iallocator is None:
1023
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1024
    if default_iallocator:
1025
      setattr(lu.op, iallocator_slot, default_iallocator)
1026
    else:
1027
      raise errors.OpPrereqError("No iallocator or node given and no"
1028
                                 " cluster-wide default iallocator found."
1029
                                 " Please specify either an iallocator or a"
1030
                                 " node, or set a cluster-wide default"
1031
                                 " iallocator.")
1032

    
1033

    
1034
class LUPostInitCluster(LogicalUnit):
1035
  """Logical unit for running hooks after cluster initialization.
1036

1037
  """
1038
  HPATH = "cluster-init"
1039
  HTYPE = constants.HTYPE_CLUSTER
1040

    
1041
  def BuildHooksEnv(self):
1042
    """Build hooks env.
1043

1044
    """
1045
    env = {"OP_TARGET": self.cfg.GetClusterName()}
1046
    mn = self.cfg.GetMasterNode()
1047
    return env, [], [mn]
1048

    
1049
  def Exec(self, feedback_fn):
1050
    """Nothing to do.
1051

1052
    """
1053
    return True
1054

    
1055

    
1056
class LUDestroyCluster(LogicalUnit):
1057
  """Logical unit for destroying the cluster.
1058

1059
  """
1060
  HPATH = "cluster-destroy"
1061
  HTYPE = constants.HTYPE_CLUSTER
1062

    
1063
  def BuildHooksEnv(self):
1064
    """Build hooks env.
1065

1066
    """
1067
    env = {"OP_TARGET": self.cfg.GetClusterName()}
1068
    return env, [], []
1069

    
1070
  def CheckPrereq(self):
1071
    """Check prerequisites.
1072

1073
    This checks whether the cluster is empty.
1074

1075
    Any errors are signaled by raising errors.OpPrereqError.
1076

1077
    """
1078
    master = self.cfg.GetMasterNode()
1079

    
1080
    nodelist = self.cfg.GetNodeList()
1081
    if len(nodelist) != 1 or nodelist[0] != master:
1082
      raise errors.OpPrereqError("There are still %d node(s) in"
1083
                                 " this cluster." % (len(nodelist) - 1),
1084
                                 errors.ECODE_INVAL)
1085
    instancelist = self.cfg.GetInstanceList()
1086
    if instancelist:
1087
      raise errors.OpPrereqError("There are still %d instance(s) in"
1088
                                 " this cluster." % len(instancelist),
1089
                                 errors.ECODE_INVAL)
1090

    
1091
  def Exec(self, feedback_fn):
1092
    """Destroys the cluster.
1093

1094
    """
1095
    master = self.cfg.GetMasterNode()
1096

    
1097
    # Run post hooks on master node before it's removed
1098
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
1099
    try:
1100
      hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
1101
    except:
1102
      # pylint: disable-msg=W0702
1103
      self.LogWarning("Errors occurred running hooks on %s" % master)
1104

    
1105
    result = self.rpc.call_node_stop_master(master, False)
1106
    result.Raise("Could not disable the master role")
1107

    
1108
    return master
1109

    
1110

    
1111
def _VerifyCertificate(filename):
1112
  """Verifies a certificate for LUVerifyCluster.
1113

1114
  @type filename: string
1115
  @param filename: Path to PEM file
1116

1117
  """
1118
  try:
1119
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1120
                                           utils.ReadFile(filename))
1121
  except Exception, err: # pylint: disable-msg=W0703
1122
    return (LUVerifyCluster.ETYPE_ERROR,
1123
            "Failed to load X509 certificate %s: %s" % (filename, err))
1124

    
1125
  (errcode, msg) = \
1126
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1127
                                constants.SSL_CERT_EXPIRATION_ERROR)
1128

    
1129
  if msg:
1130
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1131
  else:
1132
    fnamemsg = None
1133

    
1134
  if errcode is None:
1135
    return (None, fnamemsg)
1136
  elif errcode == utils.CERT_WARNING:
1137
    return (LUVerifyCluster.ETYPE_WARNING, fnamemsg)
1138
  elif errcode == utils.CERT_ERROR:
1139
    return (LUVerifyCluster.ETYPE_ERROR, fnamemsg)
1140

    
1141
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1142

    
1143

    
1144
class LUVerifyCluster(LogicalUnit):
1145
  """Verifies the cluster status.
1146

1147
  """
1148
  HPATH = "cluster-verify"
1149
  HTYPE = constants.HTYPE_CLUSTER
1150
  _OP_PARAMS = [
1151
    ("skip_checks", ht.EmptyList,
1152
     ht.TListOf(ht.TElemOf(constants.VERIFY_OPTIONAL_CHECKS))),
1153
    ("verbose", False, ht.TBool),
1154
    ("error_codes", False, ht.TBool),
1155
    ("debug_simulate_errors", False, ht.TBool),
1156
    ]
1157
  REQ_BGL = False
1158

    
1159
  TCLUSTER = "cluster"
1160
  TNODE = "node"
1161
  TINSTANCE = "instance"
1162

    
1163
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1164
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1165
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1166
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1167
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1168
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1169
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1170
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1171
  ENODEDRBD = (TNODE, "ENODEDRBD")
1172
  ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
1173
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1174
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1175
  ENODEHV = (TNODE, "ENODEHV")
1176
  ENODELVM = (TNODE, "ENODELVM")
1177
  ENODEN1 = (TNODE, "ENODEN1")
1178
  ENODENET = (TNODE, "ENODENET")
1179
  ENODEOS = (TNODE, "ENODEOS")
1180
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1181
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1182
  ENODERPC = (TNODE, "ENODERPC")
1183
  ENODESSH = (TNODE, "ENODESSH")
1184
  ENODEVERSION = (TNODE, "ENODEVERSION")
1185
  ENODESETUP = (TNODE, "ENODESETUP")
1186
  ENODETIME = (TNODE, "ENODETIME")
1187

    
1188
  ETYPE_FIELD = "code"
1189
  ETYPE_ERROR = "ERROR"
1190
  ETYPE_WARNING = "WARNING"
1191

    
1192
  class NodeImage(object):
1193
    """A class representing the logical and physical status of a node.
1194

1195
    @type name: string
1196
    @ivar name: the node name to which this object refers
1197
    @ivar volumes: a structure as returned from
1198
        L{ganeti.backend.GetVolumeList} (runtime)
1199
    @ivar instances: a list of running instances (runtime)
1200
    @ivar pinst: list of configured primary instances (config)
1201
    @ivar sinst: list of configured secondary instances (config)
1202
    @ivar sbp: diction of {secondary-node: list of instances} of all peers
1203
        of this node (config)
1204
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1205
    @ivar dfree: free disk, as reported by the node (runtime)
1206
    @ivar offline: the offline status (config)
1207
    @type rpc_fail: boolean
1208
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1209
        not whether the individual keys were correct) (runtime)
1210
    @type lvm_fail: boolean
1211
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1212
    @type hyp_fail: boolean
1213
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1214
    @type ghost: boolean
1215
    @ivar ghost: whether this is a known node or not (config)
1216
    @type os_fail: boolean
1217
    @ivar os_fail: whether the RPC call didn't return valid OS data
1218
    @type oslist: list
1219
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1220

1221
    """
1222
    def __init__(self, offline=False, name=None):
1223
      self.name = name
1224
      self.volumes = {}
1225
      self.instances = []
1226
      self.pinst = []
1227
      self.sinst = []
1228
      self.sbp = {}
1229
      self.mfree = 0
1230
      self.dfree = 0
1231
      self.offline = offline
1232
      self.rpc_fail = False
1233
      self.lvm_fail = False
1234
      self.hyp_fail = False
1235
      self.ghost = False
1236
      self.os_fail = False
1237
      self.oslist = {}
1238

    
1239
  def ExpandNames(self):
1240
    self.needed_locks = {
1241
      locking.LEVEL_NODE: locking.ALL_SET,
1242
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1243
    }
1244
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1245

    
1246
  def _Error(self, ecode, item, msg, *args, **kwargs):
1247
    """Format an error message.
1248

1249
    Based on the opcode's error_codes parameter, either format a
1250
    parseable error code, or a simpler error string.
1251

1252
    This must be called only from Exec and functions called from Exec.
1253

1254
    """
1255
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1256
    itype, etxt = ecode
1257
    # first complete the msg
1258
    if args:
1259
      msg = msg % args
1260
    # then format the whole message
1261
    if self.op.error_codes:
1262
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1263
    else:
1264
      if item:
1265
        item = " " + item
1266
      else:
1267
        item = ""
1268
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1269
    # and finally report it via the feedback_fn
1270
    self._feedback_fn("  - %s" % msg)
1271

    
1272
  def _ErrorIf(self, cond, *args, **kwargs):
1273
    """Log an error message if the passed condition is True.
1274

1275
    """
1276
    cond = bool(cond) or self.op.debug_simulate_errors
1277
    if cond:
1278
      self._Error(*args, **kwargs)
1279
    # do not mark the operation as failed for WARN cases only
1280
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1281
      self.bad = self.bad or cond
1282

    
1283
  def _VerifyNode(self, ninfo, nresult):
1284
    """Perform some basic validation on data returned from a node.
1285

1286
      - check the result data structure is well formed and has all the
1287
        mandatory fields
1288
      - check ganeti version
1289

1290
    @type ninfo: L{objects.Node}
1291
    @param ninfo: the node to check
1292
    @param nresult: the results from the node
1293
    @rtype: boolean
1294
    @return: whether overall this call was successful (and we can expect
1295
         reasonable values in the respose)
1296

1297
    """
1298
    node = ninfo.name
1299
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1300

    
1301
    # main result, nresult should be a non-empty dict
1302
    test = not nresult or not isinstance(nresult, dict)
1303
    _ErrorIf(test, self.ENODERPC, node,
1304
                  "unable to verify node: no data returned")
1305
    if test:
1306
      return False
1307

    
1308
    # compares ganeti version
1309
    local_version = constants.PROTOCOL_VERSION
1310
    remote_version = nresult.get("version", None)
1311
    test = not (remote_version and
1312
                isinstance(remote_version, (list, tuple)) and
1313
                len(remote_version) == 2)
1314
    _ErrorIf(test, self.ENODERPC, node,
1315
             "connection to node returned invalid data")
1316
    if test:
1317
      return False
1318

    
1319
    test = local_version != remote_version[0]
1320
    _ErrorIf(test, self.ENODEVERSION, node,
1321
             "incompatible protocol versions: master %s,"
1322
             " node %s", local_version, remote_version[0])
1323
    if test:
1324
      return False
1325

    
1326
    # node seems compatible, we can actually try to look into its results
1327

    
1328
    # full package version
1329
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1330
                  self.ENODEVERSION, node,
1331
                  "software version mismatch: master %s, node %s",
1332
                  constants.RELEASE_VERSION, remote_version[1],
1333
                  code=self.ETYPE_WARNING)
1334

    
1335
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1336
    if isinstance(hyp_result, dict):
1337
      for hv_name, hv_result in hyp_result.iteritems():
1338
        test = hv_result is not None
1339
        _ErrorIf(test, self.ENODEHV, node,
1340
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1341

    
1342

    
1343
    test = nresult.get(constants.NV_NODESETUP,
1344
                           ["Missing NODESETUP results"])
1345
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1346
             "; ".join(test))
1347

    
1348
    return True
1349

    
1350
  def _VerifyNodeTime(self, ninfo, nresult,
1351
                      nvinfo_starttime, nvinfo_endtime):
1352
    """Check the node time.
1353

1354
    @type ninfo: L{objects.Node}
1355
    @param ninfo: the node to check
1356
    @param nresult: the remote results for the node
1357
    @param nvinfo_starttime: the start time of the RPC call
1358
    @param nvinfo_endtime: the end time of the RPC call
1359

1360
    """
1361
    node = ninfo.name
1362
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1363

    
1364
    ntime = nresult.get(constants.NV_TIME, None)
1365
    try:
1366
      ntime_merged = utils.MergeTime(ntime)
1367
    except (ValueError, TypeError):
1368
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1369
      return
1370

    
1371
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1372
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1373
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1374
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1375
    else:
1376
      ntime_diff = None
1377

    
1378
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1379
             "Node time diverges by at least %s from master node time",
1380
             ntime_diff)
1381

    
1382
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1383
    """Check the node time.
1384

1385
    @type ninfo: L{objects.Node}
1386
    @param ninfo: the node to check
1387
    @param nresult: the remote results for the node
1388
    @param vg_name: the configured VG name
1389

1390
    """
1391
    if vg_name is None:
1392
      return
1393

    
1394
    node = ninfo.name
1395
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1396

    
1397
    # checks vg existence and size > 20G
1398
    vglist = nresult.get(constants.NV_VGLIST, None)
1399
    test = not vglist
1400
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1401
    if not test:
1402
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1403
                                            constants.MIN_VG_SIZE)
1404
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1405

    
1406
    # check pv names
1407
    pvlist = nresult.get(constants.NV_PVLIST, None)
1408
    test = pvlist is None
1409
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1410
    if not test:
1411
      # check that ':' is not present in PV names, since it's a
1412
      # special character for lvcreate (denotes the range of PEs to
1413
      # use on the PV)
1414
      for _, pvname, owner_vg in pvlist:
1415
        test = ":" in pvname
1416
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1417
                 " '%s' of VG '%s'", pvname, owner_vg)
1418

    
1419
  def _VerifyNodeNetwork(self, ninfo, nresult):
1420
    """Check the node time.
1421

1422
    @type ninfo: L{objects.Node}
1423
    @param ninfo: the node to check
1424
    @param nresult: the remote results for the node
1425

1426
    """
1427
    node = ninfo.name
1428
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1429

    
1430
    test = constants.NV_NODELIST not in nresult
1431
    _ErrorIf(test, self.ENODESSH, node,
1432
             "node hasn't returned node ssh connectivity data")
1433
    if not test:
1434
      if nresult[constants.NV_NODELIST]:
1435
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1436
          _ErrorIf(True, self.ENODESSH, node,
1437
                   "ssh communication with node '%s': %s", a_node, a_msg)
1438

    
1439
    test = constants.NV_NODENETTEST not in nresult
1440
    _ErrorIf(test, self.ENODENET, node,
1441
             "node hasn't returned node tcp connectivity data")
1442
    if not test:
1443
      if nresult[constants.NV_NODENETTEST]:
1444
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1445
        for anode in nlist:
1446
          _ErrorIf(True, self.ENODENET, node,
1447
                   "tcp communication with node '%s': %s",
1448
                   anode, nresult[constants.NV_NODENETTEST][anode])
1449

    
1450
    test = constants.NV_MASTERIP not in nresult
1451
    _ErrorIf(test, self.ENODENET, node,
1452
             "node hasn't returned node master IP reachability data")
1453
    if not test:
1454
      if not nresult[constants.NV_MASTERIP]:
1455
        if node == self.master_node:
1456
          msg = "the master node cannot reach the master IP (not configured?)"
1457
        else:
1458
          msg = "cannot reach the master IP"
1459
        _ErrorIf(True, self.ENODENET, node, msg)
1460

    
1461

    
1462
  def _VerifyInstance(self, instance, instanceconfig, node_image):
1463
    """Verify an instance.
1464

1465
    This function checks to see if the required block devices are
1466
    available on the instance's node.
1467

1468
    """
1469
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1470
    node_current = instanceconfig.primary_node
1471

    
1472
    node_vol_should = {}
1473
    instanceconfig.MapLVsByNode(node_vol_should)
1474

    
1475
    for node in node_vol_should:
1476
      n_img = node_image[node]
1477
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1478
        # ignore missing volumes on offline or broken nodes
1479
        continue
1480
      for volume in node_vol_should[node]:
1481
        test = volume not in n_img.volumes
1482
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1483
                 "volume %s missing on node %s", volume, node)
1484

    
1485
    if instanceconfig.admin_up:
1486
      pri_img = node_image[node_current]
1487
      test = instance not in pri_img.instances and not pri_img.offline
1488
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1489
               "instance not running on its primary node %s",
1490
               node_current)
1491

    
1492
    for node, n_img in node_image.items():
1493
      if (not node == node_current):
1494
        test = instance in n_img.instances
1495
        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1496
                 "instance should not run on node %s", node)
1497

    
1498
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1499
    """Verify if there are any unknown volumes in the cluster.
1500

1501
    The .os, .swap and backup volumes are ignored. All other volumes are
1502
    reported as unknown.
1503

1504
    @type reserved: L{ganeti.utils.FieldSet}
1505
    @param reserved: a FieldSet of reserved volume names
1506

1507
    """
1508
    for node, n_img in node_image.items():
1509
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1510
        # skip non-healthy nodes
1511
        continue
1512
      for volume in n_img.volumes:
1513
        test = ((node not in node_vol_should or
1514
                volume not in node_vol_should[node]) and
1515
                not reserved.Matches(volume))
1516
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1517
                      "volume %s is unknown", volume)
1518

    
1519
  def _VerifyOrphanInstances(self, instancelist, node_image):
1520
    """Verify the list of running instances.
1521

1522
    This checks what instances are running but unknown to the cluster.
1523

1524
    """
1525
    for node, n_img in node_image.items():
1526
      for o_inst in n_img.instances:
1527
        test = o_inst not in instancelist
1528
        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1529
                      "instance %s on node %s should not exist", o_inst, node)
1530

    
1531
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1532
    """Verify N+1 Memory Resilience.
1533

1534
    Check that if one single node dies we can still start all the
1535
    instances it was primary for.
1536

1537
    """
1538
    for node, n_img in node_image.items():
1539
      # This code checks that every node which is now listed as
1540
      # secondary has enough memory to host all instances it is
1541
      # supposed to should a single other node in the cluster fail.
1542
      # FIXME: not ready for failover to an arbitrary node
1543
      # FIXME: does not support file-backed instances
1544
      # WARNING: we currently take into account down instances as well
1545
      # as up ones, considering that even if they're down someone
1546
      # might want to start them even in the event of a node failure.
1547
      for prinode, instances in n_img.sbp.items():
1548
        needed_mem = 0
1549
        for instance in instances:
1550
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1551
          if bep[constants.BE_AUTO_BALANCE]:
1552
            needed_mem += bep[constants.BE_MEMORY]
1553
        test = n_img.mfree < needed_mem
1554
        self._ErrorIf(test, self.ENODEN1, node,
1555
                      "not enough memory on to accommodate"
1556
                      " failovers should peer node %s fail", prinode)
1557

    
1558
  def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum,
1559
                       master_files):
1560
    """Verifies and computes the node required file checksums.
1561

1562
    @type ninfo: L{objects.Node}
1563
    @param ninfo: the node to check
1564
    @param nresult: the remote results for the node
1565
    @param file_list: required list of files
1566
    @param local_cksum: dictionary of local files and their checksums
1567
    @param master_files: list of files that only masters should have
1568

1569
    """
1570
    node = ninfo.name
1571
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1572

    
1573
    remote_cksum = nresult.get(constants.NV_FILELIST, None)
1574
    test = not isinstance(remote_cksum, dict)
1575
    _ErrorIf(test, self.ENODEFILECHECK, node,
1576
             "node hasn't returned file checksum data")
1577
    if test:
1578
      return
1579

    
1580
    for file_name in file_list:
1581
      node_is_mc = ninfo.master_candidate
1582
      must_have = (file_name not in master_files) or node_is_mc
1583
      # missing
1584
      test1 = file_name not in remote_cksum
1585
      # invalid checksum
1586
      test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1587
      # existing and good
1588
      test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1589
      _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1590
               "file '%s' missing", file_name)
1591
      _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1592
               "file '%s' has wrong checksum", file_name)
1593
      # not candidate and this is not a must-have file
1594
      _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1595
               "file '%s' should not exist on non master"
1596
               " candidates (and the file is outdated)", file_name)
1597
      # all good, except non-master/non-must have combination
1598
      _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1599
               "file '%s' should not exist"
1600
               " on non master candidates", file_name)
1601

    
1602
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
1603
                      drbd_map):
1604
    """Verifies and the node DRBD status.
1605

1606
    @type ninfo: L{objects.Node}
1607
    @param ninfo: the node to check
1608
    @param nresult: the remote results for the node
1609
    @param instanceinfo: the dict of instances
1610
    @param drbd_helper: the configured DRBD usermode helper
1611
    @param drbd_map: the DRBD map as returned by
1612
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
1613

1614
    """
1615
    node = ninfo.name
1616
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1617

    
1618
    if drbd_helper:
1619
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
1620
      test = (helper_result == None)
1621
      _ErrorIf(test, self.ENODEDRBDHELPER, node,
1622
               "no drbd usermode helper returned")
1623
      if helper_result:
1624
        status, payload = helper_result
1625
        test = not status
1626
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
1627
                 "drbd usermode helper check unsuccessful: %s", payload)
1628
        test = status and (payload != drbd_helper)
1629
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
1630
                 "wrong drbd usermode helper: %s", payload)
1631

    
1632
    # compute the DRBD minors
1633
    node_drbd = {}
1634
    for minor, instance in drbd_map[node].items():
1635
      test = instance not in instanceinfo
1636
      _ErrorIf(test, self.ECLUSTERCFG, None,
1637
               "ghost instance '%s' in temporary DRBD map", instance)
1638
        # ghost instance should not be running, but otherwise we
1639
        # don't give double warnings (both ghost instance and
1640
        # unallocated minor in use)
1641
      if test:
1642
        node_drbd[minor] = (instance, False)
1643
      else:
1644
        instance = instanceinfo[instance]
1645
        node_drbd[minor] = (instance.name, instance.admin_up)
1646

    
1647
    # and now check them
1648
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
1649
    test = not isinstance(used_minors, (tuple, list))
1650
    _ErrorIf(test, self.ENODEDRBD, node,
1651
             "cannot parse drbd status file: %s", str(used_minors))
1652
    if test:
1653
      # we cannot check drbd status
1654
      return
1655

    
1656
    for minor, (iname, must_exist) in node_drbd.items():
1657
      test = minor not in used_minors and must_exist
1658
      _ErrorIf(test, self.ENODEDRBD, node,
1659
               "drbd minor %d of instance %s is not active", minor, iname)
1660
    for minor in used_minors:
1661
      test = minor not in node_drbd
1662
      _ErrorIf(test, self.ENODEDRBD, node,
1663
               "unallocated drbd minor %d is in use", minor)
1664

    
1665
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
1666
    """Builds the node OS structures.
1667

1668
    @type ninfo: L{objects.Node}
1669
    @param ninfo: the node to check
1670
    @param nresult: the remote results for the node
1671
    @param nimg: the node image object
1672

1673
    """
1674
    node = ninfo.name
1675
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1676

    
1677
    remote_os = nresult.get(constants.NV_OSLIST, None)
1678
    test = (not isinstance(remote_os, list) or
1679
            not compat.all(isinstance(v, list) and len(v) == 7
1680
                           for v in remote_os))
1681

    
1682
    _ErrorIf(test, self.ENODEOS, node,
1683
             "node hasn't returned valid OS data")
1684

    
1685
    nimg.os_fail = test
1686

    
1687
    if test:
1688
      return
1689

    
1690
    os_dict = {}
1691

    
1692
    for (name, os_path, status, diagnose,
1693
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
1694

    
1695
      if name not in os_dict:
1696
        os_dict[name] = []
1697

    
1698
      # parameters is a list of lists instead of list of tuples due to
1699
      # JSON lacking a real tuple type, fix it:
1700
      parameters = [tuple(v) for v in parameters]
1701
      os_dict[name].append((os_path, status, diagnose,
1702
                            set(variants), set(parameters), set(api_ver)))
1703

    
1704
    nimg.oslist = os_dict
1705

    
1706
  def _VerifyNodeOS(self, ninfo, nimg, base):
1707
    """Verifies the node OS list.
1708

1709
    @type ninfo: L{objects.Node}
1710
    @param ninfo: the node to check
1711
    @param nimg: the node image object
1712
    @param base: the 'template' node we match against (e.g. from the master)
1713

1714
    """
1715
    node = ninfo.name
1716
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1717

    
1718
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
1719

    
1720
    for os_name, os_data in nimg.oslist.items():
1721
      assert os_data, "Empty OS status for OS %s?!" % os_name
1722
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
1723
      _ErrorIf(not f_status, self.ENODEOS, node,
1724
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
1725
      _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
1726
               "OS '%s' has multiple entries (first one shadows the rest): %s",
1727
               os_name, utils.CommaJoin([v[0] for v in os_data]))
1728
      # this will catched in backend too
1729
      _ErrorIf(compat.any(v >= constants.OS_API_V15 for v in f_api)
1730
               and not f_var, self.ENODEOS, node,
1731
               "OS %s with API at least %d does not declare any variant",
1732
               os_name, constants.OS_API_V15)
1733
      # comparisons with the 'base' image
1734
      test = os_name not in base.oslist
1735
      _ErrorIf(test, self.ENODEOS, node,
1736
               "Extra OS %s not present on reference node (%s)",
1737
               os_name, base.name)
1738
      if test:
1739
        continue
1740
      assert base.oslist[os_name], "Base node has empty OS status?"
1741
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
1742
      if not b_status:
1743
        # base OS is invalid, skipping
1744
        continue
1745
      for kind, a, b in [("API version", f_api, b_api),
1746
                         ("variants list", f_var, b_var),
1747
                         ("parameters", f_param, b_param)]:
1748
        _ErrorIf(a != b, self.ENODEOS, node,
1749
                 "OS %s %s differs from reference node %s: %s vs. %s",
1750
                 kind, os_name, base.name,
1751
                 utils.CommaJoin(a), utils.CommaJoin(b))
1752

    
1753
    # check any missing OSes
1754
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
1755
    _ErrorIf(missing, self.ENODEOS, node,
1756
             "OSes present on reference node %s but missing on this node: %s",
1757
             base.name, utils.CommaJoin(missing))
1758

    
1759
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
1760
    """Verifies and updates the node volume data.
1761

1762
    This function will update a L{NodeImage}'s internal structures
1763
    with data from the remote call.
1764

1765
    @type ninfo: L{objects.Node}
1766
    @param ninfo: the node to check
1767
    @param nresult: the remote results for the node
1768
    @param nimg: the node image object
1769
    @param vg_name: the configured VG name
1770

1771
    """
1772
    node = ninfo.name
1773
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1774

    
1775
    nimg.lvm_fail = True
1776
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1777
    if vg_name is None:
1778
      pass
1779
    elif isinstance(lvdata, basestring):
1780
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1781
               utils.SafeEncode(lvdata))
1782
    elif not isinstance(lvdata, dict):
1783
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1784
    else:
1785
      nimg.volumes = lvdata
1786
      nimg.lvm_fail = False
1787

    
1788
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
1789
    """Verifies and updates the node instance list.
1790

1791
    If the listing was successful, then updates this node's instance
1792
    list. Otherwise, it marks the RPC call as failed for the instance
1793
    list key.
1794

1795
    @type ninfo: L{objects.Node}
1796
    @param ninfo: the node to check
1797
    @param nresult: the remote results for the node
1798
    @param nimg: the node image object
1799

1800
    """
1801
    idata = nresult.get(constants.NV_INSTANCELIST, None)
1802
    test = not isinstance(idata, list)
1803
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
1804
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
1805
    if test:
1806
      nimg.hyp_fail = True
1807
    else:
1808
      nimg.instances = idata
1809

    
1810
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
1811
    """Verifies and computes a node information map
1812

1813
    @type ninfo: L{objects.Node}
1814
    @param ninfo: the node to check
1815
    @param nresult: the remote results for the node
1816
    @param nimg: the node image object
1817
    @param vg_name: the configured VG name
1818

1819
    """
1820
    node = ninfo.name
1821
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1822

    
1823
    # try to read free memory (from the hypervisor)
1824
    hv_info = nresult.get(constants.NV_HVINFO, None)
1825
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
1826
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1827
    if not test:
1828
      try:
1829
        nimg.mfree = int(hv_info["memory_free"])
1830
      except (ValueError, TypeError):
1831
        _ErrorIf(True, self.ENODERPC, node,
1832
                 "node returned invalid nodeinfo, check hypervisor")
1833

    
1834
    # FIXME: devise a free space model for file based instances as well
1835
    if vg_name is not None:
1836
      test = (constants.NV_VGLIST not in nresult or
1837
              vg_name not in nresult[constants.NV_VGLIST])
1838
      _ErrorIf(test, self.ENODELVM, node,
1839
               "node didn't return data for the volume group '%s'"
1840
               " - it is either missing or broken", vg_name)
1841
      if not test:
1842
        try:
1843
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
1844
        except (ValueError, TypeError):
1845
          _ErrorIf(True, self.ENODERPC, node,
1846
                   "node returned invalid LVM info, check LVM status")
1847

    
1848
  def BuildHooksEnv(self):
1849
    """Build hooks env.
1850

1851
    Cluster-Verify hooks just ran in the post phase and their failure makes
1852
    the output be logged in the verify output and the verification to fail.
1853

1854
    """
1855
    all_nodes = self.cfg.GetNodeList()
1856
    env = {
1857
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1858
      }
1859
    for node in self.cfg.GetAllNodesInfo().values():
1860
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1861

    
1862
    return env, [], all_nodes
1863

    
1864
  def Exec(self, feedback_fn):
1865
    """Verify integrity of cluster, performing various test on nodes.
1866

1867
    """
1868
    self.bad = False
1869
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1870
    verbose = self.op.verbose
1871
    self._feedback_fn = feedback_fn
1872
    feedback_fn("* Verifying global settings")
1873
    for msg in self.cfg.VerifyConfig():
1874
      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1875

    
1876
    # Check the cluster certificates
1877
    for cert_filename in constants.ALL_CERT_FILES:
1878
      (errcode, msg) = _VerifyCertificate(cert_filename)
1879
      _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
1880

    
1881
    vg_name = self.cfg.GetVGName()
1882
    drbd_helper = self.cfg.GetDRBDHelper()
1883
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1884
    cluster = self.cfg.GetClusterInfo()
1885
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
1886
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1887
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1888
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1889
                        for iname in instancelist)
1890
    i_non_redundant = [] # Non redundant instances
1891
    i_non_a_balanced = [] # Non auto-balanced instances
1892
    n_offline = 0 # Count of offline nodes
1893
    n_drained = 0 # Count of nodes being drained
1894
    node_vol_should = {}
1895

    
1896
    # FIXME: verify OS list
1897
    # do local checksums
1898
    master_files = [constants.CLUSTER_CONF_FILE]
1899
    master_node = self.master_node = self.cfg.GetMasterNode()
1900
    master_ip = self.cfg.GetMasterIP()
1901

    
1902
    file_names = ssconf.SimpleStore().GetFileList()
1903
    file_names.extend(constants.ALL_CERT_FILES)
1904
    file_names.extend(master_files)
1905
    if cluster.modify_etc_hosts:
1906
      file_names.append(constants.ETC_HOSTS)
1907

    
1908
    local_checksums = utils.FingerprintFiles(file_names)
1909

    
1910
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1911
    node_verify_param = {
1912
      constants.NV_FILELIST: file_names,
1913
      constants.NV_NODELIST: [node.name for node in nodeinfo
1914
                              if not node.offline],
1915
      constants.NV_HYPERVISOR: hypervisors,
1916
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1917
                                  node.secondary_ip) for node in nodeinfo
1918
                                 if not node.offline],
1919
      constants.NV_INSTANCELIST: hypervisors,
1920
      constants.NV_VERSION: None,
1921
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1922
      constants.NV_NODESETUP: None,
1923
      constants.NV_TIME: None,
1924
      constants.NV_MASTERIP: (master_node, master_ip),
1925
      constants.NV_OSLIST: None,
1926
      }
1927

    
1928
    if vg_name is not None:
1929
      node_verify_param[constants.NV_VGLIST] = None
1930
      node_verify_param[constants.NV_LVLIST] = vg_name
1931
      node_verify_param[constants.NV_PVLIST] = [vg_name]
1932
      node_verify_param[constants.NV_DRBDLIST] = None
1933

    
1934
    if drbd_helper:
1935
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
1936

    
1937
    # Build our expected cluster state
1938
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
1939
                                                 name=node.name))
1940
                      for node in nodeinfo)
1941

    
1942
    for instance in instancelist:
1943
      inst_config = instanceinfo[instance]
1944

    
1945
      for nname in inst_config.all_nodes:
1946
        if nname not in node_image:
1947
          # ghost node
1948
          gnode = self.NodeImage(name=nname)
1949
          gnode.ghost = True
1950
          node_image[nname] = gnode
1951

    
1952
      inst_config.MapLVsByNode(node_vol_should)
1953

    
1954
      pnode = inst_config.primary_node
1955
      node_image[pnode].pinst.append(instance)
1956

    
1957
      for snode in inst_config.secondary_nodes:
1958
        nimg = node_image[snode]
1959
        nimg.sinst.append(instance)
1960
        if pnode not in nimg.sbp:
1961
          nimg.sbp[pnode] = []
1962
        nimg.sbp[pnode].append(instance)
1963

    
1964
    # At this point, we have the in-memory data structures complete,
1965
    # except for the runtime information, which we'll gather next
1966

    
1967
    # Due to the way our RPC system works, exact response times cannot be
1968
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
1969
    # time before and after executing the request, we can at least have a time
1970
    # window.
1971
    nvinfo_starttime = time.time()
1972
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1973
                                           self.cfg.GetClusterName())
1974
    nvinfo_endtime = time.time()
1975

    
1976
    all_drbd_map = self.cfg.ComputeDRBDMap()
1977

    
1978
    feedback_fn("* Verifying node status")
1979

    
1980
    refos_img = None
1981

    
1982
    for node_i in nodeinfo:
1983
      node = node_i.name
1984
      nimg = node_image[node]
1985

    
1986
      if node_i.offline:
1987
        if verbose:
1988
          feedback_fn("* Skipping offline node %s" % (node,))
1989
        n_offline += 1
1990
        continue
1991

    
1992
      if node == master_node:
1993
        ntype = "master"
1994
      elif node_i.master_candidate:
1995
        ntype = "master candidate"
1996
      elif node_i.drained:
1997
        ntype = "drained"
1998
        n_drained += 1
1999
      else:
2000
        ntype = "regular"
2001
      if verbose:
2002
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2003

    
2004
      msg = all_nvinfo[node].fail_msg
2005
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
2006
      if msg:
2007
        nimg.rpc_fail = True
2008
        continue
2009

    
2010
      nresult = all_nvinfo[node].payload
2011

    
2012
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2013
      self._VerifyNodeNetwork(node_i, nresult)
2014
      self._VerifyNodeLVM(node_i, nresult, vg_name)
2015
      self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums,
2016
                            master_files)
2017
      self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper,
2018
                           all_drbd_map)
2019
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2020

    
2021
      self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2022
      self._UpdateNodeInstances(node_i, nresult, nimg)
2023
      self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2024
      self._UpdateNodeOS(node_i, nresult, nimg)
2025
      if not nimg.os_fail:
2026
        if refos_img is None:
2027
          refos_img = nimg
2028
        self._VerifyNodeOS(node_i, nimg, refos_img)
2029

    
2030
    feedback_fn("* Verifying instance status")
2031
    for instance in instancelist:
2032
      if verbose:
2033
        feedback_fn("* Verifying instance %s" % instance)
2034
      inst_config = instanceinfo[instance]
2035
      self._VerifyInstance(instance, inst_config, node_image)
2036
      inst_nodes_offline = []
2037

    
2038
      pnode = inst_config.primary_node
2039
      pnode_img = node_image[pnode]
2040
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2041
               self.ENODERPC, pnode, "instance %s, connection to"
2042
               " primary node failed", instance)
2043

    
2044
      if pnode_img.offline:
2045
        inst_nodes_offline.append(pnode)
2046

    
2047
      # If the instance is non-redundant we cannot survive losing its primary
2048
      # node, so we are not N+1 compliant. On the other hand we have no disk
2049
      # templates with more than one secondary so that situation is not well
2050
      # supported either.
2051
      # FIXME: does not support file-backed instances
2052
      if not inst_config.secondary_nodes:
2053
        i_non_redundant.append(instance)
2054
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
2055
               instance, "instance has multiple secondary nodes: %s",
2056
               utils.CommaJoin(inst_config.secondary_nodes),
2057
               code=self.ETYPE_WARNING)
2058

    
2059
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2060
        i_non_a_balanced.append(instance)
2061

    
2062
      for snode in inst_config.secondary_nodes:
2063
        s_img = node_image[snode]
2064
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
2065
                 "instance %s, connection to secondary node failed", instance)
2066

    
2067
        if s_img.offline:
2068
          inst_nodes_offline.append(snode)
2069

    
2070
      # warn that the instance lives on offline nodes
2071
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
2072
               "instance lives on offline node(s) %s",
2073
               utils.CommaJoin(inst_nodes_offline))
2074
      # ... or ghost nodes
2075
      for node in inst_config.all_nodes:
2076
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
2077
                 "instance lives on ghost node %s", node)
2078

    
2079
    feedback_fn("* Verifying orphan volumes")
2080
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2081
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2082

    
2083
    feedback_fn("* Verifying orphan instances")
2084
    self._VerifyOrphanInstances(instancelist, node_image)
2085

    
2086
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2087
      feedback_fn("* Verifying N+1 Memory redundancy")
2088
      self._VerifyNPlusOneMemory(node_image, instanceinfo)
2089

    
2090
    feedback_fn("* Other Notes")
2091
    if i_non_redundant:
2092
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2093
                  % len(i_non_redundant))
2094

    
2095
    if i_non_a_balanced:
2096
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2097
                  % len(i_non_a_balanced))
2098

    
2099
    if n_offline:
2100
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2101

    
2102
    if n_drained:
2103
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2104

    
2105
    return not self.bad
2106

    
2107
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2108
    """Analyze the post-hooks' result
2109

2110
    This method analyses the hook result, handles it, and sends some
2111
    nicely-formatted feedback back to the user.
2112

2113
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2114
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2115
    @param hooks_results: the results of the multi-node hooks rpc call
2116
    @param feedback_fn: function used send feedback back to the caller
2117
    @param lu_result: previous Exec result
2118
    @return: the new Exec result, based on the previous result
2119
        and hook results
2120

2121
    """
2122
    # We only really run POST phase hooks, and are only interested in
2123
    # their results
2124
    if phase == constants.HOOKS_PHASE_POST:
2125
      # Used to change hooks' output to proper indentation
2126
      indent_re = re.compile('^', re.M)
2127
      feedback_fn("* Hooks Results")
2128
      assert hooks_results, "invalid result from hooks"
2129

    
2130
      for node_name in hooks_results:
2131
        res = hooks_results[node_name]
2132
        msg = res.fail_msg
2133
        test = msg and not res.offline
2134
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
2135
                      "Communication failure in hooks execution: %s", msg)
2136
        if res.offline or msg:
2137
          # No need to investigate payload if node is offline or gave an error.
2138
          # override manually lu_result here as _ErrorIf only
2139
          # overrides self.bad
2140
          lu_result = 1
2141
          continue
2142
        for script, hkr, output in res.payload:
2143
          test = hkr == constants.HKR_FAIL
2144
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
2145
                        "Script %s failed, output:", script)
2146
          if test:
2147
            output = indent_re.sub('      ', output)
2148
            feedback_fn("%s" % output)
2149
            lu_result = 0
2150

    
2151
      return lu_result
2152

    
2153

    
2154
class LUVerifyDisks(NoHooksLU):
2155
  """Verifies the cluster disks status.
2156

2157
  """
2158
  REQ_BGL = False
2159

    
2160
  def ExpandNames(self):
2161
    self.needed_locks = {
2162
      locking.LEVEL_NODE: locking.ALL_SET,
2163
      locking.LEVEL_INSTANCE: locking.ALL_SET,
2164
    }
2165
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
2166

    
2167
  def Exec(self, feedback_fn):
2168
    """Verify integrity of cluster disks.
2169

2170
    @rtype: tuple of three items
2171
    @return: a tuple of (dict of node-to-node_error, list of instances
2172
        which need activate-disks, dict of instance: (node, volume) for
2173
        missing volumes
2174

2175
    """
2176
    result = res_nodes, res_instances, res_missing = {}, [], {}
2177

    
2178
    vg_name = self.cfg.GetVGName()
2179
    nodes = utils.NiceSort(self.cfg.GetNodeList())
2180
    instances = [self.cfg.GetInstanceInfo(name)
2181
                 for name in self.cfg.GetInstanceList()]
2182

    
2183
    nv_dict = {}
2184
    for inst in instances:
2185
      inst_lvs = {}
2186
      if (not inst.admin_up or
2187
          inst.disk_template not in constants.DTS_NET_MIRROR):
2188
        continue
2189
      inst.MapLVsByNode(inst_lvs)
2190
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
2191
      for node, vol_list in inst_lvs.iteritems():
2192
        for vol in vol_list:
2193
          nv_dict[(node, vol)] = inst
2194

    
2195
    if not nv_dict:
2196
      return result
2197

    
2198
    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
2199

    
2200
    for node in nodes:
2201
      # node_volume
2202
      node_res = node_lvs[node]
2203
      if node_res.offline:
2204
        continue
2205
      msg = node_res.fail_msg
2206
      if msg:
2207
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
2208
        res_nodes[node] = msg
2209
        continue
2210

    
2211
      lvs = node_res.payload
2212
      for lv_name, (_, _, lv_online) in lvs.items():
2213
        inst = nv_dict.pop((node, lv_name), None)
2214
        if (not lv_online and inst is not None
2215
            and inst.name not in res_instances):
2216
          res_instances.append(inst.name)
2217

    
2218
    # any leftover items in nv_dict are missing LVs, let's arrange the
2219
    # data better
2220
    for key, inst in nv_dict.iteritems():
2221
      if inst.name not in res_missing:
2222
        res_missing[inst.name] = []
2223
      res_missing[inst.name].append(key)
2224

    
2225
    return result
2226

    
2227

    
2228
class LURepairDiskSizes(NoHooksLU):
2229
  """Verifies the cluster disks sizes.
2230

2231
  """
2232
  _OP_PARAMS = [("instances", ht.EmptyList, ht.TListOf(ht.TNonEmptyString))]
2233
  REQ_BGL = False
2234

    
2235
  def ExpandNames(self):
2236
    if self.op.instances:
2237
      self.wanted_names = []
2238
      for name in self.op.instances:
2239
        full_name = _ExpandInstanceName(self.cfg, name)
2240
        self.wanted_names.append(full_name)
2241
      self.needed_locks = {
2242
        locking.LEVEL_NODE: [],
2243
        locking.LEVEL_INSTANCE: self.wanted_names,
2244
        }
2245
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2246
    else:
2247
      self.wanted_names = None
2248
      self.needed_locks = {
2249
        locking.LEVEL_NODE: locking.ALL_SET,
2250
        locking.LEVEL_INSTANCE: locking.ALL_SET,
2251
        }
2252
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
2253

    
2254
  def DeclareLocks(self, level):
2255
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
2256
      self._LockInstancesNodes(primary_only=True)
2257

    
2258
  def CheckPrereq(self):
2259
    """Check prerequisites.
2260

2261
    This only checks the optional instance list against the existing names.
2262

2263
    """
2264
    if self.wanted_names is None:
2265
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2266

    
2267
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
2268
                             in self.wanted_names]
2269

    
2270
  def _EnsureChildSizes(self, disk):
2271
    """Ensure children of the disk have the needed disk size.
2272

2273
    This is valid mainly for DRBD8 and fixes an issue where the
2274
    children have smaller disk size.
2275

2276
    @param disk: an L{ganeti.objects.Disk} object
2277

2278
    """
2279
    if disk.dev_type == constants.LD_DRBD8:
2280
      assert disk.children, "Empty children for DRBD8?"
2281
      fchild = disk.children[0]
2282
      mismatch = fchild.size < disk.size
2283
      if mismatch:
2284
        self.LogInfo("Child disk has size %d, parent %d, fixing",
2285
                     fchild.size, disk.size)
2286
        fchild.size = disk.size
2287

    
2288
      # and we recurse on this child only, not on the metadev
2289
      return self._EnsureChildSizes(fchild) or mismatch
2290
    else:
2291
      return False
2292

    
2293
  def Exec(self, feedback_fn):
2294
    """Verify the size of cluster disks.
2295

2296
    """
2297
    # TODO: check child disks too
2298
    # TODO: check differences in size between primary/secondary nodes
2299
    per_node_disks = {}
2300
    for instance in self.wanted_instances:
2301
      pnode = instance.primary_node
2302
      if pnode not in per_node_disks:
2303
        per_node_disks[pnode] = []
2304
      for idx, disk in enumerate(instance.disks):
2305
        per_node_disks[pnode].append((instance, idx, disk))
2306

    
2307
    changed = []
2308
    for node, dskl in per_node_disks.items():
2309
      newl = [v[2].Copy() for v in dskl]
2310
      for dsk in newl:
2311
        self.cfg.SetDiskID(dsk, node)
2312
      result = self.rpc.call_blockdev_getsizes(node, newl)
2313
      if result.fail_msg:
2314
        self.LogWarning("Failure in blockdev_getsizes call to node"
2315
                        " %s, ignoring", node)
2316
        continue
2317
      if len(result.data) != len(dskl):
2318
        self.LogWarning("Invalid result from node %s, ignoring node results",
2319
                        node)
2320
        continue
2321
      for ((instance, idx, disk), size) in zip(dskl, result.data):
2322
        if size is None:
2323
          self.LogWarning("Disk %d of instance %s did not return size"
2324
                          " information, ignoring", idx, instance.name)
2325
          continue
2326
        if not isinstance(size, (int, long)):
2327
          self.LogWarning("Disk %d of instance %s did not return valid"
2328
                          " size information, ignoring", idx, instance.name)
2329
          continue
2330
        size = size >> 20
2331
        if size != disk.size:
2332
          self.LogInfo("Disk %d of instance %s has mismatched size,"
2333
                       " correcting: recorded %d, actual %d", idx,
2334
                       instance.name, disk.size, size)
2335
          disk.size = size
2336
          self.cfg.Update(instance, feedback_fn)
2337
          changed.append((instance.name, idx, size))
2338
        if self._EnsureChildSizes(disk):
2339
          self.cfg.Update(instance, feedback_fn)
2340
          changed.append((instance.name, idx, disk.size))
2341
    return changed
2342

    
2343

    
2344
class LURenameCluster(LogicalUnit):
2345
  """Rename the cluster.
2346

2347
  """
2348
  HPATH = "cluster-rename"
2349
  HTYPE = constants.HTYPE_CLUSTER
2350
  _OP_PARAMS = [("name", ht.NoDefault, ht.TNonEmptyString)]
2351

    
2352
  def BuildHooksEnv(self):
2353
    """Build hooks env.
2354

2355
    """
2356
    env = {
2357
      "OP_TARGET": self.cfg.GetClusterName(),
2358
      "NEW_NAME": self.op.name,
2359
      }
2360
    mn = self.cfg.GetMasterNode()
2361
    all_nodes = self.cfg.GetNodeList()
2362
    return env, [mn], all_nodes
2363

    
2364
  def CheckPrereq(self):
2365
    """Verify that the passed name is a valid one.
2366

2367
    """
2368
    hostname = netutils.GetHostname(name=self.op.name,
2369
                                    family=self.cfg.GetPrimaryIPFamily())
2370

    
2371
    new_name = hostname.name
2372
    self.ip = new_ip = hostname.ip
2373
    old_name = self.cfg.GetClusterName()
2374
    old_ip = self.cfg.GetMasterIP()
2375
    if new_name == old_name and new_ip == old_ip:
2376
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
2377
                                 " cluster has changed",
2378
                                 errors.ECODE_INVAL)
2379
    if new_ip != old_ip:
2380
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
2381
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
2382
                                   " reachable on the network" %
2383
                                   new_ip, errors.ECODE_NOTUNIQUE)
2384

    
2385
    self.op.name = new_name
2386

    
2387
  def Exec(self, feedback_fn):
2388
    """Rename the cluster.
2389

2390
    """
2391
    clustername = self.op.name
2392
    ip = self.ip
2393

    
2394
    # shutdown the master IP
2395
    master = self.cfg.GetMasterNode()
2396
    result = self.rpc.call_node_stop_master(master, False)
2397
    result.Raise("Could not disable the master role")
2398

    
2399
    try:
2400
      cluster = self.cfg.GetClusterInfo()
2401
      cluster.cluster_name = clustername
2402
      cluster.master_ip = ip
2403
      self.cfg.Update(cluster, feedback_fn)
2404

    
2405
      # update the known hosts file
2406
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
2407
      node_list = self.cfg.GetNodeList()
2408
      try:
2409
        node_list.remove(master)
2410
      except ValueError:
2411
        pass
2412
      result = self.rpc.call_upload_file(node_list,
2413
                                         constants.SSH_KNOWN_HOSTS_FILE)
2414
      for to_node, to_result in result.iteritems():
2415
        msg = to_result.fail_msg
2416
        if msg:
2417
          msg = ("Copy of file %s to node %s failed: %s" %
2418
                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
2419
          self.proc.LogWarning(msg)
2420

    
2421
    finally:
2422
      result = self.rpc.call_node_start_master(master, False, False)
2423
      msg = result.fail_msg
2424
      if msg:
2425
        self.LogWarning("Could not re-enable the master role on"
2426
                        " the master, please restart manually: %s", msg)
2427

    
2428
    return clustername
2429

    
2430

    
2431
class LUSetClusterParams(LogicalUnit):
2432
  """Change the parameters of the cluster.
2433

2434
  """
2435
  HPATH = "cluster-modify"
2436
  HTYPE = constants.HTYPE_CLUSTER
2437
  _OP_PARAMS = [
2438
    ("vg_name", None, ht.TMaybeString),
2439
    ("enabled_hypervisors", None,
2440
     ht.TOr(ht.TAnd(ht.TListOf(ht.TElemOf(constants.HYPER_TYPES)), ht.TTrue),
2441
            ht.TNone)),
2442
    ("hvparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
2443
                              ht.TNone)),
2444
    ("beparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
2445
                              ht.TNone)),
2446
    ("os_hvp", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
2447
                            ht.TNone)),
2448
    ("osparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
2449
                              ht.TNone)),
2450
    ("candidate_pool_size", None, ht.TOr(ht.TStrictPositiveInt, ht.TNone)),
2451
    ("uid_pool", None, ht.NoType),
2452
    ("add_uids", None, ht.NoType),
2453
    ("remove_uids", None, ht.NoType),
2454
    ("maintain_node_health", None, ht.TMaybeBool),
2455
    ("nicparams", None, ht.TOr(ht.TDict, ht.TNone)),
2456
    ("drbd_helper", None, ht.TOr(ht.TString, ht.TNone)),
2457
    ("default_iallocator", None, ht.TMaybeString),
2458
    ("reserved_lvs", None, ht.TOr(ht.TListOf(ht.TNonEmptyString), ht.TNone)),
2459
    ("hidden_os", None, ht.TOr(ht.TListOf(\
2460
          ht.TAnd(ht.TList,
2461
                ht.TIsLength(2),
2462
                ht.TMap(lambda v: v[0], ht.TElemOf(constants.DDMS_VALUES)))),
2463
          ht.TNone)),
2464
    ("blacklisted_os", None, ht.TOr(ht.TListOf(\
2465
          ht.TAnd(ht.TList,
2466
                ht.TIsLength(2),
2467
                ht.TMap(lambda v: v[0], ht.TElemOf(constants.DDMS_VALUES)))),
2468
          ht.TNone)),
2469
    ]
2470
  REQ_BGL = False
2471

    
2472
  def CheckArguments(self):
2473
    """Check parameters
2474

2475
    """
2476
    if self.op.uid_pool:
2477
      uidpool.CheckUidPool(self.op.uid_pool)
2478

    
2479
    if self.op.add_uids:
2480
      uidpool.CheckUidPool(self.op.add_uids)
2481

    
2482
    if self.op.remove_uids:
2483
      uidpool.CheckUidPool(self.op.remove_uids)
2484

    
2485
  def ExpandNames(self):
2486
    # FIXME: in the future maybe other cluster params won't require checking on
2487
    # all nodes to be modified.
2488
    self.needed_locks = {
2489
      locking.LEVEL_NODE: locking.ALL_SET,
2490
    }
2491
    self.share_locks[locking.LEVEL_NODE] = 1
2492

    
2493
  def BuildHooksEnv(self):
2494
    """Build hooks env.
2495

2496
    """
2497
    env = {
2498
      "OP_TARGET": self.cfg.GetClusterName(),
2499
      "NEW_VG_NAME": self.op.vg_name,
2500
      }
2501
    mn = self.cfg.GetMasterNode()
2502
    return env, [mn], [mn]
2503

    
2504
  def CheckPrereq(self):
2505
    """Check prerequisites.
2506

2507
    This checks whether the given params don't conflict and
2508
    if the given volume group is valid.
2509

2510
    """
2511
    if self.op.vg_name is not None and not self.op.vg_name:
2512
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
2513
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
2514
                                   " instances exist", errors.ECODE_INVAL)
2515

    
2516
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
2517
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
2518
        raise errors.OpPrereqError("Cannot disable drbd helper while"
2519
                                   " drbd-based instances exist",
2520
                                   errors.ECODE_INVAL)
2521

    
2522
    node_list = self.acquired_locks[locking.LEVEL_NODE]
2523

    
2524
    # if vg_name not None, checks given volume group on all nodes
2525
    if self.op.vg_name:
2526
      vglist = self.rpc.call_vg_list(node_list)
2527
      for node in node_list:
2528
        msg = vglist[node].fail_msg
2529
        if msg:
2530
          # ignoring down node
2531
          self.LogWarning("Error while gathering data on node %s"
2532
                          " (ignoring node): %s", node, msg)
2533
          continue
2534
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2535
                                              self.op.vg_name,
2536
                                              constants.MIN_VG_SIZE)
2537
        if vgstatus:
2538
          raise errors.OpPrereqError("Error on node '%s': %s" %
2539
                                     (node, vgstatus), errors.ECODE_ENVIRON)
2540

    
2541
    if self.op.drbd_helper:
2542
      # checks given drbd helper on all nodes
2543
      helpers = self.rpc.call_drbd_helper(node_list)
2544
      for node in node_list:
2545
        ninfo = self.cfg.GetNodeInfo(node)
2546
        if ninfo.offline:
2547
          self.LogInfo("Not checking drbd helper on offline node %s", node)
2548
          continue
2549
        msg = helpers[node].fail_msg
2550
        if msg:
2551
          raise errors.OpPrereqError("Error checking drbd helper on node"
2552
                                     " '%s': %s" % (node, msg),
2553
                                     errors.ECODE_ENVIRON)
2554
        node_helper = helpers[node].payload
2555
        if node_helper != self.op.drbd_helper:
2556
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
2557
                                     (node, node_helper), errors.ECODE_ENVIRON)
2558

    
2559
    self.cluster = cluster = self.cfg.GetClusterInfo()
2560
    # validate params changes
2561
    if self.op.beparams:
2562
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2563
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
2564

    
2565
    if self.op.nicparams:
2566
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2567
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
2568
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
2569
      nic_errors = []
2570

    
2571
      # check all instances for consistency
2572
      for instance in self.cfg.GetAllInstancesInfo().values():
2573
        for nic_idx, nic in enumerate(instance.nics):
2574
          params_copy = copy.deepcopy(nic.nicparams)
2575
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
2576

    
2577
          # check parameter syntax
2578
          try:
2579
            objects.NIC.CheckParameterSyntax(params_filled)
2580
          except errors.ConfigurationError, err:
2581
            nic_errors.append("Instance %s, nic/%d: %s" %
2582
                              (instance.name, nic_idx, err))
2583

    
2584
          # if we're moving instances to routed, check that they have an ip
2585
          target_mode = params_filled[constants.NIC_MODE]
2586
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2587
            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2588
                              (instance.name, nic_idx))
2589
      if nic_errors:
2590
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2591
                                   "\n".join(nic_errors))
2592

    
2593
    # hypervisor list/parameters
2594
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
2595
    if self.op.hvparams:
2596
      for hv_name, hv_dict in self.op.hvparams.items():
2597
        if hv_name not in self.new_hvparams:
2598
          self.new_hvparams[hv_name] = hv_dict
2599
        else:
2600
          self.new_hvparams[hv_name].update(hv_dict)
2601

    
2602
    # os hypervisor parameters
2603
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
2604
    if self.op.os_hvp:
2605
      for os_name, hvs in self.op.os_hvp.items():
2606
        if os_name not in self.new_os_hvp:
2607
          self.new_os_hvp[os_name] = hvs
2608
        else:
2609
          for hv_name, hv_dict in hvs.items():
2610
            if hv_name not in self.new_os_hvp[os_name]:
2611
              self.new_os_hvp[os_name][hv_name] = hv_dict
2612
            else:
2613
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
2614

    
2615
    # os parameters
2616
    self.new_osp = objects.FillDict(cluster.osparams, {})
2617
    if self.op.osparams:
2618
      for os_name, osp in self.op.osparams.items():
2619
        if os_name not in self.new_osp:
2620
          self.new_osp[os_name] = {}
2621

    
2622
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
2623
                                                  use_none=True)
2624

    
2625
        if not self.new_osp[os_name]:
2626
          # we removed all parameters
2627
          del self.new_osp[os_name]
2628
        else:
2629
          # check the parameter validity (remote check)
2630
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
2631
                         os_name, self.new_osp[os_name])
2632

    
2633
    # changes to the hypervisor list
2634
    if self.op.enabled_hypervisors is not None:
2635
      self.hv_list = self.op.enabled_hypervisors
2636
      for hv in self.hv_list:
2637
        # if the hypervisor doesn't already exist in the cluster
2638
        # hvparams, we initialize it to empty, and then (in both
2639
        # cases) we make sure to fill the defaults, as we might not
2640
        # have a complete defaults list if the hypervisor wasn't
2641
        # enabled before
2642
        if hv not in new_hvp:
2643
          new_hvp[hv] = {}
2644
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
2645
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
2646
    else:
2647
      self.hv_list = cluster.enabled_hypervisors
2648

    
2649
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
2650
      # either the enabled list has changed, or the parameters have, validate
2651
      for hv_name, hv_params in self.new_hvparams.items():
2652
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
2653
            (self.op.enabled_hypervisors and
2654
             hv_name in self.op.enabled_hypervisors)):
2655
          # either this is a new hypervisor, or its parameters have changed
2656
          hv_class = hypervisor.GetHypervisor(hv_name)
2657
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2658
          hv_class.CheckParameterSyntax(hv_params)
2659
          _CheckHVParams(self, node_list, hv_name, hv_params)
2660

    
2661
    if self.op.os_hvp:
2662
      # no need to check any newly-enabled hypervisors, since the
2663
      # defaults have already been checked in the above code-block
2664
      for os_name, os_hvp in self.new_os_hvp.items():
2665
        for hv_name, hv_params in os_hvp.items():
2666
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2667
          # we need to fill in the new os_hvp on top of the actual hv_p
2668
          cluster_defaults = self.new_hvparams.get(hv_name, {})
2669
          new_osp = objects.FillDict(cluster_defaults, hv_params)
2670
          hv_class = hypervisor.GetHypervisor(hv_name)
2671
          hv_class.CheckParameterSyntax(new_osp)
2672
          _CheckHVParams(self, node_list, hv_name, new_osp)
2673

    
2674
    if self.op.default_iallocator:
2675
      alloc_script = utils.FindFile(self.op.default_iallocator,
2676
                                    constants.IALLOCATOR_SEARCH_PATH,
2677
                                    os.path.isfile)
2678
      if alloc_script is None:
2679
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
2680
                                   " specified" % self.op.default_iallocator,
2681
                                   errors.ECODE_INVAL)
2682

    
2683
  def Exec(self, feedback_fn):
2684
    """Change the parameters of the cluster.
2685

2686
    """
2687
    if self.op.vg_name is not None:
2688
      new_volume = self.op.vg_name
2689
      if not new_volume:
2690
        new_volume = None
2691
      if new_volume != self.cfg.GetVGName():
2692
        self.cfg.SetVGName(new_volume)
2693
      else:
2694
        feedback_fn("Cluster LVM configuration already in desired"
2695
                    " state, not changing")
2696
    if self.op.drbd_helper is not None:
2697
      new_helper = self.op.drbd_helper
2698
      if not new_helper:
2699
        new_helper = None
2700
      if new_helper != self.cfg.GetDRBDHelper():
2701
        self.cfg.SetDRBDHelper(new_helper)
2702
      else:
2703
        feedback_fn("Cluster DRBD helper already in desired state,"
2704
                    " not changing")
2705
    if self.op.hvparams:
2706
      self.cluster.hvparams = self.new_hvparams
2707
    if self.op.os_hvp:
2708
      self.cluster.os_hvp = self.new_os_hvp
2709
    if self.op.enabled_hypervisors is not None:
2710
      self.cluster.hvparams = self.new_hvparams
2711
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2712
    if self.op.beparams:
2713
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2714
    if self.op.nicparams:
2715
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2716
    if self.op.osparams:
2717
      self.cluster.osparams = self.new_osp
2718

    
2719
    if self.op.candidate_pool_size is not None:
2720
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
2721
      # we need to update the pool size here, otherwise the save will fail
2722
      _AdjustCandidatePool(self, [])
2723

    
2724
    if self.op.maintain_node_health is not None:
2725
      self.cluster.maintain_node_health = self.op.maintain_node_health
2726

    
2727
    if self.op.add_uids is not None:
2728
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
2729

    
2730
    if self.op.remove_uids is not None:
2731
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
2732

    
2733
    if self.op.uid_pool is not None:
2734
      self.cluster.uid_pool = self.op.uid_pool
2735

    
2736
    if self.op.default_iallocator is not None:
2737
      self.cluster.default_iallocator = self.op.default_iallocator
2738

    
2739
    if self.op.reserved_lvs is not None:
2740
      self.cluster.reserved_lvs = self.op.reserved_lvs
2741

    
2742
    def helper_os(aname, mods, desc):
2743
      desc += " OS list"
2744
      lst = getattr(self.cluster, aname)
2745
      for key, val in mods:
2746
        if key == constants.DDM_ADD:
2747
          if val in lst:
2748
            feedback_fn("OS %s already in %s, ignoring", val, desc)
2749
          else:
2750
            lst.append(val)
2751
        elif key == constants.DDM_REMOVE:
2752
          if val in lst:
2753
            lst.remove(val)
2754
          else:
2755
            feedback_fn("OS %s not found in %s, ignoring", val, desc)
2756
        else:
2757
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
2758

    
2759
    if self.op.hidden_os:
2760
      helper_os("hidden_os", self.op.hidden_os, "hidden")
2761

    
2762
    if self.op.blacklisted_os:
2763
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
2764

    
2765
    self.cfg.Update(self.cluster, feedback_fn)
2766

    
2767

    
2768
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2769
  """Distribute additional files which are part of the cluster configuration.
2770

2771
  ConfigWriter takes care of distributing the config and ssconf files, but
2772
  there are more files which should be distributed to all nodes. This function
2773
  makes sure those are copied.
2774

2775
  @param lu: calling logical unit
2776
  @param additional_nodes: list of nodes not in the config to distribute to
2777

2778
  """
2779
  # 1. Gather target nodes
2780
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2781
  dist_nodes = lu.cfg.GetOnlineNodeList()
2782
  if additional_nodes is not None:
2783
    dist_nodes.extend(additional_nodes)
2784
  if myself.name in dist_nodes:
2785
    dist_nodes.remove(myself.name)
2786

    
2787
  # 2. Gather files to distribute
2788
  dist_files = set([constants.ETC_HOSTS,
2789
                    constants.SSH_KNOWN_HOSTS_FILE,
2790
                    constants.RAPI_CERT_FILE,
2791
                    constants.RAPI_USERS_FILE,
2792
                    constants.CONFD_HMAC_KEY,
2793
                    constants.CLUSTER_DOMAIN_SECRET_FILE,
2794
                   ])
2795

    
2796
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2797
  for hv_name in enabled_hypervisors:
2798
    hv_class = hypervisor.GetHypervisor(hv_name)
2799
    dist_files.update(hv_class.GetAncillaryFiles())
2800

    
2801
  # 3. Perform the files upload
2802
  for fname in dist_files:
2803
    if os.path.exists(fname):
2804
      result = lu.rpc.call_upload_file(dist_nodes, fname)
2805
      for to_node, to_result in result.items():
2806
        msg = to_result.fail_msg
2807
        if msg:
2808
          msg = ("Copy of file %s to node %s failed: %s" %
2809
                 (fname, to_node, msg))
2810
          lu.proc.LogWarning(msg)
2811

    
2812

    
2813
class LURedistributeConfig(NoHooksLU):
2814
  """Force the redistribution of cluster configuration.
2815

2816
  This is a very simple LU.
2817

2818
  """
2819
  REQ_BGL = False
2820

    
2821
  def ExpandNames(self):
2822
    self.needed_locks = {
2823
      locking.LEVEL_NODE: locking.ALL_SET,
2824
    }
2825
    self.share_locks[locking.LEVEL_NODE] = 1
2826

    
2827
  def Exec(self, feedback_fn):
2828
    """Redistribute the configuration.
2829

2830
    """
2831
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2832
    _RedistributeAncillaryFiles(self)
2833

    
2834

    
2835
def _WaitForSync(lu, instance, disks=None, oneshot=False):
2836
  """Sleep and poll for an instance's disk to sync.
2837

2838
  """
2839
  if not instance.disks or disks is not None and not disks:
2840
    return True
2841

    
2842
  disks = _ExpandCheckDisks(instance, disks)
2843

    
2844
  if not oneshot:
2845
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2846

    
2847
  node = instance.primary_node
2848

    
2849
  for dev in disks:
2850
    lu.cfg.SetDiskID(dev, node)
2851

    
2852
  # TODO: Convert to utils.Retry
2853

    
2854
  retries = 0
2855
  degr_retries = 10 # in seconds, as we sleep 1 second each time
2856
  while True:
2857
    max_time = 0
2858
    done = True
2859
    cumul_degraded = False
2860
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
2861
    msg = rstats.fail_msg
2862
    if msg:
2863
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2864
      retries += 1
2865
      if retries >= 10:
2866
        raise errors.RemoteError("Can't contact node %s for mirror data,"
2867
                                 " aborting." % node)
2868
      time.sleep(6)
2869
      continue
2870
    rstats = rstats.payload
2871
    retries = 0
2872
    for i, mstat in enumerate(rstats):
2873
      if mstat is None:
2874
        lu.LogWarning("Can't compute data for node %s/%s",
2875
                           node, disks[i].iv_name)
2876
        continue
2877

    
2878
      cumul_degraded = (cumul_degraded or
2879
                        (mstat.is_degraded and mstat.sync_percent is None))
2880
      if mstat.sync_percent is not None:
2881
        done = False
2882
        if mstat.estimated_time is not None:
2883
          rem_time = ("%s remaining (estimated)" %
2884
                      utils.FormatSeconds(mstat.estimated_time))
2885
          max_time = mstat.estimated_time
2886
        else:
2887
          rem_time = "no time estimate"
2888
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2889
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
2890

    
2891
    # if we're done but degraded, let's do a few small retries, to
2892
    # make sure we see a stable and not transient situation; therefore
2893
    # we force restart of the loop
2894
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
2895
      logging.info("Degraded disks found, %d retries left", degr_retries)
2896
      degr_retries -= 1
2897
      time.sleep(1)
2898
      continue
2899

    
2900
    if done or oneshot:
2901
      break
2902

    
2903
    time.sleep(min(60, max_time))
2904

    
2905
  if done:
2906
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2907
  return not cumul_degraded
2908

    
2909

    
2910
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2911
  """Check that mirrors are not degraded.
2912

2913
  The ldisk parameter, if True, will change the test from the
2914
  is_degraded attribute (which represents overall non-ok status for
2915
  the device(s)) to the ldisk (representing the local storage status).
2916

2917
  """
2918
  lu.cfg.SetDiskID(dev, node)
2919

    
2920
  result = True
2921

    
2922
  if on_primary or dev.AssembleOnSecondary():
2923
    rstats = lu.rpc.call_blockdev_find(node, dev)
2924
    msg = rstats.fail_msg
2925
    if msg:
2926
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2927
      result = False
2928
    elif not rstats.payload:
2929
      lu.LogWarning("Can't find disk on node %s", node)
2930
      result = False
2931
    else:
2932
      if ldisk:
2933
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2934
      else:
2935
        result = result and not rstats.payload.is_degraded
2936

    
2937
  if dev.children:
2938
    for child in dev.children:
2939
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2940

    
2941
  return result
2942

    
2943

    
2944
class LUDiagnoseOS(NoHooksLU):
2945
  """Logical unit for OS diagnose/query.
2946

2947
  """
2948
  _OP_PARAMS = [
2949
    _POutputFields,
2950
    ("names", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
2951
    ]
2952
  REQ_BGL = False
2953
  _HID = "hidden"
2954
  _BLK = "blacklisted"
2955
  _VLD = "valid"
2956
  _FIELDS_STATIC = utils.FieldSet()
2957
  _FIELDS_DYNAMIC = utils.FieldSet("name", _VLD, "node_status", "variants",
2958
                                   "parameters", "api_versions", _HID, _BLK)
2959

    
2960
  def CheckArguments(self):
2961
    if self.op.names:
2962
      raise errors.OpPrereqError("Selective OS query not supported",
2963
                                 errors.ECODE_INVAL)
2964

    
2965
    _CheckOutputFields(static=self._FIELDS_STATIC,
2966
                       dynamic=self._FIELDS_DYNAMIC,
2967
                       selected=self.op.output_fields)
2968

    
2969
  def ExpandNames(self):
2970
    # Lock all nodes, in shared mode
2971
    # Temporary removal of locks, should be reverted later
2972
    # TODO: reintroduce locks when they are lighter-weight
2973
    self.needed_locks = {}
2974
    #self.share_locks[locking.LEVEL_NODE] = 1
2975
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2976

    
2977
  @staticmethod
2978
  def _DiagnoseByOS(rlist):
2979
    """Remaps a per-node return list into an a per-os per-node dictionary
2980

2981
    @param rlist: a map with node names as keys and OS objects as values
2982

2983
    @rtype: dict
2984
    @return: a dictionary with osnames as keys and as value another
2985
        map, with nodes as keys and tuples of (path, status, diagnose,
2986
        variants, parameters, api_versions) as values, eg::
2987

2988
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
2989
                                     (/srv/..., False, "invalid api")],
2990
                           "node2": [(/srv/..., True, "", [], [])]}
2991
          }
2992

2993
    """
2994
    all_os = {}
2995
    # we build here the list of nodes that didn't fail the RPC (at RPC
2996
    # level), so that nodes with a non-responding node daemon don't
2997
    # make all OSes invalid
2998
    good_nodes = [node_name for node_name in rlist
2999
                  if not rlist[node_name].fail_msg]
3000
    for node_name, nr in rlist.items():
3001
      if nr.fail_msg or not nr.payload:
3002
        continue
3003
      for (name, path, status, diagnose, variants,
3004
           params, api_versions) in nr.payload:
3005
        if name not in all_os:
3006
          # build a list of nodes for this os containing empty lists
3007
          # for each node in node_list
3008
          all_os[name] = {}
3009
          for nname in good_nodes:
3010
            all_os[name][nname] = []
3011
        # convert params from [name, help] to (name, help)
3012
        params = [tuple(v) for v in params]
3013
        all_os[name][node_name].append((path, status, diagnose,
3014
                                        variants, params, api_versions))
3015
    return all_os
3016

    
3017
  def Exec(self, feedback_fn):
3018
    """Compute the list of OSes.
3019

3020
    """
3021
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
3022
    node_data = self.rpc.call_os_diagnose(valid_nodes)
3023
    pol = self._DiagnoseByOS(node_data)
3024
    output = []
3025
    cluster = self.cfg.GetClusterInfo()
3026

    
3027
    for os_name in utils.NiceSort(pol.keys()):
3028
      os_data = pol[os_name]
3029
      row = []
3030
      valid = True
3031
      (variants, params, api_versions) = null_state = (set(), set(), set())
3032
      for idx, osl in enumerate(os_data.values()):
3033
        valid = bool(valid and osl and osl[0][1])
3034
        if not valid:
3035
          (variants, params, api_versions) = null_state
3036
          break
3037
        node_variants, node_params, node_api = osl[0][3:6]
3038
        if idx == 0: # first entry
3039
          variants = set(node_variants)
3040
          params = set(node_params)
3041
          api_versions = set(node_api)
3042
        else: # keep consistency
3043
          variants.intersection_update(node_variants)
3044
          params.intersection_update(node_params)
3045
          api_versions.intersection_update(node_api)
3046

    
3047
      is_hid = os_name in cluster.hidden_os
3048
      is_blk = os_name in cluster.blacklisted_os
3049
      if ((self._HID not in self.op.output_fields and is_hid) or
3050
          (self._BLK not in self.op.output_fields and is_blk) or
3051
          (self._VLD not in self.op.output_fields and not valid)):
3052
        continue
3053

    
3054
      for field in self.op.output_fields:
3055
        if field == "name":
3056
          val = os_name
3057
        elif field == self._VLD:
3058
          val = valid
3059
        elif field == "node_status":
3060
          # this is just a copy of the dict
3061
          val = {}
3062
          for node_name, nos_list in os_data.items():
3063
            val[node_name] = nos_list
3064
        elif field == "variants":
3065
          val = utils.NiceSort(list(variants))
3066
        elif field == "parameters":
3067
          val = list(params)
3068
        elif field == "api_versions":
3069
          val = list(api_versions)
3070
        elif field == self._HID:
3071
          val = is_hid
3072
        elif field == self._BLK:
3073
          val = is_blk
3074
        else:
3075
          raise errors.ParameterError(field)
3076
        row.append(val)
3077
      output.append(row)
3078

    
3079
    return output
3080

    
3081

    
3082
class LURemoveNode(LogicalUnit):
3083
  """Logical unit for removing a node.
3084

3085
  """
3086
  HPATH = "node-remove"
3087
  HTYPE = constants.HTYPE_NODE
3088
  _OP_PARAMS = [
3089
    _PNodeName,
3090
    ]
3091

    
3092
  def BuildHooksEnv(self):
3093
    """Build hooks env.
3094

3095
    This doesn't run on the target node in the pre phase as a failed
3096
    node would then be impossible to remove.
3097

3098
    """
3099
    env = {
3100
      "OP_TARGET": self.op.node_name,
3101
      "NODE_NAME": self.op.node_name,
3102
      }
3103
    all_nodes = self.cfg.GetNodeList()
3104
    try:
3105
      all_nodes.remove(self.op.node_name)
3106
    except ValueError:
3107
      logging.warning("Node %s which is about to be removed not found"
3108
                      " in the all nodes list", self.op.node_name)
3109
    return env, all_nodes, all_nodes
3110

    
3111
  def CheckPrereq(self):
3112
    """Check prerequisites.
3113

3114
    This checks:
3115
     - the node exists in the configuration
3116
     - it does not have primary or secondary instances
3117
     - it's not the master
3118

3119
    Any errors are signaled by raising errors.OpPrereqError.
3120

3121
    """
3122
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3123
    node = self.cfg.GetNodeInfo(self.op.node_name)
3124
    assert node is not None
3125

    
3126
    instance_list = self.cfg.GetInstanceList()
3127

    
3128
    masternode = self.cfg.GetMasterNode()
3129
    if node.name == masternode:
3130
      raise errors.OpPrereqError("Node is the master node,"
3131
                                 " you need to failover first.",
3132
                                 errors.ECODE_INVAL)
3133

    
3134
    for instance_name in instance_list:
3135
      instance = self.cfg.GetInstanceInfo(instance_name)
3136
      if node.name in instance.all_nodes:
3137
        raise errors.OpPrereqError("Instance %s is still running on the node,"
3138
                                   " please remove first." % instance_name,
3139
                                   errors.ECODE_INVAL)
3140
    self.op.node_name = node.name
3141
    self.node = node
3142

    
3143
  def Exec(self, feedback_fn):
3144
    """Removes the node from the cluster.
3145

3146
    """
3147
    node = self.node
3148
    logging.info("Stopping the node daemon and removing configs from node %s",
3149
                 node.name)
3150

    
3151
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
3152

    
3153
    # Promote nodes to master candidate as needed
3154
    _AdjustCandidatePool(self, exceptions=[node.name])
3155
    self.context.RemoveNode(node.name)
3156

    
3157
    # Run post hooks on the node before it's removed
3158
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
3159
    try:
3160
      hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
3161
    except:
3162
      # pylint: disable-msg=W0702
3163
      self.LogWarning("Errors occurred running hooks on %s" % node.name)
3164

    
3165
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
3166
    msg = result.fail_msg
3167
    if msg:
3168
      self.LogWarning("Errors encountered on the remote node while leaving"
3169
                      " the cluster: %s", msg)
3170

    
3171
    # Remove node from our /etc/hosts
3172
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3173
      master_node = self.cfg.GetMasterNode()
3174
      result = self.rpc.call_etc_hosts_modify(master_node,
3175
                                              constants.ETC_HOSTS_REMOVE,
3176
                                              node.name, None)
3177
      result.Raise("Can't update hosts file with new host data")
3178
      _RedistributeAncillaryFiles(self)
3179

    
3180

    
3181
class LUQueryNodes(NoHooksLU):
3182
  """Logical unit for querying nodes.
3183

3184
  """
3185
  # pylint: disable-msg=W0142
3186
  _OP_PARAMS = [
3187
    _POutputFields,
3188
    ("names", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
3189
    ("use_locking", False, ht.TBool),
3190
    ]
3191
  REQ_BGL = False
3192

    
3193
  _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
3194
                    "master_candidate", "offline", "drained"]
3195

    
3196
  _FIELDS_DYNAMIC = utils.FieldSet(
3197
    "dtotal", "dfree",
3198
    "mtotal", "mnode", "mfree",
3199
    "bootid",
3200
    "ctotal", "cnodes", "csockets",
3201
    )
3202

    
3203
  _FIELDS_STATIC = utils.FieldSet(*[
3204
    "pinst_cnt", "sinst_cnt",
3205
    "pinst_list", "sinst_list",
3206
    "pip", "sip", "tags",
3207
    "master",
3208
    "role"] + _SIMPLE_FIELDS
3209
    )
3210

    
3211
  def CheckArguments(self):
3212
    _CheckOutputFields(static=self._FIELDS_STATIC,
3213
                       dynamic=self._FIELDS_DYNAMIC,
3214
                       selected=self.op.output_fields)
3215

    
3216
  def ExpandNames(self):
3217
    self.needed_locks = {}
3218
    self.share_locks[locking.LEVEL_NODE] = 1
3219

    
3220
    if self.op.names:
3221
      self.wanted = _GetWantedNodes(self, self.op.names)
3222
    else:
3223
      self.wanted = locking.ALL_SET
3224

    
3225
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3226
    self.do_locking = self.do_node_query and self.op.use_locking
3227
    if self.do_locking:
3228
      # if we don't request only static fields, we need to lock the nodes
3229
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
3230

    
3231
  def Exec(self, feedback_fn):
3232
    """Computes the list of nodes and their attributes.
3233

3234
    """
3235
    all_info = self.cfg.GetAllNodesInfo()
3236
    if self.do_locking:
3237
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
3238
    elif self.wanted != locking.ALL_SET:
3239
      nodenames = self.wanted
3240
      missing = set(nodenames).difference(all_info.keys())
3241
      if missing:
3242
        raise errors.OpExecError(
3243
          "Some nodes were removed before retrieving their data: %s" % missing)
3244
    else:
3245
      nodenames = all_info.keys()
3246

    
3247
    nodenames = utils.NiceSort(nodenames)
3248
    nodelist = [all_info[name] for name in nodenames]
3249

    
3250
    # begin data gathering
3251

    
3252
    if self.do_node_query:
3253
      live_data = {}
3254
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3255
                                          self.cfg.GetHypervisorType())
3256
      for name in nodenames:
3257
        nodeinfo = node_data[name]
3258
        if not nodeinfo.fail_msg and nodeinfo.payload:
3259
          nodeinfo = nodeinfo.payload
3260
          fn = utils.TryConvert
3261
          live_data[name] = {
3262
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
3263
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
3264
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
3265
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
3266
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
3267
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
3268
            "bootid": nodeinfo.get('bootid', None),
3269
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
3270
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
3271
            }
3272
        else:
3273
          live_data[name] = {}
3274
    else:
3275
      live_data = dict.fromkeys(nodenames, {})
3276

    
3277
    node_to_primary = dict([(name, set()) for name in nodenames])
3278
    node_to_secondary = dict([(name, set()) for name in nodenames])
3279

    
3280
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
3281
                             "sinst_cnt", "sinst_list"))
3282
    if inst_fields & frozenset(self.op.output_fields):
3283
      inst_data = self.cfg.GetAllInstancesInfo()
3284

    
3285
      for inst in inst_data.values():
3286
        if inst.primary_node in node_to_primary:
3287
          node_to_primary[inst.primary_node].add(inst.name)
3288
        for secnode in inst.secondary_nodes:
3289
          if secnode in node_to_secondary:
3290
            node_to_secondary[secnode].add(inst.name)
3291

    
3292
    master_node = self.cfg.GetMasterNode()
3293

    
3294
    # end data gathering
3295

    
3296
    output = []
3297
    for node in nodelist:
3298
      node_output = []
3299
      for field in self.op.output_fields:
3300
        if field in self._SIMPLE_FIELDS:
3301
          val = getattr(node, field)
3302
        elif field == "pinst_list":
3303
          val = list(node_to_primary[node.name])
3304
        elif field == "sinst_list":
3305
          val = list(node_to_secondary[node.name])
3306
        elif field == "pinst_cnt":
3307
          val = len(node_to_primary[node.name])
3308
        elif field == "sinst_cnt":
3309
          val = len(node_to_secondary[node.name])
3310
        elif field == "pip":
3311
          val = node.primary_ip
3312
        elif field == "sip":
3313
          val = node.secondary_ip
3314
        elif field == "tags":
3315
          val = list(node.GetTags())
3316
        elif field == "master":
3317
          val = node.name == master_node
3318
        elif self._FIELDS_DYNAMIC.Matches(field):
3319
          val = live_data[node.name].get(field, None)
3320
        elif field == "role":
3321
          if node.name == master_node:
3322
            val = "M"
3323
          elif node.master_candidate:
3324
            val = "C"
3325
          elif node.drained:
3326
            val = "D"
3327
          elif node.offline:
3328
            val = "O"
3329
          else:
3330
            val = "R"
3331
        else:
3332
          raise errors.ParameterError(field)
3333
        node_output.append(val)
3334
      output.append(node_output)
3335

    
3336
    return output
3337

    
3338

    
3339
class LUQueryNodeVolumes(NoHooksLU):
3340
  """Logical unit for getting volumes on node(s).
3341

3342
  """
3343
  _OP_PARAMS = [
3344
    ("nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
3345
    ("output_fields", ht.NoDefault, ht.TListOf(ht.TNonEmptyString)),
3346
    ]
3347
  REQ_BGL = False
3348
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
3349
  _FIELDS_STATIC = utils.FieldSet("node")
3350

    
3351
  def CheckArguments(self):
3352
    _CheckOutputFields(static=self._FIELDS_STATIC,
3353
                       dynamic=self._FIELDS_DYNAMIC,
3354
                       selected=self.op.output_fields)
3355

    
3356
  def ExpandNames(self):
3357
    self.needed_locks = {}
3358
    self.share_locks[locking.LEVEL_NODE] = 1
3359
    if not self.op.nodes:
3360
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3361
    else:
3362
      self.needed_locks[locking.LEVEL_NODE] = \
3363
        _GetWantedNodes(self, self.op.nodes)
3364

    
3365
  def Exec(self, feedback_fn):
3366
    """Computes the list of nodes and their attributes.
3367

3368
    """
3369
    nodenames = self.acquired_locks[locking.LEVEL_NODE]
3370
    volumes = self.rpc.call_node_volumes(nodenames)
3371

    
3372
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3373
             in self.cfg.GetInstanceList()]
3374

    
3375
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
3376

    
3377
    output = []
3378
    for node in nodenames:
3379
      nresult = volumes[node]
3380
      if nresult.offline:
3381
        continue
3382
      msg = nresult.fail_msg
3383
      if msg:
3384
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
3385
        continue
3386

    
3387
      node_vols = nresult.payload[:]
3388
      node_vols.sort(key=lambda vol: vol['dev'])
3389

    
3390
      for vol in node_vols:
3391
        node_output = []
3392
        for field in self.op.output_fields:
3393
          if field == "node":
3394
            val = node
3395
          elif field == "phys":
3396
            val = vol['dev']
3397
          elif field == "vg":
3398
            val = vol['vg']
3399
          elif field == "name":
3400
            val = vol['name']
3401
          elif field == "size":
3402
            val = int(float(vol['size']))
3403
          elif field == "instance":
3404
            for inst in ilist:
3405
              if node not in lv_by_node[inst]:
3406
                continue
3407
              if vol['name'] in lv_by_node[inst][node]:
3408
                val = inst.name
3409
                break
3410
            else:
3411
              val = '-'
3412
          else:
3413
            raise errors.ParameterError(field)
3414
          node_output.append(str(val))
3415

    
3416
        output.append(node_output)
3417

    
3418
    return output
3419

    
3420

    
3421
class LUQueryNodeStorage(NoHooksLU):
3422
  """Logical unit for getting information on storage units on node(s).
3423

3424
  """
3425
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
3426
  _OP_PARAMS = [
3427
    ("nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
3428
    ("storage_type", ht.NoDefault, _CheckStorageType),
3429
    ("output_fields", ht.NoDefault, ht.TListOf(ht.TNonEmptyString)),
3430
    ("name", None, ht.TMaybeString),
3431
    ]
3432
  REQ_BGL = False
3433

    
3434
  def CheckArguments(self):
3435
    _CheckOutputFields(static=self._FIELDS_STATIC,
3436
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
3437
                       selected=self.op.output_fields)
3438

    
3439
  def ExpandNames(self):
3440
    self.needed_locks = {}
3441
    self.share_locks[locking.LEVEL_NODE] = 1
3442

    
3443
    if self.op.nodes:
3444
      self.needed_locks[locking.LEVEL_NODE] = \
3445
        _GetWantedNodes(self, self.op.nodes)
3446
    else:
3447
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3448

    
3449
  def Exec(self, feedback_fn):
3450
    """Computes the list of nodes and their attributes.
3451

3452
    """
3453
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
3454

    
3455
    # Always get name to sort by
3456
    if constants.SF_NAME in self.op.output_fields:
3457
      fields = self.op.output_fields[:]
3458
    else:
3459
      fields = [constants.SF_NAME] + self.op.output_fields
3460

    
3461
    # Never ask for node or type as it's only known to the LU
3462
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
3463
      while extra in fields:
3464
        fields.remove(extra)
3465

    
3466
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
3467
    name_idx = field_idx[constants.SF_NAME]
3468

    
3469
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3470
    data = self.rpc.call_storage_list(self.nodes,
3471
                                      self.op.storage_type, st_args,
3472
                                      self.op.name, fields)
3473

    
3474
    result = []
3475

    
3476
    for node in utils.NiceSort(self.nodes):
3477
      nresult = data[node]
3478
      if nresult.offline:
3479
        continue
3480

    
3481
      msg = nresult.fail_msg
3482
      if msg:
3483
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
3484
        continue
3485

    
3486
      rows = dict([(row[name_idx], row) for row in nresult.payload])
3487

    
3488
      for name in utils.NiceSort(rows.keys()):
3489
        row = rows[name]
3490

    
3491
        out = []
3492

    
3493
        for field in self.op.output_fields:
3494
          if field == constants.SF_NODE:
3495
            val = node
3496
          elif field == constants.SF_TYPE:
3497
            val = self.op.storage_type
3498
          elif field in field_idx:
3499
            val = row[field_idx[field]]
3500
          else:
3501
            raise errors.ParameterError(field)
3502

    
3503
          out.append(val)
3504

    
3505
        result.append(out)
3506

    
3507
    return result
3508

    
3509

    
3510
class LUModifyNodeStorage(NoHooksLU):
3511
  """Logical unit for modifying a storage volume on a node.
3512

3513
  """
3514
  _OP_PARAMS = [
3515
    _PNodeName,
3516
    ("storage_type", ht.NoDefault, _CheckStorageType),
3517
    ("name", ht.NoDefault, ht.TNonEmptyString),
3518
    ("changes", ht.NoDefault, ht.TDict),
3519
    ]
3520
  REQ_BGL = False
3521

    
3522
  def CheckArguments(self):
3523
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3524

    
3525
    storage_type = self.op.storage_type
3526

    
3527
    try:
3528
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
3529
    except KeyError:
3530
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
3531
                                 " modified" % storage_type,
3532
                                 errors.ECODE_INVAL)
3533

    
3534
    diff = set(self.op.changes.keys()) - modifiable
3535
    if diff:
3536
      raise errors.OpPrereqError("The following fields can not be modified for"
3537
                                 " storage units of type '%s': %r" %
3538
                                 (storage_type, list(diff)),
3539
                                 errors.ECODE_INVAL)
3540

    
3541
  def ExpandNames(self):
3542
    self.needed_locks = {
3543
      locking.LEVEL_NODE: self.op.node_name,
3544
      }
3545

    
3546
  def Exec(self, feedback_fn):
3547
    """Computes the list of nodes and their attributes.
3548

3549
    """
3550
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3551
    result = self.rpc.call_storage_modify(self.op.node_name,
3552
                                          self.op.storage_type, st_args,
3553
                                          self.op.name, self.op.changes)
3554
    result.Raise("Failed to modify storage unit '%s' on %s" %
3555
                 (self.op.name, self.op.node_name))
3556

    
3557

    
3558
class LUAddNode(LogicalUnit):
3559
  """Logical unit for adding node to the cluster.
3560

3561
  """
3562
  HPATH = "node-add"
3563
  HTYPE = constants.HTYPE_NODE
3564
  _OP_PARAMS = [
3565
    _PNodeName,
3566
    ("primary_ip", None, ht.NoType),
3567
    ("secondary_ip", None, ht.TMaybeString),
3568
    ("readd", False, ht.TBool),
3569
    ("nodegroup", None, ht.TMaybeString)
3570
    ]
3571

    
3572
  def CheckArguments(self):
3573
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
3574
    # validate/normalize the node name
3575
    self.hostname = netutils.GetHostname(name=self.op.node_name,
3576
                                         family=self.primary_ip_family)
3577
    self.op.node_name = self.hostname.name
3578
    if self.op.readd and self.op.nodegroup:
3579
      raise errors.OpPrereqError("Cannot pass a nodegroup when a node is"
3580
                                 " being readded", errors.ECODE_INVAL)
3581

    
3582
  def BuildHooksEnv(self):
3583
    """Build hooks env.
3584

3585
    This will run on all nodes before, and on all nodes + the new node after.
3586

3587
    """
3588
    env = {
3589
      "OP_TARGET": self.op.node_name,
3590
      "NODE_NAME": self.op.node_name,
3591
      "NODE_PIP": self.op.primary_ip,
3592
      "NODE_SIP": self.op.secondary_ip,
3593
      }
3594
    nodes_0 = self.cfg.GetNodeList()
3595
    nodes_1 = nodes_0 + [self.op.node_name, ]
3596
    return env, nodes_0, nodes_1
3597

    
3598
  def CheckPrereq(self):
3599
    """Check prerequisites.
3600

3601
    This checks:
3602
     - the new node is not already in the config
3603
     - it is resolvable
3604
     - its parameters (single/dual homed) matches the cluster
3605

3606
    Any errors are signaled by raising errors.OpPrereqError.
3607

3608
    """
3609
    cfg = self.cfg
3610
    hostname = self.hostname
3611
    node = hostname.name
3612
    primary_ip = self.op.primary_ip = hostname.ip
3613
    if self.op.secondary_ip is None:
3614
      if self.primary_ip_family == netutils.IP6Address.family:
3615
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
3616
                                   " IPv4 address must be given as secondary",
3617
                                   errors.ECODE_INVAL)
3618
      self.op.secondary_ip = primary_ip
3619

    
3620
    secondary_ip = self.op.secondary_ip
3621
    if not netutils.IP4Address.IsValid(secondary_ip):
3622
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
3623
                                 " address" % secondary_ip, errors.ECODE_INVAL)
3624

    
3625
    node_list = cfg.GetNodeList()
3626
    if not self.op.readd and node in node_list:
3627
      raise errors.OpPrereqError("Node %s is already in the configuration" %
3628
                                 node, errors.ECODE_EXISTS)
3629
    elif self.op.readd and node not in node_list:
3630
      raise errors.OpPrereqError("Node %s is not in the configuration" % node,
3631
                                 errors.ECODE_NOENT)
3632

    
3633
    self.changed_primary_ip = False
3634

    
3635
    for existing_node_name in node_list:
3636
      existing_node = cfg.GetNodeInfo(existing_node_name)
3637

    
3638
      if self.op.readd and node == existing_node_name:
3639
        if existing_node.secondary_ip != secondary_ip:
3640
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
3641
                                     " address configuration as before",
3642
                                     errors.ECODE_INVAL)
3643
        if existing_node.primary_ip != primary_ip:
3644
          self.changed_primary_ip = True
3645

    
3646
        continue
3647

    
3648
      if (existing_node.primary_ip == primary_ip or
3649
          existing_node.secondary_ip == primary_ip or
3650
          existing_node.primary_ip == secondary_ip or
3651
          existing_node.secondary_ip == secondary_ip):
3652
        raise errors.OpPrereqError("New node ip address(es) conflict with"
3653
                                   " existing node %s" % existing_node.name,
3654
                                   errors.ECODE_NOTUNIQUE)
3655

    
3656
    # check that the type of the node (single versus dual homed) is the
3657
    # same as for the master
3658
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
3659
    master_singlehomed = myself.secondary_ip == myself.primary_ip
3660
    newbie_singlehomed = secondary_ip == primary_ip
3661
    if master_singlehomed != newbie_singlehomed:
3662
      if master_singlehomed:
3663
        raise errors.OpPrereqError("The master has no private ip but the"
3664
                                   " new node has one",
3665
                                   errors.ECODE_INVAL)
3666
      else:
3667
        raise errors.OpPrereqError("The master has a private ip but the"
3668
                                   " new node doesn't have one",
3669
                                   errors.ECODE_INVAL)
3670

    
3671
    # checks reachability
3672
    if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
3673
      raise errors.OpPrereqError("Node not reachable by ping",
3674
                                 errors.ECODE_ENVIRON)
3675

    
3676
    if not newbie_singlehomed:
3677
      # check reachability from my secondary ip to newbie's secondary ip
3678
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
3679
                           source=myself.secondary_ip):
3680
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
3681
                                   " based ping to noded port",
3682
                                   errors.ECODE_ENVIRON)
3683

    
3684
    if self.op.readd:
3685
      exceptions = [node]
3686
    else:
3687
      exceptions = []
3688

    
3689
    self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
3690

    
3691
    if self.op.readd:
3692
      self.new_node = self.cfg.GetNodeInfo(node)
3693
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
3694
    else:
3695
      nodegroup = cfg.LookupNodeGroup(self.op.nodegroup)
3696
      self.new_node = objects.Node(name=node,
3697
                                   primary_ip=primary_ip,
3698
                                   secondary_ip=secondary_ip,
3699
                                   master_candidate=self.master_candidate,
3700
                                   offline=False, drained=False,
3701
                                   nodegroup=nodegroup)
3702

    
3703
  def Exec(self, feedback_fn):
3704
    """Adds the new node to the cluster.
3705

3706
    """
3707
    new_node = self.new_node
3708
    node = new_node.name
3709

    
3710
    # for re-adds, reset the offline/drained/master-candidate flags;
3711
    # we need to reset here, otherwise offline would prevent RPC calls
3712
    # later in the procedure; this also means that if the re-add
3713
    # fails, we are left with a non-offlined, broken node
3714
    if self.op.readd:
3715
      new_node.drained = new_node.offline = False # pylint: disable-msg=W0201
3716
      self.LogInfo("Readding a node, the offline/drained flags were reset")
3717
      # if we demote the node, we do cleanup later in the procedure
3718
      new_node.master_candidate = self.master_candidate
3719
      if self.changed_primary_ip:
3720
        new_node.primary_ip = self.op.primary_ip
3721

    
3722
    # notify the user about any possible mc promotion
3723
    if new_node.master_candidate:
3724
      self.LogInfo("Node will be a master candidate")
3725

    
3726
    # check connectivity
3727
    result = self.rpc.call_version([node])[node]
3728
    result.Raise("Can't get version information from node %s" % node)
3729
    if constants.PROTOCOL_VERSION == result.payload:
3730
      logging.info("Communication to node %s fine, sw version %s match",
3731
                   node, result.payload)
3732
    else:
3733
      raise errors.OpExecError("Version mismatch master version %s,"
3734
                               " node version %s" %
3735
                               (constants.PROTOCOL_VERSION, result.payload))
3736

    
3737
    # Add node to our /etc/hosts, and add key to known_hosts
3738
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3739
      master_node = self.cfg.GetMasterNode()
3740
      result = self.rpc.call_etc_hosts_modify(master_node,
3741
                                              constants.ETC_HOSTS_ADD,
3742
                                              self.hostname.name,
3743
                                              self.hostname.ip)
3744
      result.Raise("Can't update hosts file with new host data")
3745

    
3746
    if new_node.secondary_ip != new_node.primary_ip:
3747
      result = self.rpc.call_node_has_ip_address(new_node.name,
3748
                                                 new_node.secondary_ip)
3749
      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
3750
                   prereq=True, ecode=errors.ECODE_ENVIRON)
3751
      if not result.payload:
3752
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
3753
                                 " you gave (%s). Please fix and re-run this"
3754
                                 " command." % new_node.secondary_ip)
3755

    
3756
    node_verify_list = [self.cfg.GetMasterNode()]
3757
    node_verify_param = {
3758
      constants.NV_NODELIST: [node],
3759
      # TODO: do a node-net-test as well?
3760
    }
3761

    
3762
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3763
                                       self.cfg.GetClusterName())
3764
    for verifier in node_verify_list:
3765
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
3766
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
3767
      if nl_payload:
3768
        for failed in nl_payload:
3769
          feedback_fn("ssh/hostname verification failed"
3770
                      " (checking from %s): %s" %
3771
                      (verifier, nl_payload[failed]))
3772
        raise errors.OpExecError("ssh/hostname verification failed.")
3773

    
3774
    if self.op.readd:
3775
      _RedistributeAncillaryFiles(self)
3776
      self.context.ReaddNode(new_node)
3777
      # make sure we redistribute the config
3778
      self.cfg.Update(new_node, feedback_fn)
3779
      # and make sure the new node will not have old files around
3780
      if not new_node.master_candidate:
3781
        result = self.rpc.call_node_demote_from_mc(new_node.name)
3782
        msg = result.fail_msg
3783
        if msg:
3784
          self.LogWarning("Node failed to demote itself from master"
3785
                          " candidate status: %s" % msg)
3786
    else:
3787
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
3788
      self.context.AddNode(new_node, self.proc.GetECId())
3789

    
3790

    
3791
class LUSetNodeParams(LogicalUnit):
3792
  """Modifies the parameters of a node.
3793

3794
  """
3795
  HPATH = "node-modify"
3796
  HTYPE = constants.HTYPE_NODE
3797
  _OP_PARAMS = [
3798
    _PNodeName,
3799
    ("master_candidate", None, ht.TMaybeBool),
3800
    ("offline", None, ht.TMaybeBool),
3801
    ("drained", None, ht.TMaybeBool),
3802
    ("auto_promote", False, ht.TBool),
3803
    _PForce,
3804
    ]
3805
  REQ_BGL = False
3806

    
3807
  def CheckArguments(self):
3808
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3809
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
3810
    if all_mods.count(None) == 3:
3811
      raise errors.OpPrereqError("Please pass at least one modification",
3812
                                 errors.ECODE_INVAL)
3813
    if all_mods.count(True) > 1:
3814
      raise errors.OpPrereqError("Can't set the node into more than one"
3815
                                 " state at the same time",
3816
                                 errors.ECODE_INVAL)
3817

    
3818
    # Boolean value that tells us whether we're offlining or draining the node
3819
    self.offline_or_drain = (self.op.offline == True or
3820
                             self.op.drained == True)
3821
    self.deoffline_or_drain = (self.op.offline == False or
3822
                               self.op.drained == False)
3823
    self.might_demote = (self.op.master_candidate == False or
3824
                         self.offline_or_drain)
3825

    
3826
    self.lock_all = self.op.auto_promote and self.might_demote
3827

    
3828

    
3829
  def ExpandNames(self):
3830
    if self.lock_all:
3831
      self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET}
3832
    else:
3833
      self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3834

    
3835
  def BuildHooksEnv(self):
3836
    """Build hooks env.
3837

3838
    This runs on the master node.
3839

3840
    """
3841
    env = {
3842
      "OP_TARGET": self.op.node_name,
3843
      "MASTER_CANDIDATE": str(self.op.master_candidate),
3844
      "OFFLINE": str(self.op.offline),
3845
      "DRAINED": str(self.op.drained),
3846
      }
3847
    nl = [self.cfg.GetMasterNode(),
3848
          self.op.node_name]
3849
    return env, nl, nl
3850

    
3851
  def CheckPrereq(self):
3852
    """Check prerequisites.
3853

3854
    This only checks the instance list against the existing names.
3855

3856
    """
3857
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3858

    
3859
    if (self.op.master_candidate is not None or
3860
        self.op.drained is not None or
3861
        self.op.offline is not None):
3862
      # we can't change the master's node flags
3863
      if self.op.node_name == self.cfg.GetMasterNode():
3864
        raise errors.OpPrereqError("The master role can be changed"
3865
                                   " only via master-failover",
3866
                                   errors.ECODE_INVAL)
3867

    
3868

    
3869
    if node.master_candidate and self.might_demote and not self.lock_all:
3870
      assert not self.op.auto_promote, "auto-promote set but lock_all not"
3871
      # check if after removing the current node, we're missing master
3872
      # candidates
3873
      (mc_remaining, mc_should, _) = \
3874
          self.cfg.GetMasterCandidateStats(exceptions=[node.name])
3875
      if mc_remaining < mc_should:
3876
        raise errors.OpPrereqError("Not enough master candidates, please"
3877
                                   " pass auto_promote to allow promotion",
3878
                                   errors.ECODE_INVAL)
3879

    
3880
    if (self.op.master_candidate == True and
3881
        ((node.offline and not self.op.offline == False) or
3882
         (node.drained and not self.op.drained == False))):
3883
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3884
                                 " to master_candidate" % node.name,
3885
                                 errors.ECODE_INVAL)
3886

    
3887
    # If we're being deofflined/drained, we'll MC ourself if needed
3888
    if (self.deoffline_or_drain and not self.offline_or_drain and not
3889
        self.op.master_candidate == True and not node.master_candidate):
3890
      self.op.master_candidate = _DecideSelfPromotion(self)
3891
      if self.op.master_candidate:
3892
        self.LogInfo("Autopromoting node to master candidate")
3893

    
3894
    return
3895

    
3896
  def Exec(self, feedback_fn):
3897
    """Modifies a node.
3898

3899
    """
3900
    node = self.node
3901

    
3902
    result = []
3903
    changed_mc = False
3904

    
3905
    if self.op.offline is not None:
3906
      node.offline = self.op.offline
3907
      result.append(("offline", str(self.op.offline)))
3908
      if self.op.offline == True:
3909
        if node.master_candidate:
3910
          node.master_candidate = False
3911
          changed_mc = True
3912
          result.append(("master_candidate", "auto-demotion due to offline"))
3913
        if node.drained:
3914
          node.drained = False
3915
          result.append(("drained", "clear drained status due to offline"))
3916

    
3917
    if self.op.master_candidate is not None:
3918
      node.master_candidate = self.op.master_candidate
3919
      changed_mc = True
3920
      result.append(("master_candidate", str(self.op.master_candidate)))
3921
      if self.op.master_candidate == False:
3922
        rrc = self.rpc.call_node_demote_from_mc(node.name)
3923
        msg = rrc.fail_msg
3924
        if msg:
3925
          self.LogWarning("Node failed to demote itself: %s" % msg)
3926

    
3927
    if self.op.drained is not None:
3928
      node.drained = self.op.drained
3929
      result.append(("drained", str(self.op.drained)))
3930
      if self.op.drained == True:
3931
        if node.master_candidate:
3932
          node.master_candidate = False
3933
          changed_mc = True
3934
          result.append(("master_candidate", "auto-demotion due to drain"))
3935
          rrc = self.rpc.call_node_demote_from_mc(node.name)
3936
          msg = rrc.fail_msg
3937
          if msg:
3938
            self.LogWarning("Node failed to demote itself: %s" % msg)
3939
        if node.offline:
3940
          node.offline = False
3941
          result.append(("offline", "clear offline status due to drain"))
3942

    
3943
    # we locked all nodes, we adjust the CP before updating this node
3944
    if self.lock_all:
3945
      _AdjustCandidatePool(self, [node.name])
3946

    
3947
    # this will trigger configuration file update, if needed
3948
    self.cfg.Update(node, feedback_fn)
3949

    
3950
    # this will trigger job queue propagation or cleanup
3951
    if changed_mc:
3952
      self.context.ReaddNode(node)
3953

    
3954
    return result
3955

    
3956

    
3957
class LUPowercycleNode(NoHooksLU):
3958
  """Powercycles a node.
3959

3960
  """
3961
  _OP_PARAMS = [
3962
    _PNodeName,
3963
    _PForce,
3964
    ]
3965
  REQ_BGL = False
3966

    
3967
  def CheckArguments(self):
3968
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3969
    if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
3970
      raise errors.OpPrereqError("The node is the master and the force"
3971
                                 " parameter was not set",
3972
                                 errors.ECODE_INVAL)
3973

    
3974
  def ExpandNames(self):
3975
    """Locking for PowercycleNode.
3976

3977
    This is a last-resort option and shouldn't block on other
3978
    jobs. Therefore, we grab no locks.
3979

3980
    """
3981
    self.needed_locks = {}
3982

    
3983
  def Exec(self, feedback_fn):
3984
    """Reboots a node.
3985

3986
    """
3987
    result = self.rpc.call_node_powercycle(self.op.node_name,
3988
                                           self.cfg.GetHypervisorType())
3989
    result.Raise("Failed to schedule the reboot")
3990
    return result.payload
3991

    
3992

    
3993
class LUQueryClusterInfo(NoHooksLU):
3994
  """Query cluster configuration.
3995

3996
  """
3997
  REQ_BGL = False
3998

    
3999
  def ExpandNames(self):
4000
    self.needed_locks = {}
4001

    
4002
  def Exec(self, feedback_fn):
4003
    """Return cluster config.
4004

4005
    """
4006
    cluster = self.cfg.GetClusterInfo()
4007
    os_hvp = {}
4008

    
4009
    # Filter just for enabled hypervisors
4010
    for os_name, hv_dict in cluster.os_hvp.items():
4011
      os_hvp[os_name] = {}
4012
      for hv_name, hv_params in hv_dict.items():
4013
        if hv_name in cluster.enabled_hypervisors:
4014
          os_hvp[os_name][hv_name] = hv_params
4015

    
4016
    # Convert ip_family to ip_version
4017
    primary_ip_version = constants.IP4_VERSION
4018
    if cluster.primary_ip_family == netutils.IP6Address.family:
4019
      primary_ip_version = constants.IP6_VERSION
4020

    
4021
    result = {
4022
      "software_version": constants.RELEASE_VERSION,
4023
      "protocol_version": constants.PROTOCOL_VERSION,
4024
      "config_version": constants.CONFIG_VERSION,
4025
      "os_api_version": max(constants.OS_API_VERSIONS),
4026
      "export_version": constants.EXPORT_VERSION,
4027
      "architecture": (platform.architecture()[0], p