Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 4fe5cf90

History | View | Annotate | Download (399.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay to many lines in this module
30

    
31
import os
32
import os.path
33
import time
34
import re
35
import platform
36
import logging
37
import copy
38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
42

    
43
from ganeti import ssh
44
from ganeti import utils
45
from ganeti import errors
46
from ganeti import hypervisor
47
from ganeti import locking
48
from ganeti import constants
49
from ganeti import objects
50
from ganeti import serializer
51
from ganeti import ssconf
52
from ganeti import uidpool
53
from ganeti import compat
54
from ganeti import masterd
55
from ganeti import netutils
56
from ganeti import ht
57
from ganeti import query
58
from ganeti import qlang
59

    
60
import ganeti.masterd.instance # pylint: disable-msg=W0611
61

    
62
# Common opcode attributes
63

    
64
#: output fields for a query operation
65
_POutputFields = ("output_fields", ht.NoDefault, ht.TListOf(ht.TNonEmptyString))
66

    
67

    
68
#: the shutdown timeout
69
_PShutdownTimeout = ("shutdown_timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT,
70
                     ht.TPositiveInt)
71

    
72
#: the force parameter
73
_PForce = ("force", False, ht.TBool)
74

    
75
#: a required instance name (for single-instance LUs)
76
_PInstanceName = ("instance_name", ht.NoDefault, ht.TNonEmptyString)
77

    
78
#: Whether to ignore offline nodes
79
_PIgnoreOfflineNodes = ("ignore_offline_nodes", False, ht.TBool)
80

    
81
#: a required node name (for single-node LUs)
82
_PNodeName = ("node_name", ht.NoDefault, ht.TNonEmptyString)
83

    
84
#: a required node group name (for single-group LUs)
85
_PGroupName = ("group_name", ht.NoDefault, ht.TNonEmptyString)
86

    
87
#: the migration type (live/non-live)
88
_PMigrationMode = ("mode", None,
89
                   ht.TOr(ht.TNone, ht.TElemOf(constants.HT_MIGRATION_MODES)))
90

    
91
#: the obsolete 'live' mode (boolean)
92
_PMigrationLive = ("live", None, ht.TMaybeBool)
93

    
94

    
95
# End types
96
class LogicalUnit(object):
97
  """Logical Unit base class.
98

99
  Subclasses must follow these rules:
100
    - implement ExpandNames
101
    - implement CheckPrereq (except when tasklets are used)
102
    - implement Exec (except when tasklets are used)
103
    - implement BuildHooksEnv
104
    - redefine HPATH and HTYPE
105
    - optionally redefine their run requirements:
106
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
107

108
  Note that all commands require root permissions.
109

110
  @ivar dry_run_result: the value (if any) that will be returned to the caller
111
      in dry-run mode (signalled by opcode dry_run parameter)
112
  @cvar _OP_PARAMS: a list of opcode attributes, the default values
113
      they should get if not already defined, and types they must match
114

115
  """
116
  HPATH = None
117
  HTYPE = None
118
  _OP_PARAMS = []
119
  REQ_BGL = True
120

    
121
  def __init__(self, processor, op, context, rpc):
122
    """Constructor for LogicalUnit.
123

124
    This needs to be overridden in derived classes in order to check op
125
    validity.
126

127
    """
128
    self.proc = processor
129
    self.op = op
130
    self.cfg = context.cfg
131
    self.context = context
132
    self.rpc = rpc
133
    # Dicts used to declare locking needs to mcpu
134
    self.needed_locks = None
135
    self.acquired_locks = {}
136
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
137
    self.add_locks = {}
138
    self.remove_locks = {}
139
    # Used to force good behavior when calling helper functions
140
    self.recalculate_locks = {}
141
    self.__ssh = None
142
    # logging
143
    self.Log = processor.Log # pylint: disable-msg=C0103
144
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
145
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
146
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
147
    # support for dry-run
148
    self.dry_run_result = None
149
    # support for generic debug attribute
150
    if (not hasattr(self.op, "debug_level") or
151
        not isinstance(self.op.debug_level, int)):
152
      self.op.debug_level = 0
153

    
154
    # Tasklets
155
    self.tasklets = None
156

    
157
    # The new kind-of-type-system
158
    op_id = self.op.OP_ID
159
    for attr_name, aval, test in self._OP_PARAMS:
160
      if not hasattr(op, attr_name):
161
        if aval == ht.NoDefault:
162
          raise errors.OpPrereqError("Required parameter '%s.%s' missing" %
163
                                     (op_id, attr_name), errors.ECODE_INVAL)
164
        else:
165
          if callable(aval):
166
            dval = aval()
167
          else:
168
            dval = aval
169
          setattr(self.op, attr_name, dval)
170
      attr_val = getattr(op, attr_name)
171
      if test == ht.NoType:
172
        # no tests here
173
        continue
174
      if not callable(test):
175
        raise errors.ProgrammerError("Validation for parameter '%s.%s' failed,"
176
                                     " given type is not a proper type (%s)" %
177
                                     (op_id, attr_name, test))
178
      if not test(attr_val):
179
        logging.error("OpCode %s, parameter %s, has invalid type %s/value %s",
180
                      self.op.OP_ID, attr_name, type(attr_val), attr_val)
181
        raise errors.OpPrereqError("Parameter '%s.%s' fails validation" %
182
                                   (op_id, attr_name), errors.ECODE_INVAL)
183

    
184
    self.CheckArguments()
185

    
186
  def __GetSSH(self):
187
    """Returns the SshRunner object
188

189
    """
190
    if not self.__ssh:
191
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
192
    return self.__ssh
193

    
194
  ssh = property(fget=__GetSSH)
195

    
196
  def CheckArguments(self):
197
    """Check syntactic validity for the opcode arguments.
198

199
    This method is for doing a simple syntactic check and ensure
200
    validity of opcode parameters, without any cluster-related
201
    checks. While the same can be accomplished in ExpandNames and/or
202
    CheckPrereq, doing these separate is better because:
203

204
      - ExpandNames is left as as purely a lock-related function
205
      - CheckPrereq is run after we have acquired locks (and possible
206
        waited for them)
207

208
    The function is allowed to change the self.op attribute so that
209
    later methods can no longer worry about missing parameters.
210

211
    """
212
    pass
213

    
214
  def ExpandNames(self):
215
    """Expand names for this LU.
216

217
    This method is called before starting to execute the opcode, and it should
218
    update all the parameters of the opcode to their canonical form (e.g. a
219
    short node name must be fully expanded after this method has successfully
220
    completed). This way locking, hooks, logging, etc. can work correctly.
221

222
    LUs which implement this method must also populate the self.needed_locks
223
    member, as a dict with lock levels as keys, and a list of needed lock names
224
    as values. Rules:
225

226
      - use an empty dict if you don't need any lock
227
      - if you don't need any lock at a particular level omit that level
228
      - don't put anything for the BGL level
229
      - if you want all locks at a level use locking.ALL_SET as a value
230

231
    If you need to share locks (rather than acquire them exclusively) at one
232
    level you can modify self.share_locks, setting a true value (usually 1) for
233
    that level. By default locks are not shared.
234

235
    This function can also define a list of tasklets, which then will be
236
    executed in order instead of the usual LU-level CheckPrereq and Exec
237
    functions, if those are not defined by the LU.
238

239
    Examples::
240

241
      # Acquire all nodes and one instance
242
      self.needed_locks = {
243
        locking.LEVEL_NODE: locking.ALL_SET,
244
        locking.LEVEL_INSTANCE: ['instance1.example.com'],
245
      }
246
      # Acquire just two nodes
247
      self.needed_locks = {
248
        locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
249
      }
250
      # Acquire no locks
251
      self.needed_locks = {} # No, you can't leave it to the default value None
252

253
    """
254
    # The implementation of this method is mandatory only if the new LU is
255
    # concurrent, so that old LUs don't need to be changed all at the same
256
    # time.
257
    if self.REQ_BGL:
258
      self.needed_locks = {} # Exclusive LUs don't need locks.
259
    else:
260
      raise NotImplementedError
261

    
262
  def DeclareLocks(self, level):
263
    """Declare LU locking needs for a level
264

265
    While most LUs can just declare their locking needs at ExpandNames time,
266
    sometimes there's the need to calculate some locks after having acquired
267
    the ones before. This function is called just before acquiring locks at a
268
    particular level, but after acquiring the ones at lower levels, and permits
269
    such calculations. It can be used to modify self.needed_locks, and by
270
    default it does nothing.
271

272
    This function is only called if you have something already set in
273
    self.needed_locks for the level.
274

275
    @param level: Locking level which is going to be locked
276
    @type level: member of ganeti.locking.LEVELS
277

278
    """
279

    
280
  def CheckPrereq(self):
281
    """Check prerequisites for this LU.
282

283
    This method should check that the prerequisites for the execution
284
    of this LU are fulfilled. It can do internode communication, but
285
    it should be idempotent - no cluster or system changes are
286
    allowed.
287

288
    The method should raise errors.OpPrereqError in case something is
289
    not fulfilled. Its return value is ignored.
290

291
    This method should also update all the parameters of the opcode to
292
    their canonical form if it hasn't been done by ExpandNames before.
293

294
    """
295
    if self.tasklets is not None:
296
      for (idx, tl) in enumerate(self.tasklets):
297
        logging.debug("Checking prerequisites for tasklet %s/%s",
298
                      idx + 1, len(self.tasklets))
299
        tl.CheckPrereq()
300
    else:
301
      pass
302

    
303
  def Exec(self, feedback_fn):
304
    """Execute the LU.
305

306
    This method should implement the actual work. It should raise
307
    errors.OpExecError for failures that are somewhat dealt with in
308
    code, or expected.
309

310
    """
311
    if self.tasklets is not None:
312
      for (idx, tl) in enumerate(self.tasklets):
313
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
314
        tl.Exec(feedback_fn)
315
    else:
316
      raise NotImplementedError
317

    
318
  def BuildHooksEnv(self):
319
    """Build hooks environment for this LU.
320

321
    This method should return a three-node tuple consisting of: a dict
322
    containing the environment that will be used for running the
323
    specific hook for this LU, a list of node names on which the hook
324
    should run before the execution, and a list of node names on which
325
    the hook should run after the execution.
326

327
    The keys of the dict must not have 'GANETI_' prefixed as this will
328
    be handled in the hooks runner. Also note additional keys will be
329
    added by the hooks runner. If the LU doesn't define any
330
    environment, an empty dict (and not None) should be returned.
331

332
    No nodes should be returned as an empty list (and not None).
333

334
    Note that if the HPATH for a LU class is None, this function will
335
    not be called.
336

337
    """
338
    raise NotImplementedError
339

    
340
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
341
    """Notify the LU about the results of its hooks.
342

343
    This method is called every time a hooks phase is executed, and notifies
344
    the Logical Unit about the hooks' result. The LU can then use it to alter
345
    its result based on the hooks.  By default the method does nothing and the
346
    previous result is passed back unchanged but any LU can define it if it
347
    wants to use the local cluster hook-scripts somehow.
348

349
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
350
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
351
    @param hook_results: the results of the multi-node hooks rpc call
352
    @param feedback_fn: function used send feedback back to the caller
353
    @param lu_result: the previous Exec result this LU had, or None
354
        in the PRE phase
355
    @return: the new Exec result, based on the previous result
356
        and hook results
357

358
    """
359
    # API must be kept, thus we ignore the unused argument and could
360
    # be a function warnings
361
    # pylint: disable-msg=W0613,R0201
362
    return lu_result
363

    
364
  def _ExpandAndLockInstance(self):
365
    """Helper function to expand and lock an instance.
366

367
    Many LUs that work on an instance take its name in self.op.instance_name
368
    and need to expand it and then declare the expanded name for locking. This
369
    function does it, and then updates self.op.instance_name to the expanded
370
    name. It also initializes needed_locks as a dict, if this hasn't been done
371
    before.
372

373
    """
374
    if self.needed_locks is None:
375
      self.needed_locks = {}
376
    else:
377
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
378
        "_ExpandAndLockInstance called with instance-level locks set"
379
    self.op.instance_name = _ExpandInstanceName(self.cfg,
380
                                                self.op.instance_name)
381
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
382

    
383
  def _LockInstancesNodes(self, primary_only=False):
384
    """Helper function to declare instances' nodes for locking.
385

386
    This function should be called after locking one or more instances to lock
387
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
388
    with all primary or secondary nodes for instances already locked and
389
    present in self.needed_locks[locking.LEVEL_INSTANCE].
390

391
    It should be called from DeclareLocks, and for safety only works if
392
    self.recalculate_locks[locking.LEVEL_NODE] is set.
393

394
    In the future it may grow parameters to just lock some instance's nodes, or
395
    to just lock primaries or secondary nodes, if needed.
396

397
    If should be called in DeclareLocks in a way similar to::
398

399
      if level == locking.LEVEL_NODE:
400
        self._LockInstancesNodes()
401

402
    @type primary_only: boolean
403
    @param primary_only: only lock primary nodes of locked instances
404

405
    """
406
    assert locking.LEVEL_NODE in self.recalculate_locks, \
407
      "_LockInstancesNodes helper function called with no nodes to recalculate"
408

    
409
    # TODO: check if we're really been called with the instance locks held
410

    
411
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
412
    # future we might want to have different behaviors depending on the value
413
    # of self.recalculate_locks[locking.LEVEL_NODE]
414
    wanted_nodes = []
415
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
416
      instance = self.context.cfg.GetInstanceInfo(instance_name)
417
      wanted_nodes.append(instance.primary_node)
418
      if not primary_only:
419
        wanted_nodes.extend(instance.secondary_nodes)
420

    
421
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
422
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
423
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
424
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
425

    
426
    del self.recalculate_locks[locking.LEVEL_NODE]
427

    
428

    
429
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
430
  """Simple LU which runs no hooks.
431

432
  This LU is intended as a parent for other LogicalUnits which will
433
  run no hooks, in order to reduce duplicate code.
434

435
  """
436
  HPATH = None
437
  HTYPE = None
438

    
439
  def BuildHooksEnv(self):
440
    """Empty BuildHooksEnv for NoHooksLu.
441

442
    This just raises an error.
443

444
    """
445
    assert False, "BuildHooksEnv called for NoHooksLUs"
446

    
447

    
448
class Tasklet:
449
  """Tasklet base class.
450

451
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
452
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
453
  tasklets know nothing about locks.
454

455
  Subclasses must follow these rules:
456
    - Implement CheckPrereq
457
    - Implement Exec
458

459
  """
460
  def __init__(self, lu):
461
    self.lu = lu
462

    
463
    # Shortcuts
464
    self.cfg = lu.cfg
465
    self.rpc = lu.rpc
466

    
467
  def CheckPrereq(self):
468
    """Check prerequisites for this tasklets.
469

470
    This method should check whether the prerequisites for the execution of
471
    this tasklet are fulfilled. It can do internode communication, but it
472
    should be idempotent - no cluster or system changes are allowed.
473

474
    The method should raise errors.OpPrereqError in case something is not
475
    fulfilled. Its return value is ignored.
476

477
    This method should also update all parameters to their canonical form if it
478
    hasn't been done before.
479

480
    """
481
    pass
482

    
483
  def Exec(self, feedback_fn):
484
    """Execute the tasklet.
485

486
    This method should implement the actual work. It should raise
487
    errors.OpExecError for failures that are somewhat dealt with in code, or
488
    expected.
489

490
    """
491
    raise NotImplementedError
492

    
493

    
494
class _QueryBase:
495
  """Base for query utility classes.
496

497
  """
498
  #: Attribute holding field definitions
499
  FIELDS = None
500

    
501
  def __init__(self, names, fields, use_locking):
502
    """Initializes this class.
503

504
    """
505
    self.names = names
506
    self.use_locking = use_locking
507

    
508
    self.query = query.Query(self.FIELDS, fields)
509
    self.requested_data = self.query.RequestedData()
510

    
511
  @classmethod
512
  def FieldsQuery(cls, fields):
513
    """Returns list of available fields.
514

515
    @return: List of L{objects.QueryFieldDefinition}
516

517
    """
518
    if fields is None:
519
      # Client requests all fields
520
      fdefs = query.GetAllFields(cls.FIELDS.values())
521
    else:
522
      fdefs = query.Query(cls.FIELDS, fields).GetFields()
523

    
524
    return {
525
      "fields": [fdef.ToDict() for fdef in fdefs],
526
      }
527

    
528
  def ExpandNames(self, lu):
529
    """Expand names for this query.
530

531
    See L{LogicalUnit.ExpandNames}.
532

533
    """
534
    raise NotImplementedError()
535

    
536
  def DeclareLocks(self, level):
537
    """Declare locks for this query.
538

539
    See L{LogicalUnit.DeclareLocks}.
540

541
    """
542
    raise NotImplementedError()
543

    
544
  def _GetQueryData(self, lu):
545
    """Collects all data for this query.
546

547
    @return: Query data object
548

549
    """
550
    raise NotImplementedError()
551

    
552
  def NewStyleQuery(self, lu):
553
    """Collect data and execute query.
554

555
    """
556
    data = self._GetQueryData(lu)
557

    
558
    return {
559
      "data": self.query.Query(data),
560
      "fields": [fdef.ToDict()
561
                 for fdef in self.query.GetFields()],
562
      }
563

    
564
  def OldStyleQuery(self, lu):
565
    """Collect data and execute query.
566

567
    """
568
    return self.query.OldStyleQuery(self._GetQueryData(lu))
569

    
570

    
571
def _GetWantedNodes(lu, nodes):
572
  """Returns list of checked and expanded node names.
573

574
  @type lu: L{LogicalUnit}
575
  @param lu: the logical unit on whose behalf we execute
576
  @type nodes: list
577
  @param nodes: list of node names or None for all nodes
578
  @rtype: list
579
  @return: the list of nodes, sorted
580
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
581

582
  """
583
  if nodes:
584
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
585

    
586
  return utils.NiceSort(lu.cfg.GetNodeList())
587

    
588

    
589
def _GetWantedInstances(lu, instances):
590
  """Returns list of checked and expanded instance names.
591

592
  @type lu: L{LogicalUnit}
593
  @param lu: the logical unit on whose behalf we execute
594
  @type instances: list
595
  @param instances: list of instance names or None for all instances
596
  @rtype: list
597
  @return: the list of instances, sorted
598
  @raise errors.OpPrereqError: if the instances parameter is wrong type
599
  @raise errors.OpPrereqError: if any of the passed instances is not found
600

601
  """
602
  if instances:
603
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
604
  else:
605
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
606
  return wanted
607

    
608

    
609
def _GetUpdatedParams(old_params, update_dict,
610
                      use_default=True, use_none=False):
611
  """Return the new version of a parameter dictionary.
612

613
  @type old_params: dict
614
  @param old_params: old parameters
615
  @type update_dict: dict
616
  @param update_dict: dict containing new parameter values, or
617
      constants.VALUE_DEFAULT to reset the parameter to its default
618
      value
619
  @param use_default: boolean
620
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
621
      values as 'to be deleted' values
622
  @param use_none: boolean
623
  @type use_none: whether to recognise C{None} values as 'to be
624
      deleted' values
625
  @rtype: dict
626
  @return: the new parameter dictionary
627

628
  """
629
  params_copy = copy.deepcopy(old_params)
630
  for key, val in update_dict.iteritems():
631
    if ((use_default and val == constants.VALUE_DEFAULT) or
632
        (use_none and val is None)):
633
      try:
634
        del params_copy[key]
635
      except KeyError:
636
        pass
637
    else:
638
      params_copy[key] = val
639
  return params_copy
640

    
641

    
642
def _CheckOutputFields(static, dynamic, selected):
643
  """Checks whether all selected fields are valid.
644

645
  @type static: L{utils.FieldSet}
646
  @param static: static fields set
647
  @type dynamic: L{utils.FieldSet}
648
  @param dynamic: dynamic fields set
649

650
  """
651
  f = utils.FieldSet()
652
  f.Extend(static)
653
  f.Extend(dynamic)
654

    
655
  delta = f.NonMatching(selected)
656
  if delta:
657
    raise errors.OpPrereqError("Unknown output fields selected: %s"
658
                               % ",".join(delta), errors.ECODE_INVAL)
659

    
660

    
661
def _CheckGlobalHvParams(params):
662
  """Validates that given hypervisor params are not global ones.
663

664
  This will ensure that instances don't get customised versions of
665
  global params.
666

667
  """
668
  used_globals = constants.HVC_GLOBALS.intersection(params)
669
  if used_globals:
670
    msg = ("The following hypervisor parameters are global and cannot"
671
           " be customized at instance level, please modify them at"
672
           " cluster level: %s" % utils.CommaJoin(used_globals))
673
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
674

    
675

    
676
def _CheckNodeOnline(lu, node, msg=None):
677
  """Ensure that a given node is online.
678

679
  @param lu: the LU on behalf of which we make the check
680
  @param node: the node to check
681
  @param msg: if passed, should be a message to replace the default one
682
  @raise errors.OpPrereqError: if the node is offline
683

684
  """
685
  if msg is None:
686
    msg = "Can't use offline node"
687
  if lu.cfg.GetNodeInfo(node).offline:
688
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
689

    
690

    
691
def _CheckNodeNotDrained(lu, node):
692
  """Ensure that a given node is not drained.
693

694
  @param lu: the LU on behalf of which we make the check
695
  @param node: the node to check
696
  @raise errors.OpPrereqError: if the node is drained
697

698
  """
699
  if lu.cfg.GetNodeInfo(node).drained:
700
    raise errors.OpPrereqError("Can't use drained node %s" % node,
701
                               errors.ECODE_STATE)
702

    
703

    
704
def _CheckNodeVmCapable(lu, node):
705
  """Ensure that a given node is vm capable.
706

707
  @param lu: the LU on behalf of which we make the check
708
  @param node: the node to check
709
  @raise errors.OpPrereqError: if the node is not vm capable
710

711
  """
712
  if not lu.cfg.GetNodeInfo(node).vm_capable:
713
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
714
                               errors.ECODE_STATE)
