Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 4edc512c

History | View | Annotate | Download (394.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay to many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42

    
43
from ganeti import ssh
44
from ganeti import utils
45
from ganeti import errors
46
from ganeti import hypervisor
47
from ganeti import locking
48
from ganeti import constants
49
from ganeti import objects
50
from ganeti import serializer
51
from ganeti import ssconf
52
from ganeti import uidpool
53
from ganeti import compat
54
from ganeti import masterd
55
from ganeti import netutils
56
from ganeti import ht
57
from ganeti import query
58
from ganeti import qlang
59

    
60
import ganeti.masterd.instance # pylint: disable-msg=W0611
61

    
62
# Common opcode attributes
63

    
64
#: output fields for a query operation
65
_POutputFields = ("output_fields", ht.NoDefault, ht.TListOf(ht.TNonEmptyString))
66

    
67

    
68
#: the shutdown timeout
69
_PShutdownTimeout = ("shutdown_timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT,
70
                     ht.TPositiveInt)
71

    
72
#: the force parameter
73
_PForce = ("force", False, ht.TBool)
74

    
75
#: a required instance name (for single-instance LUs)
76
_PInstanceName = ("instance_name", ht.NoDefault, ht.TNonEmptyString)
77

    
78
#: Whether to ignore offline nodes
79
_PIgnoreOfflineNodes = ("ignore_offline_nodes", False, ht.TBool)
80

    
81
#: a required node name (for single-node LUs)
82
_PNodeName = ("node_name", ht.NoDefault, ht.TNonEmptyString)
83

    
84
#: the migration type (live/non-live)
85
_PMigrationMode = ("mode", None,
86
                   ht.TOr(ht.TNone, ht.TElemOf(constants.HT_MIGRATION_MODES)))
87

    
88
#: the obsolete 'live' mode (boolean)
89
_PMigrationLive = ("live", None, ht.TMaybeBool)
90

    
91

    
92
# End types
93
class LogicalUnit(object):
94
  """Logical Unit base class.
95

96
  Subclasses must follow these rules:
97
    - implement ExpandNames
98
    - implement CheckPrereq (except when tasklets are used)
99
    - implement Exec (except when tasklets are used)
100
    - implement BuildHooksEnv
101
    - redefine HPATH and HTYPE
102
    - optionally redefine their run requirements:
103
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
104

105
  Note that all commands require root permissions.
106

107
  @ivar dry_run_result: the value (if any) that will be returned to the caller
108
      in dry-run mode (signalled by opcode dry_run parameter)
109
  @cvar _OP_PARAMS: a list of opcode attributes, the default values
110
      they should get if not already defined, and types they must match
111

112
  """
113
  HPATH = None
114
  HTYPE = None
115
  _OP_PARAMS = []
116
  REQ_BGL = True
117

    
118
  def __init__(self, processor, op, context, rpc):
119
    """Constructor for LogicalUnit.
120

121
    This needs to be overridden in derived classes in order to check op
122
    validity.
123

124
    """
125
    self.proc = processor
126
    self.op = op
127
    self.cfg = context.cfg
128
    self.context = context
129
    self.rpc = rpc
130
    # Dicts used to declare locking needs to mcpu
131
    self.needed_locks = None
132
    self.acquired_locks = {}
133
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
134
    self.add_locks = {}
135
    self.remove_locks = {}
136
    # Used to force good behavior when calling helper functions
137
    self.recalculate_locks = {}
138
    self.__ssh = None
139
    # logging
140
    self.Log = processor.Log # pylint: disable-msg=C0103
141
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
142
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
143
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
144
    # support for dry-run
145
    self.dry_run_result = None
146
    # support for generic debug attribute
147
    if (not hasattr(self.op, "debug_level") or
148
        not isinstance(self.op.debug_level, int)):
149
      self.op.debug_level = 0
150

    
151
    # Tasklets
152
    self.tasklets = None
153

    
154
    # The new kind-of-type-system
155
    op_id = self.op.OP_ID
156
    for attr_name, aval, test in self._OP_PARAMS:
157
      if not hasattr(op, attr_name):
158
        if aval == ht.NoDefault:
159
          raise errors.OpPrereqError("Required parameter '%s.%s' missing" %
160
                                     (op_id, attr_name), errors.ECODE_INVAL)
161
        else:
162
          if callable(aval):
163
            dval = aval()
164
          else:
165
            dval = aval
166
          setattr(self.op, attr_name, dval)
167
      attr_val = getattr(op, attr_name)
168
      if test == ht.NoType:
169
        # no tests here
170
        continue
171
      if not callable(test):
172
        raise errors.ProgrammerError("Validation for parameter '%s.%s' failed,"
173
                                     " given type is not a proper type (%s)" %
174
                                     (op_id, attr_name, test))
175
      if not test(attr_val):
176
        logging.error("OpCode %s, parameter %s, has invalid type %s/value %s",
177
                      self.op.OP_ID, attr_name, type(attr_val), attr_val)
178
        raise errors.OpPrereqError("Parameter '%s.%s' fails validation" %
179
                                   (op_id, attr_name), errors.ECODE_INVAL)
180

    
181
    self.CheckArguments()
182

    
183
  def __GetSSH(self):
184
    """Returns the SshRunner object
185

186
    """
187
    if not self.__ssh:
188
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
189
    return self.__ssh
190

    
191
  ssh = property(fget=__GetSSH)
192

    
193
  def CheckArguments(self):
194
    """Check syntactic validity for the opcode arguments.
195

196
    This method is for doing a simple syntactic check and ensure
197
    validity of opcode parameters, without any cluster-related
198
    checks. While the same can be accomplished in ExpandNames and/or
199
    CheckPrereq, doing these separate is better because:
200

201
      - ExpandNames is left as as purely a lock-related function
202
      - CheckPrereq is run after we have acquired locks (and possible
203
        waited for them)
204

205
    The function is allowed to change the self.op attribute so that
206
    later methods can no longer worry about missing parameters.
207

208
    """
209
    pass
210

    
211
  def ExpandNames(self):
212
    """Expand names for this LU.
213

214
    This method is called before starting to execute the opcode, and it should
215
    update all the parameters of the opcode to their canonical form (e.g. a
216
    short node name must be fully expanded after this method has successfully
217
    completed). This way locking, hooks, logging, etc. can work correctly.
218

219
    LUs which implement this method must also populate the self.needed_locks
220
    member, as a dict with lock levels as keys, and a list of needed lock names
221
    as values. Rules:
222

223
      - use an empty dict if you don't need any lock
224
      - if you don't need any lock at a particular level omit that level
225
      - don't put anything for the BGL level
226
      - if you want all locks at a level use locking.ALL_SET as a value
227

228
    If you need to share locks (rather than acquire them exclusively) at one
229
    level you can modify self.share_locks, setting a true value (usually 1) for
230
    that level. By default locks are not shared.
231

232
    This function can also define a list of tasklets, which then will be
233
    executed in order instead of the usual LU-level CheckPrereq and Exec
234
    functions, if those are not defined by the LU.
235

236
    Examples::
237

238
      # Acquire all nodes and one instance
239
      self.needed_locks = {
240
        locking.LEVEL_NODE: locking.ALL_SET,
241
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
242
      }
243
      # Acquire just two nodes
244
      self.needed_locks = {
245
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
246
      }
247
      # Acquire no locks
248
      self.needed_locks = {} # No, you can't leave it to the default value None
249

250
    """
251
    # The implementation of this method is mandatory only if the new LU is
252
    # concurrent, so that old LUs don't need to be changed all at the same
253
    # time.
254
    if self.REQ_BGL:
255
      self.needed_locks = {} # Exclusive LUs don't need locks.
256
    else:
257
      raise NotImplementedError
258

    
259
  def DeclareLocks(self, level):
260
    """Declare LU locking needs for a level
261

262
    While most LUs can just declare their locking needs at ExpandNames time,
263
    sometimes there's the need to calculate some locks after having acquired
264
    the ones before. This function is called just before acquiring locks at a
265
    particular level, but after acquiring the ones at lower levels, and permits
266
    such calculations. It can be used to modify self.needed_locks, and by
267
    default it does nothing.
268

269
    This function is only called if you have something already set in
270
    self.needed_locks for the level.
271

272
    @param level: Locking level which is going to be locked
273
    @type level: member of ganeti.locking.LEVELS
274

275
    """
276

    
277
  def CheckPrereq(self):
278
    """Check prerequisites for this LU.
279

280
    This method should check that the prerequisites for the execution
281
    of this LU are fulfilled. It can do internode communication, but
282
    it should be idempotent - no cluster or system changes are
283
    allowed.
284

285
    The method should raise errors.OpPrereqError in case something is
286
    not fulfilled. Its return value is ignored.
287

288
    This method should also update all the parameters of the opcode to
289
    their canonical form if it hasn't been done by ExpandNames before.
290

291
    """
292
    if self.tasklets is not None:
293
      for (idx, tl) in enumerate(self.tasklets):
294
        logging.debug("Checking prerequisites for tasklet %s/%s",
295
                      idx + 1, len(self.tasklets))
296
        tl.CheckPrereq()
297
    else:
298
      pass
299

    
300
  def Exec(self, feedback_fn):
301
    """Execute the LU.
302

303
    This method should implement the actual work. It should raise
304
    errors.OpExecError for failures that are somewhat dealt with in
305
    code, or expected.
306

307
    """
308
    if self.tasklets is not None:
309
      for (idx, tl) in enumerate(self.tasklets):
310
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
311
        tl.Exec(feedback_fn)
312
    else:
313
      raise NotImplementedError
314

    
315
  def BuildHooksEnv(self):
316
    """Build hooks environment for this LU.
317

318
    This method should return a three-node tuple consisting of: a dict
319
    containing the environment that will be used for running the
320
    specific hook for this LU, a list of node names on which the hook
321
    should run before the execution, and a list of node names on which
322
    the hook should run after the execution.
323

324
    The keys of the dict must not have 'GANETI_' prefixed as this will
325
    be handled in the hooks runner. Also note additional keys will be
326
    added by the hooks runner. If the LU doesn't define any
327
    environment, an empty dict (and not None) should be returned.
328

329
    No nodes should be returned as an empty list (and not None).
330

331
    Note that if the HPATH for a LU class is None, this function will
332
    not be called.
333

334
    """
335
    raise NotImplementedError
336

    
337
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
338
    """Notify the LU about the results of its hooks.
339

340
    This method is called every time a hooks phase is executed, and notifies
341
    the Logical Unit about the hooks' result. The LU can then use it to alter
342
    its result based on the hooks.  By default the method does nothing and the
343
    previous result is passed back unchanged but any LU can define it if it
344
    wants to use the local cluster hook-scripts somehow.
345

346
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
347
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
348
    @param hook_results: the results of the multi-node hooks rpc call
349
    @param feedback_fn: function used send feedback back to the caller
350
    @param lu_result: the previous Exec result this LU had, or None
351
        in the PRE phase
352
    @return: the new Exec result, based on the previous result
353
        and hook results
354

355
    """
356
    # API must be kept, thus we ignore the unused argument and could
357
    # be a function warnings
358
    # pylint: disable-msg=W0613,R0201
359
    return lu_result
360

    
361
  def _ExpandAndLockInstance(self):
362
    """Helper function to expand and lock an instance.
363

364
    Many LUs that work on an instance take its name in self.op.instance_name
365
    and need to expand it and then declare the expanded name for locking. This
366
    function does it, and then updates self.op.instance_name to the expanded
367
    name. It also initializes needed_locks as a dict, if this hasn't been done
368
    before.
369

370
    """
371
    if self.needed_locks is None:
372
      self.needed_locks = {}
373
    else:
374
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
375
        "_ExpandAndLockInstance called with instance-level locks set"
376
    self.op.instance_name = _ExpandInstanceName(self.cfg,
377
                                                self.op.instance_name)
378
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
379

    
380
  def _LockInstancesNodes(self, primary_only=False):
381
    """Helper function to declare instances' nodes for locking.
382

383
    This function should be called after locking one or more instances to lock
384
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
385
    with all primary or secondary nodes for instances already locked and
386
    present in self.needed_locks[locking.LEVEL_INSTANCE].
387

388
    It should be called from DeclareLocks, and for safety only works if
389
    self.recalculate_locks[locking.LEVEL_NODE] is set.
390

391
    In the future it may grow parameters to just lock some instance's nodes, or
392
    to just lock primaries or secondary nodes, if needed.
393

394
    If should be called in DeclareLocks in a way similar to::
395

396
      if level == locking.LEVEL_NODE:
397
        self._LockInstancesNodes()
398

399
    @type primary_only: boolean
400
    @param primary_only: only lock primary nodes of locked instances
401

402
    """
403
    assert locking.LEVEL_NODE in self.recalculate_locks, \
404
      "_LockInstancesNodes helper function called with no nodes to recalculate"
405

    
406
    # TODO: check if we're really been called with the instance locks held
407

    
408
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
409
    # future we might want to have different behaviors depending on the value
410
    # of self.recalculate_locks[locking.LEVEL_NODE]
411
    wanted_nodes = []
412
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
413
      instance = self.context.cfg.GetInstanceInfo(instance_name)
414
      wanted_nodes.append(instance.primary_node)
415
      if not primary_only:
416
        wanted_nodes.extend(instance.secondary_nodes)
417

    
418
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
419
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
420
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
421
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
422

    
423
    del self.recalculate_locks[locking.LEVEL_NODE]
424

    
425

    
426
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
427
  """Simple LU which runs no hooks.
428

429
  This LU is intended as a parent for other LogicalUnits which will
430
  run no hooks, in order to reduce duplicate code.
431

432
  """
433
  HPATH = None
434
  HTYPE = None
435

    
436
  def BuildHooksEnv(self):
437
    """Empty BuildHooksEnv for NoHooksLu.
438

439
    This just raises an error.
440

441
    """
442
    assert False, "BuildHooksEnv called for NoHooksLUs"
443

    
444

    
445
class Tasklet:
446
  """Tasklet base class.
447

448
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
449
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
450
  tasklets know nothing about locks.
451

452
  Subclasses must follow these rules:
453
    - Implement CheckPrereq
454
    - Implement Exec
455

456
  """
457
  def __init__(self, lu):
458
    self.lu = lu
459

    
460
    # Shortcuts
461
    self.cfg = lu.cfg
462
    self.rpc = lu.rpc
463

    
464
  def CheckPrereq(self):
465
    """Check prerequisites for this tasklets.
466

467
    This method should check whether the prerequisites for the execution of
468
    this tasklet are fulfilled. It can do internode communication, but it
469
    should be idempotent - no cluster or system changes are allowed.
470

471
    The method should raise errors.OpPrereqError in case something is not
472
    fulfilled. Its return value is ignored.
473

474
    This method should also update all parameters to their canonical form if it
475
    hasn't been done before.
476

477
    """
478
    pass
479

    
480
  def Exec(self, feedback_fn):
481
    """Execute the tasklet.
482

483
    This method should implement the actual work. It should raise
484
    errors.OpExecError for failures that are somewhat dealt with in code, or
485
    expected.
486

487
    """
488
    raise NotImplementedError
489

    
490

    
491
class _QueryBase:
492
  """Base for query utility classes.
493

494
  """
495
  #: Attribute holding field definitions
496
  FIELDS = None
497

    
498
  def __init__(self, names, fields, use_locking):
499
    """Initializes this class.
500

501
    """
502
    self.names = names
503
    self.use_locking = use_locking
504

    
505
    self.query = query.Query(self.FIELDS, fields)
506
    self.requested_data = self.query.RequestedData()
507

    
508
  @classmethod
509
  def FieldsQuery(cls, fields):
510
    """Returns list of available fields.
511

512
    @return: List of L{objects.QueryFieldDefinition}
513

514
    """
515
    if fields is None:
516
      # Client requests all fields
517
      fdefs = query.GetAllFields(cls.FIELDS.values())
518
    else:
519
      fdefs = query.Query(cls.FIELDS, fields).GetFields()
520

    
521
    return {
522
      "fields": [fdef.ToDict() for fdef in fdefs],
523
      }
524

    
525
  def ExpandNames(self, lu):
526
    """Expand names for this query.
527

528
    See L{LogicalUnit.ExpandNames}.
529

530
    """
531
    raise NotImplementedError()
532

    
533
  def DeclareLocks(self, level):
534
    """Declare locks for this query.
535

536
    See L{LogicalUnit.DeclareLocks}.
537

538
    """
539
    raise NotImplementedError()
540

    
541
  def _GetQueryData(self, lu):
542
    """Collects all data for this query.
543

544
    @return: Query data object
545

546
    """
547
    raise NotImplementedError()
548

    
549
  def NewStyleQuery(self, lu):
550
    """Collect data and execute query.
551

552
    """
553
    data = self._GetQueryData(lu)
554

    
555
    return {
556
      "data": self.query.Query(data),
557
      "fields": [fdef.ToDict()
558
                 for fdef in self.query.GetFields()],
559
      }
560

    
561
  def OldStyleQuery(self, lu):
562
    """Collect data and execute query.
563

564
    """
565
    return self.query.OldStyleQuery(self._GetQueryData(lu))
566

    
567

    
568
def _GetWantedNodes(lu, nodes):
569
  """Returns list of checked and expanded node names.
570

571
  @type lu: L{LogicalUnit}
572
  @param lu: the logical unit on whose behalf we execute
573
  @type nodes: list
574
  @param nodes: list of node names or None for all nodes
575
  @rtype: list
576
  @return: the list of nodes, sorted
577
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
578

579
  """
580
  if nodes:
581
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
582

    
583
  return utils.NiceSort(lu.cfg.GetNodeList())
584

    
585

    
586
def _GetWantedInstances(lu, instances):
587
  """Returns list of checked and expanded instance names.
588

589
  @type lu: L{LogicalUnit}
590
  @param lu: the logical unit on whose behalf we execute
591
  @type instances: list
592
  @param instances: list of instance names or None for all instances
593
  @rtype: list
594
  @return: the list of instances, sorted
595
  @raise errors.OpPrereqError: if the instances parameter is wrong type
596
  @raise errors.OpPrereqError: if any of the passed instances is not found
597

598
  """
599
  if instances:
600
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
601
  else:
602
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
603
  return wanted
604

    
605

    
606
def _GetUpdatedParams(old_params, update_dict,
607
                      use_default=True, use_none=False):
608
  """Return the new version of a parameter dictionary.
609

610
  @type old_params: dict
611
  @param old_params: old parameters
612
  @type update_dict: dict
613
  @param update_dict: dict containing new parameter values, or
614
      constants.VALUE_DEFAULT to reset the parameter to its default
615
      value
616
  @param use_default: boolean
617
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
618
      values as 'to be deleted' values
619
  @param use_none: boolean
620
  @type use_none: whether to recognise C{None} values as 'to be
621
      deleted' values
622
  @rtype: dict
623
  @return: the new parameter dictionary
624

625
  """
626
  params_copy = copy.deepcopy(old_params)
627
  for key, val in update_dict.iteritems():
628
    if ((use_default and val == constants.VALUE_DEFAULT) or
629
        (use_none and val is None)):
630
      try:
631
        del params_copy[key]
632
      except KeyError:
633
        pass
634
    else:
635
      params_copy[key] = val
636
  return params_copy
637

    
638

    
639
def _CheckOutputFields(static, dynamic, selected):
640
  """Checks whether all selected fields are valid.
641

642
  @type static: L{utils.FieldSet}
643
  @param static: static fields set
644
  @type dynamic: L{utils.FieldSet}
645
  @param dynamic: dynamic fields set
646

647
  """
648
  f = utils.FieldSet()
649
  f.Extend(static)
650
  f.Extend(dynamic)
651

    
652
  delta = f.NonMatching(selected)
653
  if delta:
654
    raise errors.OpPrereqError("Unknown output fields selected: %s"
655
                               % ",".join(delta), errors.ECODE_INVAL)
656

    
657

    
658
def _CheckGlobalHvParams(params):
659
  """Validates that given hypervisor params are not global ones.
660

661
  This will ensure that instances don't get customised versions of
662
  global params.
663

664
  """
665
  used_globals = constants.HVC_GLOBALS.intersection(params)
666
  if used_globals:
667
    msg = ("The following hypervisor parameters are global and cannot"
668
           " be customized at instance level, please modify them at"
669
           " cluster level: %s" % utils.CommaJoin(used_globals))
670
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
671

    
672

    
673
def _CheckNodeOnline(lu, node, msg=None):
674
  """Ensure that a given node is online.
675

676
  @param lu: the LU on behalf of which we make the check
677
  @param node: the node to check
678
  @param msg: if passed, should be a message to replace the default one
679
  @raise errors.OpPrereqError: if the node is offline
680

681
  """
682
  if msg is None:
683
    msg = "Can't use offline node"
684
  if lu.cfg.GetNodeInfo(node).offline:
685
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
686

    
687

    
688
def _CheckNodeNotDrained(lu, node):
689
  """Ensure that a given node is not drained.
690

691
  @param lu: the LU on behalf of which we make the check
692
  @param node: the node to check
693
  @raise errors.OpPrereqError: if the node is drained
694

695
  """
696
  if lu.cfg.GetNodeInfo(node).drained:
697
    raise errors.OpPrereqError("Can't use drained node %s" % node,
698
                               errors.ECODE_STATE)
699

    
700

    
701
def _CheckNodeVmCapable(lu, node):
702
  """Ensure that a given node is vm capable.
703

704
  @param lu: the LU on behalf of which we make the check
705
  @param node: the node to check
706
  @raise errors.OpPrereqError: if the node is not vm capable
707

708
  """
709
  if not lu.cfg.GetNodeInfo(node).vm_capable:
710
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
711
                               errors.ECODE_STATE)
712

    
713

    
714
def _CheckNodeHasOS(lu, node, os_name, force_variant):
715
  """Ensure that a node supports a given OS.
716

717
  @param lu: the LU on behalf of which we make the check
718
  @param node: the node to check
719
  @param os_name: the OS to query about
720
  @param force_variant: whether to ignore variant errors
721
  @raise errors.OpPrereqError: if the node is not supporting the OS
722

723
  """
724
  result = lu.rpc.call_os_get(node, os_name)
725
  result.Raise("OS '%s' not in supported OS list for node %s" %
726
               (os_name, node),
727
               prereq=True, ecode=errors.ECODE_INVAL)
728
  if not force_variant:
729
    _CheckOSVariant(result.payload, os_name)
730

    
731

    
732
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
733
  """Ensure that a node has the given secondary ip.
734

735
  @type lu: L{LogicalUnit}
736
  @param lu: the LU on behalf of which we make the check
737
  @type node: string
738
  @param node: the node to check
739
  @type secondary_ip: string
740
  @param secondary_ip: the ip to check
741
  @type prereq: boolean
742
  @param prereq: whether to throw a prerequisite or an execute error
743
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
744
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
745

746
  """
747
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
748
  result.Raise("Failure checking secondary ip on node %s" % node,
749
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
750
  if not result.payload:
751
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
752
           " please fix and re-run this command" % secondary_ip)
753
    if prereq:
754
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
755
    else:
756
      raise errors.OpExecError(msg)
757

    
758

    
759
def _RequireFileStorage():
760
  """Checks that file storage is enabled.
761

762
  @raise errors.OpPrereqError: when file storage is disabled
763

764
  """
765
  if not constants.ENABLE_FILE_STORAGE:
766
    raise errors.OpPrereqError("File storage disabled at configure time",
767
                               errors.ECODE_INVAL)
768

    
769

    
770
def _CheckDiskTemplate(template):
771
  """Ensure a given disk template is valid.
772

773
  """
774
  if template not in constants.DISK_TEMPLATES:
775
    msg = ("Invalid disk template name '%s', valid templates are: %s" %
776
           (template, utils.CommaJoin(constants.DISK_TEMPLATES)))
777
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
778
  if template == constants.DT_FILE:
779
    _RequireFileStorage()
780
  return True
781

    
782

    
783
def _CheckStorageType(storage_type):
784
  """Ensure a given storage type is valid.
785

786
  """
787
  if storage_type not in constants.VALID_STORAGE_TYPES:
788
    raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
789
                               errors.ECODE_INVAL)
790
  if storage_type == constants.ST_FILE:
791
    _RequireFileStorage()
792
  return True
793

    
794

    
795
def _GetClusterDomainSecret():
796
  """Reads the cluster domain secret.
797

798
  """
799
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
800
                               strict=True)
801

    
802

    
803
def _CheckInstanceDown(lu, instance, reason):
804
  """Ensure that an instance is not running."""
805
  if instance.admin_up:
806
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
807
                               (instance.name, reason), errors.ECODE_STATE)
808

    
809
  pnode = instance.primary_node
810
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
811
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
812
              prereq=True, ecode=errors.ECODE_ENVIRON)
813

    
814
  if instance.name in ins_l.payload:
815
    raise errors.OpPrereqError("Instance %s is running, %s" %
816
                               (instance.name, reason), errors.ECODE_STATE)
817

    
818

    
819
def _ExpandItemName(fn, name, kind):
820
  """Expand an item name.
821

822
  @param fn: the function to use for expansion
823
  @param name: requested item name
824
  @param kind: text description ('Node' or 'Instance')
825
  @return: the resolved (full) name
826
  @raise errors.OpPrereqError: if the item is not found
827

828
  """
829
  full_name = fn(name)
830
  if full_name is None:
831
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
832
                               errors.ECODE_NOENT)
833
  return full_name
834

    
835

    
836
def _ExpandNodeName(cfg, name):
837
  """Wrapper over L{_ExpandItemName} for nodes."""
838
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
839

    
840

    
841
def _ExpandInstanceName(cfg, name):
842
  """Wrapper over L{_ExpandItemName} for instance."""
843
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
844

    
845

    
846
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
847
                          memory, vcpus, nics, disk_template, disks,
848
                          bep, hvp, hypervisor_name):
849
  """Builds instance related env variables for hooks
850

851
  This builds the hook environment from individual variables.
852

853
  @type name: string
854
  @param name: the name of the instance
855
  @type primary_node: string
856
  @param primary_node: the name of the instance's primary node
857
  @type secondary_nodes: list
858
  @param secondary_nodes: list of secondary nodes as strings
859
  @type os_type: string
860
  @param os_type: the name of the instance's OS
861
  @type status: boolean
862
  @param status: the should_run status of the instance
863
  @type memory: string
864
  @param memory: the memory size of the instance
865
  @type vcpus: string
866
  @param vcpus: the count of VCPUs the instance has
867
  @type nics: list
868
  @param nics: list of tuples (ip, mac, mode, link) representing
869
      the NICs the instance has
870
  @type disk_template: string
871
  @param disk_template: the disk template of the instance
872
  @type disks: list
873
  @param disks: the list of (size, mode) pairs
874
  @type bep: dict
875
  @param bep: the backend parameters for the instance
876
  @type hvp: dict
877
  @param hvp: the hypervisor parameters for the instance
878
  @type hypervisor_name: string
879
  @param hypervisor_name: the hypervisor for the instance
880
  @rtype: dict
881
  @return: the hook environment for this instance
882

883
  """
884
  if status:
885
    str_status = "up"
886
  else:
887
    str_status = "down"
888
  env = {
889
    "OP_TARGET": name,
890
    "INSTANCE_NAME": name,
891
    "INSTANCE_PRIMARY": primary_node,
892
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
893
    "INSTANCE_OS_TYPE": os_type,
894
    "INSTANCE_STATUS": str_status,
895
    "INSTANCE_MEMORY": memory,
896
    "INSTANCE_VCPUS": vcpus,
897
    "INSTANCE_DISK_TEMPLATE": disk_template,
898
    "INSTANCE_HYPERVISOR": hypervisor_name,
899
  }
900

    
901
  if nics:
902
    nic_count = len(nics)
903
    for idx, (ip, mac, mode, link) in enumerate(nics):
904
      if ip is None:
905
        ip = ""
906
      env["INSTANCE_NIC%d_IP" % idx] = ip
907
      env["INSTANCE_NIC%d_MAC" % idx] = mac
908
      env["INSTANCE_NIC%d_MODE" % idx] = mode
909
      env["INSTANCE_NIC%d_LINK" % idx] = link
910
      if mode == constants.NIC_MODE_BRIDGED:
911
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
912
  else:
913
    nic_count = 0
914

    
915
  env["INSTANCE_NIC_COUNT"] = nic_count
916

    
917
  if disks:
918
    disk_count = len(disks)
919
    for idx, (size, mode) in enumerate(disks):
920
      env["INSTANCE_DISK%d_SIZE" % idx] = size
921
      env["INSTANCE_DISK%d_MODE" % idx] = mode
922
  else:
923
    disk_count = 0
924

    
925
  env["INSTANCE_DISK_COUNT"] = disk_count
926

    
927
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
928
    for key, value in source.items():
929
      env["INSTANCE_%s_%s" % (kind, key)] = value
930

    
931
  return env
932

    
933

    
934
def _NICListToTuple(lu, nics):
935
  """Build a list of nic information tuples.
936

937
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
938
  value in LUQueryInstanceData.
939

940
  @type lu:  L{LogicalUnit}
941
  @param lu: the logical unit on whose behalf we execute
942
  @type nics: list of L{objects.NIC}
943
  @param nics: list of nics to convert to hooks tuples
944

945
  """
946
  hooks_nics = []
947
  cluster = lu.cfg.GetClusterInfo()
948
  for nic in nics:
949
    ip = nic.ip
950
    mac = nic.mac
951
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
952
    mode = filled_params[constants.NIC_MODE]
953
    link = filled_params[constants.NIC_LINK]
954
    hooks_nics.append((ip, mac, mode, link))
955
  return hooks_nics
956

    
957

    
958
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
959
  """Builds instance related env variables for hooks from an object.
960

961
  @type lu: L{LogicalUnit}
962
  @param lu: the logical unit on whose behalf we execute
963
  @type instance: L{objects.Instance}
964
  @param instance: the instance for which we should build the
965
      environment
966
  @type override: dict
967
  @param override: dictionary with key/values that will override
968
      our values
969
  @rtype: dict
970
  @return: the hook environment dictionary
971

972
  """
973
  cluster = lu.cfg.GetClusterInfo()
974
  bep = cluster.FillBE(instance)
975
  hvp = cluster.FillHV(instance)
976
  args = {
977
    'name': instance.name,
978
    'primary_node': instance.primary_node,
979
    'secondary_nodes': instance.secondary_nodes,
980
    'os_type': instance.os,
981
    'status': instance.admin_up,
982
    'memory': bep[constants.BE_MEMORY],
983
    'vcpus': bep[constants.BE_VCPUS],
984
    'nics': _NICListToTuple(lu, instance.nics),
985
    'disk_template': instance.disk_template,
986
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
987
    'bep': bep,
988
    'hvp': hvp,
989
    'hypervisor_name': instance.hypervisor,
990
  }
991
  if override:
992
    args.update(override)
993
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
994

    
995

    
996
def _AdjustCandidatePool(lu, exceptions):
997
  """Adjust the candidate pool after node operations.
998

999
  """
1000
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1001
  if mod_list:
1002
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1003
               utils.CommaJoin(node.name for node in mod_list))
1004
    for name in mod_list:
1005
      lu.context.ReaddNode(name)
1006
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1007
  if mc_now > mc_max:
1008
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1009
               (mc_now, mc_max))
1010

    
1011

    
1012
def _DecideSelfPromotion(lu, exceptions=None):
1013
  """Decide whether I should promote myself as a master candidate.
1014

1015
  """
1016
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1017
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1018
  # the new node will increase mc_max with one, so:
1019
  mc_should = min(mc_should + 1, cp_size)
1020
  return mc_now < mc_should
1021

    
1022

    
1023
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1024
  """Check that the brigdes needed by a list of nics exist.
1025

1026
  """
1027
  cluster = lu.cfg.GetClusterInfo()