715

    
716

    
717
def _CheckNodeHasOS(lu, node, os_name, force_variant):
718
  """Ensure that a node supports a given OS.
719

720
  @param lu: the LU on behalf of which we make the check
721
  @param node: the node to check
722
  @param os_name: the OS to query about
723
  @param force_variant: whether to ignore variant errors
724
  @raise errors.OpPrereqError: if the node is not supporting the OS
725

726
  """
727
  result = lu.rpc.call_os_get(node, os_name)
728
  result.Raise("OS '%s' not in supported OS list for node %s" %
729
               (os_name, node),
730
               prereq=True, ecode=errors.ECODE_INVAL)
731
  if not force_variant:
732
    _CheckOSVariant(result.payload, os_name)
733

    
734

    
735
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
736
  """Ensure that a node has the given secondary ip.
737

738
  @type lu: L{LogicalUnit}
739
  @param lu: the LU on behalf of which we make the check
740
  @type node: string
741
  @param node: the node to check
742
  @type secondary_ip: string
743
  @param secondary_ip: the ip to check
744
  @type prereq: boolean
745
  @param prereq: whether to throw a prerequisite or an execute error
746
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
747
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
748

749
  """
750
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
751
  result.Raise("Failure checking secondary ip on node %s" % node,
752
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
753
  if not result.payload:
754
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
755
           " please fix and re-run this command" % secondary_ip)
756
    if prereq:
757
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
758
    else:
759
      raise errors.OpExecError(msg)
760

    
761

    
762
def _RequireFileStorage():
763
  """Checks that file storage is enabled.
764

765
  @raise errors.OpPrereqError: when file storage is disabled
766

767
  """
768
  if not constants.ENABLE_FILE_STORAGE:
769
    raise errors.OpPrereqError("File storage disabled at configure time",
770
                               errors.ECODE_INVAL)
771

    
772

    
773
def _CheckDiskTemplate(template):
774
  """Ensure a given disk template is valid.
775

776
  """
777
  if template not in constants.DISK_TEMPLATES:
778
    msg = ("Invalid disk template name '%s', valid templates are: %s" %
779
           (template, utils.CommaJoin(constants.DISK_TEMPLATES)))
780
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
781
  if template == constants.DT_FILE:
782
    _RequireFileStorage()
783
  return True
784

    
785

    
786
def _CheckStorageType(storage_type):
787
  """Ensure a given storage type is valid.
788

789
  """
790
  if storage_type not in constants.VALID_STORAGE_TYPES:
791
    raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
792
                               errors.ECODE_INVAL)
793
  if storage_type == constants.ST_FILE:
794
    _RequireFileStorage()
795
  return True
796

    
797

    
798
def _GetClusterDomainSecret():
799
  """Reads the cluster domain secret.
800

801
  """
802
  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
803
                               strict=True)
804

    
805

    
806
def _CheckInstanceDown(lu, instance, reason):
807
  """Ensure that an instance is not running."""
808
  if instance.admin_up:
809
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
810
                               (instance.name, reason), errors.ECODE_STATE)
811

    
812
  pnode = instance.primary_node
813
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
814
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
815
              prereq=True, ecode=errors.ECODE_ENVIRON)
816

    
817
  if instance.name in ins_l.payload:
818
    raise errors.OpPrereqError("Instance %s is running, %s" %
819
                               (instance.name, reason), errors.ECODE_STATE)
820

    
821

    
822
def _ExpandItemName(fn, name, kind):
823
  """Expand an item name.
824

825
  @param fn: the function to use for expansion
826
  @param name: requested item name
827
  @param kind: text description ('Node' or 'Instance')
828
  @return: the resolved (full) name
829
  @raise errors.OpPrereqError: if the item is not found
830

831
  """
832
  full_name = fn(name)
833
  if full_name is None:
834
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
835
                               errors.ECODE_NOENT)
836
  return full_name
837

    
838

    
839
def _ExpandNodeName(cfg, name):
840
  """Wrapper over L{_ExpandItemName} for nodes."""
841
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
842

    
843

    
844
def _ExpandInstanceName(cfg, name):
845
  """Wrapper over L{_ExpandItemName} for instance."""
846
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
847

    
848

    
849
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
850
                          memory, vcpus, nics, disk_template, disks,
851
                          bep, hvp, hypervisor_name):
852
  """Builds instance related env variables for hooks
853

854
  This builds the hook environment from individual variables.
855

856
  @type name: string
857
  @param name: the name of the instance
858
  @type primary_node: string
859
  @param primary_node: the name of the instance's primary node
860
  @type secondary_nodes: list
861
  @param secondary_nodes: list of secondary nodes as strings
862
  @type os_type: string
863
  @param os_type: the name of the instance's OS
864
  @type status: boolean
865
  @param status: the should_run status of the instance
866
  @type memory: string
867
  @param memory: the memory size of the instance
868
  @type vcpus: string
869
  @param vcpus: the count of VCPUs the instance has
870
  @type nics: list
871
  @param nics: list of tuples (ip, mac, mode, link) representing
872
      the NICs the instance has
873
  @type disk_template: string
874
  @param disk_template: the disk template of the instance
875
  @type disks: list
876
  @param disks: the list of (size, mode) pairs
877
  @type bep: dict
878
  @param bep: the backend parameters for the instance
879
  @type hvp: dict
880
  @param hvp: the hypervisor parameters for the instance
881
  @type hypervisor_name: string
882
  @param hypervisor_name: the hypervisor for the instance
883
  @rtype: dict
884
  @return: the hook environment for this instance
885

886
  """
887
  if status:
888
    str_status = "up"
889
  else:
890
    str_status = "down"
891
  env = {
892
    "OP_TARGET": name,
893
    "INSTANCE_NAME": name,
894
    "INSTANCE_PRIMARY": primary_node,
895
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
896
    "INSTANCE_OS_TYPE": os_type,
897
    "INSTANCE_STATUS": str_status,
898
    "INSTANCE_MEMORY": memory,
899
    "INSTANCE_VCPUS": vcpus,
900
    "INSTANCE_DISK_TEMPLATE": disk_template,
901
    "INSTANCE_HYPERVISOR": hypervisor_name,
902
  }
903

    
904
  if nics:
905
    nic_count = len(nics)
906
    for idx, (ip, mac, mode, link) in enumerate(nics):
907
      if ip is None:
908
        ip = ""
909
      env["INSTANCE_NIC%d_IP" % idx] = ip
910
      env["INSTANCE_NIC%d_MAC" % idx] = mac
911
      env["INSTANCE_NIC%d_MODE" % idx] = mode
912
      env["INSTANCE_NIC%d_LINK" % idx] = link
913
      if mode == constants.NIC_MODE_BRIDGED:
914
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
915
  else:
916
    nic_count = 0
917

    
918
  env["INSTANCE_NIC_COUNT"] = nic_count
919

    
920
  if disks:
921
    disk_count = len(disks)
922
    for idx, (size, mode) in enumerate(disks):
923
      env["INSTANCE_DISK%d_SIZE" % idx] = size
924
      env["INSTANCE_DISK%d_MODE" % idx] = mode
925
  else:
926
    disk_count = 0
927

    
928
  env["INSTANCE_DISK_COUNT"] = disk_count
929

    
930
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
931
    for key, value in source.items():
932
      env["INSTANCE_%s_%s" % (kind, key)] = value
933

    
934
  return env
935

    
936

    
937
def _NICListToTuple(lu, nics):
938
  """Build a list of nic information tuples.
939

940
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
941
  value in LUQueryInstanceData.
942

943
  @type lu:  L{LogicalUnit}
944
  @param lu: the logical unit on whose behalf we execute
945
  @type nics: list of L{objects.NIC}
946
  @param nics: list of nics to convert to hooks tuples
947

948
  """
949
  hooks_nics = []
950
  cluster = lu.cfg.GetClusterInfo()
951
  for nic in nics:
952
    ip = nic.ip
953
    mac = nic.mac
954
    filled_params = cluster.SimpleFillNIC(nic.nicparams)
955
    mode = filled_params[constants.NIC_MODE]
956
    link = filled_params[constants.NIC_LINK]
957
    hooks_nics.append((ip, mac, mode, link))
958
  return hooks_nics
959

    
960

    
961
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
962
  """Builds instance related env variables for hooks from an object.
963

964
  @type lu: L{LogicalUnit}
965
  @param lu: the logical unit on whose behalf we execute
966
  @type instance: L{objects.Instance}
967
  @param instance: the instance for which we should build the
968
      environment
969
  @type override: dict
970
  @param override: dictionary with key/values that will override
971
      our values
972
  @rtype: dict
973
  @return: the hook environment dictionary
974

975
  """
976
  cluster = lu.cfg.GetClusterInfo()
977
  bep = cluster.FillBE(instance)
978
  hvp = cluster.FillHV(instance)
979
  args = {
980
    'name': instance.name,
981
    'primary_node': instance.primary_node,
982
    'secondary_nodes': instance.secondary_nodes,
983
    'os_type': instance.os,
984
    'status': instance.admin_up,
985
    'memory': bep[constants.BE_MEMORY],
986
    'vcpus': bep[constants.BE_VCPUS],
987
    'nics': _NICListToTuple(lu, instance.nics),
988
    'disk_template': instance.disk_template,
989
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
990
    'bep': bep,
991
    'hvp': hvp,
992
    'hypervisor_name': instance.hypervisor,
993
  }
994
  if override:
995
    args.update(override)
996
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
997

    
998

    
999
def _AdjustCandidatePool(lu, exceptions):
1000
  """Adjust the candidate pool after node operations.
1001

1002
  """
1003
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1004
  if mod_list:
1005
    lu.LogInfo("Promoted nodes to master candidate role: %s",
1006
               utils.CommaJoin(node.name for node in mod_list))
1007
    for name in mod_list:
1008
      lu.context.ReaddNode(name)
1009
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1010
  if mc_now > mc_max:
1011
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1012
               (mc_now, mc_max))
1013

    
1014

    
1015
def _DecideSelfPromotion(lu, exceptions=None):
1016
  """Decide whether I should promote myself as a master candidate.
1017

1018
  """
1019
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1020
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1021
  # the new node will increase mc_max with one, so:
1022
  mc_should = min(mc_should + 1, cp_size)
1023
  return mc_now < mc_should
1024

    
1025

    
1026
def _CheckNicsBridgesExist(lu, target_nics, target_node):
1027
  """Check that the brigdes needed by a list of nics exist.
1028

1029
  """
1030
  cluster = lu.cfg.GetClusterInfo()
1031
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1032
  brlist = [params[constants.NIC_LINK] for params in paramslist
1033
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1034
  if brlist:
1035
    result = lu.rpc.call_bridges_exist(target_node, brlist)
1036
    result.Raise("Error checking bridges on destination node '%s'" %
1037
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1038

    
1039

    
1040
def _CheckInstanceBridgesExist(lu, instance, node=None):
1041
  """Check that the brigdes needed by an instance exist.
1042

1043
  """
1044
  if node is None:
1045
    node = instance.primary_node
1046
  _CheckNicsBridgesExist(lu, instance.nics, node)
1047

    
1048

    
1049
def _CheckOSVariant(os_obj, name):
1050
  """Check whether an OS name conforms to the os variants specification.
1051

1052
  @type os_obj: L{objects.OS}
1053
  @param os_obj: OS object to check
1054
  @type name: string
1055
  @param name: OS name passed by the user, to check for validity
1056

1057
  """
1058
  if not os_obj.supported_variants:
1059
    return
1060
  variant = objects.OS.GetVariant(name)
1061
  if not variant:
1062
    raise errors.OpPrereqError("OS name must include a variant",
1063
                               errors.ECODE_INVAL)
1064

    
1065
  if variant not in os_obj.supported_variants:
1066
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1067

    
1068

    
1069
def _GetNodeInstancesInner(cfg, fn):
1070
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1071

    
1072

    
1073
def _GetNodeInstances(cfg, node_name):
1074
  """Returns a list of all primary and secondary instances on a node.
1075

1076
  """
1077

    
1078
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1079

    
1080

    
1081
def _GetNodePrimaryInstances(cfg, node_name):
1082
  """Returns primary instances on a node.
1083

1084
  """
1085
  return _GetNodeInstancesInner(cfg,
1086
                                lambda inst: node_name == inst.primary_node)
1087

    
1088

    
1089
def _GetNodeSecondaryInstances(cfg, node_name):
1090
  """Returns secondary instances on a node.
1091

1092
  """
1093
  return _GetNodeInstancesInner(cfg,
1094
                                lambda inst: node_name in inst.secondary_nodes)
1095

    
1096

    
1097
def _GetStorageTypeArgs(cfg, storage_type):
1098
  """Returns the arguments for a storage type.
1099

1100
  """
1101
  # Special case for file storage
1102
  if storage_type == constants.ST_FILE:
1103
    # storage.FileStorage wants a list of storage directories
1104
    return [[cfg.GetFileStorageDir()]]
1105

    
1106
  return []
1107

    
1108

    
1109
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
1110
  faulty = []
1111

    
1112
  for dev in instance.disks:
1113
    cfg.SetDiskID(dev, node_name)
1114

    
1115
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
1116
  result.Raise("Failed to get disk status from node %s" % node_name,
1117
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1118

    
1119
  for idx, bdev_status in enumerate(result.payload):
1120
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1121
      faulty.append(idx)
1122

    
1123
  return faulty
1124

    
1125

    
1126
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1127
  """Check the sanity of iallocator and node arguments and use the
1128
  cluster-wide iallocator if appropriate.
1129

1130
  Check that at most one of (iallocator, node) is specified. If none is
1131
  specified, then the LU's opcode's iallocator slot is filled with the
1132
  cluster-wide default iallocator.
1133

1134
  @type iallocator_slot: string
1135
  @param iallocator_slot: the name of the opcode iallocator slot
1136
  @type node_slot: string
1137
  @param node_slot: the name of the opcode target node slot
1138

1139
  """
1140
  node = getattr(lu.op, node_slot, None)
1141
  iallocator = getattr(lu.op, iallocator_slot, None)
1142

    
1143
  if node is not None and iallocator is not None:
1144
    raise errors.OpPrereqError("Do not specify both, iallocator and node.",
1145
                               errors.ECODE_INVAL)
1146
  elif node is None and iallocator is None:
1147
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1148
    if default_iallocator:
1149
      setattr(lu.op, iallocator_slot, default_iallocator)
1150
    else:
1151
      raise errors.OpPrereqError("No iallocator or node given and no"
1152
                                 " cluster-wide default iallocator found."
1153
                                 " Please specify either an iallocator or a"
1154
                                 " node, or set a cluster-wide default"
1155
                                 " iallocator.")
1156

    
1157

    
1158
class LUPostInitCluster(LogicalUnit):
1159
  """Logical unit for running hooks after cluster initialization.
1160

1161
  """
1162
  HPATH = "cluster-init"
1163
  HTYPE = constants.HTYPE_CLUSTER
1164

    
1165
  def BuildHooksEnv(self):
1166
    """Build hooks env.
1167

1168
    """
1169
    env = {"OP_TARGET": self.cfg.GetClusterName()}
1170
    mn = self.cfg.GetMasterNode()
1171
    return env, [], [mn]
1172

    
1173
  def Exec(self, feedback_fn):
1174
    """Nothing to do.
1175

1176
    """
1177
    return True
1178

    
1179

    
1180
class LUDestroyCluster(LogicalUnit):
1181
  """Logical unit for destroying the cluster.
1182

1183
  """
1184
  HPATH = "cluster-destroy"
1185
  HTYPE = constants.HTYPE_CLUSTER
1186

    
1187
  def BuildHooksEnv(self):
1188
    """Build hooks env.
1189

1190
    """
1191
    env = {"OP_TARGET": self.cfg.GetClusterName()}
1192
    return env, [], []
1193

    
1194
  def CheckPrereq(self):
1195
    """Check prerequisites.
1196

1197
    This checks whether the cluster is empty.
1198

1199
    Any errors are signaled by raising errors.OpPrereqError.
1200

1201
    """
1202
    master = self.cfg.GetMasterNode()
1203

    
1204
    nodelist = self.cfg.GetNodeList()
1205
    if len(nodelist) != 1 or nodelist[0] != master:
1206
      raise errors.OpPrereqError("There are still %d node(s) in"
1207
                                 " this cluster." % (len(nodelist) - 1),
1208
                                 errors.ECODE_INVAL)
1209
    instancelist = self.cfg.GetInstanceList()
1210
    if instancelist:
1211
      raise errors.OpPrereqError("There are still %d instance(s) in"
1212
                                 " this cluster." % len(instancelist),
1213
                                 errors.ECODE_INVAL)
1214

    
1215
  def Exec(self, feedback_fn):
1216
    """Destroys the cluster.
1217

1218
    """
1219
    master = self.cfg.GetMasterNode()
1220

    
1221
    # Run post hooks on master node before it's removed
1222
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
1223
    try:
1224
      hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
1225
    except:
1226
      # pylint: disable-msg=W0702
1227
      self.LogWarning("Errors occurred running hooks on %s" % master)
1228

    
1229
    result = self.rpc.call_node_stop_master(master, False)
1230
    result.Raise("Could not disable the master role")
1231

    
1232
    return master
1233

    
1234

    
1235
def _VerifyCertificate(filename):
1236
  """Verifies a certificate for LUVerifyCluster.
1237

1238
  @type filename: string
1239
  @param filename: Path to PEM file
1240

1241
  """
1242
  try:
1243
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1244
                                           utils.ReadFile(filename))
1245
  except Exception, err: # pylint: disable-msg=W0703
1246
    return (LUVerifyCluster.ETYPE_ERROR,
1247
            "Failed to load X509 certificate %s: %s" % (filename, err))
1248

    
1249
  (errcode, msg) = \
1250
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1251
                                constants.SSL_CERT_EXPIRATION_ERROR)
1252

    
1253
  if msg:
1254
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1255
  else:
1256
    fnamemsg = None
1257

    
1258
  if errcode is None:
1259
    return (None, fnamemsg)
1260
  elif errcode == utils.CERT_WARNING:
1261
    return (LUVerifyCluster.ETYPE_WARNING, fnamemsg)
1262
  elif errcode == utils.CERT_ERROR:
1263
    return (LUVerifyCluster.ETYPE_ERROR, fnamemsg)
1264

    
1265
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1266

    
1267

    
1268
class LUVerifyCluster(LogicalUnit):
1269
  """Verifies the cluster status.
1270

1271
  """
1272
  HPATH = "cluster-verify"
1273
  HTYPE = constants.HTYPE_CLUSTER
1274
  _OP_PARAMS = [
1275
    ("skip_checks", ht.EmptyList,
1276
     ht.TListOf(ht.TElemOf(constants.VERIFY_OPTIONAL_CHECKS))),
1277
    ("verbose", False, ht.TBool),
1278
    ("error_codes", False, ht.TBool),
1279
    ("debug_simulate_errors", False, ht.TBool),
1280
    ]
1281
  REQ_BGL = False
1282

    
1283
  TCLUSTER = "cluster"
1284
  TNODE = "node"
1285
  TINSTANCE = "instance"
1286

    
1287
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1288
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1289
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1290
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1291
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1292
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1293
  EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
1294
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1295
  ENODEDRBD = (TNODE, "ENODEDRBD")
1296
  ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
1297
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1298
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1299
  ENODEHV = (TNODE, "ENODEHV")
1300
  ENODELVM = (TNODE, "ENODELVM")
1301
  ENODEN1 = (TNODE, "ENODEN1")
1302
  ENODENET = (TNODE, "ENODENET")
1303
  ENODEOS = (TNODE, "ENODEOS")
1304
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1305
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1306
  ENODERPC = (TNODE, "ENODERPC")
1307
  ENODESSH = (TNODE, "ENODESSH")
1308
  ENODEVERSION = (TNODE, "ENODEVERSION")
1309
  ENODESETUP = (TNODE, "ENODESETUP")
1310
  ENODETIME = (TNODE, "ENODETIME")
1311

    
1312
  ETYPE_FIELD = "code"
1313
  ETYPE_ERROR = "ERROR"
1314
  ETYPE_WARNING = "WARNING"
1315

    
1316
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1317

    
1318
  class NodeImage(object):
1319
    """A class representing the logical and physical status of a node.
1320

1321
    @type name: string
1322
    @ivar name: the node name to which this object refers
1323
    @ivar volumes: a structure as returned from
1324
        L{ganeti.backend.GetVolumeList} (runtime)
1325
    @ivar instances: a list of running instances (runtime)
1326
    @ivar pinst: list of configured primary instances (config)
1327
    @ivar sinst: list of configured secondary instances (config)
1328
    @ivar sbp: diction of {secondary-node: list of instances} of all peers
1329
        of this node (config)
1330
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1331
    @ivar dfree: free disk, as reported by the node (runtime)
1332
    @ivar offline: the offline status (config)
1333
    @type rpc_fail: boolean
1334
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1335
        not whether the individual keys were correct) (runtime)
1336
    @type lvm_fail: boolean
1337
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1338
    @type hyp_fail: boolean
1339
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1340
    @type ghost: boolean
1341
    @ivar ghost: whether this is a known node or not (config)
1342
    @type os_fail: boolean
1343
    @ivar os_fail: whether the RPC call didn't return valid OS data
1344
    @type oslist: list
1345
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1346
    @type vm_capable: boolean
1347
    @ivar vm_capable: whether the node can host instances
1348

1349
    """
1350
    def __init__(self, offline=False, name=None, vm_capable=True):
1351
      self.name = name
1352
      self.volumes = {}
1353
      self.instances = []
1354
      self.pinst = []
1355
      self.sinst = []
1356
      self.sbp = {}
1357
      self.mfree = 0
1358
      self.dfree = 0
1359
      self.offline = offline
1360
      self.vm_capable = vm_capable
1361
      self.rpc_fail = False
1362
      self.lvm_fail = False
1363
      self.hyp_fail = False
1364
      self.ghost = False
1365
      self.os_fail = False
1366
      self.oslist = {}
1367

    
1368
  def ExpandNames(self):
1369
    self.needed_locks = {
1370
      locking.LEVEL_NODE: locking.ALL_SET,
1371
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1372
    }
1373
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1374

    
1375
  def _Error(self, ecode, item, msg, *args, **kwargs):
1376
    """Format an error message.
1377

1378
    Based on the opcode's error_codes parameter, either format a
1379
    parseable error code, or a simpler error string.
1380

1381
    This must be called only from Exec and functions called from Exec.
1382

1383
    """
1384
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1385
    itype, etxt = ecode
1386
    # first complete the msg
1387
    if args:
1388
      msg = msg % args
1389
    # then format the whole message
1390
    if self.op.error_codes:
1391
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1392
    else:
1393
      if item:
1394
        item = " " + item
1395
      else:
1396
        item = ""
1397
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1398
    # and finally report it via the feedback_fn
1399
    self._feedback_fn("  - %s" % msg)
1400

    
1401
  def _ErrorIf(self, cond, *args, **kwargs):
1402
    """Log an error message if the passed condition is True.
1403

1404
    """
1405
    cond = bool(cond) or self.op.debug_simulate_errors
1406
    if cond:
1407
      self._Error(*args, **kwargs)
1408
    # do not mark the operation as failed for WARN cases only
1409
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1410
      self.bad = self.bad or cond
1411

    
1412
  def _VerifyNode(self, ninfo, nresult):
1413
    """Perform some basic validation on data returned from a node.
1414

1415
      - check the result data structure is well formed and has all the
1416
        mandatory fields
1417
      - check ganeti version
1418

1419
    @type ninfo: L{objects.Node}
1420
    @param ninfo: the node to check
1421
    @param nresult: the results from the node
1422
    @rtype: boolean
1423
    @return: whether overall this call was successful (and we can expect
1424
         reasonable values in the respose)
1425

1426
    """
1427
    node = ninfo.name
1428
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1429

    
1430
    # main result, nresult should be a non-empty dict
1431
    test = not nresult or not isinstance(nresult, dict)
1432
    _ErrorIf(test, self.ENODERPC, node,
1433
                  "unable to verify node: no data returned")
1434
    if test:
1435
      return False
1436

    
1437
    # compares ganeti version
1438
    local_version = constants.PROTOCOL_VERSION
1439
    remote_version = nresult.get("version", None)
1440
    test = not (remote_version and
1441
                isinstance(remote_version, (list, tuple)) and
1442
                len(remote_version) == 2)
1443
    _ErrorIf(test, self.ENODERPC, node,
1444
             "connection to node returned invalid data")
1445
    if test:
1446
      return False
1447

    
1448
    test = local_version != remote_version[0]
1449
    _ErrorIf(test, self.ENODEVERSION, node,
1450
             "incompatible protocol versions: master %s,"
1451
             " node %s", local_version, remote_version[0])
1452
    if test:
1453
      return False
1454

    
1455
    # node seems compatible, we can actually try to look into its results
1456

    
1457
    # full package version
1458
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1459
                  self.ENODEVERSION, node,
1460
                  "software version mismatch: master %s, node %s",
1461
                  constants.RELEASE_VERSION, remote_version[1],
1462
                  code=self.ETYPE_WARNING)
1463

    
1464
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1465
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1466
      for hv_name, hv_result in hyp_result.iteritems():
1467
        test = hv_result is not None
1468
        _ErrorIf(test, self.ENODEHV, node,
1469
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1470

    
1471
    test = nresult.get(constants.NV_NODESETUP,
1472
                           ["Missing NODESETUP results"])
1473
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1474
             "; ".join(test))
1475

    
1476
    return True
1477

    
1478
  def _VerifyNodeTime(self, ninfo, nresult,
1479
                      nvinfo_starttime, nvinfo_endtime):
1480
    """Check the node time.
1481

1482
    @type ninfo: L{objects.Node}
1483
    @param ninfo: the node to check
1484
    @param nresult: the remote results for the node
1485
    @param nvinfo_starttime: the start time of the RPC call
1486
    @param nvinfo_endtime: the end time of the RPC call
1487

1488
    """
1489
    node = ninfo.name
1490
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1491

    
1492
    ntime = nresult.get(constants.NV_TIME, None)
1493
    try:
1494
      ntime_merged = utils.MergeTime(ntime)
1495
    except (ValueError, TypeError):
1496
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1497
      return
1498

    
1499
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1500
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1501
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1502
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1503
    else:
1504
      ntime_diff = None
1505

    
1506
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1507
             "Node time diverges by at least %s from master node time",
1508
             ntime_diff)
1509

    
1510
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1511
    """Check the node time.
1512

1513
    @type ninfo: L{objects.Node}
1514
    @param ninfo: the node to check
1515
    @param nresult: the remote results for the node
1516
    @param vg_name: the configured VG name
1517

1518
    """
1519
    if vg_name is None:
1520
      return
1521

    
1522
    node = ninfo.name
1523
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1524

    
1525
    # checks vg existence and size > 20G
1526
    vglist = nresult.get(constants.NV_VGLIST, None)
1527
    test = not vglist
1528
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1529
    if not test:
1530
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1531
                                            constants.MIN_VG_SIZE)
1532
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1533

    
1534
    # check pv names
1535
    pvlist = nresult.get(constants.NV_PVLIST, None)
1536
    test = pvlist is None
1537
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1538
    if not test:
1539
      # check that ':' is not present in PV names, since it's a
1540
      # special character for lvcreate (denotes the range of PEs to
1541
      # use on the PV)
1542
      for _, pvname, owner_vg in pvlist:
1543
        test = ":" in pvname
1544
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1545
                 " '%s' of VG '%s'", pvname, owner_vg)
1546

    
1547
  def _VerifyNodeNetwork(self, ninfo, nresult):
1548
    """Check the node time.
1549

1550
    @type ninfo: L{objects.Node}
1551
    @param ninfo: the node to check
1552
    @param nresult: the remote results for the node
1553

1554
    """
1555
    node = ninfo.name
1556
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1557

    
1558
    test = constants.NV_NODELIST not in nresult
1559
    _ErrorIf(test, self.ENODESSH, node,
1560
             "node hasn't returned node ssh connectivity data")
1561
    if not test:
1562
      if nresult[constants.NV_NODELIST]:
1563
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1564
          _ErrorIf(True, self.ENODESSH, node,
1565
                   "ssh communication with node '%s': %s", a_node, a_msg)
1566

    
1567
    test = constants.NV_NODENETTEST not in nresult
1568
    _ErrorIf(test, self.ENODENET, node,
1569
             "node hasn't returned node tcp connectivity data")
1570
    if not test:
1571
      if nresult[constants.NV_NODENETTEST]:
1572
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1573
        for anode in nlist:
1574
          _ErrorIf(True, self.ENODENET, node,
1575
                   "tcp communication with node '%s': %s",
1576
                   anode, nresult[constants.NV_NODENETTEST][anode])
1577

    
1578
    test = constants.NV_MASTERIP not in nresult
1579
    _ErrorIf(test, self.ENODENET, node,
1580
             "node hasn't returned node master IP reachability data")
1581
    if not test:
1582
      if not nresult[constants.NV_MASTERIP]:
1583
        if node == self.master_node:
1584
          msg = "the master node cannot reach the master IP (not configured?)"
1585
        else:
1586
          msg = "cannot reach the master IP"
1587
        _ErrorIf(True, self.ENODENET, node, msg)
1588

    
1589
  def _VerifyInstance(self, instance, instanceconfig, node_image,
1590
                      diskstatus):
1591
    """Verify an instance.
1592

1593
    This function checks to see if the required block devices are
1594
    available on the instance's node.
1595

1596
    """
1597
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1598
    node_current = instanceconfig.primary_node
1599

    
1600
    node_vol_should = {}
1601
    instanceconfig.MapLVsByNode(node_vol_should)
1602

    
1603
    for node in node_vol_should:
1604
      n_img = node_image[node]
1605
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1606
        # ignore missing volumes on offline or broken nodes
1607
        continue
1608
      for volume in node_vol_should[node]:
1609
        test = volume not in n_img.volumes
1610
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1611
                 "volume %s missing on node %s", volume, node)
1612

    
1613
    if instanceconfig.admin_up:
1614
      pri_img = node_image[node_current]
1615
      test = instance not in pri_img.instances and not pri_img.offline
1616
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1617
               "instance not running on its primary node %s",
1618
               node_current)
1619

    
1620
    for node, n_img in node_image.items():
1621
      if (not node == node_current):
1622
        test = instance in n_img.instances
1623
        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1624
                 "instance should not run on node %s", node)
1625

    
1626
    diskdata = [(nname, success, status, idx)
1627
                for (nname, disks) in diskstatus.items()
1628
                for idx, (success, status) in enumerate(disks)]
1629

    
1630
    for nname, success, bdev_status, idx in diskdata:
1631
      _ErrorIf(instanceconfig.admin_up and not success,
1632
               self.EINSTANCEFAULTYDISK, instance,
1633
               "couldn't retrieve status for disk/%s on %s: %s",
1634
               idx, nname, bdev_status)
1635
      _ErrorIf((instanceconfig.admin_up and success and
1636
                bdev_status.ldisk_status == constants.LDS_FAULTY),
1637
               self.EINSTANCEFAULTYDISK, instance,
1638
               "disk/%s on %s is faulty", idx, nname)