1028
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1029
  brlist = [params[constants.NIC_LINK] for params in paramslist
1030
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1031
  if brlist:
1032
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1033
    result.Raise("Error checking bridges on destination node '%s'" %
1034
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1035

    
1036

    
1037
def _CheckInstanceBridgesExist(lu, instance, node=None):
1038
  """Check that the brigdes needed by an instance exist.
1039

1040
  """
1041
  if node is None:
1042
    node = instance.primary_node
1043
  _CheckNicsBridgesExist(lu, instance.nics, node)
1044

    
1045

    
1046
def _CheckOSVariant(os_obj, name):
1047
  """Check whether an OS name conforms to the os variants specification.
1048

1049
  @type os_obj: L{objects.OS}
1050
  @param os_obj: OS object to check
1051
  @type name: string
1052
  @param name: OS name passed by the user, to check for validity
1053

1054
  """
1055
  if not os_obj.supported_variants:
1056
    return
1057
  variant = objects.OS.GetVariant(name)
1058
  if not variant:
1059
    raise errors.OpPrereqError("OS name must include a variant",
1060
                               errors.ECODE_INVAL)
1061

    
1062
  if variant not in os_obj.supported_variants:
1063
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1064

    
1065

    
1066
def _GetNodeInstancesInner(cfg, fn):
1067
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1068

    
1069

    
1070
def _GetNodeInstances(cfg, node_name):
1071
  """Returns a list of all primary and secondary instances on a node.
1072

1073
  """
1074

    
1075
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1076

    
1077

    
1078
def _GetNodePrimaryInstances(cfg, node_name):
1079
  """Returns primary instances on a node.
1080

1081
  """
1082
  return _GetNodeInstancesInner(cfg,
1083
                                lambda inst: node_name == inst.primary_node)
1084

    
1085

    
1086
def _GetNodeSecondaryInstances(cfg, node_name):
1087
  """Returns secondary instances on a node.
1088

1089
  """
1090
  return _GetNodeInstancesInner(cfg,
1091
                                lambda inst: node_name in inst.secondary_nodes)
1092

    
1093

    
1094
def _GetStorageTypeArgs(cfg, storage_type):
1095
  """Returns the arguments for a storage type.
1096

1097
  """
1098
  # Special case for file storage
1099
  if storage_type == constants.ST_FILE:
1100
    # storage.FileStorage wants a list of storage directories
1101
    return [[cfg.GetFileStorageDir()]]
1102

    
1103
  return []
1104

    
1105

    
1106
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
1107
  faulty = []
1108

    
1109
  for dev in instance.disks:
1110
    cfg.SetDiskID(dev, node_name)
1111

    
1112
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
1113
  result.Raise("Failed to get disk status from node %s" % node_name,
1114
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1115

    
1116
  for idx, bdev_status in enumerate(result.payload):
1117
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1118
      faulty.append(idx)
1119

    
1120
  return faulty
1121

    
1122

    
1123
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1124
  """Check the sanity of iallocator and node arguments and use the
1125
  cluster-wide iallocator if appropriate.
1126

1127
  Check that at most one of (iallocator, node) is specified. If none is
1128
  specified, then the LU's opcode's iallocator slot is filled with the
1129
  cluster-wide default iallocator.
1130

1131
  @type iallocator_slot: string
1132
  @param iallocator_slot: the name of the opcode iallocator slot
1133
  @type node_slot: string
1134
  @param node_slot: the name of the opcode target node slot
1135

1136
  """
1137
  node = getattr(lu.op, node_slot, None)
1138
  iallocator = getattr(lu.op, iallocator_slot, None)
1139

    
1140
  if node is not None and iallocator is not None:
1141
    raise errors.OpPrereqError("Do not specify both, iallocator and node.",
1142
                               errors.ECODE_INVAL)
1143
  elif node is None and iallocator is None:
1144
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1145
    if default_iallocator:
1146
      setattr(lu.op, iallocator_slot, default_iallocator)
1147
    else:
1148
      raise errors.OpPrereqError("No iallocator or node given and no"
1149
                                 " cluster-wide default iallocator found."
1150
                                 " Please specify either an iallocator or a"
1151
                                 " node, or set a cluster-wide default"
1152
                                 " iallocator.")
1153

    
1154

    
1155
class LUPostInitCluster(LogicalUnit):
1156
  """Logical unit for running hooks after cluster initialization.
1157

1158
  """
1159
  HPATH = "cluster-init"
1160
  HTYPE = constants.HTYPE_CLUSTER
1161

    
1162
  def BuildHooksEnv(self):
1163
    """Build hooks env.
1164

1165
    """
1166
    env = {"OP_TARGET": self.cfg.GetClusterName()}
1167
    mn = self.cfg.GetMasterNode()
1168
    return env, [], [mn]
1169

    
1170
  def Exec(self, feedback_fn):
1171
    """Nothing to do.
1172

1173
    """
1174
    return True
1175

    
1176

    
1177
class LUDestroyCluster(LogicalUnit):
1178
  """Logical unit for destroying the cluster.
1179

1180
  """
1181
  HPATH = "cluster-destroy"
1182
  HTYPE = constants.HTYPE_CLUSTER
1183

    
1184
  def BuildHooksEnv(self):
1185
    """Build hooks env.
1186

1187
    """
1188
    env = {"OP_TARGET": self.cfg.GetClusterName()}
1189
    return env, [], []
1190

    
1191
  def CheckPrereq(self):
1192
    """Check prerequisites.
1193

1194
    This checks whether the cluster is empty.
1195

1196
    Any errors are signaled by raising errors.OpPrereqError.
1197

1198
    """
1199
    master = self.cfg.GetMasterNode()
1200

    
1201
    nodelist = self.cfg.GetNodeList()
1202
    if len(nodelist) != 1 or nodelist[0] != master:
1203
      raise errors.OpPrereqError("There are still %d node(s) in"
1204
                                 " this cluster." % (len(nodelist) - 1),
1205
                                 errors.ECODE_INVAL)
1206
    instancelist = self.cfg.GetInstanceList()
1207
    if instancelist:
1208
      raise errors.OpPrereqError("There are still %d instance(s) in"
1209
                                 " this cluster." % len(instancelist),
1210
                                 errors.ECODE_INVAL)
1211

    
1212
  def Exec(self, feedback_fn):
1213
    """Destroys the cluster.
1214

1215
    """
1216
    master = self.cfg.GetMasterNode()
1217

    
1218
    # Run post hooks on master node before it's removed
1219
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
1220
    try:
1221
      hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
1222
    except:
1223
      # pylint: disable-msg=W0702
1224
      self.LogWarning("Errors occurred running hooks on %s" % master)
1225

    
1226
    result = self.rpc.call_node_stop_master(master, False)
1227
    result.Raise("Could not disable the master role")
1228

    
1229
    return master
1230

    
1231

    
1232
def _VerifyCertificate(filename):
1233
  """Verifies a certificate for LUVerifyCluster.
1234

1235
  @type filename: string
1236
  @param filename: Path to PEM file
1237

1238
  """
1239
  try:
1240
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1241
                                           utils.ReadFile(filename))
1242
  except Exception, err: # pylint: disable-msg=W0703
1243
    return (LUVerifyCluster.ETYPE_ERROR,
1244
            "Failed to load X509 certificate %s: %s" % (filename, err))
1245

    
1246
  (errcode, msg) = \
1247
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1248
                                constants.SSL_CERT_EXPIRATION_ERROR)
1249

    
1250
  if msg:
1251
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1252
  else:
1253
    fnamemsg = None
1254

    
1255
  if errcode is None:
1256
    return (None, fnamemsg)
1257
  elif errcode == utils.CERT_WARNING:
1258
    return (LUVerifyCluster.ETYPE_WARNING, fnamemsg)
1259
  elif errcode == utils.CERT_ERROR:
1260
    return (LUVerifyCluster.ETYPE_ERROR, fnamemsg)
1261

    
1262
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1263

    
1264

    
1265
class LUVerifyCluster(LogicalUnit):
1266
  """Verifies the cluster status.
1267

1268
  """
1269
  HPATH = "cluster-verify"
1270
  HTYPE = constants.HTYPE_CLUSTER
1271
  _OP_PARAMS = [
1272
    ("skip_checks", ht.EmptyList,
1273
     ht.TListOf(ht.TElemOf(constants.VERIFY_OPTIONAL_CHECKS))),
1274
    ("verbose", False, ht.TBool),
1275
    ("error_codes", False, ht.TBool),
1276
    ("debug_simulate_errors", False, ht.TBool),
1277
    ]
1278
  REQ_BGL = False
1279

    
1280
  TCLUSTER = "cluster"
1281
  TNODE = "node"
1282
  TINSTANCE = "instance"
1283

    
1284
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1285
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1286
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1287
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1288
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1289
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1290
  EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
1291
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1292
  ENODEDRBD = (TNODE, "ENODEDRBD")
1293
  ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
1294
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1295
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1296
  ENODEHV = (TNODE, "ENODEHV")
1297
  ENODELVM = (TNODE, "ENODELVM")
1298
  ENODEN1 = (TNODE, "ENODEN1")
1299
  ENODENET = (TNODE, "ENODENET")
1300
  ENODEOS = (TNODE, "ENODEOS")
1301
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1302
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1303
  ENODERPC = (TNODE, "ENODERPC")
1304
  ENODESSH = (TNODE, "ENODESSH")
1305
  ENODEVERSION = (TNODE, "ENODEVERSION")
1306
  ENODESETUP = (TNODE, "ENODESETUP")
1307
  ENODETIME = (TNODE, "ENODETIME")
1308

    
1309
  ETYPE_FIELD = "code"
1310
  ETYPE_ERROR = "ERROR"
1311
  ETYPE_WARNING = "WARNING"
1312

    
1313
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1314

    
1315
  class NodeImage(object):
1316
    """A class representing the logical and physical status of a node.
1317

1318
    @type name: string
1319
    @ivar name: the node name to which this object refers
1320
    @ivar volumes: a structure as returned from
1321
        L{ganeti.backend.GetVolumeList} (runtime)
1322
    @ivar instances: a list of running instances (runtime)
1323
    @ivar pinst: list of configured primary instances (config)
1324
    @ivar sinst: list of configured secondary instances (config)
1325
    @ivar sbp: diction of {secondary-node: list of instances} of all peers
1326
        of this node (config)
1327
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1328
    @ivar dfree: free disk, as reported by the node (runtime)
1329
    @ivar offline: the offline status (config)
1330
    @type rpc_fail: boolean
1331
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1332
        not whether the individual keys were correct) (runtime)
1333
    @type lvm_fail: boolean
1334
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1335
    @type hyp_fail: boolean
1336
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1337
    @type ghost: boolean
1338
    @ivar ghost: whether this is a known node or not (config)
1339
    @type os_fail: boolean
1340
    @ivar os_fail: whether the RPC call didn't return valid OS data
1341
    @type oslist: list
1342
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1343
    @type vm_capable: boolean
1344
    @ivar vm_capable: whether the node can host instances
1345

1346
    """
1347
    def __init__(self, offline=False, name=None, vm_capable=True):
1348
      self.name = name
1349
      self.volumes = {}
1350
      self.instances = []
1351
      self.pinst = []
1352
      self.sinst = []
1353
      self.sbp = {}
1354
      self.mfree = 0
1355
      self.dfree = 0
1356
      self.offline = offline
1357
      self.vm_capable = vm_capable
1358
      self.rpc_fail = False
1359
      self.lvm_fail = False
1360
      self.hyp_fail = False
1361
      self.ghost = False
1362
      self.os_fail = False
1363
      self.oslist = {}
1364

    
1365
  def ExpandNames(self):
1366
    self.needed_locks = {
1367
      locking.LEVEL_NODE: locking.ALL_SET,
1368
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1369
    }
1370
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1371

    
1372
  def _Error(self, ecode, item, msg, *args, **kwargs):
1373
    """Format an error message.
1374

1375
    Based on the opcode's error_codes parameter, either format a
1376
    parseable error code, or a simpler error string.
1377

1378
    This must be called only from Exec and functions called from Exec.
1379

1380
    """
1381
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1382
    itype, etxt = ecode
1383
    # first complete the msg
1384
    if args:
1385
      msg = msg % args
1386
    # then format the whole message
1387
    if self.op.error_codes:
1388
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1389
    else:
1390
      if item:
1391
        item = " " + item
1392
      else:
1393
        item = ""
1394
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1395
    # and finally report it via the feedback_fn
1396
    self._feedback_fn("  - %s" % msg)
1397

    
1398
  def _ErrorIf(self, cond, *args, **kwargs):
1399
    """Log an error message if the passed condition is True.
1400

1401
    """
1402
    cond = bool(cond) or self.op.debug_simulate_errors
1403
    if cond:
1404
      self._Error(*args, **kwargs)
1405
    # do not mark the operation as failed for WARN cases only
1406
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1407
      self.bad = self.bad or cond
1408

    
1409
  def _VerifyNode(self, ninfo, nresult):
1410
    """Perform some basic validation on data returned from a node.
1411

1412
      - check the result data structure is well formed and has all the
1413
        mandatory fields
1414
      - check ganeti version
1415

1416
    @type ninfo: L{objects.Node}
1417
    @param ninfo: the node to check
1418
    @param nresult: the results from the node
1419
    @rtype: boolean
1420
    @return: whether overall this call was successful (and we can expect
1421
         reasonable values in the respose)
1422

1423
    """
1424
    node = ninfo.name
1425
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1426

    
1427
    # main result, nresult should be a non-empty dict
1428
    test = not nresult or not isinstance(nresult, dict)
1429
    _ErrorIf(test, self.ENODERPC, node,
1430
                  "unable to verify node: no data returned")
1431
    if test:
1432
      return False
1433

    
1434
    # compares ganeti version
1435
    local_version = constants.PROTOCOL_VERSION
1436
    remote_version = nresult.get("version", None)
1437
    test = not (remote_version and
1438
                isinstance(remote_version, (list, tuple)) and
1439
                len(remote_version) == 2)
1440
    _ErrorIf(test, self.ENODERPC, node,
1441
             "connection to node returned invalid data")
1442
    if test:
1443
      return False
1444

    
1445
    test = local_version != remote_version[0]
1446
    _ErrorIf(test, self.ENODEVERSION, node,
1447
             "incompatible protocol versions: master %s,"
1448
             " node %s", local_version, remote_version[0])
1449
    if test:
1450
      return False
1451

    
1452
    # node seems compatible, we can actually try to look into its results
1453

    
1454
    # full package version
1455
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1456
                  self.ENODEVERSION, node,
1457
                  "software version mismatch: master %s, node %s",
1458
                  constants.RELEASE_VERSION, remote_version[1],
1459
                  code=self.ETYPE_WARNING)
1460

    
1461
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1462
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1463
      for hv_name, hv_result in hyp_result.iteritems():
1464
        test = hv_result is not None
1465
        _ErrorIf(test, self.ENODEHV, node,
1466
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1467

    
1468
    test = nresult.get(constants.NV_NODESETUP,
1469
                           ["Missing NODESETUP results"])
1470
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1471
             "; ".join(test))
1472

    
1473
    return True
1474

    
1475
  def _VerifyNodeTime(self, ninfo, nresult,
1476
                      nvinfo_starttime, nvinfo_endtime):
1477
    """Check the node time.
1478

1479
    @type ninfo: L{objects.Node}
1480
    @param ninfo: the node to check
1481
    @param nresult: the remote results for the node
1482
    @param nvinfo_starttime: the start time of the RPC call
1483
    @param nvinfo_endtime: the end time of the RPC call
1484

1485
    """
1486
    node = ninfo.name
1487
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1488

    
1489
    ntime = nresult.get(constants.NV_TIME, None)
1490
    try:
1491
      ntime_merged = utils.MergeTime(ntime)
1492
    except (ValueError, TypeError):
1493
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1494
      return
1495

    
1496
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1497
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1498
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1499
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1500
    else:
1501
      ntime_diff = None
1502

    
1503
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1504
             "Node time diverges by at least %s from master node time",
1505
             ntime_diff)
1506

    
1507
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1508
    """Check the node time.
1509

1510
    @type ninfo: L{objects.Node}
1511
    @param ninfo: the node to check
1512
    @param nresult: the remote results for the node
1513
    @param vg_name: the configured VG name
1514

1515
    """
1516
    if vg_name is None:
1517
      return
1518

    
1519
    node = ninfo.name
1520
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1521

    
1522
    # checks vg existence and size > 20G
1523
    vglist = nresult.get(constants.NV_VGLIST, None)
1524
    test = not vglist
1525
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1526
    if not test:
1527
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1528
                                            constants.MIN_VG_SIZE)
1529
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1530

    
1531
    # check pv names
1532
    pvlist = nresult.get(constants.NV_PVLIST, None)
1533
    test = pvlist is None
1534
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1535
    if not test:
1536
      # check that ':' is not present in PV names, since it's a
1537
      # special character for lvcreate (denotes the range of PEs to
1538
      # use on the PV)
1539
      for _, pvname, owner_vg in pvlist:
1540
        test = ":" in pvname
1541
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1542
                 " '%s' of VG '%s'", pvname, owner_vg)
1543

    
1544
  def _VerifyNodeNetwork(self, ninfo, nresult):
1545
    """Check the node time.
1546

1547
    @type ninfo: L{objects.Node}
1548
    @param ninfo: the node to check
1549
    @param nresult: the remote results for the node
1550

1551
    """
1552
    node = ninfo.name
1553
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1554

    
1555
    test = constants.NV_NODELIST not in nresult
1556
    _ErrorIf(test, self.ENODESSH, node,
1557
             "node hasn't returned node ssh connectivity data")
1558
    if not test:
1559
      if nresult[constants.NV_NODELIST]:
1560
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1561
          _ErrorIf(True, self.ENODESSH, node,
1562
                   "ssh communication with node '%s': %s", a_node, a_msg)
1563

    
1564
    test = constants.NV_NODENETTEST not in nresult
1565
    _ErrorIf(test, self.ENODENET, node,
1566
             "node hasn't returned node tcp connectivity data")
1567
    if not test:
1568
      if nresult[constants.NV_NODENETTEST]:
1569
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1570
        for anode in nlist:
1571
          _ErrorIf(True, self.ENODENET, node,
1572
                   "tcp communication with node '%s': %s",
1573
                   anode, nresult[constants.NV_NODENETTEST][anode])
1574

    
1575
    test = constants.NV_MASTERIP not in nresult
1576
    _ErrorIf(test, self.ENODENET, node,
1577
             "node hasn't returned node master IP reachability data")
1578
    if not test:
1579
      if not nresult[constants.NV_MASTERIP]:
1580
        if node == self.master_node:
1581
          msg = "the master node cannot reach the master IP (not configured?)"
1582
        else:
1583
          msg = "cannot reach the master IP"
1584
        _ErrorIf(True, self.ENODENET, node, msg)
1585

    
1586
  def _VerifyInstance(self, instance, instanceconfig, node_image,
1587
                      diskstatus):
1588
    """Verify an instance.
1589

1590
    This function checks to see if the required block devices are
1591
    available on the instance's node.
1592

1593
    """
1594
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1595
    node_current = instanceconfig.primary_node
1596

    
1597
    node_vol_should = {}
1598
    instanceconfig.MapLVsByNode(node_vol_should)
1599

    
1600
    for node in node_vol_should:
1601
      n_img = node_image[node]
1602
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1603
        # ignore missing volumes on offline or broken nodes
1604
        continue
1605
      for volume in node_vol_should[node]:
1606
        test = volume not in n_img.volumes
1607
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1608
                 "volume %s missing on node %s", volume, node)
1609

    
1610
    if instanceconfig.admin_up:
1611
      pri_img = node_image[node_current]
1612
      test = instance not in pri_img.instances and not pri_img.offline
1613
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1614
               "instance not running on its primary node %s",
1615
               node_current)
1616

    
1617
    for node, n_img in node_image.items():
1618
      if (not node == node_current):
1619
        test = instance in n_img.instances
1620
        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1621
                 "instance should not run on node %s", node)
1622

    
1623
    diskdata = [(nname, success, status, idx)
1624
                for (nname, disks) in diskstatus.items()
1625
                for idx, (success, status) in enumerate(disks)]
1626

    
1627
    for nname, success, bdev_status, idx in diskdata:
1628
      _ErrorIf(instanceconfig.admin_up and not success,
1629
               self.EINSTANCEFAULTYDISK, instance,
1630
               "couldn't retrieve status for disk/%s on %s: %s",
1631
               idx, nname, bdev_status)
1632
      _ErrorIf((instanceconfig.admin_up and success and
1633
                bdev_status.ldisk_status == constants.LDS_FAULTY),
1634
               self.EINSTANCEFAULTYDISK, instance,
1635
               "disk/%s on %s is faulty", idx, nname)