1639

    
1640
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1641
    """Verify if there are any unknown volumes in the cluster.
1642

1643
    The .os, .swap and backup volumes are ignored. All other volumes are
1644
    reported as unknown.
1645

1646
    @type reserved: L{ganeti.utils.FieldSet}
1647
    @param reserved: a FieldSet of reserved volume names
1648

1649
    """
1650
    for node, n_img in node_image.items():
1651
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1652
        # skip non-healthy nodes
1653
        continue
1654
      for volume in n_img.volumes:
1655
        test = ((node not in node_vol_should or
1656
                volume not in node_vol_should[node]) and
1657
                not reserved.Matches(volume))
1658
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1659
                      "volume %s is unknown", volume)
1660

    
1661
  def _VerifyOrphanInstances(self, instancelist, node_image):
1662
    """Verify the list of running instances.
1663

1664
    This checks what instances are running but unknown to the cluster.
1665

1666
    """
1667
    for node, n_img in node_image.items():
1668
      for o_inst in n_img.instances:
1669
        test = o_inst not in instancelist
1670
        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1671
                      "instance %s on node %s should not exist", o_inst, node)
1672

    
1673
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1674
    """Verify N+1 Memory Resilience.
1675

1676
    Check that if one single node dies we can still start all the
1677
    instances it was primary for.
1678

1679
    """
1680
    for node, n_img in node_image.items():
1681
      # This code checks that every node which is now listed as
1682
      # secondary has enough memory to host all instances it is
1683
      # supposed to should a single other node in the cluster fail.
1684
      # FIXME: not ready for failover to an arbitrary node
1685
      # FIXME: does not support file-backed instances
1686
      # WARNING: we currently take into account down instances as well
1687
      # as up ones, considering that even if they're down someone
1688
      # might want to start them even in the event of a node failure.
1689
      for prinode, instances in n_img.sbp.items():
1690
        needed_mem = 0
1691
        for instance in instances:
1692
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1693
          if bep[constants.BE_AUTO_BALANCE]:
1694
            needed_mem += bep[constants.BE_MEMORY]
1695
        test = n_img.mfree < needed_mem
1696
        self._ErrorIf(test, self.ENODEN1, node,
1697
                      "not enough memory on to accommodate"
1698
                      " failovers should peer node %s fail", prinode)
1699

    
1700
  def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum,
1701
                       master_files):
1702
    """Verifies and computes the node required file checksums.
1703

1704
    @type ninfo: L{objects.Node}
1705
    @param ninfo: the node to check
1706
    @param nresult: the remote results for the node
1707
    @param file_list: required list of files
1708
    @param local_cksum: dictionary of local files and their checksums
1709
    @param master_files: list of files that only masters should have
1710

1711
    """
1712
    node = ninfo.name
1713
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1714

    
1715
    remote_cksum = nresult.get(constants.NV_FILELIST, None)
1716
    test = not isinstance(remote_cksum, dict)
1717
    _ErrorIf(test, self.ENODEFILECHECK, node,
1718
             "node hasn't returned file checksum data")
1719
    if test:
1720
      return
1721

    
1722
    for file_name in file_list:
1723
      node_is_mc = ninfo.master_candidate
1724
      must_have = (file_name not in master_files) or node_is_mc
1725
      # missing
1726
      test1 = file_name not in remote_cksum
1727
      # invalid checksum
1728
      test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1729
      # existing and good
1730
      test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1731
      _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1732
               "file '%s' missing", file_name)
1733
      _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1734
               "file '%s' has wrong checksum", file_name)
1735
      # not candidate and this is not a must-have file
1736
      _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1737
               "file '%s' should not exist on non master"
1738
               " candidates (and the file is outdated)", file_name)
1739
      # all good, except non-master/non-must have combination
1740
      _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1741
               "file '%s' should not exist"
1742
               " on non master candidates", file_name)
1743

    
1744
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
1745
                      drbd_map):
1746
    """Verifies and the node DRBD status.
1747

1748
    @type ninfo: L{objects.Node}
1749
    @param ninfo: the node to check
1750
    @param nresult: the remote results for the node
1751
    @param instanceinfo: the dict of instances
1752
    @param drbd_helper: the configured DRBD usermode helper
1753
    @param drbd_map: the DRBD map as returned by
1754
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
1755

1756
    """
1757
    node = ninfo.name
1758
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1759

    
1760
    if drbd_helper:
1761
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
1762
      test = (helper_result == None)
1763
      _ErrorIf(test, self.ENODEDRBDHELPER, node,
1764
               "no drbd usermode helper returned")
1765
      if helper_result:
1766
        status, payload = helper_result
1767
        test = not status
1768
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
1769
                 "drbd usermode helper check unsuccessful: %s", payload)
1770
        test = status and (payload != drbd_helper)
1771
        _ErrorIf(test, self.ENODEDRBDHELPER, node,
1772
                 "wrong drbd usermode helper: %s", payload)
1773

    
1774
    # compute the DRBD minors
1775
    node_drbd = {}
1776
    for minor, instance in drbd_map[node].items():
1777
      test = instance not in instanceinfo
1778
      _ErrorIf(test, self.ECLUSTERCFG, None,
1779
               "ghost instance '%s' in temporary DRBD map", instance)
1780
        # ghost instance should not be running, but otherwise we
1781
        # don't give double warnings (both ghost instance and
1782
        # unallocated minor in use)
1783
      if test:
1784
        node_drbd[minor] = (instance, False)
1785
      else:
1786
        instance = instanceinfo[instance]
1787
        node_drbd[minor] = (instance.name, instance.admin_up)
1788

    
1789
    # and now check them
1790
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
1791
    test = not isinstance(used_minors, (tuple, list))
1792
    _ErrorIf(test, self.ENODEDRBD, node,
1793
             "cannot parse drbd status file: %s", str(used_minors))
1794
    if test:
1795
      # we cannot check drbd status
1796
      return
1797

    
1798
    for minor, (iname, must_exist) in node_drbd.items():
1799
      test = minor not in used_minors and must_exist
1800
      _ErrorIf(test, self.ENODEDRBD, node,
1801
               "drbd minor %d of instance %s is not active", minor, iname)
1802
    for minor in used_minors:
1803
      test = minor not in node_drbd
1804
      _ErrorIf(test, self.ENODEDRBD, node,
1805
               "unallocated drbd minor %d is in use", minor)
1806

    
1807
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
1808
    """Builds the node OS structures.
1809

1810
    @type ninfo: L{objects.Node}
1811
    @param ninfo: the node to check
1812
    @param nresult: the remote results for the node
1813
    @param nimg: the node image object
1814

1815
    """
1816
    node = ninfo.name
1817
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1818

    
1819
    remote_os = nresult.get(constants.NV_OSLIST, None)
1820
    test = (not isinstance(remote_os, list) or
1821
            not compat.all(isinstance(v, list) and len(v) == 7
1822
                           for v in remote_os))
1823

    
1824
    _ErrorIf(test, self.ENODEOS, node,
1825
             "node hasn't returned valid OS data")
1826

    
1827
    nimg.os_fail = test
1828

    
1829
    if test:
1830
      return
1831

    
1832
    os_dict = {}
1833

    
1834
    for (name, os_path, status, diagnose,
1835
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
1836

    
1837
      if name not in os_dict:
1838
        os_dict[name] = []
1839

    
1840
      # parameters is a list of lists instead of list of tuples due to
1841
      # JSON lacking a real tuple type, fix it:
1842
      parameters = [tuple(v) for v in parameters]
1843
      os_dict[name].append((os_path, status, diagnose,
1844
                            set(variants), set(parameters), set(api_ver)))
1845

    
1846
    nimg.oslist = os_dict
1847

    
1848
  def _VerifyNodeOS(self, ninfo, nimg, base):
1849
    """Verifies the node OS list.
1850

1851
    @type ninfo: L{objects.Node}
1852
    @param ninfo: the node to check
1853
    @param nimg: the node image object
1854
    @param base: the 'template' node we match against (e.g. from the master)
1855

1856
    """
1857
    node = ninfo.name
1858
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1859

    
1860
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
1861

    
1862
    for os_name, os_data in nimg.oslist.items():
1863
      assert os_data, "Empty OS status for OS %s?!" % os_name
1864
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
1865
      _ErrorIf(not f_status, self.ENODEOS, node,
1866
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
1867
      _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
1868
               "OS '%s' has multiple entries (first one shadows the rest): %s",
1869
               os_name, utils.CommaJoin([v[0] for v in os_data]))
1870
      # this will catched in backend too
1871
      _ErrorIf(compat.any(v >= constants.OS_API_V15 for v in f_api)
1872
               and not f_var, self.ENODEOS, node,
1873
               "OS %s with API at least %d does not declare any variant",
1874
               os_name, constants.OS_API_V15)
1875
      # comparisons with the 'base' image
1876
      test = os_name not in base.oslist
1877
      _ErrorIf(test, self.ENODEOS, node,
1878
               "Extra OS %s not present on reference node (%s)",
1879
               os_name, base.name)
1880
      if test:
1881
        continue
1882
      assert base.oslist[os_name], "Base node has empty OS status?"
1883
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
1884
      if not b_status:
1885
        # base OS is invalid, skipping
1886
        continue
1887
      for kind, a, b in [("API version", f_api, b_api),
1888
                         ("variants list", f_var, b_var),
1889
                         ("parameters", f_param, b_param)]:
1890
        _ErrorIf(a != b, self.ENODEOS, node,
1891
                 "OS %s %s differs from reference node %s: %s vs. %s",
1892
                 kind, os_name, base.name,
1893
                 utils.CommaJoin(a), utils.CommaJoin(b))
1894

    
1895
    # check any missing OSes
1896
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
1897
    _ErrorIf(missing, self.ENODEOS, node,
1898
             "OSes present on reference node %s but missing on this node: %s",
1899
             base.name, utils.CommaJoin(missing))
1900

    
1901
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
1902
    """Verifies and updates the node volume data.
1903

1904
    This function will update a L{NodeImage}'s internal structures
1905
    with data from the remote call.
1906

1907
    @type ninfo: L{objects.Node}
1908
    @param ninfo: the node to check
1909
    @param nresult: the remote results for the node
1910
    @param nimg: the node image object
1911
    @param vg_name: the configured VG name
1912

1913
    """
1914
    node = ninfo.name
1915
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1916

    
1917
    nimg.lvm_fail = True
1918
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1919
    if vg_name is None:
1920
      pass
1921
    elif isinstance(lvdata, basestring):
1922
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1923
               utils.SafeEncode(lvdata))
1924
    elif not isinstance(lvdata, dict):
1925
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1926
    else:
1927
      nimg.volumes = lvdata
1928
      nimg.lvm_fail = False
1929

    
1930
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
1931
    """Verifies and updates the node instance list.
1932

1933
    If the listing was successful, then updates this node's instance
1934
    list. Otherwise, it marks the RPC call as failed for the instance
1935
    list key.
1936

1937
    @type ninfo: L{objects.Node}
1938
    @param ninfo: the node to check
1939
    @param nresult: the remote results for the node
1940
    @param nimg: the node image object
1941

1942
    """
1943
    idata = nresult.get(constants.NV_INSTANCELIST, None)
1944
    test = not isinstance(idata, list)
1945
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
1946
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
1947
    if test:
1948
      nimg.hyp_fail = True
1949
    else:
1950
      nimg.instances = idata
1951

    
1952
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
1953
    """Verifies and computes a node information map
1954

1955
    @type ninfo: L{objects.Node}
1956
    @param ninfo: the node to check
1957
    @param nresult: the remote results for the node
1958
    @param nimg: the node image object
1959
    @param vg_name: the configured VG name
1960

1961
    """
1962
    node = ninfo.name
1963
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1964

    
1965
    # try to read free memory (from the hypervisor)
1966
    hv_info = nresult.get(constants.NV_HVINFO, None)
1967
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
1968
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1969
    if not test:
1970
      try:
1971
        nimg.mfree = int(hv_info["memory_free"])
1972
      except (ValueError, TypeError):
1973
        _ErrorIf(True, self.ENODERPC, node,
1974
                 "node returned invalid nodeinfo, check hypervisor")
1975

    
1976
    # FIXME: devise a free space model for file based instances as well
1977
    if vg_name is not None:
1978
      test = (constants.NV_VGLIST not in nresult or
1979
              vg_name not in nresult[constants.NV_VGLIST])
1980
      _ErrorIf(test, self.ENODELVM, node,
1981
               "node didn't return data for the volume group '%s'"
1982
               " - it is either missing or broken", vg_name)
1983
      if not test:
1984
        try:
1985
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
1986
        except (ValueError, TypeError):
1987
          _ErrorIf(True, self.ENODERPC, node,
1988
                   "node returned invalid LVM info, check LVM status")
1989

    
1990
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
1991
    """Gets per-disk status information for all instances.
1992

1993
    @type nodelist: list of strings
1994
    @param nodelist: Node names
1995
    @type node_image: dict of (name, L{objects.Node})
1996
    @param node_image: Node objects
1997
    @type instanceinfo: dict of (name, L{objects.Instance})
1998
    @param instanceinfo: Instance objects
1999

2000
    """
2001
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2002

    
2003
    node_disks = {}
2004
    node_disks_devonly = {}
2005

    
2006
    for nname in nodelist:
2007
      disks = [(inst, disk)
2008
               for instlist in [node_image[nname].pinst,
2009
                                node_image[nname].sinst]
2010
               for inst in instlist
2011
               for disk in instanceinfo[inst].disks]
2012

    
2013
      if not disks:
2014
        # No need to collect data
2015
        continue
2016

    
2017
      node_disks[nname] = disks
2018

    
2019
      # Creating copies as SetDiskID below will modify the objects and that can
2020
      # lead to incorrect data returned from nodes
2021
      devonly = [dev.Copy() for (_, dev) in disks]
2022

    
2023
      for dev in devonly:
2024
        self.cfg.SetDiskID(dev, nname)
2025

    
2026
      node_disks_devonly[nname] = devonly
2027

    
2028
    assert len(node_disks) == len(node_disks_devonly)
2029

    
2030
    # Collect data from all nodes with disks
2031
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2032
                                                          node_disks_devonly)
2033

    
2034
    assert len(result) == len(node_disks)
2035

    
2036
    instdisk = {}
2037

    
2038
    for (nname, nres) in result.items():
2039
      if nres.offline:
2040
        # Ignore offline node
2041
        continue
2042

    
2043
      disks = node_disks[nname]
2044

    
2045
      msg = nres.fail_msg
2046
      _ErrorIf(msg, self.ENODERPC, nname,
2047
               "while getting disk information: %s", nres.fail_msg)
2048
      if msg:
2049
        # No data from this node
2050
        data = len(disks) * [None]
2051
      else:
2052
        data = nres.payload
2053

    
2054
      for ((inst, _), status) in zip(disks, data):
2055
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2056

    
2057
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2058
                      len(nnames) <= len(instanceinfo[inst].all_nodes)
2059
                      for inst, nnames in instdisk.items()
2060
                      for nname, statuses in nnames.items())
2061

    
2062
    return instdisk
2063

    
2064
  def BuildHooksEnv(self):
2065
    """Build hooks env.
2066

2067
    Cluster-Verify hooks just ran in the post phase and their failure makes
2068
    the output be logged in the verify output and the verification to fail.
2069

2070
    """
2071
    all_nodes = self.cfg.GetNodeList()
2072
    env = {
2073
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
2074
      }
2075
    for node in self.cfg.GetAllNodesInfo().values():
2076
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
2077

    
2078
    return env, [], all_nodes
2079

    
2080
  def Exec(self, feedback_fn):
2081
    """Verify integrity of cluster, performing various test on nodes.
2082

2083
    """
2084
    self.bad = False
2085
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
2086
    verbose = self.op.verbose
2087
    self._feedback_fn = feedback_fn
2088
    feedback_fn("* Verifying global settings")
2089
    for msg in self.cfg.VerifyConfig():
2090
      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
2091

    
2092
    # Check the cluster certificates
2093
    for cert_filename in constants.ALL_CERT_FILES:
2094
      (errcode, msg) = _VerifyCertificate(cert_filename)
2095
      _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
2096

    
2097
    vg_name = self.cfg.GetVGName()
2098
    drbd_helper = self.cfg.GetDRBDHelper()
2099
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2100
    cluster = self.cfg.GetClusterInfo()
2101
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
2102
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
2103
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
2104
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
2105
                        for iname in instancelist)
2106
    i_non_redundant = [] # Non redundant instances
2107
    i_non_a_balanced = [] # Non auto-balanced instances
2108
    n_offline = 0 # Count of offline nodes
2109
    n_drained = 0 # Count of nodes being drained
2110
    node_vol_should = {}
2111

    
2112
    # FIXME: verify OS list
2113
    # do local checksums
2114
    master_files = [constants.CLUSTER_CONF_FILE]
2115
    master_node = self.master_node = self.cfg.GetMasterNode()
2116
    master_ip = self.cfg.GetMasterIP()
2117

    
2118
    file_names = ssconf.SimpleStore().GetFileList()
2119
    file_names.extend(constants.ALL_CERT_FILES)
2120
    file_names.extend(master_files)
2121
    if cluster.modify_etc_hosts:
2122
      file_names.append(constants.ETC_HOSTS)
2123

    
2124
    local_checksums = utils.FingerprintFiles(file_names)
2125

    
2126
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
2127
    node_verify_param = {
2128
      constants.NV_FILELIST: file_names,
2129
      constants.NV_NODELIST: [node.name for node in nodeinfo
2130
                              if not node.offline],
2131
      constants.NV_HYPERVISOR: hypervisors,
2132
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
2133
                                  node.secondary_ip) for node in nodeinfo