1636

    
1637
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1638
    """Verify if there are any unknown volumes in the cluster.
1639

1640
    The .os, .swap and backup volumes are ignored. All other volumes are
1641
    reported as unknown.
1642

1643
    @type reserved: L{ganeti.utils.FieldSet}
1644
    @param reserved: a FieldSet of reserved volume names
1645

1646
    """
1647
    for node, n_img in node_image.items():
1648
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1649
        # skip non-healthy nodes
1650
        continue
1651
      for volume in n_img.volumes:
1652
        test = ((node not in node_vol_should or
1653
                volume not in node_vol_should[node]) and
1654
                not reserved.Matches(volume))
1655
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1656
                      "volume %s is unknown", volume)
1657

    
1658
  def _VerifyOrphanInstances(self, instancelist, node_image):
1659
    """Verify the list of running instances.
1660

1661
    This checks what instances are running but unknown to the cluster.
1662

1663
    """
1664
    for node, n_img in node_image.items():
1665
      for o_inst in n_img.instances:
1666
        test = o_inst not in instancelist
1667
        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1668
                      "instance %s on node %s should not exist", o_inst, node)
1669

    
1670
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1671
    """Verify N+1 Memory Resilience.
1672

1673
    Check that if one single node dies we can still start all the
1674
    instances it was primary for.
1675

1676
    """
1677
    for node, n_img in node_image.items():
1678
      # This code checks that every node which is now listed as
1679
      # secondary has enough memory to host all instances it is
1680
      # supposed to should a single other node in the cluster fail.
1681
      # FIXME: not ready for failover to an arbitrary node
1682
      # FIXME: does not support file-backed instances
1683
      # WARNING: we currently take into account down instances as well
1684
      # as up ones, considering that even if they're down someone
1685
      # might want to start them even in the event of a node failure.
1686
      for prinode, instances in n_img.sbp.items():
1687
        needed_mem = 0
1688
        for instance in instances:
1689
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1690
          if bep[constants.BE_AUTO_BALANCE]:
1691
            needed_mem += bep[constants.BE_MEMORY]
1692
        test = n_img.mfree < needed_mem
1693
        self._ErrorIf(test, self.ENODEN1, node,
1694
                      "not enough memory on to accommodate"
1695
                      " failovers should peer node %s fail", prinode)
1696

    
1697
  def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum,
1698
                       master_files):
1699
    """Verifies and computes the node required file checksums.
1700

1701
    @type ninfo: L{objects.Node}
1702
    @param ninfo: the node to check
1703
    @param nresult: the remote results for the node
1704
    @param file_list: required list of files
1705
    @param local_cksum: dictionary of local files and their checksums
1706
    @param master_files: list of files that only masters should have
1707

1708
    """
1709
    node = ninfo.name
1710
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1711

    
1712
    remote_cksum = nresult.get(constants.NV_FILELIST, None)
1713
    test = not isinstance(remote_cksum, dict)
1714
    _ErrorIf(test, self.ENODEFILECHECK, node,
1715
             "node hasn't returned file checksum data")
1716
    if test:
1717
      return
1718

    
1719
    for file_name in file_list:
1720
      node_is_mc = ninfo.master_candidate
1721
      must_have = (file_name not in master_files) or node_is_mc
1722
      # missing
1723
      test1 = file_name not in remote_cksum
1724
      # invalid checksum
1725
      test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1726
      # existing and good
1727
      test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1728
      _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1729
               "file '%s' missing", file_name)
1730
      _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1731
               "file '%s' has wrong checksum", file_name)
1732
      # not candidate and this is not a must-have file
1733
      _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1734
               "file '%s' should not exist on non master"
1735
               " candidates (and the file is outdated)", file_name)
1736
      # all good, except non-master/non-must have combination
1737
      _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1738
               "file '%s' should not exist"
1739
               " on non master candidates", file_name)
1740

    
1741
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
1742
                      drbd_map):
1743
    """Verifies and the node DRBD status.
1744

1745
    @type ninfo: L{objects.Node}
1746
    @param ninfo: the node to check
1747
    @param nresult: the remote results for the node
1748
    @param instanceinfo: the dict of instances
1749
    @param drbd_helper: the configured DRBD usermode helper
1750
    @param drbd_map: the DRBD map as returned by
1751
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
1752

1753
    """
1754
    node = ninfo.name
1755
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1756

    
1757
    if drbd_helper:
1758
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
1759
      test = (helper_result == None)
1760
      _ErrorIf(test, self.ENODEDRBDHELPER, node,
1761
               "no drbd usermode helper returned")
1762
      if helper_result:
1763
        status, payload = helper_result
1764
        test = not status
1765
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
1766
                 "drbd usermode helper check unsuccessful: %s", payload)
1767
        test = status and (payload != drbd_helper)
1768
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
1769
                 "wrong drbd usermode helper: %s", payload)
1770

    
1771
    # compute the DRBD minors
1772
    node_drbd = {}
1773
    for minor, instance in drbd_map[node].items():
1774
      test = instance not in instanceinfo
1775
      _ErrorIf(test, self.ECLUSTERCFG, None,
1776
               "ghost instance '%s' in temporary DRBD map", instance)
1777
        # ghost instance should not be running, but otherwise we
1778
        # don't give double warnings (both ghost instance and
1779
        # unallocated minor in use)
1780
      if test:
1781
        node_drbd[minor] = (instance, False)
1782
      else:
1783
        instance = instanceinfo[instance]
1784
        node_drbd[minor] = (instance.name, instance.admin_up)
1785

    
1786
    # and now check them
1787
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
1788
    test = not isinstance(used_minors, (tuple, list))
1789
    _ErrorIf(test, self.ENODEDRBD, node,
1790
             "cannot parse drbd status file: %s", str(used_minors))
1791
    if test:
1792
      # we cannot check drbd status
1793
      return
1794

    
1795
    for minor, (iname, must_exist) in node_drbd.items():
1796
      test = minor not in used_minors and must_exist
1797
      _ErrorIf(test, self.ENODEDRBD, node,
1798
               "drbd minor %d of instance %s is not active", minor, iname)
1799
    for minor in used_minors:
1800
      test = minor not in node_drbd
1801
      _ErrorIf(test, self.ENODEDRBD, node,
1802
               "unallocated drbd minor %d is in use", minor)
1803

    
1804
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
1805
    """Builds the node OS structures.
1806

1807
    @type ninfo: L{objects.Node}
1808
    @param ninfo: the node to check
1809
    @param nresult: the remote results for the node
1810
    @param nimg: the node image object
1811

1812
    """
1813
    node = ninfo.name
1814
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1815

    
1816
    remote_os = nresult.get(constants.NV_OSLIST, None)
1817
    test = (not isinstance(remote_os, list) or
1818
            not compat.all(isinstance(v, list) and len(v) == 7
1819
                           for v in remote_os))
1820

    
1821
    _ErrorIf(test, self.ENODEOS, node,
1822
             "node hasn't returned valid OS data")
1823

    
1824
    nimg.os_fail = test
1825

    
1826
    if test:
1827
      return
1828

    
1829
    os_dict = {}
1830

    
1831
    for (name, os_path, status, diagnose,
1832
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
1833

    
1834
      if name not in os_dict:
1835
        os_dict[name] = []
1836

    
1837
      # parameters is a list of lists instead of list of tuples due to
1838
      # JSON lacking a real tuple type, fix it:
1839
      parameters = [tuple(v) for v in parameters]
1840
      os_dict[name].append((os_path, status, diagnose,
1841
                            set(variants), set(parameters), set(api_ver)))
1842

    
1843
    nimg.oslist = os_dict
1844

    
1845
  def _VerifyNodeOS(self, ninfo, nimg, base):
1846
    """Verifies the node OS list.
1847

1848
    @type ninfo: L{objects.Node}
1849
    @param ninfo: the node to check
1850
    @param nimg: the node image object
1851
    @param base: the 'template' node we match against (e.g. from the master)
1852

1853
    """
1854
    node = ninfo.name
1855
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1856

    
1857
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
1858

    
1859
    for os_name, os_data in nimg.oslist.items():
1860
      assert os_data, "Empty OS status for OS %s?!" % os_name
1861
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
1862
      _ErrorIf(not f_status, self.ENODEOS, node,
1863
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
1864
      _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
1865
               "OS '%s' has multiple entries (first one shadows the rest): %s",
1866
               os_name, utils.CommaJoin([v[0] for v in os_data]))
1867
      # this will catched in backend too
1868
      _ErrorIf(compat.any(v >= constants.OS_API_V15 for v in f_api)
1869
               and not f_var, self.ENODEOS, node,
1870
               "OS %s with API at least %d does not declare any variant",
1871
               os_name, constants.OS_API_V15)
1872
      # comparisons with the 'base' image
1873
      test = os_name not in base.oslist
1874
      _ErrorIf(test, self.ENODEOS, node,
1875
               "Extra OS %s not present on reference node (%s)",
1876
               os_name, base.name)
1877
      if test:
1878
        continue
1879
      assert base.oslist[os_name], "Base node has empty OS status?"
1880
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
1881
      if not b_status:
1882
        # base OS is invalid, skipping
1883
        continue
1884
      for kind, a, b in [("API version", f_api, b_api),
1885
                         ("variants list", f_var, b_var),
1886
                         ("parameters", f_param, b_param)]:
1887
        _ErrorIf(a != b, self.ENODEOS, node,
1888
                 "OS %s %s differs from reference node %s: %s vs. %s",
1889
                 kind, os_name, base.name,
1890
                 utils.CommaJoin(a), utils.CommaJoin(b))
1891

    
1892
    # check any missing OSes
1893
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
1894
    _ErrorIf(missing, self.ENODEOS, node,
1895
             "OSes present on reference node %s but missing on this node: %s",
1896
             base.name, utils.CommaJoin(missing))
1897

    
1898
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
1899
    """Verifies and updates the node volume data.
1900

1901
    This function will update a L{NodeImage}'s internal structures
1902
    with data from the remote call.
1903

1904
    @type ninfo: L{objects.Node}
1905
    @param ninfo: the node to check
1906
    @param nresult: the remote results for the node
1907
    @param nimg: the node image object
1908
    @param vg_name: the configured VG name
1909

1910
    """
1911
    node = ninfo.name
1912
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1913

    
1914
    nimg.lvm_fail = True
1915
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1916
    if vg_name is None:
1917
      pass
1918
    elif isinstance(lvdata, basestring):
1919
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1920
               utils.SafeEncode(lvdata))
1921
    elif not isinstance(lvdata, dict):
1922
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1923
    else:
1924
      nimg.volumes = lvdata
1925
      nimg.lvm_fail = False
1926

    
1927
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
1928
    """Verifies and updates the node instance list.
1929

1930
    If the listing was successful, then updates this node's instance
1931
    list. Otherwise, it marks the RPC call as failed for the instance
1932
    list key.
1933

1934
    @type ninfo: L{objects.Node}
1935
    @param ninfo: the node to check
1936
    @param nresult: the remote results for the node
1937
    @param nimg: the node image object
1938

1939
    """
1940
    idata = nresult.get(constants.NV_INSTANCELIST, None)
1941
    test = not isinstance(idata, list)
1942
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
1943
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
1944
    if test:
1945
      nimg.hyp_fail = True
1946
    else:
1947
      nimg.instances = idata
1948

    
1949
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
1950
    """Verifies and computes a node information map
1951

1952
    @type ninfo: L{objects.Node}
1953
    @param ninfo: the node to check
1954
    @param nresult: the remote results for the node
1955
    @param nimg: the node image object
1956
    @param vg_name: the configured VG name
1957

1958
    """
1959
    node = ninfo.name
1960
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1961

    
1962
    # try to read free memory (from the hypervisor)
1963
    hv_info = nresult.get(constants.NV_HVINFO, None)
1964
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
1965
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1966
    if not test:
1967
      try:
1968
        nimg.mfree = int(hv_info["memory_free"])
1969
      except (ValueError, TypeError):
1970
        _ErrorIf(True, self.ENODERPC, node,
1971
                 "node returned invalid nodeinfo, check hypervisor")
1972

    
1973
    # FIXME: devise a free space model for file based instances as well
1974
    if vg_name is not None:
1975
      test = (constants.NV_VGLIST not in nresult or
1976
              vg_name not in nresult[constants.NV_VGLIST])
1977
      _ErrorIf(test, self.ENODELVM, node,
1978
               "node didn't return data for the volume group '%s'"
1979
               " - it is either missing or broken", vg_name)
1980
      if not test:
1981
        try:
1982
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
1983
        except (ValueError, TypeError):
1984
          _ErrorIf(True, self.ENODERPC, node,
1985
                   "node returned invalid LVM info, check LVM status")
1986

    
1987
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
1988
    """Gets per-disk status information for all instances.
1989

1990
    @type nodelist: list of strings
1991
    @param nodelist: Node names
1992
    @type node_image: dict of (name, L{objects.Node})
1993
    @param node_image: Node objects
1994
    @type instanceinfo: dict of (name, L{objects.Instance})
1995
    @param instanceinfo: Instance objects
1996

1997
    """
1998
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1999

    
2000
    node_disks = {}
2001
    node_disks_devonly = {}
2002

    
2003
    for nname in nodelist:
2004
      disks = [(inst, disk)
2005
               for instlist in [node_image[nname].pinst,
2006
                                node_image[nname].sinst]
2007
               for inst in instlist
2008
               for disk in instanceinfo[inst].disks]
2009

    
2010
      if not disks:
2011
        # No need to collect data
2012
        continue
2013

    
2014
      node_disks[nname] = disks
2015

    
2016
      # Creating copies as SetDiskID below will modify the objects and that can
2017
      # lead to incorrect data returned from nodes
2018
      devonly = [dev.Copy() for (_, dev) in disks]
2019

    
2020
      for dev in devonly:
2021
        self.cfg.SetDiskID(dev, nname)
2022

    
2023
      node_disks_devonly[nname] = devonly
2024

    
2025
    assert len(node_disks) == len(node_disks_devonly)
2026

    
2027
    # Collect data from all nodes with disks
2028
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2029
                                                          node_disks_devonly)
2030

    
2031
    assert len(result) == len(node_disks)
2032

    
2033
    instdisk = {}
2034

    
2035
    for (nname, nres) in result.items():
2036
      if nres.offline:
2037
        # Ignore offline node
2038
        continue
2039

    
2040
      disks = node_disks[nname]
2041

    
2042
      msg = nres.fail_msg
2043
      _ErrorIf(msg, self.ENODERPC, nname,
2044
               "while getting disk information: %s", nres.fail_msg)
2045
      if msg:
2046
        # No data from this node
2047
        data = len(disks) * [None]
2048
      else:
2049
        data = nres.payload
2050

    
2051
      for ((inst, _), status) in zip(disks, data):
2052
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2053

    
2054
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2055
                      len(nnames) <= len(instanceinfo[inst].all_nodes)
2056
                      for inst, nnames in instdisk.items()
2057
                      for nname, statuses in nnames.items())
2058

    
2059
    return instdisk
2060

    
2061
  def BuildHooksEnv(self):
2062
    """Build hooks env.
2063

2064
    Cluster-Verify hooks just ran in the post phase and their failure makes
2065
    the output be logged in the verify output and the verification to fail.
2066

2067
    """
2068
    all_nodes = self.cfg.GetNodeList()
2069
    env = {
2070
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2071
      }
2072
    for node in self.cfg.GetAllNodesInfo().values():
2073
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
2074

    
2075
    return env, [], all_nodes
2076

    
2077
  def Exec(self, feedback_fn):
2078
    """Verify integrity of cluster, performing various test on nodes.
2079

2080
    """
2081
    self.bad = False
2082
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2083
    verbose = self.op.verbose
2084
    self._feedback_fn = feedback_fn
2085
    feedback_fn("* Verifying global settings")
2086
    for msg in self.cfg.VerifyConfig():
2087
      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
2088

    
2089
    # Check the cluster certificates
2090
    for cert_filename in constants.ALL_CERT_FILES:
2091
      (errcode, msg) = _VerifyCertificate(cert_filename)
2092
      _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
2093

    
2094
    vg_name = self.cfg.GetVGName()
2095
    drbd_helper = self.cfg.GetDRBDHelper()
2096
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2097
    cluster = self.cfg.GetClusterInfo()
2098
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
2099
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
2100
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
2101
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
2102
                        for iname in instancelist)
2103
    i_non_redundant = [] # Non redundant instances
2104
    i_non_a_balanced = [] # Non auto-balanced instances
2105
    n_offline = 0 # Count of offline nodes
2106
    n_drained = 0 # Count of nodes being drained
2107
    node_vol_should = {}
2108

    
2109
    # FIXME: verify OS list
2110
    # do local checksums
2111
    master_files = [constants.CLUSTER_CONF_FILE]
2112
    master_node = self.master_node = self.cfg.GetMasterNode()
2113
    master_ip = self.cfg.GetMasterIP()
2114

    
2115
    file_names = ssconf.SimpleStore().GetFileList()
2116
    file_names.extend(constants.ALL_CERT_FILES)
2117
    file_names.extend(master_files)
2118
    if cluster.modify_etc_hosts:
2119
      file_names.append(constants.ETC_HOSTS)
2120

    
2121
    local_checksums = utils.FingerprintFiles(file_names)
2122

    
2123
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
2124
    node_verify_param = {
2125
      constants.NV_FILELIST: file_names,
2126
      constants.NV_NODELIST: [node.name for node in nodeinfo
2127
                              if not node.offline],
2128
      constants.NV_HYPERVISOR: hypervisors,
2129
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
2130
                                  node.secondary_ip) for node in nodeinfo
2131
                                 if not node.offline],
2132
      constants.NV_INSTANCELIST: hypervisors,
2133
      constants.NV_VERSION: None,
2134
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2135
      constants.NV_NODESETUP: None,
2136
      constants.NV_TIME: None,
2137
      constants.NV_MASTERIP: (master_node, master_ip),
2138
      constants.NV_OSLIST: None,
2139
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2140
      }
2141

    
2142
    if vg_name is not None:
2143
      node_verify_param[constants.NV_VGLIST] = None
2144
      node_verify_param[constants.NV_LVLIST] = vg_name
2145
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2146
      node_verify_param[constants.NV_DRBDLIST] = None
2147

    
2148
    if drbd_helper:
2149
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2150

    
2151
    # Build our expected cluster state
2152
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2153
                                                 name=node.name,
2154
                                                 vm_capable=node.vm_capable))
2155
                      for node in nodeinfo)
2156

    
2157
    for instance in instancelist:
2158
      inst_config = instanceinfo[instance]
2159

    
2160
      for nname in inst_config.all_nodes:
2161
        if nname not in node_image:
2162
          # ghost node
2163
          gnode = self.NodeImage(name=nname)
2164
          gnode.ghost = True
2165
          node_image[nname] = gnode
2166

    
2167
      inst_config.MapLVsByNode(node_vol_should)
2168

    
2169
      pnode = inst_config.primary_node
2170
      node_image[pnode].pinst.append(instance)
2171

    
2172
      for snode in inst_config.secondary_nodes:
2173
        nimg = node_image[snode]
2174
        nimg.sinst.append(instance)
2175
        if pnode not in nimg.sbp:
2176
          nimg.sbp[pnode] = []
2177
        nimg.sbp[pnode].append(instance)
2178

    
2179
    # At this point, we have the in-memory data structures complete,
2180
    # except for the runtime information, which we'll gather next
2181

    
2182
    # Due to the way our RPC system works, exact response times cannot be
2183
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2184
    # time before and after executing the request, we can at least have a time
2185
    # window.
2186
    nvinfo_starttime = time.time()
2187
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
2188
                                           self.cfg.GetClusterName())
2189
    nvinfo_endtime = time.time()
2190

    
2191
    all_drbd_map = self.cfg.ComputeDRBDMap()
2192

    
2193
    feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist))
2194
    instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo)
2195

    
2196
    feedback_fn("* Verifying node status")
2197

    
2198
    refos_img = None
2199

    
2200
    for node_i in nodeinfo:
2201
      node = node_i.name
2202
      nimg = node_image[node]
2203

    
2204
      if node_i.offline:
2205
        if verbose:
2206
          feedback_fn("* Skipping offline node %s" % (node,))
2207
        n_offline += 1
2208
        continue
2209

    
2210
      if node == master_node:
2211
        ntype = "master"
2212
      elif node_i.master_candidate:
2213
        ntype = "master candidate"
2214
      elif node_i.drained:
2215
        ntype = "drained"
2216
        n_drained += 1
2217
      else:
2218
        ntype = "regular"
2219
      if verbose:
2220
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2221

    
2222
      msg = all_nvinfo[node].fail_msg
2223
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
2224
      if msg:
2225
        nimg.rpc_fail = True
2226
        continue
2227

    
2228
      nresult = all_nvinfo[node].payload
2229

    
2230
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2231
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2232
      self._VerifyNodeNetwork(node_i, nresult)
2233
      self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums,
2234
                            master_files)
2235

    
2236
      if nimg.vm_capable:
2237
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2238
        self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper,
2239
                             all_drbd_map)
2240

    
2241
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2242
        self._UpdateNodeInstances(node_i, nresult, nimg)
2243
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2244
        self._UpdateNodeOS(node_i, nresult, nimg)
2245
        if not nimg.os_fail:
2246
          if refos_img is None:
2247
            refos_img = nimg
2248
          self._VerifyNodeOS(node_i, nimg, refos_img)
2249

    
2250
    feedback_fn("* Verifying instance status")
2251
    for instance in instancelist:
2252
      if verbose:
2253
        feedback_fn("* Verifying instance %s" % instance)
2254
      inst_config = instanceinfo[instance]
2255
      self._VerifyInstance(instance, inst_config, node_image,
2256
                           instdisk[instance])
2257
      inst_nodes_offline = []
2258

    
2259
      pnode = inst_config.primary_node
2260
      pnode_img = node_image[pnode]
2261
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2262
               self.ENODERPC, pnode, "instance %s, connection to"
2263
               " primary node failed", instance)
2264

    
2265
      if pnode_img.offline:
2266
        inst_nodes_offline.append(pnode)
2267

    
2268
      # If the instance is non-redundant we cannot survive losing its primary
2269
      # node, so we are not N+1 compliant. On the other hand we have no disk
2270
      # templates with more than one secondary so that situation is not well
2271
      # supported either.
2272
      # FIXME: does not support file-backed instances
2273
      if not inst_config.secondary_nodes:
2274
        i_non_redundant.append(instance)
2275
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
2276
               instance, "instance has multiple secondary nodes: %s",
2277
               utils.CommaJoin(inst_config.secondary_nodes),
2278
               code=self.ETYPE_WARNING)
2279

    
2280
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2281
        i_non_a_balanced.append(instance)
2282

    
2283
      for snode in inst_config.secondary_nodes:
2284
        s_img = node_image[snode]
2285
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
2286
                 "instance %s, connection to secondary node failed", instance)
2287

    
2288
        if s_img.offline:
2289
          inst_nodes_offline.append(snode)
2290

    
2291
      # warn that the instance lives on offline nodes
2292
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
2293
               "instance lives on offline node(s) %s",
2294
               utils.CommaJoin(inst_nodes_offline))
2295
      # ... or ghost/non-vm_capable nodes
2296
      for node in inst_config.all_nodes:
2297
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
2298
                 "instance lives on ghost node %s", node)
2299
        _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE,
2300
                 instance, "instance lives on non-vm_capable node %s", node)
2301

    
2302
    feedback_fn("* Verifying orphan volumes")
2303
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2304
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2305

    
2306
    feedback_fn("* Verifying orphan instances")
2307
    self._VerifyOrphanInstances(instancelist, node_image)
2308

    
2309
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2310
      feedback_fn("* Verifying N+1 Memory redundancy")
2311
      self._VerifyNPlusOneMemory(node_image, instanceinfo)
2312

    
2313
    feedback_fn("* Other Notes")
2314
    if i_non_redundant:
2315
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2316
                  % len(i_non_redundant))
2317

    
2318
    if i_non_a_balanced:
2319
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2320
                  % len(i_non_a_balanced))
2321

    
2322
    if n_offline:
2323
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2324

    
2325
    if n_drained:
2326
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2327

    
2328
    return not self.bad
2329

    
2330
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2331
    """Analyze the post-hooks' result
2332

2333
    This method analyses the hook result, handles it, and sends some
2334
    nicely-formatted feedback back to the user.
2335

2336
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2337
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2338
    @param hooks_results: the results of the multi-node hooks rpc call
2339
    @param feedback_fn: function used send feedback back to the caller
2340
    @param lu_result: previous Exec result
2341
    @return: the new Exec result, based on the previous result
2342
        and hook results
2343

2344
    """
2345
    # We only really run POST phase hooks, and are only interested in
2346
    # their results
2347
    if phase == constants.HOOKS_PHASE_POST:
2348
      # Used to change hooks' output to proper indentation
2349
      feedback_fn("* Hooks Results")
2350
      assert hooks_results, "invalid result from hooks"
2351

    
2352
      for node_name in hooks_results:
2353
        res = hooks_results[node_name]
2354
        msg = res.fail_msg
2355
        test = msg and not res.offline
2356
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
2357
                      "Communication failure in hooks execution: %s", msg)
2358
        if res.offline or msg:
2359
          # No need to investigate payload if node is offline or gave an error.
2360
          # override manually lu_result here as _ErrorIf only
2361
          # overrides self.bad
2362
          lu_result = 1
2363
          continue
2364
        for script, hkr, output in res.payload:
2365
          test = hkr == constants.HKR_FAIL
2366
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
2367
                        "Script %s failed, output:", script)
2368
          if test:
2369
            output = self._HOOKS_INDENT_RE.sub('      ', output)
2370
            feedback_fn("%s" % output)
2371
            lu_result = 0
2372

    
2373
      return lu_result
2374

    
2375

    
2376
class LUVerifyDisks(NoHooksLU):
2377
  """Verifies the cluster disks status.
2378

2379
  """
2380
  REQ_BGL = False
2381

    
2382
  def ExpandNames(self):
2383
    self.needed_locks = {
2384
      locking.LEVEL_NODE: locking.ALL_SET,
2385
      locking.LEVEL_INSTANCE: locking.ALL_SET,
2386
    }
2387
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
2388

    
2389
  def Exec(self, feedback_fn):
2390
    """Verify integrity of cluster disks.
2391

2392
    @rtype: tuple of three items
2393
    @return: a tuple of (dict of node-to-node_error, list of instances
2394
        which need activate-disks, dict of instance: (node, volume) for
2395
        missing volumes
2396

2397
    """
2398
    result = res_nodes, res_instances, res_missing = {}, [], {}
2399

    
2400
    nodes = utils.NiceSort(self.cfg.GetNodeList())
2401
    instances = [self.cfg.GetInstanceInfo(name)
2402
                 for name in self.cfg.GetInstanceList()]
2403

    
2404
    nv_dict = {}
2405
    for inst in instances:
2406
      inst_lvs = {}
2407
      if (not inst.admin_up or
2408
          inst.disk_template not in constants.DTS_NET_MIRROR):
2409
        continue
2410
      inst.MapLVsByNode(inst_lvs)
2411
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
2412
      for node, vol_list in inst_lvs.iteritems():
2413
        for vol in vol_list:
2414
          nv_dict[(node, vol)] = inst
2415

    
2416
    if not nv_dict:
2417
      return result
2418

    
2419
    vg_names = self.rpc.call_vg_list(nodes)
2420
    vg_names.Raise("Cannot get list of VGs")
2421

    
2422
    for node in nodes:
2423
      # node_volume
2424
      node_res = self.rpc.call_lv_list([node],
2425
                                       vg_names[node].payload.keys())[node]
2426
      if node_res.offline:
2427
        continue
2428
      msg = node_res.fail_msg
2429
      if msg:
2430
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
2431
        res_nodes[node] = msg
2432
        continue
2433

    
2434
      lvs = node_res.payload
2435
      for lv_name, (_, _, lv_online) in lvs.items():
2436
        inst = nv_dict.pop((node, lv_name), None)
2437
        if (not lv_online and inst is not None
2438
            and inst.name not in res_instances):
2439
          res_instances.append(inst.name)
2440

    
2441
    # any leftover items in nv_dict are missing LVs, let's arrange the
2442
    # data better
2443
    for key, inst in nv_dict.iteritems():
2444
      if inst.name not in res_missing:
2445
        res_missing[inst.name] = []
2446
      res_missing[inst.name].append(key)
2447

    
2448
    return result
2449

    
2450

    
2451
class LURepairDiskSizes(NoHooksLU):
2452
  """Verifies the cluster disks sizes.
2453

2454
  """
2455
  _OP_PARAMS = [("instances", ht.EmptyList, ht.TListOf(ht.TNonEmptyString))]
2456
  REQ_BGL = False
2457

    
2458
  def ExpandNames(self):
2459
    if self.op.instances:
2460
      self.wanted_names = []
2461
      for name in self.op.instances:
2462
        full_name = _ExpandInstanceName(self.cfg, name)
2463
        self.wanted_names.append(full_name)
2464
      self.needed_locks = {
2465
        locking.LEVEL_NODE: [],
2466
        locking.LEVEL_INSTANCE: self.wanted_names,
2467
        }
2468
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2469
    else:
2470
      self.wanted_names = None
2471
      self.needed_locks = {
2472
        locking.LEVEL_NODE: locking.ALL_SET,
2473
        locking.LEVEL_INSTANCE: locking.ALL_SET,
2474
        }
2475
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
2476

    
2477
  def DeclareLocks(self, level):
2478
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
2479
      self._LockInstancesNodes(primary_only=True)
2480

    
2481
  def CheckPrereq(self):
2482
    """Check prerequisites.
2483

2484
    This only checks the optional instance list against the existing names.
2485

2486
    """
2487
    if self.wanted_names is None:
2488
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2489

    
2490
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
2491
                             in self.wanted_names]
2492

    
2493
  def _EnsureChildSizes(self, disk):
2494
    """Ensure children of the disk have the needed disk size.
2495

2496
    This is valid mainly for DRBD8 and fixes an issue where the
2497
    children have smaller disk size.
2498

2499
    @param disk: an L{ganeti.objects.Disk} object
2500

2501
    """
2502
    if disk.dev_type == constants.LD_DRBD8:
2503
      assert disk.children, "Empty children for DRBD8?"
2504
      fchild = disk.children[0]
2505
      mismatch = fchild.size < disk.size
2506
      if mismatch:
2507
        self.LogInfo("Child disk has size %d, parent %d, fixing",
2508
                     fchild.size, disk.size)
2509
        fchild.size = disk.size
2510

    
2511
      # and we recurse on this child only, not on the metadev
2512
      return self._EnsureChildSizes(fchild) or mismatch
2513
    else:
2514
      return False
2515

    
2516
  def Exec(self, feedback_fn):
2517
    """Verify the size of cluster disks.
2518

2519
    """
2520
    # TODO: check child disks too
2521
    # TODO: check differences in size between primary/secondary nodes
2522
    per_node_disks = {}
2523
    for instance in self.wanted_instances:
2524
      pnode = instance.primary_node
2525
      if pnode not in per_node_disks:
2526
        per_node_disks[pnode] = []
2527
      for idx, disk in enumerate(instance.disks):
2528
        per_node_disks[pnode].append((instance, idx, disk))
2529

    
2530
    changed = []
2531
    for node, dskl in per_node_disks.items():
2532
      newl = [v[2].Copy() for v in dskl]
2533
      for dsk in newl:
2534
        self.cfg.SetDiskID(dsk, node)
2535
      result = self.rpc.call_blockdev_getsizes(node, newl)
2536
      if result.fail_msg:
2537
        self.LogWarning("Failure in blockdev_getsizes call to node"
2538
                        " %s, ignoring", node)
2539
        continue
2540
      if len(result.data) != len(dskl):
2541
        self.LogWarning("Invalid result from node %s, ignoring node results",
2542
                        node)
2543
        continue
2544
      for ((instance, idx, disk), size) in zip(dskl, result.data):
2545
        if size is None:
2546
          self.LogWarning("Disk %d of instance %s did not return size"
2547
                          " information, ignoring", idx, instance.name)
2548
          continue
2549
        if not isinstance(size, (int, long)):
2550
          self.LogWarning("Disk %d of instance %s did not return valid"
2551
                          " size information, ignoring", idx, instance.name)
2552
          continue
2553
        size = size >> 20
2554
        if size != disk.size:
2555
          self.LogInfo("Disk %d of instance %s has mismatched size,"
2556
                       " correcting: recorded %d, actual %d", idx,
2557
                       instance.name, disk.size, size)
2558
          disk.size = size
2559
          self.cfg.Update(instance, feedback_fn)
2560
          changed.append((instance.name, idx, size))
2561
        if self._EnsureChildSizes(disk):
2562
          self.cfg.Update(instance, feedback_fn)
2563
          changed.append((instance.name, idx, disk.size))
2564
    return changed
2565

    
2566

    
2567
class LURenameCluster(LogicalUnit):
2568
  """Rename the cluster.
2569

2570
  """
2571
  HPATH = "cluster-rename"
2572
  HTYPE = constants.HTYPE_CLUSTER
2573
  _OP_PARAMS = [("name", ht.NoDefault, ht.TNonEmptyString)]
2574

    
2575
  def BuildHooksEnv(self):
2576
    """Build hooks env.
2577

2578
    """
2579
    env = {
2580
      "OP_TARGET": self.cfg.GetClusterName(),
2581
      "NEW_NAME": self.op.name,
2582
      }
2583
    mn = self.cfg.GetMasterNode()
2584
    all_nodes = self.cfg.GetNodeList()
2585
    return env, [mn], all_nodes
2586

    
2587
  def CheckPrereq(self):
2588
    """Verify that the passed name is a valid one.
2589

2590
    """
2591
    hostname = netutils.GetHostname(name=self.op.name,
2592
                                    family=self.cfg.GetPrimaryIPFamily())
2593

    
2594
    new_name = hostname.name
2595
    self.ip = new_ip = hostname.ip
2596
    old_name = self.cfg.GetClusterName()
2597
    old_ip = self.cfg.GetMasterIP()
2598
    if new_name == old_name and new_ip == old_ip:
2599
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
2600
                                 " cluster has changed",
2601
                                 errors.ECODE_INVAL)
2602
    if new_ip != old_ip:
2603
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
2604
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
2605
                                   " reachable on the network" %
2606
                                   new_ip, errors.ECODE_NOTUNIQUE)
2607

    
2608
    self.op.name = new_name
2609

    
2610
  def Exec(self, feedback_fn):
2611
    """Rename the cluster.
2612

2613
    """
2614
    clustername = self.op.name
2615
    ip = self.ip
2616

    
2617
    # shutdown the master IP
2618
    master = self.cfg.GetMasterNode()
2619
    result = self.rpc.call_node_stop_master(master, False)
2620
    result.Raise("Could not disable the master role")
2621

    
2622
    try:
2623
      cluster = self.cfg.GetClusterInfo()
2624
      cluster.cluster_name = clustername
2625
      cluster.master_ip = ip
2626
      self.cfg.Update(cluster, feedback_fn)
2627

    
2628
      # update the known hosts file
2629
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
2630
      node_list = self.cfg.GetOnlineNodeList()
2631
      try:
2632
        node_list.remove(master)
2633
      except ValueError:
2634
        pass
2635
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
2636
    finally:
2637
      result = self.rpc.call_node_start_master(master, False, False)
2638
      msg = result.fail_msg
2639
      if msg:
2640
        self.LogWarning("Could not re-enable the master role on"
2641
                        " the master, please restart manually: %s", msg)
2642

    
2643
    return clustername
2644

    
2645

    
2646
class LUSetClusterParams(LogicalUnit):
2647
  """Change the parameters of the cluster.
2648

2649
  """
2650
  HPATH = "cluster-modify"
2651
  HTYPE = constants.HTYPE_CLUSTER
2652
  _OP_PARAMS = [
2653
    ("vg_name", None, ht.TMaybeString),
2654
    ("enabled_hypervisors", None,
2655
     ht.TOr(ht.TAnd(ht.TListOf(ht.TElemOf(constants.HYPER_TYPES)), ht.TTrue),
2656
            ht.TNone)),
2657
    ("hvparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
2658
                              ht.TNone)),
2659
    ("beparams", None, ht.TOr(ht.TDict, ht.TNone)),
2660
    ("os_hvp", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
2661
                            ht.TNone)),
2662
    ("osparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
2663
                              ht.TNone)),
2664
    ("candidate_pool_size", None, ht.TOr(ht.TStrictPositiveInt, ht.TNone)),
2665
    ("uid_pool", None, ht.NoType),
2666
    ("add_uids", None, ht.NoType),
2667
    ("remove_uids", None, ht.NoType),
2668
    ("maintain_node_health", None, ht.TMaybeBool),
2669
    ("prealloc_wipe_disks", None, ht.TMaybeBool),
2670
    ("nicparams", None, ht.TOr(ht.TDict, ht.TNone)),
2671
    ("ndparams", None, ht.TOr(ht.TDict, ht.TNone)),
2672
    ("drbd_helper", None, ht.TOr(ht.TString, ht.TNone)),
2673
    ("default_iallocator", None, ht.TOr(ht.TString, ht.TNone)),
2674
    ("reserved_lvs", None, ht.TOr(ht.TListOf(ht.TNonEmptyString), ht.TNone)),
2675
    ("hidden_os", None, ht.TOr(ht.TListOf(\
2676
          ht.TAnd(ht.TList,
2677
                ht.TIsLength(2),
2678
                ht.TMap(lambda v: v[0], ht.TElemOf(constants.DDMS_VALUES)))),
2679
          ht.TNone)),
2680
    ("blacklisted_os", None, ht.TOr(ht.TListOf(\
2681
          ht.TAnd(ht.TList,
2682
                ht.TIsLength(2),
2683
                ht.TMap(lambda v: v[0], ht.TElemOf(constants.DDMS_VALUES)))),
2684
          ht.TNone)),
2685
    ]
2686
  REQ_BGL = False
2687

    
2688
  def CheckArguments(self):
2689
    """Check parameters
2690

2691
    """
2692
    if self.op.uid_pool:
2693
      uidpool.CheckUidPool(self.op.uid_pool)
2694

    
2695
    if self.op.add_uids:
2696
      uidpool.CheckUidPool(self.op.add_uids)
2697

    
2698
    if self.op.remove_uids:
2699
      uidpool.CheckUidPool(self.op.remove_uids)
2700

    
2701
  def ExpandNames(self):
2702
    # FIXME: in the future maybe other cluster params won't require checking on
2703
    # all nodes to be modified.
2704
    self.needed_locks = {
2705
      locking.LEVEL_NODE: locking.ALL_SET,
2706
    }
2707
    self.share_locks[locking.LEVEL_NODE] = 1
2708

    
2709
  def BuildHooksEnv(self):
2710
    """Build hooks env.
2711

2712
    """
2713
    env = {
2714
      "OP_TARGET": self.cfg.GetClusterName(),
2715
      "NEW_VG_NAME": self.op.vg_name,
2716
      }
2717
    mn = self.cfg.GetMasterNode()
2718
    return env, [mn], [mn]
2719

    
2720
  def CheckPrereq(self):
2721
    """Check prerequisites.
2722

2723
    This checks whether the given params don't conflict and
2724
    if the given volume group is valid.
2725

2726
    """
2727
    if self.op.vg_name is not None and not self.op.vg_name:
2728
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
2729
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
2730
                                   " instances exist", errors.ECODE_INVAL)
2731

    
2732
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
2733
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
2734
        raise errors.OpPrereqError("Cannot disable drbd helper while"
2735
                                   " drbd-based instances exist",
2736
                                   errors.ECODE_INVAL)
2737

    
2738
    node_list = self.acquired_locks[locking.LEVEL_NODE]
2739

    
2740
    # if vg_name not None, checks given volume group on all nodes
2741
    if self.op.vg_name:
2742
      vglist = self.rpc.call_vg_list(node_list)
2743
      for node in node_list:
2744
        msg = vglist[node].fail_msg
2745
        if msg:
2746
          # ignoring down node
2747
          self.LogWarning("Error while gathering data on node %s"
2748
                          " (ignoring node): %s", node, msg)
2749
          continue
2750
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2751
                                              self.op.vg_name,
2752
                                              constants.MIN_VG_SIZE)
2753
        if vgstatus:
2754
          raise errors.OpPrereqError("Error on node '%s': %s" %
2755
                                     (node, vgstatus), errors.ECODE_ENVIRON)
2756

    
2757
    if self.op.drbd_helper:
2758
      # checks given drbd helper on all nodes
2759
      helpers = self.rpc.call_drbd_helper(node_list)
2760
      for node in node_list:
2761
        ninfo = self.cfg.GetNodeInfo(node)
2762
        if ninfo.offline:
2763
          self.LogInfo("Not checking drbd helper on offline node %s", node)
2764
          continue
2765
        msg = helpers[node].fail_msg
2766
        if msg:
2767
          raise errors.OpPrereqError("Error checking drbd helper on node"
2768
                                     " '%s': %s" % (node, msg),
2769
                                     errors.ECODE_ENVIRON)
2770
        node_helper = helpers[node].payload
2771
        if node_helper != self.op.drbd_helper:
2772
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
2773
                                     (node, node_helper), errors.ECODE_ENVIRON)
2774

    
2775
    self.cluster = cluster = self.cfg.GetClusterInfo()
2776
    # validate params changes
2777
    if self.op.beparams:
2778
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2779
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
2780

    
2781
    if self.op.ndparams:
2782
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
2783
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
2784

    
2785
    if self.op.nicparams:
2786
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2787
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
2788
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
2789
      nic_errors = []
2790

    
2791
      # check all instances for consistency
2792
      for instance in self.cfg.GetAllInstancesInfo().values():
2793
        for nic_idx, nic in enumerate(instance.nics):
2794
          params_copy = copy.deepcopy(nic.nicparams)
2795
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
2796

    
2797
          # check parameter syntax
2798
          try:
2799
            objects.NIC.CheckParameterSyntax(params_filled)
2800
          except errors.ConfigurationError, err:
2801
            nic_errors.append("Instance %s, nic/%d: %s" %
2802
                              (instance.name, nic_idx, err))
2803

    
2804
          # if we're moving instances to routed, check that they have an ip
2805
          target_mode = params_filled[constants.NIC_MODE]
2806
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2807
            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2808
                              (instance.name, nic_idx))
2809
      if nic_errors:
2810
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2811
                                   "\n".join(nic_errors))
2812

    
2813
    # hypervisor list/parameters
2814
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
2815
    if self.op.hvparams:
2816
      for hv_name, hv_dict in self.op.hvparams.items():
2817
        if hv_name not in self.new_hvparams:
2818
          self.new_hvparams[hv_name] = hv_dict
2819
        else:
2820
          self.new_hvparams[hv_name].update(hv_dict)
2821

    
2822
    # os hypervisor parameters
2823
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
2824
    if self.op.os_hvp:
2825
      for os_name, hvs in self.op.os_hvp.items():
2826
        if os_name not in self.new_os_hvp:
2827
          self.new_os_hvp[os_name] = hvs
2828
        else:
2829
          for hv_name, hv_dict in hvs.items():
2830
            if hv_name not in self.new_os_hvp[os_name]:
2831
              self.new_os_hvp[os_name][hv_name] = hv_dict
2832
            else:
2833
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
2834

    
2835
    # os parameters
2836
    self.new_osp = objects.FillDict(cluster.osparams, {})
2837
    if self.op.osparams:
2838
      for os_name, osp in self.op.osparams.items():
2839
        if os_name not in self.new_osp:
2840
          self.new_osp[os_name] = {}
2841

    
2842
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
2843
                                                  use_none=True)
2844

    
2845
        if not self.new_osp[os_name]:
2846
          # we removed all parameters
2847
          del self.new_osp[os_name]
2848
        else:
2849
          # check the parameter validity (remote check)
2850
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
2851
                         os_name, self.new_osp[os_name])
2852

    
2853
    # changes to the hypervisor list
2854
    if self.op.enabled_hypervisors is not None:
2855
      self.hv_list = self.op.enabled_hypervisors
2856
      for hv in self.hv_list:
2857
        # if the hypervisor doesn't already exist in the cluster
2858
        # hvparams, we initialize it to empty, and then (in both
2859
        # cases) we make sure to fill the defaults, as we might not
2860
        # have a complete defaults list if the hypervisor wasn't
2861
        # enabled before
2862
        if hv not in new_hvp:
2863
          new_hvp[hv] = {}
2864
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
2865
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
2866
    else:
2867
      self.hv_list = cluster.enabled_hypervisors
2868

    
2869
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
2870
      # either the enabled list has changed, or the parameters have, validate
2871
      for hv_name, hv_params in self.new_hvparams.items():
2872
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
2873
            (self.op.enabled_hypervisors and
2874
             hv_name in self.op.enabled_hypervisors)):
2875
          # either this is a new hypervisor, or its parameters have changed
2876
          hv_class = hypervisor.GetHypervisor(hv_name)
2877
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2878
          hv_class.CheckParameterSyntax(hv_params)
2879
          _CheckHVParams(self, node_list, hv_name, hv_params)
2880

    
2881
    if self.op.os_hvp:
2882
      # no need to check any newly-enabled hypervisors, since the
2883
      # defaults have already been checked in the above code-block
2884
      for os_name, os_hvp in self.new_os_hvp.items():
2885
        for hv_name, hv_params in os_hvp.items():
2886
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2887
          # we need to fill in the new os_hvp on top of the actual hv_p
2888
          cluster_defaults = self.new_hvparams.get(hv_name, {})
2889
          new_osp = objects.FillDict(cluster_defaults, hv_params)
2890
          hv_class = hypervisor.GetHypervisor(hv_name)
2891
          hv_class.CheckParameterSyntax(new_osp)
2892
          _CheckHVParams(self, node_list, hv_name, new_osp)
2893

    
2894
    if self.op.default_iallocator:
2895
      alloc_script = utils.FindFile(self.op.default_iallocator,
2896
                                    constants.IALLOCATOR_SEARCH_PATH,
2897
                                    os.path.isfile)
2898
      if alloc_script is None:
2899
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
2900
                                   " specified" % self.op.default_iallocator,
2901
                                   errors.ECODE_INVAL)
2902

    
2903
  def Exec(self, feedback_fn):
2904
    """Change the parameters of the cluster.
2905

2906
    """
2907
    if self.op.vg_name is not None:
2908
      new_volume = self.op.vg_name
2909
      if not new_volume:
2910
        new_volume = None
2911
      if new_volume != self.cfg.GetVGName():
2912
        self.cfg.SetVGName(new_volume)
2913
      else:
2914
        feedback_fn("Cluster LVM configuration already in desired"
2915
                    " state, not changing")
2916
    if self.op.drbd_helper is not None:
2917
      new_helper = self.op.drbd_helper
2918
      if not new_helper:
2919
        new_helper = None
2920
      if new_helper != self.cfg.GetDRBDHelper():
2921
        self.cfg.SetDRBDHelper(new_helper)
2922
      else:
2923
        feedback_fn("Cluster DRBD helper already in desired state,"
2924
                    " not changing")
2925
    if self.op.hvparams:
2926
      self.cluster.hvparams = self.new_hvparams
2927
    if self.op.os_hvp:
2928
      self.cluster.os_hvp = self.new_os_hvp
2929
    if self.op.enabled_hypervisors is not None:
2930
      self.cluster.hvparams = self.new_hvparams
2931
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2932
    if self.op.beparams:
2933
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2934
    if self.op.nicparams:
2935
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2936
    if self.op.osparams:
2937
      self.cluster.osparams = self.new_osp
2938
    if self.op.ndparams:
2939
      self.cluster.ndparams = self.new_ndparams
2940

    
2941
    if self.op.candidate_pool_size is not None:
2942
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
2943
      # we need to update the pool size here, otherwise the save will fail
2944
      _AdjustCandidatePool(self, [])
2945

    
2946
    if self.op.maintain_node_health is not None:
2947
      self.cluster.maintain_node_health = self.op.maintain_node_health
2948

    
2949
    if self.op.prealloc_wipe_disks is not None:
2950
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
2951

    
2952
    if self.op.add_uids is not None:
2953
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
2954

    
2955
    if self.op.remove_uids is not None:
2956
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
2957

    
2958
    if self.op.uid_pool is not None:
2959
      self.cluster.uid_pool = self.op.uid_pool
2960

    
2961
    if self.op.default_iallocator is not None:
2962
      self.cluster.default_iallocator = self.op.default_iallocator
2963

    
2964
    if self.op.reserved_lvs is not None:
2965
      self.cluster.reserved_lvs = self.op.reserved_lvs
2966

    
2967
    def helper_os(aname, mods, desc):
2968
      desc += " OS list"
2969
      lst = getattr(self.cluster, aname)
2970
      for key, val in mods:
2971
        if key == constants.DDM_ADD:
2972
          if val in lst:
2973
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
2974
          else:
2975
            lst.append(val)
2976
        elif key == constants.DDM_REMOVE:
2977
          if val in lst:
2978
            lst.remove(val)
2979
          else:
2980
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
2981
        else:
2982
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
2983

    
2984
    if self.op.hidden_os:
2985
      helper_os("hidden_os", self.op.hidden_os, "hidden")
2986

    
2987
    if self.op.blacklisted_os:
2988
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
2989

    
2990
    self.cfg.Update(self.cluster, feedback_fn)
2991

    
2992

    
2993
def _UploadHelper(lu, nodes, fname):
2994
  """Helper for uploading a file and showing warnings.
2995

2996
  """
2997
  if os.path.exists(fname):
2998
    result = lu.rpc.call_upload_file(nodes, fname)
2999
    for to_node, to_result in result.items():
3000
      msg = to_result.fail_msg
3001
      if msg:
3002
        msg = ("Copy of file %s to node %s failed: %s" %
3003
               (fname, to_node, msg))
3004
        lu.proc.LogWarning(msg)
3005

    
3006

    
3007
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3008
  """Distribute additional files which are part of the cluster configuration.
3009

3010
  ConfigWriter takes care of distributing the config and ssconf files, but
3011
  there are more files which should be distributed to all nodes. This function
3012
  makes sure those are copied.
3013

3014
  @param lu: calling logical unit
3015
  @param additional_nodes: list of nodes not in the config to distribute to
3016
  @type additional_vm: boolean
3017
  @param additional_vm: whether the additional nodes are vm-capable or not
3018

3019
  """
3020
  # 1. Gather target nodes
3021
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3022
  dist_nodes = lu.cfg.GetOnlineNodeList()
3023
  nvm_nodes = lu.cfg.GetNonVmCapableNodeList()
3024
  vm_nodes = [name for name in dist_nodes if name not in nvm_nodes]
3025
  if additional_nodes is not None:
3026
    dist_nodes.extend(additional_nodes)
3027
    if additional_vm:
3028
      vm_nodes.extend(additional_nodes)
3029
  if myself.name in dist_nodes:
3030
    dist_nodes.remove(myself.name)
3031
  if myself.name in vm_nodes:
3032
    vm_nodes.remove(myself.name)
3033

    
3034
  # 2. Gather files to distribute
3035
  dist_files = set([constants.ETC_HOSTS,
3036
                    constants.SSH_KNOWN_HOSTS_FILE,
3037
                    constants.RAPI_CERT_FILE,
3038
                    constants.RAPI_USERS_FILE,
3039
                    constants.CONFD_HMAC_KEY,
3040
                    constants.CLUSTER_DOMAIN_SECRET_FILE,
3041
                   ])
3042

    
3043
  vm_files = set()
3044
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
3045
  for hv_name in enabled_hypervisors:
3046
    hv_class = hypervisor.GetHypervisor(hv_name)
3047
    vm_files.update(hv_class.GetAncillaryFiles())
3048

    
3049
  # 3. Perform the files upload
3050
  for fname in dist_files:
3051
    _UploadHelper(lu, dist_nodes, fname)
3052
  for fname in vm_files:
3053
    _UploadHelper(lu, vm_nodes, fname)
3054

    
3055

    
3056
class LURedistributeConfig(NoHooksLU):
3057
  """Force the redistribution of cluster configuration.
3058

3059
  This is a very simple LU.
3060

3061
  """
3062
  REQ_BGL = False
3063

    
3064
  def ExpandNames(self):
3065
    self.needed_locks = {
3066
      locking.LEVEL_NODE: locking.ALL_SET,
3067
    }
3068
    self.share_locks[locking.LEVEL_NODE] = 1
3069

    
3070
  def Exec(self, feedback_fn):
3071
    """Redistribute the configuration.
3072

3073
    """
3074
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3075
    _RedistributeAncillaryFiles(self)