2134
                                 if not node.offline],
2135
      constants.NV_INSTANCELIST: hypervisors,
2136
      constants.NV_VERSION: None,
2137
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2138
      constants.NV_NODESETUP: None,
2139
      constants.NV_TIME: None,
2140
      constants.NV_MASTERIP: (master_node, master_ip),
2141
      constants.NV_OSLIST: None,
2142
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2143
      }
2144

    
2145
    if vg_name is not None:
2146
      node_verify_param[constants.NV_VGLIST] = None
2147
      node_verify_param[constants.NV_LVLIST] = vg_name
2148
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2149
      node_verify_param[constants.NV_DRBDLIST] = None
2150

    
2151
    if drbd_helper:
2152
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2153

    
2154
    # Build our expected cluster state
2155
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2156
                                                 name=node.name,
2157
                                                 vm_capable=node.vm_capable))
2158
                      for node in nodeinfo)
2159

    
2160
    for instance in instancelist:
2161
      inst_config = instanceinfo[instance]
2162

    
2163
      for nname in inst_config.all_nodes:
2164
        if nname not in node_image:
2165
          # ghost node
2166
          gnode = self.NodeImage(name=nname)
2167
          gnode.ghost = True
2168
          node_image[nname] = gnode
2169

    
2170
      inst_config.MapLVsByNode(node_vol_should)
2171

    
2172
      pnode = inst_config.primary_node
2173
      node_image[pnode].pinst.append(instance)
2174

    
2175
      for snode in inst_config.secondary_nodes:
2176
        nimg = node_image[snode]
2177
        nimg.sinst.append(instance)
2178
        if pnode not in nimg.sbp:
2179
          nimg.sbp[pnode] = []
2180
        nimg.sbp[pnode].append(instance)
2181

    
2182
    # At this point, we have the in-memory data structures complete,
2183
    # except for the runtime information, which we'll gather next
2184

    
2185
    # Due to the way our RPC system works, exact response times cannot be
2186
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2187
    # time before and after executing the request, we can at least have a time
2188
    # window.
2189
    nvinfo_starttime = time.time()
2190
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
2191
                                           self.cfg.GetClusterName())
2192
    nvinfo_endtime = time.time()
2193

    
2194
    all_drbd_map = self.cfg.ComputeDRBDMap()
2195

    
2196
    feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist))
2197
    instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo)
2198

    
2199
    feedback_fn("* Verifying node status")
2200

    
2201
    refos_img = None
2202

    
2203
    for node_i in nodeinfo:
2204
      node = node_i.name
2205
      nimg = node_image[node]
2206

    
2207
      if node_i.offline:
2208
        if verbose:
2209
          feedback_fn("* Skipping offline node %s" % (node,))
2210
        n_offline += 1
2211
        continue
2212

    
2213
      if node == master_node:
2214
        ntype = "master"
2215
      elif node_i.master_candidate:
2216
        ntype = "master candidate"
2217
      elif node_i.drained:
2218
        ntype = "drained"
2219
        n_drained += 1
2220
      else:
2221
        ntype = "regular"
2222
      if verbose:
2223
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2224

    
2225
      msg = all_nvinfo[node].fail_msg
2226
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
2227
      if msg:
2228
        nimg.rpc_fail = True
2229
        continue
2230

    
2231
      nresult = all_nvinfo[node].payload
2232

    
2233
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2234
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2235
      self._VerifyNodeNetwork(node_i, nresult)
2236
      self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums,
2237
                            master_files)
2238

    
2239
      if nimg.vm_capable:
2240
        self._VerifyNodeLVM(node_i, nresult, vg_name)
2241
        self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper,
2242
                             all_drbd_map)
2243

    
2244
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2245
        self._UpdateNodeInstances(node_i, nresult, nimg)
2246
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2247
        self._UpdateNodeOS(node_i, nresult, nimg)
2248
        if not nimg.os_fail:
2249
          if refos_img is None:
2250
            refos_img = nimg
2251
          self._VerifyNodeOS(node_i, nimg, refos_img)
2252

    
2253
    feedback_fn("* Verifying instance status")
2254
    for instance in instancelist:
2255
      if verbose:
2256
        feedback_fn("* Verifying instance %s" % instance)
2257
      inst_config = instanceinfo[instance]
2258
      self._VerifyInstance(instance, inst_config, node_image,
2259
                           instdisk[instance])
2260
      inst_nodes_offline = []
2261

    
2262
      pnode = inst_config.primary_node
2263
      pnode_img = node_image[pnode]
2264
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2265
               self.ENODERPC, pnode, "instance %s, connection to"
2266
               " primary node failed", instance)
2267

    
2268
      if pnode_img.offline:
2269
        inst_nodes_offline.append(pnode)
2270

    
2271
      # If the instance is non-redundant we cannot survive losing its primary
2272
      # node, so we are not N+1 compliant. On the other hand we have no disk
2273
      # templates with more than one secondary so that situation is not well
2274
      # supported either.
2275
      # FIXME: does not support file-backed instances
2276
      if not inst_config.secondary_nodes:
2277
        i_non_redundant.append(instance)
2278
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
2279
               instance, "instance has multiple secondary nodes: %s",
2280
               utils.CommaJoin(inst_config.secondary_nodes),
2281
               code=self.ETYPE_WARNING)
2282

    
2283
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2284
        i_non_a_balanced.append(instance)
2285

    
2286
      for snode in inst_config.secondary_nodes:
2287
        s_img = node_image[snode]
2288
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
2289
                 "instance %s, connection to secondary node failed", instance)
2290

    
2291
        if s_img.offline:
2292
          inst_nodes_offline.append(snode)
2293

    
2294
      # warn that the instance lives on offline nodes
2295
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
2296
               "instance lives on offline node(s) %s",
2297
               utils.CommaJoin(inst_nodes_offline))
2298
      # ... or ghost/non-vm_capable nodes
2299
      for node in inst_config.all_nodes:
2300
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
2301
                 "instance lives on ghost node %s", node)
2302
        _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE,
2303
                 instance, "instance lives on non-vm_capable node %s", node)
2304

    
2305
    feedback_fn("* Verifying orphan volumes")
2306
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2307
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2308

    
2309
    feedback_fn("* Verifying orphan instances")
2310
    self._VerifyOrphanInstances(instancelist, node_image)
2311

    
2312
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2313
      feedback_fn("* Verifying N+1 Memory redundancy")
2314
      self._VerifyNPlusOneMemory(node_image, instanceinfo)
2315

    
2316
    feedback_fn("* Other Notes")
2317
    if i_non_redundant:
2318
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2319
                  % len(i_non_redundant))
2320

    
2321
    if i_non_a_balanced:
2322
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2323
                  % len(i_non_a_balanced))
2324

    
2325
    if n_offline:
2326
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2327

    
2328
    if n_drained:
2329
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2330

    
2331
    return not self.bad
2332

    
2333
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2334
    """Analyze the post-hooks' result
2335

2336
    This method analyses the hook result, handles it, and sends some
2337
    nicely-formatted feedback back to the user.
2338

2339
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2340
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2341
    @param hooks_results: the results of the multi-node hooks rpc call
2342
    @param feedback_fn: function used send feedback back to the caller
2343
    @param lu_result: previous Exec result
2344
    @return: the new Exec result, based on the previous result
2345
        and hook results
2346

2347
    """
2348
    # We only really run POST phase hooks, and are only interested in
2349
    # their results
2350
    if phase == constants.HOOKS_PHASE_POST:
2351
      # Used to change hooks' output to proper indentation
2352
      feedback_fn("* Hooks Results")
2353
      assert hooks_results, "invalid result from hooks"
2354

    
2355
      for node_name in hooks_results:
2356
        res = hooks_results[node_name]
2357
        msg = res.fail_msg
2358
        test = msg and not res.offline
2359
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
2360
                      "Communication failure in hooks execution: %s", msg)
2361
        if res.offline or msg:
2362
          # No need to investigate payload if node is offline or gave an error.
2363
          # override manually lu_result here as _ErrorIf only
2364
          # overrides self.bad
2365
          lu_result = 1
2366
          continue
2367
        for script, hkr, output in res.payload:
2368
          test = hkr == constants.HKR_FAIL
2369
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
2370
                        "Script %s failed, output:", script)
2371
          if test:
2372
            output = self._HOOKS_INDENT_RE.sub('      ', output)
2373
            feedback_fn("%s" % output)
2374
            lu_result = 0
2375

    
2376
      return lu_result
2377

    
2378

    
2379
class LUVerifyDisks(NoHooksLU):
2380
  """Verifies the cluster disks status.
2381

2382
  """
2383
  REQ_BGL = False
2384

    
2385
  def ExpandNames(self):
2386
    self.needed_locks = {
2387
      locking.LEVEL_NODE: locking.ALL_SET,
2388
      locking.LEVEL_INSTANCE: locking.ALL_SET,
2389
    }
2390
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
2391

    
2392
  def Exec(self, feedback_fn):
2393
    """Verify integrity of cluster disks.
2394

2395
    @rtype: tuple of three items
2396
    @return: a tuple of (dict of node-to-node_error, list of instances
2397
        which need activate-disks, dict of instance: (node, volume) for
2398
        missing volumes
2399

2400
    """
2401
    result = res_nodes, res_instances, res_missing = {}, [], {}
2402

    
2403
    nodes = utils.NiceSort(self.cfg.GetNodeList())
2404
    instances = [self.cfg.GetInstanceInfo(name)
2405
                 for name in self.cfg.GetInstanceList()]
2406

    
2407
    nv_dict = {}
2408
    for inst in instances:
2409
      inst_lvs = {}
2410
      if (not inst.admin_up or
2411
          inst.disk_template not in constants.DTS_NET_MIRROR):
2412
        continue
2413
      inst.MapLVsByNode(inst_lvs)
2414
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
2415
      for node, vol_list in inst_lvs.iteritems():
2416
        for vol in vol_list:
2417
          nv_dict[(node, vol)] = inst
2418

    
2419
    if not nv_dict:
2420
      return result
2421

    
2422
    vg_names = self.rpc.call_vg_list(nodes)
2423
    vg_names.Raise("Cannot get list of VGs")
2424

    
2425
    for node in nodes:
2426
      # node_volume
2427
      node_res = self.rpc.call_lv_list([node],
2428
                                       vg_names[node].payload.keys())[node]
2429
      if node_res.offline:
2430
        continue
2431
      msg = node_res.fail_msg
2432
      if msg:
2433
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
2434
        res_nodes[node] = msg
2435
        continue
2436

    
2437
      lvs = node_res.payload
2438
      for lv_name, (_, _, lv_online) in lvs.items():
2439
        inst = nv_dict.pop((node, lv_name), None)
2440
        if (not lv_online and inst is not None
2441
            and inst.name not in res_instances):
2442
          res_instances.append(inst.name)
2443

    
2444
    # any leftover items in nv_dict are missing LVs, let's arrange the
2445
    # data better
2446
    for key, inst in nv_dict.iteritems():
2447
      if inst.name not in res_missing:
2448
        res_missing[inst.name] = []
2449
      res_missing[inst.name].append(key)
2450

    
2451
    return result
2452

    
2453

    
2454
class LURepairDiskSizes(NoHooksLU):
2455
  """Verifies the cluster disks sizes.
2456

2457
  """
2458
  _OP_PARAMS = [("instances", ht.EmptyList, ht.TListOf(ht.TNonEmptyString))]
2459
  REQ_BGL = False
2460

    
2461
  def ExpandNames(self):
2462
    if self.op.instances:
2463
      self.wanted_names = []
2464
      for name in self.op.instances:
2465
        full_name = _ExpandInstanceName(self.cfg, name)
2466
        self.wanted_names.append(full_name)
2467
      self.needed_locks = {
2468
        locking.LEVEL_NODE: [],
2469
        locking.LEVEL_INSTANCE: self.wanted_names,
2470
        }
2471
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2472
    else:
2473
      self.wanted_names = None
2474
      self.needed_locks = {
2475
        locking.LEVEL_NODE: locking.ALL_SET,
2476
        locking.LEVEL_INSTANCE: locking.ALL_SET,
2477
        }
2478
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
2479

    
2480
  def DeclareLocks(self, level):
2481
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
2482
      self._LockInstancesNodes(primary_only=True)
2483

    
2484
  def CheckPrereq(self):
2485
    """Check prerequisites.
2486

2487
    This only checks the optional instance list against the existing names.
2488

2489
    """
2490
    if self.wanted_names is None:
2491
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2492

    
2493
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
2494
                             in self.wanted_names]
2495

    
2496
  def _EnsureChildSizes(self, disk):
2497
    """Ensure children of the disk have the needed disk size.
2498

2499
    This is valid mainly for DRBD8 and fixes an issue where the
2500
    children have smaller disk size.
2501

2502
    @param disk: an L{ganeti.objects.Disk} object
2503

2504
    """
2505
    if disk.dev_type == constants.LD_DRBD8:
2506
      assert disk.children, "Empty children for DRBD8?"
2507
      fchild = disk.children[0]
2508
      mismatch = fchild.size < disk.size
2509
      if mismatch:
2510
        self.LogInfo("Child disk has size %d, parent %d, fixing",
2511
                     fchild.size, disk.size)
2512
        fchild.size = disk.size
2513

    
2514
      # and we recurse on this child only, not on the metadev
2515
      return self._EnsureChildSizes(fchild) or mismatch
2516
    else:
2517
      return False
2518

    
2519
  def Exec(self, feedback_fn):
2520
    """Verify the size of cluster disks.
2521

2522
    """
2523
    # TODO: check child disks too
2524
    # TODO: check differences in size between primary/secondary nodes
2525
    per_node_disks = {}
2526
    for instance in self.wanted_instances:
2527
      pnode = instance.primary_node
2528
      if pnode not in per_node_disks:
2529
        per_node_disks[pnode] = []
2530
      for idx, disk in enumerate(instance.disks):
2531
        per_node_disks[pnode].append((instance, idx, disk))
2532

    
2533
    changed = []
2534
    for node, dskl in per_node_disks.items():
2535
      newl = [v[2].Copy() for v in dskl]
2536
      for dsk in newl:
2537
        self.cfg.SetDiskID(dsk, node)
2538
      result = self.rpc.call_blockdev_getsizes(node, newl)
2539
      if result.fail_msg:
2540
        self.LogWarning("Failure in blockdev_getsizes call to node"
2541
                        " %s, ignoring", node)
2542
        continue
2543
      if len(result.data) != len(dskl):
2544
        self.LogWarning("Invalid result from node %s, ignoring node results",
2545
                        node)
2546
        continue
2547
      for ((instance, idx, disk), size) in zip(dskl, result.data):
2548
        if size is None:
2549
          self.LogWarning("Disk %d of instance %s did not return size"
2550
                          " information, ignoring", idx, instance.name)
2551
          continue
2552
        if not isinstance(size, (int, long)):
2553
          self.LogWarning("Disk %d of instance %s did not return valid"
2554
                          " size information, ignoring", idx, instance.name)
2555
          continue
2556
        size = size >> 20
2557
        if size != disk.size:
2558
          self.LogInfo("Disk %d of instance %s has mismatched size,"
2559
                       " correcting: recorded %d, actual %d", idx,
2560
                       instance.name, disk.size, size)
2561
          disk.size = size
2562
          self.cfg.Update(instance, feedback_fn)
2563
          changed.append((instance.name, idx, size))
2564
        if self._EnsureChildSizes(disk):
2565
          self.cfg.Update(instance, feedback_fn)
2566
          changed.append((instance.name, idx, disk.size))
2567
    return changed
2568

    
2569

    
2570
class LURenameCluster(LogicalUnit):
2571
  """Rename the cluster.
2572

2573
  """
2574
  HPATH = "cluster-rename"
2575
  HTYPE = constants.HTYPE_CLUSTER
2576
  _OP_PARAMS = [("name", ht.NoDefault, ht.TNonEmptyString)]
2577

    
2578
  def BuildHooksEnv(self):
2579
    """Build hooks env.
2580

2581
    """
2582
    env = {
2583
      "OP_TARGET": self.cfg.GetClusterName(),
2584
      "NEW_NAME": self.op.name,
2585
      }
2586
    mn = self.cfg.GetMasterNode()
2587
    all_nodes = self.cfg.GetNodeList()
2588
    return env, [mn], all_nodes
2589

    
2590
  def CheckPrereq(self):
2591
    """Verify that the passed name is a valid one.
2592

2593
    """
2594
    hostname = netutils.GetHostname(name=self.op.name,
2595
                                    family=self.cfg.GetPrimaryIPFamily())
2596

    
2597
    new_name = hostname.name
2598
    self.ip = new_ip = hostname.ip
2599
    old_name = self.cfg.GetClusterName()
2600
    old_ip = self.cfg.GetMasterIP()
2601
    if new_name == old_name and new_ip == old_ip:
2602
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
2603
                                 " cluster has changed",
2604
                                 errors.ECODE_INVAL)
2605
    if new_ip != old_ip:
2606
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
2607
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
2608
                                   " reachable on the network" %
2609
                                   new_ip, errors.ECODE_NOTUNIQUE)
2610

    
2611
    self.op.name = new_name
2612

    
2613
  def Exec(self, feedback_fn):
2614
    """Rename the cluster.
2615

2616
    """
2617
    clustername = self.op.name
2618
    ip = self.ip
2619

    
2620
    # shutdown the master IP
2621
    master = self.cfg.GetMasterNode()
2622
    result = self.rpc.call_node_stop_master(master, False)
2623
    result.Raise("Could not disable the master role")
2624

    
2625
    try:
2626
      cluster = self.cfg.GetClusterInfo()
2627
      cluster.cluster_name = clustername
2628
      cluster.master_ip = ip
2629
      self.cfg.Update(cluster, feedback_fn)