3076

    
3077

    
3078
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3079
  """Sleep and poll for an instance's disk to sync.
3080

3081
  """
3082
  if not instance.disks or disks is not None and not disks:
3083
    return True
3084

    
3085
  disks = _ExpandCheckDisks(instance, disks)
3086

    
3087
  if not oneshot:
3088
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3089

    
3090
  node = instance.primary_node
3091

    
3092
  for dev in disks:
3093
    lu.cfg.SetDiskID(dev, node)
3094

    
3095
  # TODO: Convert to utils.Retry
3096

    
3097
  retries = 0
3098
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3099
  while True:
3100
    max_time = 0
3101
    done = True
3102
    cumul_degraded = False
3103
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3104
    msg = rstats.fail_msg
3105
    if msg:
3106
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3107
      retries += 1
3108
      if retries >= 10:
3109
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3110
                                 " aborting." % node)
3111
      time.sleep(6)
3112
      continue
3113
    rstats = rstats.payload
3114
    retries = 0
3115
    for i, mstat in enumerate(rstats):
3116
      if mstat is None:
3117
        lu.LogWarning("Can't compute data for node %s/%s",
3118
                           node, disks[i].iv_name)
3119
        continue
3120

    
3121
      cumul_degraded = (cumul_degraded or
3122
                        (mstat.is_degraded and mstat.sync_percent is None))
3123
      if mstat.sync_percent is not None:
3124
        done = False
3125
        if mstat.estimated_time is not None:
3126
          rem_time = ("%s remaining (estimated)" %
3127
                      utils.FormatSeconds(mstat.estimated_time))
3128
          max_time = mstat.estimated_time
3129
        else:
3130
          rem_time = "no time estimate"
3131
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
3132
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
3133

    
3134
    # if we're done but degraded, let's do a few small retries, to
3135
    # make sure we see a stable and not transient situation; therefore
3136
    # we force restart of the loop
3137
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3138
      logging.info("Degraded disks found, %d retries left", degr_retries)
3139
      degr_retries -= 1
3140
      time.sleep(1)
3141
      continue
3142

    
3143
    if done or oneshot:
3144
      break
3145

    
3146
    time.sleep(min(60, max_time))
3147

    
3148
  if done:
3149
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
3150
  return not cumul_degraded
3151

    
3152

    
3153
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3154
  """Check that mirrors are not degraded.
3155

3156
  The ldisk parameter, if True, will change the test from the
3157
  is_degraded attribute (which represents overall non-ok status for
3158
  the device(s)) to the ldisk (representing the local storage status).
3159

3160
  """
3161
  lu.cfg.SetDiskID(dev, node)
3162

    
3163
  result = True
3164

    
3165
  if on_primary or dev.AssembleOnSecondary():
3166
    rstats = lu.rpc.call_blockdev_find(node, dev)
3167
    msg = rstats.fail_msg
3168
    if msg:
3169
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
3170
      result = False
3171
    elif not rstats.payload:
3172
      lu.LogWarning("Can't find disk on node %s", node)
3173
      result = False
3174
    else:
3175
      if ldisk:
3176
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
3177
      else:
3178
        result = result and not rstats.payload.is_degraded
3179

    
3180
  if dev.children:
3181
    for child in dev.children:
3182
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
3183

    
3184
  return result
3185

    
3186

    
3187
class LUOutOfBand(NoHooksLU):
3188
  """Logical unit for OOB handling.
3189

3190
  """
3191
  _OP_PARAMS = [
3192
    _PNodeName,
3193
    ("command", None, ht.TElemOf(constants.OOB_COMMANDS)),
3194
    ("timeout", constants.OOB_TIMEOUT, ht.TInt),
3195
    ]
3196
  REG_BGL = False
3197

    
3198
  def CheckPrereq(self):
3199
    """Check prerequisites.
3200

3201
    This checks:
3202
     - the node exists in the configuration
3203
     - OOB is supported
3204

3205
    Any errors are signaled by raising errors.OpPrereqError.
3206

3207
    """
3208
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3209
    node = self.cfg.GetNodeInfo(self.op.node_name)
3210

    
3211
    if node is None:
3212
      raise errors.OpPrereqError("Node %s not found" % self.op.node_name)
3213

    
3214
    self.oob_program = self.cfg.GetOobProgram(node)
3215

    
3216
    if not self.oob_program:
3217
      raise errors.OpPrereqError("OOB is not supported for node %s" %
3218
                                 self.op.node_name)
3219

    
3220
    self.op.node_name = node.name
3221
    self.node = node
3222

    
3223
  def ExpandNames(self):
3224
    """Gather locks we need.
3225

3226
    """
3227
    self.needed_locks = {
3228
      locking.LEVEL_NODE: [self.op.node_name],
3229
      }
3230

    
3231
  def Exec(self, feedback_fn):
3232
    """Execute OOB and return result if we expect any.
3233

3234
    """
3235
    master_node = self.cfg.GetMasterNode()
3236

    
3237
    logging.info("Executing out-of-band command '%s' using '%s' on %s",
3238
                 self.op.command, self.oob_program, self.op.node_name)
3239
    result = self.rpc.call_run_oob(master_node, self.oob_program,
3240
                                   self.op.command, self.op.node_name,
3241
                                   self.op.timeout)
3242

    
3243
    result.Raise("An error occurred on execution of OOB helper")
3244

    
3245
    if self.op.command == constants.OOB_HEALTH:
3246
      # For health we should log important events
3247
      for item, status in result.payload:
3248
        if status in [constants.OOB_STATUS_WARNING,
3249
                      constants.OOB_STATUS_CRITICAL]:
3250
          logging.warning("On node '%s' item '%s' has status '%s'",
3251
                          self.op.node_name, item, status)
3252

    
3253
    return result.payload
3254

    
3255

    
3256
class LUDiagnoseOS(NoHooksLU):
3257
  """Logical unit for OS diagnose/query.
3258

3259
  """