2630

    
2631
      # update the known hosts file
2632
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
2633
      node_list = self.cfg.GetOnlineNodeList()
2634
      try:
2635
        node_list.remove(master)
2636
      except ValueError:
2637
        pass
2638
      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
2639
    finally:
2640
      result = self.rpc.call_node_start_master(master, False, False)
2641
      msg = result.fail_msg
2642
      if msg:
2643
        self.LogWarning("Could not re-enable the master role on"
2644
                        " the master, please restart manually: %s", msg)
2645

    
2646
    return clustername
2647

    
2648

    
2649
class LUSetClusterParams(LogicalUnit):
2650
  """Change the parameters of the cluster.
2651

2652
  """
2653
  HPATH = "cluster-modify"
2654
  HTYPE = constants.HTYPE_CLUSTER
2655
  _OP_PARAMS = [
2656
    ("vg_name", None, ht.TMaybeString),
2657
    ("enabled_hypervisors", None,
2658
     ht.TOr(ht.TAnd(ht.TListOf(ht.TElemOf(constants.HYPER_TYPES)), ht.TTrue),
2659
            ht.TNone)),
2660
    ("hvparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
2661
                              ht.TNone)),
2662
    ("beparams", None, ht.TOr(ht.TDict, ht.TNone)),
2663
    ("os_hvp", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
2664
                            ht.TNone)),
2665
    ("osparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
2666
                              ht.TNone)),
2667
    ("candidate_pool_size", None, ht.TOr(ht.TStrictPositiveInt, ht.TNone)),
2668
    ("uid_pool", None, ht.NoType),
2669
    ("add_uids", None, ht.NoType),
2670
    ("remove_uids", None, ht.NoType),
2671
    ("maintain_node_health", None, ht.TMaybeBool),
2672
    ("prealloc_wipe_disks", None, ht.TMaybeBool),
2673
    ("nicparams", None, ht.TOr(ht.TDict, ht.TNone)),
2674
    ("ndparams", None, ht.TOr(ht.TDict, ht.TNone)),
2675
    ("drbd_helper", None, ht.TOr(ht.TString, ht.TNone)),
2676
    ("default_iallocator", None, ht.TOr(ht.TString, ht.TNone)),
2677
    ("reserved_lvs", None, ht.TOr(ht.TListOf(ht.TNonEmptyString), ht.TNone)),
2678
    ("hidden_os", None, ht.TOr(ht.TListOf(\
2679
          ht.TAnd(ht.TList,
2680
                ht.TIsLength(2),
2681
                ht.TMap(lambda v: v[0], ht.TElemOf(constants.DDMS_VALUES)))),
2682
          ht.TNone)),
2683
    ("blacklisted_os", None, ht.TOr(ht.TListOf(\
2684
          ht.TAnd(ht.TList,
2685
                ht.TIsLength(2),
2686
                ht.TMap(lambda v: v[0], ht.TElemOf(constants.DDMS_VALUES)))),
2687
          ht.TNone)),
2688
    ]
2689
  REQ_BGL = False
2690

    
2691
  def CheckArguments(self):
2692
    """Check parameters
2693

2694
    """
2695
    if self.op.uid_pool:
2696
      uidpool.CheckUidPool(self.op.uid_pool)
2697

    
2698
    if self.op.add_uids:
2699
      uidpool.CheckUidPool(self.op.add_uids)
2700

    
2701
    if self.op.remove_uids:
2702
      uidpool.CheckUidPool(self.op.remove_uids)
2703

    
2704
  def ExpandNames(self):
2705
    # FIXME: in the future maybe other cluster params won't require checking on
2706
    # all nodes to be modified.
2707
    self.needed_locks = {
2708
      locking.LEVEL_NODE: locking.ALL_SET,
2709
    }
2710
    self.share_locks[locking.LEVEL_NODE] = 1
2711

    
2712
  def BuildHooksEnv(self):
2713
    """Build hooks env.
2714

2715
    """
2716
    env = {
2717
      "OP_TARGET": self.cfg.GetClusterName(),
2718
      "NEW_VG_NAME": self.op.vg_name,
2719
      }
2720
    mn = self.cfg.GetMasterNode()
2721
    return env, [mn], [mn]
2722

    
2723
  def CheckPrereq(self):
2724
    """Check prerequisites.
2725

2726
    This checks whether the given params don't conflict and
2727
    if the given volume group is valid.
2728

2729
    """
2730
    if self.op.vg_name is not None and not self.op.vg_name:
2731
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
2732
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
2733
                                   " instances exist", errors.ECODE_INVAL)
2734

    
2735
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
2736
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
2737
        raise errors.OpPrereqError("Cannot disable drbd helper while"
2738
                                   " drbd-based instances exist",
2739
                                   errors.ECODE_INVAL)
2740

    
2741
    node_list = self.acquired_locks[locking.LEVEL_NODE]
2742

    
2743
    # if vg_name not None, checks given volume group on all nodes
2744
    if self.op.vg_name:
2745
      vglist = self.rpc.call_vg_list(node_list)
2746
      for node in node_list:
2747
        msg = vglist[node].fail_msg
2748
        if msg:
2749
          # ignoring down node
2750
          self.LogWarning("Error while gathering data on node %s"
2751
                          " (ignoring node): %s", node, msg)
2752
          continue
2753
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2754
                                              self.op.vg_name,
2755
                                              constants.MIN_VG_SIZE)
2756
        if vgstatus:
2757
          raise errors.OpPrereqError("Error on node '%s': %s" %
2758
                                     (node, vgstatus), errors.ECODE_ENVIRON)
2759

    
2760
    if self.op.drbd_helper:
2761
      # checks given drbd helper on all nodes
2762
      helpers = self.rpc.call_drbd_helper(node_list)
2763
      for node in node_list:
2764
        ninfo = self.cfg.GetNodeInfo(node)
2765
        if ninfo.offline:
2766
          self.LogInfo("Not checking drbd helper on offline node %s", node)
2767
          continue
2768
        msg = helpers[node].fail_msg
2769
        if msg:
2770
          raise errors.OpPrereqError("Error checking drbd helper on node"
2771
                                     " '%s': %s" % (node, msg),
2772
                                     errors.ECODE_ENVIRON)
2773
        node_helper = helpers[node].payload
2774
        if node_helper != self.op.drbd_helper:
2775
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
2776
                                     (node, node_helper), errors.ECODE_ENVIRON)
2777

    
2778
    self.cluster = cluster = self.cfg.GetClusterInfo()
2779
    # validate params changes
2780
    if self.op.beparams:
2781
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2782
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
2783

    
2784
    if self.op.ndparams:
2785
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
2786
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
2787

    
2788
    if self.op.nicparams:
2789
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2790
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
2791
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
2792
      nic_errors = []
2793

    
2794
      # check all instances for consistency
2795
      for instance in self.cfg.GetAllInstancesInfo().values():
2796
        for nic_idx, nic in enumerate(instance.nics):
2797
          params_copy = copy.deepcopy(nic.nicparams)
2798
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
2799

    
2800
          # check parameter syntax
2801
          try:
2802
            objects.NIC.CheckParameterSyntax(params_filled)
2803
          except errors.ConfigurationError, err:
2804
            nic_errors.append("Instance %s, nic/%d: %s" %
2805
                              (instance.name, nic_idx, err))
2806

    
2807
          # if we're moving instances to routed, check that they have an ip
2808
          target_mode = params_filled[constants.NIC_MODE]
2809
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2810
            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2811
                              (instance.name, nic_idx))
2812
      if nic_errors:
2813
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2814
                                   "\n".join(nic_errors))
2815

    
2816
    # hypervisor list/parameters
2817
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
2818
    if self.op.hvparams:
2819
      for hv_name, hv_dict in self.op.hvparams.items():
2820
        if hv_name not in self.new_hvparams:
2821
          self.new_hvparams[hv_name] = hv_dict
2822
        else:
2823
          self.new_hvparams[hv_name].update(hv_dict)
2824

    
2825
    # os hypervisor parameters
2826
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
2827
    if self.op.os_hvp:
2828
      for os_name, hvs in self.op.os_hvp.items():
2829
        if os_name not in self.new_os_hvp:
2830
          self.new_os_hvp[os_name] = hvs
2831
        else:
2832
          for hv_name, hv_dict in hvs.items():
2833
            if hv_name not in self.new_os_hvp[os_name]:
2834
              self.new_os_hvp[os_name][hv_name] = hv_dict
2835
            else:
2836
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
2837

    
2838
    # os parameters
2839
    self.new_osp = objects.FillDict(cluster.osparams, {})
2840
    if self.op.osparams:
2841
      for os_name, osp in self.op.osparams.items():
2842
        if os_name not in self.new_osp:
2843
          self.new_osp[os_name] = {}
2844

    
2845
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
2846
                                                  use_none=True)
2847

    
2848
        if not self.new_osp[os_name]:
2849
          # we removed all parameters
2850
          del self.new_osp[os_name]
2851
        else:
2852
          # check the parameter validity (remote check)
2853
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
2854
                         os_name, self.new_osp[os_name])
2855

    
2856
    # changes to the hypervisor list
2857
    if self.op.enabled_hypervisors is not None:
2858
      self.hv_list = self.op.enabled_hypervisors
2859
      for hv in self.hv_list:
2860
        # if the hypervisor doesn't already exist in the cluster
2861
        # hvparams, we initialize it to empty, and then (in both
2862
        # cases) we make sure to fill the defaults, as we might not
2863
        # have a complete defaults list if the hypervisor wasn't
2864
        # enabled before
2865
        if hv not in new_hvp:
2866
          new_hvp[hv] = {}
2867
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
2868
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
2869
    else:
2870
      self.hv_list = cluster.enabled_hypervisors
2871

    
2872
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
2873
      # either the enabled list has changed, or the parameters have, validate
2874
      for hv_name, hv_params in self.new_hvparams.items():
2875
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
2876
            (self.op.enabled_hypervisors and
2877
             hv_name in self.op.enabled_hypervisors)):
2878
          # either this is a new hypervisor, or its parameters have changed
2879
          hv_class = hypervisor.GetHypervisor(hv_name)
2880
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2881
          hv_class.CheckParameterSyntax(hv_params)
2882
          _CheckHVParams(self, node_list, hv_name, hv_params)
2883

    
2884
    if self.op.os_hvp:
2885
      # no need to check any newly-enabled hypervisors, since the
2886
      # defaults have already been checked in the above code-block
2887
      for os_name, os_hvp in self.new_os_hvp.items():
2888
        for hv_name, hv_params in os_hvp.items():
2889
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2890
          # we need to fill in the new os_hvp on top of the actual hv_p
2891
          cluster_defaults = self.new_hvparams.get(hv_name, {})
2892
          new_osp = objects.FillDict(cluster_defaults, hv_params)
2893
          hv_class = hypervisor.GetHypervisor(hv_name)
2894
          hv_class.CheckParameterSyntax(new_osp)
2895
          _CheckHVParams(self, node_list, hv_name, new_osp)
2896

    
2897
    if self.op.default_iallocator:
2898
      alloc_script = utils.FindFile(self.op.default_iallocator,
2899
                                    constants.IALLOCATOR_SEARCH_PATH,
2900
                                    os.path.isfile)
2901
      if alloc_script is None:
2902
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
2903
                                   " specified" % self.op.default_iallocator,
2904
                                   errors.ECODE_INVAL)
2905

    
2906
  def Exec(self, feedback_fn):
2907
    """Change the parameters of the cluster.
2908

2909
    """
2910
    if self.op.vg_name is not None:
2911
      new_volume = self.op.vg_name
2912
      if not new_volume:
2913
        new_volume = None
2914
      if new_volume != self.cfg.GetVGName():
2915
        self.cfg.SetVGName(new_volume)
2916
      else:
2917
        feedback_fn("Cluster LVM configuration already in desired"
2918
                    " state, not changing")
2919
    if self.op.drbd_helper is not None:
2920
      new_helper = self.op.drbd_helper
2921
      if not new_helper:
2922
        new_helper = None
2923
      if new_helper != self.cfg.GetDRBDHelper():
2924
        self.cfg.SetDRBDHelper(new_helper)
2925
      else:
2926
        feedback_fn("Cluster DRBD helper already in desired state,"
2927
                    " not changing")
2928
    if self.op.hvparams:
2929
      self.cluster.hvparams = self.new_hvparams
2930
    if self.op.os_hvp:
2931
      self.cluster.os_hvp = self.new_os_hvp
2932
    if self.op.enabled_hypervisors is not None:
2933
      self.cluster.hvparams = self.new_hvparams
2934
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2935
    if self.op.beparams:
2936
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2937
    if self.op.nicparams:
2938
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2939
    if self.op.osparams:
2940
      self.cluster.osparams = self.new_osp
2941
    if self.op.ndparams:
2942
      self.cluster.ndparams = self.new_ndparams
2943

    
2944
    if self.op.candidate_pool_size is not None:
2945
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
2946
      # we need to update the pool size here, otherwise the save will fail
2947
      _AdjustCandidatePool(self, [])
2948

    
2949
    if self.op.maintain_node_health is not None:
2950
      self.cluster.maintain_node_health = self.op.maintain_node_health
2951

    
2952
    if self.op.prealloc_wipe_disks is not None:
2953
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
2954

    
2955
    if self.op.add_uids is not None:
2956
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
2957

    
2958
    if self.op.remove_uids is not None:
2959
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
2960

    
2961
    if self.op.uid_pool is not None:
2962
      self.cluster.uid_pool = self.op.uid_pool
2963

    
2964
    if self.op.default_iallocator is not None:
2965
      self.cluster.default_iallocator = self.op.default_iallocator
2966

    
2967
    if self.op.reserved_lvs is not None:
2968
      self.cluster.reserved_lvs = self.op.reserved_lvs
2969

    
2970
    def helper_os(aname, mods, desc):
2971
      desc += " OS list"
2972
      lst = getattr(self.cluster, aname)
2973
      for key, val in mods:
2974
        if key == constants.DDM_ADD:
2975
          if val in lst:
2976
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
2977
          else:
2978
            lst.append(val)
2979
        elif key == constants.DDM_REMOVE:
2980
          if val in lst:
2981
            lst.remove(val)
2982
          else:
2983
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
2984
        else:
2985
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
2986

    
2987
    if self.op.hidden_os:
2988
      helper_os("hidden_os", self.op.hidden_os, "hidden")
2989

    
2990
    if self.op.blacklisted_os:
2991
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
2992

    
2993
    self.cfg.Update(self.cluster, feedback_fn)
2994

    
2995

    
2996
def _UploadHelper(lu, nodes, fname):
2997
  """Helper for uploading a file and showing warnings.
2998

2999
  """
3000
  if os.path.exists(fname):
3001
    result = lu.rpc.call_upload_file(nodes, fname)
3002
    for to_node, to_result in result.items():
3003
      msg = to_result.fail_msg
3004
      if msg:
3005
        msg = ("Copy of file %s to node %s failed: %s" %
3006
               (fname, to_node, msg))
3007
        lu.proc.LogWarning(msg)
3008

    
3009

    
3010
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
3011
  """Distribute additional files which are part of the cluster configuration.
3012

3013
  ConfigWriter takes care of distributing the config and ssconf files, but
3014
  there are more files which should be distributed to all nodes. This function
3015
  makes sure those are copied.
3016

3017
  @param lu: calling logical unit
3018
  @param additional_nodes: list of nodes not in the config to distribute to
3019
  @type additional_vm: boolean
3020
  @param additional_vm: whether the additional nodes are vm-capable or not
3021

3022
  """
3023
  # 1. Gather target nodes
3024
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
3025
  dist_nodes = lu.cfg.GetOnlineNodeList()
3026
  nvm_nodes = lu.cfg.GetNonVmCapableNodeList()
3027
  vm_nodes = [name for name in dist_nodes if name not in nvm_nodes]
3028
  if additional_nodes is not None:
3029
    dist_nodes.extend(additional_nodes)
3030
    if additional_vm:
3031
      vm_nodes.extend(additional_nodes)
3032
  if myself.name in dist_nodes:
3033
    dist_nodes.remove(myself.name)
3034
  if myself.name in vm_nodes:
3035
    vm_nodes.remove(myself.name)
3036

    
3037
  # 2. Gather files to distribute
3038
  dist_files = set([constants.ETC_HOSTS,
3039
                    constants.SSH_KNOWN_HOSTS_FILE,
3040
                    constants.RAPI_CERT_FILE,
3041
                    constants.RAPI_USERS_FILE,
3042
                    constants.CONFD_HMAC_KEY,
3043
                    constants.CLUSTER_DOMAIN_SECRET_FILE,
3044
                   ])
3045

    
3046
  vm_files = set()
3047
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
3048
  for hv_name in enabled_hypervisors:
3049
    hv_class = hypervisor.GetHypervisor(hv_name)
3050
    vm_files.update(hv_class.GetAncillaryFiles())
3051

    
3052
  # 3. Perform the files upload
3053
  for fname in dist_files:
3054
    _UploadHelper(lu, dist_nodes, fname)
3055
  for fname in vm_files:
3056
    _UploadHelper(lu, vm_nodes, fname)
3057

    
3058

    
3059
class LURedistributeConfig(NoHooksLU):
3060
  """Force the redistribution of cluster configuration.
3061

3062
  This is a very simple LU.
3063

3064
  """
3065
  REQ_BGL = False
3066

    
3067
  def ExpandNames(self):
3068
    self.needed_locks = {
3069
      locking.LEVEL_NODE: locking.ALL_SET,
3070
    }
3071
    self.share_locks[locking.LEVEL_NODE] = 1
3072

    
3073
  def Exec(self, feedback_fn):
3074
    """Redistribute the configuration.
3075

3076
    """
3077
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
3078
    _RedistributeAncillaryFiles(self)
3079

    
3080

    
3081
def _WaitForSync(lu, instance, disks=None, oneshot=False):
3082
  """Sleep and poll for an instance's disk to sync.
3083

3084
  """
3085
  if not instance.disks or disks is not None and not disks:
3086
    return True
3087

    
3088
  disks = _ExpandCheckDisks(instance, disks)
3089

    
3090
  if not oneshot:
3091
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
3092

    
3093
  node = instance.primary_node
3094

    
3095
  for dev in disks:
3096
    lu.cfg.SetDiskID(dev, node)
3097

    
3098
  # TODO: Convert to utils.Retry
3099

    
3100
  retries = 0
3101
  degr_retries = 10 # in seconds, as we sleep 1 second each time
3102
  while True:
3103
    max_time = 0
3104
    done = True
3105
    cumul_degraded = False
3106
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
3107
    msg = rstats.fail_msg
3108
    if msg:
3109
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
3110
      retries += 1
3111
      if retries >= 10:
3112
        raise errors.RemoteError("Can't contact node %s for mirror data,"
3113
                                 " aborting." % node)
3114
      time.sleep(6)
3115
      continue
3116
    rstats = rstats.payload
3117
    retries = 0
3118
    for i, mstat in enumerate(rstats):
3119
      if mstat is None:
3120
        lu.LogWarning("Can't compute data for node %s/%s",
3121
                           node, disks[i].iv_name)
3122
        continue
3123

    
3124
      cumul_degraded = (cumul_degraded or
3125
                        (mstat.is_degraded and mstat.sync_percent is None))
3126
      if mstat.sync_percent is not None:
3127
        done = False
3128
        if mstat.estimated_time is not None:
3129
          rem_time = ("%s remaining (estimated)" %
3130
                      utils.FormatSeconds(mstat.estimated_time))
3131
          max_time = mstat.estimated_time
3132
        else:
3133
          rem_time = "no time estimate"
3134
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
3135
                        (disks[i].iv_name, mstat.sync_percent, rem_time))
3136

    
3137
    # if we're done but degraded, let's do a few small retries, to
3138
    # make sure we see a stable and not transient situation; therefore
3139
    # we force restart of the loop
3140
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
3141
      logging.info("Degraded disks found, %d retries left", degr_retries)
3142
      degr_retries -= 1
3143
      time.sleep(1)
3144
      continue
3145

    
3146
    if done or oneshot:
3147
      break
3148

    
3149
    time.sleep(min(60, max_time))
3150

    
3151
  if done:
3152
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
3153
  return not cumul_degraded
3154

    
3155

    
3156
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
3157
  """Check that mirrors are not degraded.
3158

3159
  The ldisk parameter, if True, will change the test from the
3160
  is_degraded attribute (which represents overall non-ok status for
3161
  the device(s)) to the ldisk (representing the local storage status).
3162

3163
  """
3164
  lu.cfg.SetDiskID(dev, node)
3165

    
3166
  result = True
3167

    
3168
  if on_primary or dev.AssembleOnSecondary():
3169
    rstats = lu.rpc.call_blockdev_find(node, dev)
3170
    msg = rstats.fail_msg
3171
    if msg:
3172
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
3173
      result = False
3174
    elif not rstats.payload:
3175
      lu.LogWarning("Can't find disk on node %s", node)
3176
      result = False
3177
    else:
3178
      if ldisk:
3179
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
3180
      else:
3181
        result = result and not rstats.payload.is_degraded
3182

    
3183
  if dev.children:
3184
    for child in dev.children:
3185
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
3186

    
3187
  return result
3188

    
3189

    
3190
class LUOutOfBand(NoHooksLU):
3191
  """Logical unit for OOB handling.
3192

3193
  """
3194
  _OP_PARAMS = [
3195
    _PNodeName,
3196
    ("command", None, ht.TElemOf(constants.OOB_COMMANDS)),
3197
    ("timeout", constants.OOB_TIMEOUT, ht.TInt),
3198
    ]
3199
  REG_BGL = False
3200

    
3201
  def CheckPrereq(self):
3202
    """Check prerequisites.
3203

3204
    This checks:
3205
     - the node exists in the configuration
3206
     - OOB is supported
3207

3208
    Any errors are signaled by raising errors.OpPrereqError.
3209

3210
    """
3211
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3212
    node = self.cfg.GetNodeInfo(self.op.node_name)
3213

    
3214
    if node is None:
3215
      raise errors.OpPrereqError("Node %s not found" % self.op.node_name)
3216

    
3217
    self.oob_program = self.cfg.GetOobProgram(node)
3218

    
3219
    if not self.oob_program:
3220
      raise errors.OpPrereqError("OOB is not supported for node %s" %
3221
                                 self.op.node_name)
3222

    
3223
    self.op.node_name = node.name
3224
    self.node = node
3225

    
3226
  def ExpandNames(self):
3227
    """Gather locks we need.
3228

3229
    """
3230
    self.needed_locks = {
3231
      locking.LEVEL_NODE: [self.op.node_name],
3232
      }
3233

    
3234
  def Exec(self, feedback_fn):
3235
    """Execute OOB and return result if we expect any.
3236

3237
    """
3238
    master_node = self.cfg.GetMasterNode()
3239

    
3240
    logging.info("Executing out-of-band command '%s' using '%s' on %s",
3241
                 self.op.command, self.oob_program, self.op.node_name)
3242
    result = self.rpc.call_run_oob(master_node, self.oob_program,
3243
                                   self.op.command, self.op.node_name,
3244
                                   self.op.timeout)
3245

    
3246
    result.Raise("An error occurred on execution of OOB helper")
3247

    
3248
    if self.op.command == constants.OOB_HEALTH:
3249
      # For health we should log important events
3250
      for item, status in result.payload:
3251
        if status in [constants.OOB_STATUS_WARNING,
3252
                      constants.OOB_STATUS_CRITICAL]:
3253
          logging.warning("On node '%s' item '%s' has status '%s'",
3254
                          self.op.node_name, item, status)
3255

    
3256
    return result.payload
3257

    
3258

    
3259
class LUDiagnoseOS(NoHooksLU):
3260
  """Logical unit for OS diagnose/query.
3261

3262
  """