3260
  _OP_PARAMS = [
3261
    _POutputFields,
3262
    ("names", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
3263
    ]
3264
  REQ_BGL = False
3265
  _HID = "hidden"
3266
  _BLK = "blacklisted"
3267
  _VLD = "valid"
3268
  _FIELDS_STATIC = utils.FieldSet()
3269
  _FIELDS_DYNAMIC = utils.FieldSet("name", _VLD, "node_status", "variants",
3270
                                   "parameters", "api_versions", _HID, _BLK)
3271

    
3272
  def CheckArguments(self):
3273
    if self.op.names:
3274
      raise errors.OpPrereqError("Selective OS query not supported",
3275
                                 errors.ECODE_INVAL)
3276

    
3277
    _CheckOutputFields(static=self._FIELDS_STATIC,
3278
                       dynamic=self._FIELDS_DYNAMIC,
3279
                       selected=self.op.output_fields)
3280

    
3281
  def ExpandNames(self):
3282
    # Lock all nodes, in shared mode
3283
    # Temporary removal of locks, should be reverted later
3284
    # TODO: reintroduce locks when they are lighter-weight
3285
    self.needed_locks = {}
3286
    #self.share_locks[locking.LEVEL_NODE] = 1
3287
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3288

    
3289
  @staticmethod
3290
  def _DiagnoseByOS(rlist):
3291
    """Remaps a per-node return list into an a per-os per-node dictionary
3292

3293
    @param rlist: a map with node names as keys and OS objects as values
3294

3295
    @rtype: dict
3296
    @return: a dictionary with osnames as keys and as value another
3297
        map, with nodes as keys and tuples of (path, status, diagnose,
3298
        variants, parameters, api_versions) as values, eg::
3299

3300
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
3301
                                     (/srv/..., False, "invalid api")],
3302
                           "node2": [(/srv/..., True, "", [], [])]}
3303
          }
3304

3305
    """
3306
    all_os = {}
3307
    # we build here the list of nodes that didn't fail the RPC (at RPC
3308
    # level), so that nodes with a non-responding node daemon don't
3309
    # make all OSes invalid
3310
    good_nodes = [node_name for node_name in rlist
3311
                  if not rlist[node_name].fail_msg]
3312
    for node_name, nr in rlist.items():
3313
      if nr.fail_msg or not nr.payload:
3314
        continue
3315
      for (name, path, status, diagnose, variants,
3316
           params, api_versions) in nr.payload:
3317
        if name not in all_os:
3318
          # build a list of nodes for this os containing empty lists
3319
          # for each node in node_list
3320
          all_os[name] = {}
3321
          for nname in good_nodes:
3322
            all_os[name][nname] = []
3323
        # convert params from [name, help] to (name, help)
3324
        params = [tuple(v) for v in params]
3325
        all_os[name][node_name].append((path, status, diagnose,
3326
                                        variants, params, api_versions))
3327
    return all_os
3328

    
3329
  def Exec(self, feedback_fn):
3330
    """Compute the list of OSes.
3331

3332
    """
3333
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
3334
    node_data = self.rpc.call_os_diagnose(valid_nodes)
3335
    pol = self._DiagnoseByOS(node_data)
3336
    output = []
3337
    cluster = self.cfg.GetClusterInfo()
3338

    
3339
    for os_name in utils.NiceSort(pol.keys()):
3340
      os_data = pol[os_name]
3341
      row = []
3342
      valid = True
3343
      (variants, params, api_versions) = null_state = (set(), set(), set())
3344
      for idx, osl in enumerate(os_data.values()):
3345
        valid = bool(valid and osl and osl[0][1])
3346
        if not valid:
3347
          (variants, params, api_versions) = null_state
3348
          break
3349
        node_variants, node_params, node_api = osl[0][3:6]
3350
        if idx == 0: # first entry
3351
          variants = set(node_variants)
3352
          params = set(node_params)
3353
          api_versions = set(node_api)
3354
        else: # keep consistency
3355
          variants.intersection_update(node_variants)
3356
          params.intersection_update(node_params)
3357
          api_versions.intersection_update(node_api)
3358

    
3359
      is_hid = os_name in cluster.hidden_os
3360
      is_blk = os_name in cluster.blacklisted_os
3361
      if ((self._HID not in self.op.output_fields and is_hid) or
3362
          (self._BLK not in self.op.output_fields and is_blk) or
3363
          (self._VLD not in self.op.output_fields and not valid)):
3364
        continue
3365

    
3366
      for field in self.op.output_fields:
3367
        if field == "name":
3368
          val = os_name
3369
        elif field == self._VLD:
3370
          val = valid
3371
        elif field == "node_status":
3372
          # this is just a copy of the dict
3373
          val = {}
3374
          for node_name, nos_list in os_data.items():
3375
            val[node_name] = nos_list
3376
        elif field == "variants":
3377
          val = utils.NiceSort(list(variants))
3378
        elif field == "parameters":
3379
          val = list(params)
3380
        elif field == "api_versions":
3381
          val = list(api_versions)
3382
        elif field == self._HID:
3383
          val = is_hid
3384
        elif field == self._BLK:
3385
          val = is_blk
3386
        else:
3387
          raise errors.ParameterError(field)
3388
        row.append(val)
3389
      output.append(row)
3390

    
3391
    return output
3392

    
3393

    
3394
class LURemoveNode(LogicalUnit):
3395
  """Logical unit for removing a node.
3396

3397
  """
3398
  HPATH = "node-remove"
3399
  HTYPE = constants.HTYPE_NODE
3400
  _OP_PARAMS = [
3401
    _PNodeName,
3402
    ]
3403

    
3404
  def BuildHooksEnv(self):
3405
    """Build hooks env.
3406

3407
    This doesn't run on the target node in the pre phase as a failed
3408
    node would then be impossible to remove.
3409

3410
    """
3411
    env = {
3412
      "OP_TARGET": self.op.node_name,
3413
      "NODE_NAME": self.op.node_name,
3414
      }
3415
    all_nodes = self.cfg.GetNodeList()
3416
    try:
3417
      all_nodes.remove(self.op.node_name)
3418
    except ValueError:
3419
      logging.warning("Node %s which is about to be removed not found"
3420
                      " in the all nodes list", self.op.node_name)
3421
    return env, all_nodes, all_nodes
3422

    
3423
  def CheckPrereq(self):
3424
    """Check prerequisites.
3425

3426
    This checks:
3427
     - the node exists in the configuration
3428
     - it does not have primary or secondary instances
3429
     - it's not the master
3430

3431
    Any errors are signaled by raising errors.OpPrereqError.
3432

3433
    """
3434
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3435
    node = self.cfg.GetNodeInfo(self.op.node_name)
3436
    assert node is not None
3437

    
3438
    instance_list = self.cfg.GetInstanceList()
3439

    
3440
    masternode = self.cfg.GetMasterNode()
3441
    if node.name == masternode:
3442
      raise errors.OpPrereqError("Node is the master node,"
3443
                                 " you need to failover first.",
3444
                                 errors.ECODE_INVAL)
3445

    
3446
    for instance_name in instance_list:
3447
      instance = self.cfg.GetInstanceInfo(instance_name)
3448
      if node.name in instance.all_nodes:
3449
        raise errors.OpPrereqError("Instance %s is still running on the node,"
3450
                                   " please remove first." % instance_name,
3451
                                   errors.ECODE_INVAL)
3452
    self.op.node_name = node.name
3453
    self.node = node
3454

    
3455
  def Exec(self, feedback_fn):
3456
    """Removes the node from the cluster.
3457

3458
    """
3459
    node = self.node
3460
    logging.info("Stopping the node daemon and removing configs from node %s",
3461
                 node.name)
3462

    
3463
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
3464

    
3465
    # Promote nodes to master candidate as needed
3466
    _AdjustCandidatePool(self, exceptions=[node.name])
3467
    self.context.RemoveNode(node.name)
3468

    
3469
    # Run post hooks on the node before it's removed
3470
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
3471
    try:
3472
      hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
3473
    except:
3474
      # pylint: disable-msg=W0702
3475
      self.LogWarning("Errors occurred running hooks on %s" % node.name)
3476

    
3477
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
3478
    msg = result.fail_msg
3479
    if msg:
3480
      self.LogWarning("Errors encountered on the remote node while leaving"
3481
                      " the cluster: %s", msg)
3482

    
3483
    # Remove node from our /etc/hosts
3484
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3485
      master_node = self.cfg.GetMasterNode()
3486
      result = self.rpc.call_etc_hosts_modify(master_node,
3487
                                              constants.ETC_HOSTS_REMOVE,
3488
                                              node.name, None)
3489
      result.Raise("Can't update hosts file with new host data")
3490
      _RedistributeAncillaryFiles(self)
3491

    
3492

    
3493
class _NodeQuery(_QueryBase):
3494
  FIELDS = query.NODE_FIELDS
3495

    
3496
  def ExpandNames(self, lu):
3497
    lu.needed_locks = {}
3498
    lu.share_locks[locking.LEVEL_NODE] = 1
3499

    
3500
    if self.names:
3501
      self.wanted = _GetWantedNodes(lu, self.names)
3502
    else:
3503
      self.wanted = locking.ALL_SET
3504

    
3505
    self.do_locking = (self.use_locking and
3506
                       query.NQ_LIVE in self.requested_data)
3507

    
3508
    if self.do_locking:
3509
      # if we don't request only static fields, we need to lock the nodes
3510
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
3511

    
3512
  def DeclareLocks(self, _):
3513
    pass
3514

    
3515
  def _GetQueryData(self, lu):
3516
    """Computes the list of nodes and their attributes.
3517

3518
    """
3519
    all_info = lu.cfg.GetAllNodesInfo()
3520

    
3521
    if self.do_locking:
3522
      nodenames = lu.acquired_locks[locking.LEVEL_NODE]
3523
    elif self.wanted != locking.ALL_SET:
3524
      nodenames = self.wanted
3525
      missing = set(nodenames).difference(all_info.keys())
3526
      if missing:
3527
        raise errors.OpExecError("Some nodes were removed before retrieving"
3528
                                 " their data: %s" % missing)
3529
    else:
3530
      nodenames = all_info.keys()
3531

    
3532
    nodenames = utils.NiceSort(nodenames)
3533

    
3534
    # Gather data as requested
3535
    if query.NQ_LIVE in self.requested_data:
3536
      node_data = lu.rpc.call_node_info(nodenames, lu.cfg.GetVGName(),
3537
                                        lu.cfg.GetHypervisorType())
3538
      live_data = dict((name, nresult.payload)
3539
                       for (name, nresult) in node_data.items()
3540
                       if not nresult.fail_msg and nresult.payload)
3541
    else:
3542
      live_data = None
3543

    
3544
    if query.NQ_INST in self.requested_data:
3545
      node_to_primary = dict([(name, set()) for name in nodenames])
3546
      node_to_secondary = dict([(name, set()) for name in nodenames])
3547

    
3548
      inst_data = lu.cfg.GetAllInstancesInfo()
3549

    
3550
      for inst in inst_data.values():
3551
        if inst.primary_node in node_to_primary:
3552
          node_to_primary[inst.primary_node].add(inst.name)
3553
        for secnode in inst.secondary_nodes:
3554
          if secnode in node_to_secondary:
3555
            node_to_secondary[secnode].add(inst.name)
3556
    else:
3557
      node_to_primary = None
3558
      node_to_secondary = None
3559

    
3560
    if query.NQ_GROUP in self.requested_data:
3561
      groups = lu.cfg.GetAllNodeGroupsInfo()
3562
    else:
3563
      groups = {}
3564

    
3565
    return query.NodeQueryData([all_info[name] for name in nodenames],
3566
                               live_data, lu.cfg.GetMasterNode(),
3567
                               node_to_primary, node_to_secondary, groups)
3568

    
3569

    
3570
class LUQueryNodes(NoHooksLU):
3571
  """Logical unit for querying nodes.
3572

3573
  """
3574
  # pylint: disable-msg=W0142
3575
  _OP_PARAMS = [
3576
    _POutputFields,
3577
    ("names", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
3578
    ("use_locking", False, ht.TBool),
3579
    ]
3580
  REQ_BGL = False
3581

    
3582
  def CheckArguments(self):
3583
    self.nq = _NodeQuery(self.op.names, self.op.output_fields,
3584
                         self.op.use_locking)
3585

    
3586
  def ExpandNames(self):
3587
    self.nq.ExpandNames(self)
3588

    
3589
  def Exec(self, feedback_fn):
3590
    return self.nq.OldStyleQuery(self)
3591

    
3592

    
3593
class LUQueryNodeVolumes(NoHooksLU):
3594
  """Logical unit for getting volumes on node(s).
3595

3596
  """
3597
  _OP_PARAMS = [
3598
    _POutputFields,
3599
    ("nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
3600
    ]
3601
  REQ_BGL = False
3602
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
3603
  _FIELDS_STATIC = utils.FieldSet("node")
3604

    
3605
  def CheckArguments(self):
3606
    _CheckOutputFields(static=self._FIELDS_STATIC,
3607
                       dynamic=self._FIELDS_DYNAMIC,
3608
                       selected=self.op.output_fields)
3609

    
3610
  def ExpandNames(self):
3611
    self.needed_locks = {}
3612
    self.share_locks[locking.LEVEL_NODE] = 1
3613
    if not self.op.nodes:
3614
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3615
    else:
3616
      self.needed_locks[locking.LEVEL_NODE] = \
3617
        _GetWantedNodes(self, self.op.nodes)
3618

    
3619
  def Exec(self, feedback_fn):
3620
    """Computes the list of nodes and their attributes.
3621

3622
    """
3623
    nodenames = self.acquired_locks[locking.LEVEL_NODE]
3624
    volumes = self.rpc.call_node_volumes(nodenames)
3625

    
3626
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3627
             in self.cfg.GetInstanceList()]
3628

    
3629
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
3630

    
3631
    output = []
3632
    for node in nodenames:
3633
      nresult = volumes[node]
3634
      if nresult.offline:
3635
        continue
3636
      msg = nresult.fail_msg
3637
      if msg:
3638
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
3639
        continue
3640

    
3641
      node_vols = nresult.payload[:]
3642
      node_vols.sort(key=lambda vol: vol['dev'])
3643

    
3644
      for vol in node_vols:
3645
        node_output = []
3646
        for field in self.op.output_fields:
3647
          if field == "node":
3648
            val = node
3649
          elif field == "phys":
3650
            val = vol['dev']
3651
          elif field == "vg":
3652
            val = vol['vg']
3653
          elif field == "name":
3654
            val = vol['name']
3655
          elif field == "size":
3656
            val = int(float(vol['size']))
3657
          elif field == "instance":
3658
            for inst in ilist:
3659
              if node not in lv_by_node[inst]:
3660
                continue
3661
              if vol['name'] in lv_by_node[inst][node]:
3662
                val = inst.name
3663
                break
3664
            else:
3665
              val = '-'
3666
          else:
3667
            raise errors.ParameterError(field)
3668
          node_output.append(str(val))
3669

    
3670
        output.append(node_output)
3671

    
3672
    return output
3673

    
3674

    
3675
class LUQueryNodeStorage(NoHooksLU):
3676
  """Logical unit for getting information on storage units on node(s).
3677

3678
  """
3679
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
3680
  _OP_PARAMS = [
3681
    _POutputFields,
3682
    ("nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
3683
    ("storage_type", ht.NoDefault, _CheckStorageType),
3684
    ("name", None, ht.TMaybeString),
3685
    ]
3686
  REQ_BGL = False
3687

    
3688
  def CheckArguments(self):
3689
    _CheckOutputFields(static=self._FIELDS_STATIC,
3690
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
3691
                       selected=self.op.output_fields)
3692

    
3693
  def ExpandNames(self):
3694
    self.needed_locks = {}
3695
    self.share_locks[locking.LEVEL_NODE] = 1
3696

    
3697
    if self.op.nodes:
3698
      self.needed_locks[locking.LEVEL_NODE] = \
3699
        _GetWantedNodes(self, self.op.nodes)
3700
    else:
3701
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3702

    
3703
  def Exec(self, feedback_fn):
3704
    """Computes the list of nodes and their attributes.
3705

3706
    """
3707
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
3708

    
3709
    # Always get name to sort by
3710
    if constants.SF_NAME in self.op.output_fields:
3711
      fields = self.op.output_fields[:]
3712
    else:
3713
      fields = [constants.SF_NAME] + self.op.output_fields
3714

    
3715
    # Never ask for node or type as it's only known to the LU
3716
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
3717
      while extra in fields:
3718
        fields.remove(extra)
3719

    
3720
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
3721
    name_idx = field_idx[constants.SF_NAME]
3722

    
3723
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3724
    data = self.rpc.call_storage_list(self.nodes,
3725
                                      self.op.storage_type, st_args,
3726
                                      self.op.name, fields)
3727

    
3728
    result = []
3729

    
3730
    for node in utils.NiceSort(self.nodes):
3731
      nresult = data[node]
3732
      if nresult.offline:
3733
        continue
3734

    
3735
      msg = nresult.fail_msg
3736
      if msg:
3737
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
3738
        continue
3739

    
3740
      rows = dict([(row[name_idx], row) for row in nresult.payload])
3741

    
3742
      for name in utils.NiceSort(rows.keys()):
3743
        row = rows[name]
3744

    
3745
        out = []
3746

    
3747
        for field in self.op.output_fields:
3748
          if field == constants.SF_NODE:
3749
            val = node
3750
          elif field == constants.SF_TYPE:
3751
            val = self.op.storage_type
3752
          elif field in field_idx:
3753
            val = row[field_idx[field]]
3754
          else:
3755
            raise errors.ParameterError(field)
3756

    
3757
          out.append(val)
3758

    
3759
        result.append(out)
3760

    
3761
    return result
3762

    
3763

    
3764
def _InstanceQuery(*args): # pylint: disable-msg=W0613
3765
  """Dummy until instance queries have been converted to query2.
3766

3767
  """
3768
  raise NotImplementedError
3769

    
3770

    
3771
#: Query type implementations
3772
_QUERY_IMPL = {
3773
  constants.QR_INSTANCE: _InstanceQuery,
3774
  constants.QR_NODE: _NodeQuery,
3775
  }
3776

    
3777

    
3778
def _GetQueryImplementation(name):
3779
  """Returns the implemtnation for a query type.
3780

3781
  @param name: Query type, must be one of L{constants.QR_OP_QUERY}
3782

3783
  """
3784
  try:
3785
    return _QUERY_IMPL[name]
3786
  except KeyError:
3787
    raise errors.OpPrereqError("Unknown query resource '%s'" % name,
3788
                               errors.ECODE_INVAL)
3789

    
3790

    
3791
class LUQuery(NoHooksLU):
3792
  """Query for resources/items of a certain kind.
3793

3794
  """
3795
  # pylint: disable-msg=W0142
3796
  _OP_PARAMS = [
3797
    ("what", ht.NoDefault, ht.TElemOf(constants.QR_OP_QUERY)),
3798
    ("fields", ht.NoDefault, ht.TListOf(ht.TNonEmptyString)),
3799
    ("filter", None, ht.TOr(ht.TNone,
3800
                            ht.TListOf(ht.TOr(ht.TNonEmptyString, ht.TList)))),
3801
    ]
3802
  REQ_BGL = False
3803

    
3804
  def CheckArguments(self):
3805
    qcls = _GetQueryImplementation(self.op.what)
3806
    names = qlang.ReadSimpleFilter("name", self.op.filter)
3807

    
3808
    self.impl = qcls(names, self.op.fields, False)
3809

    
3810
  def ExpandNames(self):
3811
    self.impl.ExpandNames(self)
3812

    
3813
  def DeclareLocks(self, level):
3814
    self.impl.DeclareLocks(self, level)
3815

    
3816
  def Exec(self, feedback_fn):
3817
    return self.impl.NewStyleQuery(self)
3818

    
3819

    
3820
class LUQueryFields(NoHooksLU):
3821
  """Query for resources/items of a certain kind.
3822

3823
  """
3824
  # pylint: disable-msg=W0142
3825
  _OP_PARAMS = [
3826
    ("what", ht.NoDefault, ht.TElemOf(constants.QR_OP_QUERY)),
3827
    ("fields", None, ht.TOr(ht.TNone, ht.TListOf(ht.TNonEmptyString))),
3828
    ]
3829
  REQ_BGL = False
3830

    
3831
  def CheckArguments(self):
3832
    self.qcls = _GetQueryImplementation(self.op.what)
3833

    
3834
  def ExpandNames(self):
3835
    self.needed_locks = {}
3836

    
3837
  def Exec(self, feedback_fn):
3838
    return self.qcls.FieldsQuery(self.op.fields)
3839

    
3840

    
3841
class LUModifyNodeStorage(NoHooksLU):
3842
  """Logical unit for modifying a storage volume on a node.
3843

3844
  """
3845
  _OP_PARAMS = [
3846
    _PNodeName,
3847
    ("storage_type", ht.NoDefault, _CheckStorageType),
3848
    ("name", ht.NoDefault, ht.TNonEmptyString),
3849
    ("changes", ht.NoDefault, ht.TDict),
3850
    ]
3851
  REQ_BGL = False
3852

    
3853
  def CheckArguments(self):
3854
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3855

    
3856
    storage_type = self.op.storage_type
3857

    
3858
    try:
3859
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
3860
    except KeyError:
3861
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
3862
                                 " modified" % storage_type,
3863
                                 errors.ECODE_INVAL)
3864

    
3865
    diff = set(self.op.changes.keys()) - modifiable
3866
    if diff:
3867
      raise errors.OpPrereqError("The following fields can not be modified for"
3868
                                 " storage units of type '%s': %r" %
3869
                                 (storage_type, list(diff)),
3870
                                 errors.ECODE_INVAL)
3871

    
3872
  def ExpandNames(self):
3873
    self.needed_locks = {
3874
      locking.LEVEL_NODE: self.op.node_name,
3875
      }
3876

    
3877
  def Exec(self, feedback_fn):
3878
    """Computes the list of nodes and their attributes.
3879

3880
    """
3881
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3882
    result = self.rpc.call_storage_modify(self.op.node_name,
3883
                                          self.op.storage_type, st_args,
3884
                                          self.op.name, self.op.changes)
3885
    result.Raise("Failed to modify storage unit '%s' on %s" %
3886
                 (self.op.name, self.op.node_name))
3887

    
3888

    
3889
class LUAddNode(LogicalUnit):
3890
  """Logical unit for adding node to the cluster.
3891

3892
  """
3893
  HPATH = "node-add"
3894
  HTYPE = constants.HTYPE_NODE
3895
  _OP_PARAMS = [
3896
    _PNodeName,
3897
    ("primary_ip", None, ht.NoType),
3898
    ("secondary_ip", None, ht.TMaybeString),
3899
    ("readd", False, ht.TBool),
3900
    ("group", None, ht.TMaybeString),
3901
    ("master_capable", None, ht.TMaybeBool),
3902
    ("vm_capable", None, ht.TMaybeBool),
3903
    ("ndparams", None, ht.TOr(ht.TDict, ht.TNone)),
3904
    ]
3905
  _NFLAGS = ["master_capable", "vm_capable"]
3906

    
3907
  def CheckArguments(self):
3908
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
3909
    # validate/normalize the node name
3910
    self.hostname = netutils.GetHostname(name=self.op.node_name,
3911
                                         family=self.primary_ip_family)
3912
    self.op.node_name = self.hostname.name
3913
    if self.op.readd and self.op.group:
3914
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
3915
                                 " being readded", errors.ECODE_INVAL)
3916

    
3917
  def BuildHooksEnv(self):
3918
    """Build hooks env.
3919

3920
    This will run on all nodes before, and on all nodes + the new node after.
3921

3922
    """
3923
    env = {
3924
      "OP_TARGET": self.op.node_name,
3925
      "NODE_NAME": self.op.node_name,
3926
      "NODE_PIP": self.op.primary_ip,
3927
      "NODE_SIP": self.op.secondary_ip,
3928
      "MASTER_CAPABLE": str(self.op.master_capable),
3929
      "VM_CAPABLE": str(self.op.vm_capable),
3930
      }
3931
    nodes_0 = self.cfg.GetNodeList()
3932
    nodes_1 = nodes_0 + [self.op.node_name, ]
3933
    return env, nodes_0, nodes_1
3934

    
3935
  def CheckPrereq(self):
3936
    """Check prerequisites.
3937

3938
    This checks:
3939
     - the new node is not already in the config
3940
     - it is resolvable
3941
     - its parameters (single/dual homed) matches the cluster
3942

3943
    Any errors are signaled by raising errors.OpPrereqError.
3944

3945
    """
3946
    cfg = self.cfg
3947
    hostname = self.hostname
3948
    node = hostname.name
3949
    primary_ip = self.op.primary_ip = hostname.ip
3950
    if self.op.secondary_ip is None:
3951
      if self.primary_ip_family == netutils.IP6Address.family:
3952
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
3953
                                   " IPv4 address must be given as secondary",
3954
                                   errors.ECODE_INVAL)
3955
      self.op.secondary_ip = primary_ip
3956

    
3957
    secondary_ip = self.op.secondary_ip
3958
    if not netutils.IP4Address.IsValid(secondary_ip):
3959
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
3960
                                 " address" % secondary_ip, errors.ECODE_INVAL)
3961

    
3962
    node_list = cfg.GetNodeList()
3963
    if not self.op.readd and node in node_list:
3964
      raise errors.OpPrereqError("Node %s is already in the configuration" %
3965
                                 node, errors.ECODE_EXISTS)
3966
    elif self.op.readd and node not in node_list:
3967
      raise errors.OpPrereqError("Node %s is not in the configuration" % node,
3968
                                 errors.ECODE_NOENT)
3969

    
3970
    self.changed_primary_ip = False
3971

    
3972
    for existing_node_name in node_list:
3973
      existing_node = cfg.GetNodeInfo(existing_node_name)
3974

    
3975
      if self.op.readd and node == existing_node_name:
3976
        if existing_node.secondary_ip != secondary_ip:
3977
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
3978
                                     " address configuration as before",
3979
                                     errors.ECODE_INVAL)
3980
        if existing_node.primary_ip != primary_ip:
3981
          self.changed_primary_ip = True
3982

    
3983
        continue
3984

    
3985
      if (existing_node.primary_ip == primary_ip or
3986
          existing_node.secondary_ip == primary_ip or
3987
          existing_node.primary_ip == secondary_ip or
3988
          existing_node.secondary_ip == secondary_ip):
3989
        raise errors.OpPrereqError("New node ip address(es) conflict with"
3990
                                   " existing node %s" % existing_node.name,
3991
                                   errors.ECODE_NOTUNIQUE)
3992

    
3993
    # After this 'if' block, None is no longer a valid value for the
3994
    # _capable op attributes
3995
    if self.op.readd:
3996
      old_node = self.cfg.GetNodeInfo(node)
3997
      assert old_node is not None, "Can't retrieve locked node %s" % node
3998
      for attr in self._NFLAGS:
3999
        if getattr(self.op, attr) is None:
4000
          setattr(self.op, attr, getattr(old_node, attr))
4001
    else:
4002
      for attr in self._NFLAGS:
4003
        if getattr(self.op, attr) is None:
4004
          setattr(self.op, attr, True)
4005

    
4006
    if self.op.readd and not self.op.vm_capable:
4007
      pri, sec = cfg.GetNodeInstances(node)
4008
      if pri or sec:
4009
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
4010
                                   " flag set to false, but it already holds"
4011
                                   " instances" % node,
4012
                                   errors.ECODE_STATE)
4013

    
4014
    # check that the type of the node (single versus dual homed) is the
4015
    # same as for the master
4016
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
4017
    master_singlehomed = myself.secondary_ip == myself.primary_ip
4018
    newbie_singlehomed = secondary_ip == primary_ip
4019
    if master_singlehomed != newbie_singlehomed:
4020
      if master_singlehomed:
4021
        raise errors.OpPrereqError("The master has no secondary ip but the"
4022
                                   " new node has one",
4023
                                   errors.ECODE_INVAL)
4024
      else:
4025
        raise errors.OpPrereqError("The master has a secondary ip but the"
4026
                                   " new node doesn't have one",
4027
                                   errors.ECODE_INVAL)
4028

    
4029
    # checks reachability
4030
    if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
4031
      raise errors.OpPrereqError("Node not reachable by ping",
4032
                                 errors.ECODE_ENVIRON)
4033

    
4034
    if not newbie_singlehomed:
4035
      # check reachability from my secondary ip to newbie's secondary ip
4036
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
4037
                           source=myself.secondary_ip):
4038
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
4039
                                   " based ping to node daemon port",
4040
                                   errors.ECODE_ENVIRON)
4041

    
4042
    if self.op.readd:
4043
      exceptions = [node]
4044
    else:
4045
      exceptions = []
4046

    
4047
    if self.op.master_capable:
4048
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
4049
    else:
4050
      self.master_candidate = False
4051

    
4052
    if self.op.readd:
4053
      self.new_node = old_node
4054
    else:
4055
      node_group = cfg.LookupNodeGroup(self.op.group)
4056
      self.new_node = objects.Node(name=node,
4057
                                   primary_ip=primary_ip,
4058
                                   secondary_ip=secondary_ip,
4059
                                   master_candidate=self.master_candidate,
4060
                                   offline=False, drained=False,
4061
                                   group=node_group)
4062

    
4063
    if