3263
  _OP_PARAMS = [
3264
    _POutputFields,
3265
    ("names", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
3266
    ]
3267
  REQ_BGL = False
3268
  _HID = "hidden"
3269
  _BLK = "blacklisted"
3270
  _VLD = "valid"
3271
  _FIELDS_STATIC = utils.FieldSet()
3272
  _FIELDS_DYNAMIC = utils.FieldSet("name", _VLD, "node_status", "variants",
3273
                                   "parameters", "api_versions", _HID, _BLK)
3274

    
3275
  def CheckArguments(self):
3276
    if self.op.names:
3277
      raise errors.OpPrereqError("Selective OS query not supported",
3278
                                 errors.ECODE_INVAL)
3279

    
3280
    _CheckOutputFields(static=self._FIELDS_STATIC,
3281
                       dynamic=self._FIELDS_DYNAMIC,
3282
                       selected=self.op.output_fields)
3283

    
3284
  def ExpandNames(self):
3285
    # Lock all nodes, in shared mode
3286
    # Temporary removal of locks, should be reverted later
3287
    # TODO: reintroduce locks when they are lighter-weight
3288
    self.needed_locks = {}
3289
    #self.share_locks[locking.LEVEL_NODE] = 1
3290
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3291

    
3292
  @staticmethod
3293
  def _DiagnoseByOS(rlist):
3294
    """Remaps a per-node return list into an a per-os per-node dictionary
3295

3296
    @param rlist: a map with node names as keys and OS objects as values
3297

3298
    @rtype: dict
3299
    @return: a dictionary with osnames as keys and as value another
3300
        map, with nodes as keys and tuples of (path, status, diagnose,
3301
        variants, parameters, api_versions) as values, eg::
3302

3303
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
3304
                                     (/srv/..., False, "invalid api")],
3305
                           "node2": [(/srv/..., True, "", [], [])]}
3306
          }
3307

3308
    """
3309
    all_os = {}
3310
    # we build here the list of nodes that didn't fail the RPC (at RPC
3311
    # level), so that nodes with a non-responding node daemon don't
3312
    # make all OSes invalid
3313
    good_nodes = [node_name for node_name in rlist
3314
                  if not rlist[node_name].fail_msg]
3315
    for node_name, nr in rlist.items():
3316
      if nr.fail_msg or not nr.payload:
3317
        continue
3318
      for (name, path, status, diagnose, variants,
3319
           params, api_versions) in nr.payload:
3320
        if name not in all_os:
3321
          # build a list of nodes for this os containing empty lists
3322
          # for each node in node_list
3323
          all_os[name] = {}
3324
          for nname in good_nodes:
3325
            all_os[name][nname] = []
3326
        # convert params from [name, help] to (name, help)
3327
        params = [tuple(v) for v in params]
3328
        all_os[name][node_name].append((path, status, diagnose,
3329
                                        variants, params, api_versions))
3330
    return all_os
3331

    
3332
  def Exec(self, feedback_fn):
3333
    """Compute the list of OSes.
3334

3335
    """
3336
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
3337
    node_data = self.rpc.call_os_diagnose(valid_nodes)
3338
    pol = self._DiagnoseByOS(node_data)
3339
    output = []
3340
    cluster = self.cfg.GetClusterInfo()
3341

    
3342
    for os_name in utils.NiceSort(pol.keys()):
3343
      os_data = pol[os_name]
3344
      row = []
3345
      valid = True
3346
      (variants, params, api_versions) = null_state = (set(), set(), set())
3347
      for idx, osl in enumerate(os_data.values()):
3348
        valid = bool(valid and osl and osl[0][1])
3349
        if not valid:
3350
          (variants, params, api_versions) = null_state
3351
          break
3352
        node_variants, node_params, node_api = osl[0][3:6]
3353
        if idx == 0: # first entry
3354
          variants = set(node_variants)
3355
          params = set(node_params)
3356
          api_versions = set(node_api)
3357
        else: # keep consistency
3358
          variants.intersection_update(node_variants)
3359
          params.intersection_update(node_params)
3360
          api_versions.intersection_update(node_api)
3361

    
3362
      is_hid = os_name in cluster.hidden_os
3363
      is_blk = os_name in cluster.blacklisted_os
3364
      if ((self._HID not in self.op.output_fields and is_hid) or
3365
          (self._BLK not in self.op.output_fields and is_blk) or
3366
          (self._VLD not in self.op.output_fields and not valid)):
3367
        continue
3368

    
3369
      for field in self.op.output_fields:
3370
        if field == "name":
3371
          val = os_name
3372
        elif field == self._VLD:
3373
          val = valid
3374
        elif field == "node_status":
3375
          # this is just a copy of the dict
3376
          val = {}
3377
          for node_name, nos_list in os_data.items():
3378
            val[node_name] = nos_list
3379
        elif field == "variants":
3380
          val = utils.NiceSort(list(variants))
3381
        elif field == "parameters":
3382
          val = list(params)
3383
        elif field == "api_versions":
3384
          val = list(api_versions)
3385
        elif field == self._HID:
3386
          val = is_hid
3387
        elif field == self._BLK:
3388
          val = is_blk
3389
        else:
3390
          raise errors.ParameterError(field)
3391
        row.append(val)
3392
      output.append(row)
3393

    
3394
    return output
3395

    
3396

    
3397
class LURemoveNode(LogicalUnit):
3398
  """Logical unit for removing a node.
3399

3400
  """
3401
  HPATH = "node-remove"
3402
  HTYPE = constants.HTYPE_NODE
3403
  _OP_PARAMS = [
3404
    _PNodeName,
3405
    ]
3406

    
3407
  def BuildHooksEnv(self):
3408
    """Build hooks env.
3409

3410
    This doesn't run on the target node in the pre phase as a failed
3411
    node would then be impossible to remove.
3412

3413
    """
3414
    env = {
3415
      "OP_TARGET": self.op.node_name,
3416
      "NODE_NAME": self.op.node_name,
3417
      }
3418
    all_nodes = self.cfg.GetNodeList()
3419
    try:
3420
      all_nodes.remove(self.op.node_name)
3421
    except ValueError:
3422
      logging.warning("Node %s which is about to be removed not found"
3423
                      " in the all nodes list", self.op.node_name)
3424
    return env, all_nodes, all_nodes
3425

    
3426
  def CheckPrereq(self):
3427
    """Check prerequisites.
3428

3429
    This checks:
3430
     - the node exists in the configuration
3431
     - it does not have primary or secondary instances
3432
     - it's not the master
3433

3434
    Any errors are signaled by raising errors.OpPrereqError.
3435

3436
    """
3437
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3438
    node = self.cfg.GetNodeInfo(self.op.node_name)
3439
    assert node is not None
3440

    
3441
    instance_list = self.cfg.GetInstanceList()
3442

    
3443
    masternode = self.cfg.GetMasterNode()
3444
    if node.name == masternode:
3445
      raise errors.OpPrereqError("Node is the master node,"
3446
                                 " you need to failover first.",
3447
                                 errors.ECODE_INVAL)
3448

    
3449
    for instance_name in instance_list:
3450
      instance = self.cfg.GetInstanceInfo(instance_name)
3451
      if node.name in instance.all_nodes:
3452
        raise errors.OpPrereqError("Instance %s is still running on the node,"
3453
                                   " please remove first." % instance_name,
3454
                                   errors.ECODE_INVAL)
3455
    self.op.node_name = node.name
3456
    self.node = node
3457

    
3458
  def Exec(self, feedback_fn):
3459
    """Removes the node from the cluster.
3460

3461
    """
3462
    node = self.node
3463
    logging.info("Stopping the node daemon and removing configs from node %s",
3464
                 node.name)
3465

    
3466
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
3467

    
3468
    # Promote nodes to master candidate as needed
3469
    _AdjustCandidatePool(self, exceptions=[node.name])
3470
    self.context.RemoveNode(node.name)
3471

    
3472
    # Run post hooks on the node before it's removed
3473
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
3474
    try:
3475
      hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
3476
    except:
3477
      # pylint: disable-msg=W0702
3478
      self.LogWarning("Errors occurred running hooks on %s" % node.name)
3479

    
3480
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
3481
    msg = result.fail_msg
3482
    if msg:
3483
      self.LogWarning("Errors encountered on the remote node while leaving"
3484
                      " the cluster: %s", msg)
3485

    
3486
    # Remove node from our /etc/hosts
3487
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3488
      master_node = self.cfg.GetMasterNode()
3489
      result = self.rpc.call_etc_hosts_modify(master_node,
3490
                                              constants.ETC_HOSTS_REMOVE,
3491
                                              node.name, None)
3492
      result.Raise("Can't update hosts file with new host data")
3493
      _RedistributeAncillaryFiles(self)
3494

    
3495

    
3496
class _NodeQuery(_QueryBase):
3497
  FIELDS = query.NODE_FIELDS
3498

    
3499
  def ExpandNames(self, lu):
3500
    lu.needed_locks = {}
3501
    lu.share_locks[locking.LEVEL_NODE] = 1
3502

    
3503
    if self.names:
3504
      self.wanted = _GetWantedNodes(lu, self.names)
3505
    else:
3506
      self.wanted = locking.ALL_SET
3507

    
3508
    self.do_locking = (self.use_locking and
3509
                       query.NQ_LIVE in self.requested_data)
3510

    
3511
    if self.do_locking:
3512
      # if we don't request only static fields, we need to lock the nodes
3513
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
3514

    
3515
  def DeclareLocks(self, _):
3516
    pass
3517

    
3518
  def _GetQueryData(self, lu):
3519
    """Computes the list of nodes and their attributes.
3520

3521
    """
3522
    all_info = lu.cfg.GetAllNodesInfo()
3523

    
3524
    if self.do_locking:
3525
      nodenames = lu.acquired_locks[locking.LEVEL_NODE]
3526
    elif self.wanted != locking.ALL_SET:
3527
      nodenames = self.wanted
3528
      missing = set(nodenames).difference(all_info.keys())
3529
      if missing:
3530
        raise errors.OpExecError("Some nodes were removed before retrieving"
3531
                                 " their data: %s" % missing)
3532
    else:
3533
      nodenames = all_info.keys()
3534

    
3535
    nodenames = utils.NiceSort(nodenames)
3536

    
3537
    # Gather data as requested
3538
    if query.NQ_LIVE in self.requested_data:
3539
      node_data = lu.rpc.call_node_info(nodenames, lu.cfg.GetVGName(),
3540
                                        lu.cfg.GetHypervisorType())
3541
      live_data = dict((name, nresult.payload)
3542
                       for (name, nresult) in node_data.items()
3543
                       if not nresult.fail_msg and nresult.payload)
3544
    else:
3545
      live_data = None
3546

    
3547
    if query.NQ_INST in self.requested_data:
3548
      node_to_primary = dict([(name, set()) for name in nodenames])
3549
      node_to_secondary = dict([(name, set()) for name in nodenames])
3550

    
3551
      inst_data = lu.cfg.GetAllInstancesInfo()
3552

    
3553
      for inst in inst_data.values():
3554
        if inst.primary_node in node_to_primary:
3555
          node_to_primary[inst.primary_node].add(inst.name)
3556
        for secnode in inst.secondary_nodes:
3557
          if secnode in node_to_secondary:
3558
            node_to_secondary[secnode].add(inst.name)
3559
    else:
3560
      node_to_primary = None
3561
      node_to_secondary = None
3562

    
3563
    if query.NQ_GROUP in self.requested_data:
3564
      groups = lu.cfg.GetAllNodeGroupsInfo()
3565
    else:
3566
      groups = {}
3567

    
3568
    return query.NodeQueryData([all_info[name] for name in nodenames],
3569
                               live_data, lu.cfg.GetMasterNode(),
3570
                               node_to_primary, node_to_secondary, groups)
3571

    
3572

    
3573
class LUQueryNodes(NoHooksLU):
3574
  """Logical unit for querying nodes.
3575

3576
  """
3577
  # pylint: disable-msg=W0142
3578
  _OP_PARAMS = [
3579
    _POutputFields,
3580
    ("names", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
3581
    ("use_locking", False, ht.TBool),
3582
    ]
3583
  REQ_BGL = False
3584

    
3585
  def CheckArguments(self):
3586
    self.nq = _NodeQuery(self.op.names, self.op.output_fields,
3587
                         self.op.use_locking)
3588

    
3589
  def ExpandNames(self):
3590
    self.nq.ExpandNames(self)
3591

    
3592
  def Exec(self, feedback_fn):
3593
    return self.nq.OldStyleQuery(self)
3594

    
3595

    
3596
class LUQueryNodeVolumes(NoHooksLU):
3597
  """Logical unit for getting volumes on node(s).
3598

3599
  """
3600
  _OP_PARAMS = [
3601
    _POutputFields,
3602
    ("nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
3603
    ]
3604
  REQ_BGL = False
3605
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
3606
  _FIELDS_STATIC = utils.FieldSet("node")
3607

    
3608
  def CheckArguments(self):
3609
    _CheckOutputFields(static=self._FIELDS_STATIC,
3610
                       dynamic=self._FIELDS_DYNAMIC,
3611
                       selected=self.op.output_fields)
3612

    
3613
  def ExpandNames(self):
3614
    self.needed_locks = {}
3615
    self.share_locks[locking.LEVEL_NODE] = 1
3616
    if not self.op.nodes:
3617
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3618
    else:
3619
      self.needed_locks[locking.LEVEL_NODE] = \
3620
        _GetWantedNodes(self, self.op.nodes)
3621

    
3622
  def Exec(self, feedback_fn):
3623
    """Computes the list of nodes and their attributes.
3624

3625
    """
3626
    nodenames = self.acquired_locks[locking.LEVEL_NODE]
3627
    volumes = self.rpc.call_node_volumes(nodenames)
3628

    
3629
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3630
             in self.cfg.GetInstanceList()]
3631

    
3632
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
3633

    
3634
    output = []
3635
    for node in nodenames:
3636
      nresult = volumes[node]
3637
      if nresult.offline:
3638
        continue
3639
      msg = nresult.fail_msg
3640
      if msg:
3641
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
3642
        continue
3643

    
3644
      node_vols = nresult.payload[:]
3645
      node_vols.sort(key=lambda vol: vol['dev'])
3646

    
3647
      for vol in node_vols:
3648
        node_output = []
3649
        for field in self.op.output_fields:
3650
          if field == "node":
3651
            val = node
3652
          elif field == "phys":
3653
            val = vol['dev']
3654
          elif field == "vg":
3655
            val = vol['vg']
3656
          elif field == "name":
3657
            val = vol['name']
3658
          elif field == "size":
3659
            val = int(float(vol['size']))
3660
          elif field == "instance":
3661
            for inst in ilist:
3662
              if node not in lv_by_node[inst]:
3663
                continue
3664
              if vol['name'] in lv_by_node[inst][node]:
3665
                val = inst.name
3666
                break
3667
            else:
3668
              val = '-'
3669
          else:
3670
            raise errors.ParameterError(field)
3671
          node_output.append(str(val))
3672

    
3673
        output.append(node_output)
3674

    
3675
    return output
3676

    
3677

    
3678
class LUQueryNodeStorage(NoHooksLU):
3679
  """Logical unit for getting information on storage units on node(s).
3680

3681
  """
3682
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
3683
  _OP_PARAMS = [
3684
    _POutputFields,
3685
    ("nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
3686
    ("storage_type", ht.NoDefault, _CheckStorageType),
3687
    ("name", None, ht.TMaybeString),
3688
    ]
3689
  REQ_BGL = False
3690

    
3691
  def CheckArguments(self):
3692
    _CheckOutputFields(static=self._FIELDS_STATIC,
3693
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
3694
                       selected=self.op.output_fields)
3695

    
3696
  def ExpandNames(self):
3697
    self.needed_locks = {}
3698
    self.share_locks[locking.LEVEL_NODE] = 1
3699

    
3700
    if self.op.nodes:
3701
      self.needed_locks[locking.LEVEL_NODE] = \
3702
        _GetWantedNodes(self, self.op.nodes)
3703
    else:
3704
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3705

    
3706
  def Exec(self, feedback_fn):
3707
    """Computes the list of nodes and their attributes.
3708

3709
    """
3710
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
3711

    
3712
    # Always get name to sort by
3713
    if constants.SF_NAME in self.op.output_fields:
3714
      fields = self.op.output_fields[:]
3715
    else:
3716
      fields = [constants.SF_NAME] + self.op.output_fields
3717

    
3718
    # Never ask for node or type as it's only known to the LU
3719
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
3720
      while extra in fields:
3721
        fields.remove(extra)
3722

    
3723
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
3724
    name_idx = field_idx[constants.SF_NAME]
3725

    
3726
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3727
    data = self.rpc.call_storage_list(self.nodes,
3728
                                      self.op.storage_type, st_args,
3729
                                      self.op.name, fields)
3730

    
3731
    result = []
3732

    
3733
    for node in utils.NiceSort(self.nodes):
3734
      nresult = data[node]
3735
      if nresult.offline:
3736
        continue
3737

    
3738
      msg = nresult.fail_msg
3739
      if msg:
3740
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
3741
        continue
3742

    
3743
      rows = dict([(row[name_idx], row) for row in nresult.payload])
3744

    
3745
      for name in utils.NiceSort(rows.keys()):
3746
        row = rows[name]
3747

    
3748
        out = []
3749

    
3750
        for field in self.op.output_fields:
3751
          if field == constants.SF_NODE:
3752
            val = node
3753
          elif field == constants.SF_TYPE:
3754
            val = self.op.storage_type
3755
          elif field in field_idx:
3756
            val = row[field_idx[field]]
3757
          else:
3758
            raise errors.ParameterError(field)
3759

    
3760
          out.append(val)
3761

    
3762
        result.append(out)
3763

    
3764
    return result
3765

    
3766

    
3767
def _InstanceQuery(*args): # pylint: disable-msg=W0613
3768
  """Dummy until instance queries have been converted to query2.
3769

3770
  """
3771
  raise NotImplementedError
3772

    
3773

    
3774
#: Query type implementations
3775
_QUERY_IMPL = {
3776
  constants.QR_INSTANCE: _InstanceQuery,
3777
  constants.QR_NODE: _NodeQuery,
3778
  }
3779

    
3780

    
3781
def _GetQueryImplementation(name):
3782
  """Returns the implemtnation for a query type.
3783

3784
  @param name: Query type, must be one of L{constants.QR_OP_QUERY}
3785

3786
  """
3787
  try:
3788
    return _QUERY_IMPL[name]
3789
  except KeyError:
3790
    raise errors.OpPrereqError("Unknown query resource '%s'" % name,
3791
                               errors.ECODE_INVAL)
3792

    
3793

    
3794
class LUQuery(NoHooksLU):
3795
  """Query for resources/items of a certain kind.
3796

3797
  """
3798
  # pylint: disable-msg=W0142
3799
  _OP_PARAMS = [
3800
    ("what", ht.NoDefault, ht.TElemOf(constants.QR_OP_QUERY)),
3801
    ("fields", ht.NoDefault, ht.TListOf(ht.TNonEmptyString)),
3802
    ("filter", None, ht.TOr(ht.TNone,
3803
                            ht.TListOf(ht.TOr(ht.TNonEmptyString, ht.TList)))),
3804
    ]
3805
  REQ_BGL = False
3806

    
3807
  def CheckArguments(self):
3808
    qcls = _GetQueryImplementation(self.op.what)
3809
    names = qlang.ReadSimpleFilter("name", self.op.filter)
3810

    
3811
    self.impl = qcls(names, self.op.fields, False)
3812

    
3813
  def ExpandNames(self):
3814
    self.impl.ExpandNames(self)
3815

    
3816
  def DeclareLocks(self, level):
3817
    self.impl.DeclareLocks(self, level)
3818

    
3819
  def Exec(self, feedback_fn):
3820
    return self.impl.NewStyleQuery(self)
3821

    
3822

    
3823
class LUQueryFields(NoHooksLU):
3824
  """Query for resources/items of a certain kind.
3825

3826
  """
3827
  # pylint: disable-msg=W0142
3828
  _OP_PARAMS = [
3829
    ("what", ht.NoDefault, ht.TElemOf(constants.QR_OP_QUERY)),
3830
    ("fields", None, ht.TOr(ht.TNone, ht.TListOf(ht.TNonEmptyString))),
3831
    ]
3832
  REQ_BGL = False
3833

    
3834
  def CheckArguments(self):
3835
    self.qcls = _GetQueryImplementation(self.op.what)
3836

    
3837
  def ExpandNames(self):
3838
    self.needed_locks = {}
3839

    
3840
  def Exec(self, feedback_fn):
3841
    return self.qcls.FieldsQuery(self.op.fields)
3842

    
3843

    
3844
class LUModifyNodeStorage(NoHooksLU):
3845
  """Logical unit for modifying a storage volume on a node.
3846

3847
  """
3848
  _OP_PARAMS = [
3849
    _PNodeName,
3850
    ("storage_type", ht.NoDefault, _CheckStorageType),
3851
    ("name", ht.NoDefault, ht.TNonEmptyString),
3852
    ("changes", ht.NoDefault, ht.TDict),
3853
    ]
3854
  REQ_BGL = False
3855

    
3856
  def CheckArguments(self):
3857
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3858

    
3859
    storage_type = self.op.storage_type
3860

    
3861
    try:
3862
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
3863
    except KeyError:
3864
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
3865
                                 " modified" % storage_type,
3866
                                 errors.ECODE_INVAL)
3867

    
3868
    diff = set(self.op.changes.keys()) - modifiable
3869
    if diff:
3870
      raise errors.OpPrereqError("The following fields can not be modified for"
3871
                                 " storage units of type '%s': %r" %
3872
                                 (storage_type, list(diff)),
3873
                                 errors.ECODE_INVAL)
3874

    
3875
  def ExpandNames(self):
3876
    self.needed_locks = {
3877
      locking.LEVEL_NODE: self.op.node_name,
3878
      }
3879

    
3880
  def Exec(self, feedback_fn):
3881
    """Computes the list of nodes and their attributes.
3882

3883
    """
3884
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3885
    result = self.rpc.call_storage_modify(self.op.node_name,
3886
                                          self.op.storage_type, st_args,
3887
                                          self.op.name, self.op.changes)
3888
    result.Raise("Failed to modify storage unit '%s' on %s" %
3889
                 (self.op.name, self.op.node_name))
3890

    
3891

    
3892
class LUAddNode(LogicalUnit):
3893
  """Logical unit for adding node to the cluster.
3894

3895
  """
3896
  HPATH = "node-add"
3897
  HTYPE = constants.HTYPE_NODE
3898
  _OP_PARAMS = [
3899
    _PNodeName,
3900
    ("primary_ip", None, ht.NoType),
3901
    ("secondary_ip", None, ht.TMaybeString),
3902
    ("readd", False, ht.TBool),
3903
    ("group", None, ht.TMaybeString),
3904
    ("master_capable", None, ht.TMaybeBool),
3905
    ("vm_capable", None, ht.TMaybeBool),
3906
    ("ndparams", None, ht.TOr(ht.TDict, ht.TNone)),
3907
    ]
3908
  _NFLAGS = ["master_capable", "vm_capable"]
3909

    
3910
  def CheckArguments(self):
3911
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
3912
    # validate/normalize the node name
3913
    self.hostname = netutils.GetHostname(name=self.op.node_name,
3914
                                         family=self.primary_ip_family)
3915
    self.op.node_name = self.hostname.name
3916
    if self.op.readd and self.op.group:
3917
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
3918
                                 " being readded", errors.ECODE_INVAL)
3919

    
3920
  def BuildHooksEnv(self):
3921
    """Build hooks env.
3922

3923
    This will run on all nodes before, and on all nodes + the new node after.
3924

3925
    """
3926
    env = {
3927
      "OP_TARGET": self.op.node_name,
3928
      "NODE_NAME": self.op.node_name,
3929
      "NODE_PIP": self.op.primary_ip,
3930
      "NODE_SIP": self.op.secondary_ip,
3931
      "MASTER_CAPABLE": str(self.op.master_capable),
3932
      "VM_CAPABLE": str(self.op.vm_capable),
3933
      }
3934
    nodes_0 = self.cfg.GetNodeList()
3935
    nodes_1 = nodes_0 + [self.op.node_name, ]
3936
    return env, nodes_0, nodes_1
3937

    
3938
  def CheckPrereq(self):
3939
    """Check prerequisites.
3940

3941
    This checks:
3942
     - the new node is not already in the config
3943
     - it is resolvable
3944
     - its parameters (single/dual homed) matches the cluster
3945

3946
    Any errors are signaled by raising errors.OpPrereqError.
3947

3948
    """
3949
    cfg = self.cfg
3950
    hostname = self.hostname
3951
    node = hostname.name
3952
    primary_ip = self.op.primary_ip = hostname.ip
3953
    if self.op.secondary_ip is None:
3954
      if self.primary_ip_family == netutils.IP6Address.family:
3955
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
3956
                                   " IPv4 address must be given as secondary",
3957
                                   errors.ECODE_INVAL)
3958
      self.op.secondary_ip = primary_ip
3959

    
3960
    secondary_ip = self.op.secondary_ip
3961
    if not netutils.IP4Address.IsValid(secondary_ip):
3962
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
3963
                                 " address" % secondary_ip, errors.ECODE_INVAL)
3964

    
3965
    node_list = cfg.GetNodeList()
3966
    if not self.op.readd and node in node_list:
3967
      raise errors.OpPrereqError("Node %s is already in the configuration" %
3968
                                 node, errors.ECODE_EXISTS)
3969
    elif self.op.readd and node not in node_list:
3970
      raise errors.OpPrereqError("Node %s is not in the configuration" % node,
3971
                                 errors.ECODE_NOENT)
3972

    
3973
    self.changed_primary_ip = False
3974

    
3975
    for existing_node_name in node_list:
3976
      existing_node = cfg.GetNodeInfo(existing_node_name)
3977

    
3978
      if self.op.readd and node == existing_node_name:
3979
        if existing_node.secondary_ip != secondary_ip:
3980
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
3981
                                     " address configuration as before",
3982
                                     errors.ECODE_INVAL)
3983
        if existing_node.primary_ip != primary_ip:
3984
          self.changed_primary_ip = True
3985

    
3986
        continue
3987

    
3988
      if (existing_node.primary_ip == primary_ip or
3989
          existing_node.secondary_ip == primary_ip or
3990
          existing_node.primary_ip == secondary_ip or
3991
          existing_node.secondary_ip == secondary_ip):
3992
        raise errors.OpPrereqError("New node ip address(es) conflict with"
3993
                                   " existing node %s" % existing_node.name,
3994
                                   errors.ECODE_NOTUNIQUE)
3995

    
3996
    # After this 'if' block, None is no longer a valid value for the
3997
    # _capable op attributes
3998
    if self.op.readd:
3999
      old_node = self.cfg.GetNodeInfo(node)
4000
      assert old_node is not None, "Can't retrieve locked node %s" % node
4001
      for attr in self._NFLAGS:
4002
        if getattr(self.op, attr) is None:
4003
          setattr(self.op, attr, getattr(old_node, attr))
4004
    else:
4005
      for attr in self._NFLAGS:
4006
        if getattr(self.op, attr) is None:
4007
          setattr(self.op, attr, True)
4008

    
4009
    if self.op.readd and not self.op.vm_capable:
4010
      pri, sec = cfg.GetNodeInstances(node)
4011
      if pri or sec:
4012
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
4013
                                   " flag set to false, but it already holds"
4014
                                   " instances" % node,
4015
                                   errors.ECODE_STATE)
4016

    
4017
    # check that the type of the node (single versus dual homed) is the
4018
    # same as for the master
4019
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
4020
    master_singlehomed = myself.secondary_ip == myself.primary_ip
4021
    newbie_singlehomed = secondary_ip == primary_ip
4022
    if master_singlehomed != newbie_singlehomed:
4023
      if master_singlehomed:
4024
        raise errors.OpPrereqError("The master has no secondary ip but the"
4025
                                   " new node has one",
4026
                                   errors.ECODE_INVAL)
4027
      else:
4028
        raise errors.OpPrereqError("The master has a secondary ip but the"
4029
                                   " new node doesn't have one",
4030
                                   errors.ECODE_INVAL)
4031

    
4032
    # checks reachability
4033
    if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
4034
      raise errors.OpPrereqError("Node not reachable by ping",
4035
                                 errors.ECODE_ENVIRON)
4036

    
4037
    if not newbie_singlehomed:
4038
      # check reachability from my secondary ip to newbie's secondary ip
4039
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
4040
                           source=myself.secondary_ip):
4041
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
4042
                                   " based ping to node daemon port",
4043
                                   errors.ECODE_ENVIRON)
4044

    
4045
    if self.op.readd:
4046
      exceptions = [node]
4047
    else:
4048
      exceptions = []
4049

    
4050
    if self.op.master_capable:
4051
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
4052
    else:
4053
      self.master_candidate = False
4054

    
4055
    if self.op.readd:
4056
      self.new_node = old_node
4057
    else:
4058
      node_group = cfg.LookupNodeGroup(self.op.group)
4059
      self.new_node = objects.Node(name=node,
4060
                                   primary_ip=primary_ip,
4061
                                   secondary_ip=secondary_ip,
4062
                                   master_candidate=self.master_candidate,
4063