Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 1338f2b4

History | View | Annotate | Download (337.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
import os
30
import os.path
31
import time
32
import re
33
import platform
34
import logging
35
import copy
36
import OpenSSL
37

    
38
from ganeti import ssh
39
from ganeti import utils
40
from ganeti import errors
41
from ganeti import hypervisor
42
from ganeti import locking
43
from ganeti import constants
44
from ganeti import objects
45
from ganeti import serializer
46
from ganeti import ssconf
47
from ganeti import uidpool
48

    
49

    
50
class LogicalUnit(object):
51
  """Logical Unit base class.
52

53
  Subclasses must follow these rules:
54
    - implement ExpandNames
55
    - implement CheckPrereq (except when tasklets are used)
56
    - implement Exec (except when tasklets are used)
57
    - implement BuildHooksEnv
58
    - redefine HPATH and HTYPE
59
    - optionally redefine their run requirements:
60
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61

62
  Note that all commands require root permissions.
63

64
  @ivar dry_run_result: the value (if any) that will be returned to the caller
65
      in dry-run mode (signalled by opcode dry_run parameter)
66

67
  """
68
  HPATH = None
69
  HTYPE = None
70
  _OP_REQP = []
71
  REQ_BGL = True
72

    
73
  def __init__(self, processor, op, context, rpc):
74
    """Constructor for LogicalUnit.
75

76
    This needs to be overridden in derived classes in order to check op
77
    validity.
78

79
    """
80
    self.proc = processor
81
    self.op = op
82
    self.cfg = context.cfg
83
    self.context = context
84
    self.rpc = rpc
85
    # Dicts used to declare locking needs to mcpu
86
    self.needed_locks = None
87
    self.acquired_locks = {}
88
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
89
    self.add_locks = {}
90
    self.remove_locks = {}
91
    # Used to force good behavior when calling helper functions
92
    self.recalculate_locks = {}
93
    self.__ssh = None
94
    # logging
95
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
96
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
97
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
98
    # support for dry-run
99
    self.dry_run_result = None
100
    # support for generic debug attribute
101
    if (not hasattr(self.op, "debug_level") or
102
        not isinstance(self.op.debug_level, int)):
103
      self.op.debug_level = 0
104

    
105
    # Tasklets
106
    self.tasklets = None
107

    
108
    for attr_name in self._OP_REQP:
109
      attr_val = getattr(op, attr_name, None)
110
      if attr_val is None:
111
        raise errors.OpPrereqError("Required parameter '%s' missing" %
112
                                   attr_name, errors.ECODE_INVAL)
113

    
114
    self.CheckArguments()
115

    
116
  def __GetSSH(self):
117
    """Returns the SshRunner object
118

119
    """
120
    if not self.__ssh:
121
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
122
    return self.__ssh
123

    
124
  ssh = property(fget=__GetSSH)
125

    
126
  def CheckArguments(self):
127
    """Check syntactic validity for the opcode arguments.
128

129
    This method is for doing a simple syntactic check and ensure
130
    validity of opcode parameters, without any cluster-related
131
    checks. While the same can be accomplished in ExpandNames and/or
132
    CheckPrereq, doing these separate is better because:
133

134
      - ExpandNames is left as as purely a lock-related function
135
      - CheckPrereq is run after we have acquired locks (and possible
136
        waited for them)
137

138
    The function is allowed to change the self.op attribute so that
139
    later methods can no longer worry about missing parameters.
140

141
    """
142
    pass
143

    
144
  def ExpandNames(self):
145
    """Expand names for this LU.
146

147
    This method is called before starting to execute the opcode, and it should
148
    update all the parameters of the opcode to their canonical form (e.g. a
149
    short node name must be fully expanded after this method has successfully
150
    completed). This way locking, hooks, logging, ecc. can work correctly.
151

152
    LUs which implement this method must also populate the self.needed_locks
153
    member, as a dict with lock levels as keys, and a list of needed lock names
154
    as values. Rules:
155

156
      - use an empty dict if you don't need any lock
157
      - if you don't need any lock at a particular level omit that level
158
      - don't put anything for the BGL level
159
      - if you want all locks at a level use locking.ALL_SET as a value
160

161
    If you need to share locks (rather than acquire them exclusively) at one
162
    level you can modify self.share_locks, setting a true value (usually 1) for
163
    that level. By default locks are not shared.
164

165
    This function can also define a list of tasklets, which then will be
166
    executed in order instead of the usual LU-level CheckPrereq and Exec
167
    functions, if those are not defined by the LU.
168

169
    Examples::
170

171
      # Acquire all nodes and one instance
172
      self.needed_locks = {
173
        locking.LEVEL_NODE: locking.ALL_SET,
174
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
175
      }
176
      # Acquire just two nodes
177
      self.needed_locks = {
178
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
179
      }
180
      # Acquire no locks
181
      self.needed_locks = {} # No, you can't leave it to the default value None
182

183
    """
184
    # The implementation of this method is mandatory only if the new LU is
185
    # concurrent, so that old LUs don't need to be changed all at the same
186
    # time.
187
    if self.REQ_BGL:
188
      self.needed_locks = {} # Exclusive LUs don't need locks.
189
    else:
190
      raise NotImplementedError
191

    
192
  def DeclareLocks(self, level):
193
    """Declare LU locking needs for a level
194

195
    While most LUs can just declare their locking needs at ExpandNames time,
196
    sometimes there's the need to calculate some locks after having acquired
197
    the ones before. This function is called just before acquiring locks at a
198
    particular level, but after acquiring the ones at lower levels, and permits
199
    such calculations. It can be used to modify self.needed_locks, and by
200
    default it does nothing.
201

202
    This function is only called if you have something already set in
203
    self.needed_locks for the level.
204

205
    @param level: Locking level which is going to be locked
206
    @type level: member of ganeti.locking.LEVELS
207

208
    """
209

    
210
  def CheckPrereq(self):
211
    """Check prerequisites for this LU.
212

213
    This method should check that the prerequisites for the execution
214
    of this LU are fulfilled. It can do internode communication, but
215
    it should be idempotent - no cluster or system changes are
216
    allowed.
217

218
    The method should raise errors.OpPrereqError in case something is
219
    not fulfilled. Its return value is ignored.
220

221
    This method should also update all the parameters of the opcode to
222
    their canonical form if it hasn't been done by ExpandNames before.
223

224
    """
225
    if self.tasklets is not None:
226
      for (idx, tl) in enumerate(self.tasklets):
227
        logging.debug("Checking prerequisites for tasklet %s/%s",
228
                      idx + 1, len(self.tasklets))
229
        tl.CheckPrereq()
230
    else:
231
      raise NotImplementedError
232

    
233
  def Exec(self, feedback_fn):
234
    """Execute the LU.
235

236
    This method should implement the actual work. It should raise
237
    errors.OpExecError for failures that are somewhat dealt with in
238
    code, or expected.
239

240
    """
241
    if self.tasklets is not None:
242
      for (idx, tl) in enumerate(self.tasklets):
243
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
244
        tl.Exec(feedback_fn)
245
    else:
246
      raise NotImplementedError
247

    
248
  def BuildHooksEnv(self):
249
    """Build hooks environment for this LU.
250

251
    This method should return a three-node tuple consisting of: a dict
252
    containing the environment that will be used for running the
253
    specific hook for this LU, a list of node names on which the hook
254
    should run before the execution, and a list of node names on which
255
    the hook should run after the execution.
256

257
    The keys of the dict must not have 'GANETI_' prefixed as this will
258
    be handled in the hooks runner. Also note additional keys will be
259
    added by the hooks runner. If the LU doesn't define any
260
    environment, an empty dict (and not None) should be returned.
261

262
    No nodes should be returned as an empty list (and not None).
263

264
    Note that if the HPATH for a LU class is None, this function will
265
    not be called.
266

267
    """
268
    raise NotImplementedError
269

    
270
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
271
    """Notify the LU about the results of its hooks.
272

273
    This method is called every time a hooks phase is executed, and notifies
274
    the Logical Unit about the hooks' result. The LU can then use it to alter
275
    its result based on the hooks.  By default the method does nothing and the
276
    previous result is passed back unchanged but any LU can define it if it
277
    wants to use the local cluster hook-scripts somehow.
278

279
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
280
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
281
    @param hook_results: the results of the multi-node hooks rpc call
282
    @param feedback_fn: function used send feedback back to the caller
283
    @param lu_result: the previous Exec result this LU had, or None
284
        in the PRE phase
285
    @return: the new Exec result, based on the previous result
286
        and hook results
287

288
    """
289
    # API must be kept, thus we ignore the unused argument and could
290
    # be a function warnings
291
    # pylint: disable-msg=W0613,R0201
292
    return lu_result
293

    
294
  def _ExpandAndLockInstance(self):
295
    """Helper function to expand and lock an instance.
296

297
    Many LUs that work on an instance take its name in self.op.instance_name
298
    and need to expand it and then declare the expanded name for locking. This
299
    function does it, and then updates self.op.instance_name to the expanded
300
    name. It also initializes needed_locks as a dict, if this hasn't been done
301
    before.
302

303
    """
304
    if self.needed_locks is None:
305
      self.needed_locks = {}
306
    else:
307
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
308
        "_ExpandAndLockInstance called with instance-level locks set"
309
    self.op.instance_name = _ExpandInstanceName(self.cfg,
310
                                                self.op.instance_name)
311
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
312

    
313
  def _LockInstancesNodes(self, primary_only=False):
314
    """Helper function to declare instances' nodes for locking.
315

316
    This function should be called after locking one or more instances to lock
317
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
318
    with all primary or secondary nodes for instances already locked and
319
    present in self.needed_locks[locking.LEVEL_INSTANCE].
320

321
    It should be called from DeclareLocks, and for safety only works if
322
    self.recalculate_locks[locking.LEVEL_NODE] is set.
323

324
    In the future it may grow parameters to just lock some instance's nodes, or
325
    to just lock primaries or secondary nodes, if needed.
326

327
    If should be called in DeclareLocks in a way similar to::
328

329
      if level == locking.LEVEL_NODE:
330
        self._LockInstancesNodes()
331

332
    @type primary_only: boolean
333
    @param primary_only: only lock primary nodes of locked instances
334

335
    """
336
    assert locking.LEVEL_NODE in self.recalculate_locks, \
337
      "_LockInstancesNodes helper function called with no nodes to recalculate"
338

    
339
    # TODO: check if we're really been called with the instance locks held
340

    
341
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
342
    # future we might want to have different behaviors depending on the value
343
    # of self.recalculate_locks[locking.LEVEL_NODE]
344
    wanted_nodes = []
345
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
346
      instance = self.context.cfg.GetInstanceInfo(instance_name)
347
      wanted_nodes.append(instance.primary_node)
348
      if not primary_only:
349
        wanted_nodes.extend(instance.secondary_nodes)
350

    
351
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
352
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
353
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
354
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
355

    
356
    del self.recalculate_locks[locking.LEVEL_NODE]
357

    
358

    
359
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
360
  """Simple LU which runs no hooks.
361

362
  This LU is intended as a parent for other LogicalUnits which will
363
  run no hooks, in order to reduce duplicate code.
364

365
  """
366
  HPATH = None
367
  HTYPE = None
368

    
369
  def BuildHooksEnv(self):
370
    """Empty BuildHooksEnv for NoHooksLu.
371

372
    This just raises an error.
373

374
    """
375
    assert False, "BuildHooksEnv called for NoHooksLUs"
376

    
377

    
378
class Tasklet:
379
  """Tasklet base class.
380

381
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
382
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
383
  tasklets know nothing about locks.
384

385
  Subclasses must follow these rules:
386
    - Implement CheckPrereq
387
    - Implement Exec
388

389
  """
390
  def __init__(self, lu):
391
    self.lu = lu
392

    
393
    # Shortcuts
394
    self.cfg = lu.cfg
395
    self.rpc = lu.rpc
396

    
397
  def CheckPrereq(self):
398
    """Check prerequisites for this tasklets.
399

400
    This method should check whether the prerequisites for the execution of
401
    this tasklet are fulfilled. It can do internode communication, but it
402
    should be idempotent - no cluster or system changes are allowed.
403

404
    The method should raise errors.OpPrereqError in case something is not
405
    fulfilled. Its return value is ignored.
406

407
    This method should also update all parameters to their canonical form if it
408
    hasn't been done before.
409

410
    """
411
    raise NotImplementedError
412

    
413
  def Exec(self, feedback_fn):
414
    """Execute the tasklet.
415

416
    This method should implement the actual work. It should raise
417
    errors.OpExecError for failures that are somewhat dealt with in code, or
418
    expected.
419

420
    """
421
    raise NotImplementedError
422

    
423

    
424
def _GetWantedNodes(lu, nodes):
425
  """Returns list of checked and expanded node names.
426

427
  @type lu: L{LogicalUnit}
428
  @param lu: the logical unit on whose behalf we execute
429
  @type nodes: list
430
  @param nodes: list of node names or None for all nodes
431
  @rtype: list
432
  @return: the list of nodes, sorted
433
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
434

435
  """
436
  if not isinstance(nodes, list):
437
    raise errors.OpPrereqError("Invalid argument type 'nodes'",
438
                               errors.ECODE_INVAL)
439

    
440
  if not nodes:
441
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
442
      " non-empty list of nodes whose name is to be expanded.")
443

    
444
  wanted = [_ExpandNodeName(lu.cfg, name) for name in nodes]
445
  return utils.NiceSort(wanted)
446

    
447

    
448
def _GetWantedInstances(lu, instances):
449
  """Returns list of checked and expanded instance names.
450

451
  @type lu: L{LogicalUnit}
452
  @param lu: the logical unit on whose behalf we execute
453
  @type instances: list
454
  @param instances: list of instance names or None for all instances
455
  @rtype: list
456
  @return: the list of instances, sorted
457
  @raise errors.OpPrereqError: if the instances parameter is wrong type
458
  @raise errors.OpPrereqError: if any of the passed instances is not found
459

460
  """
461
  if not isinstance(instances, list):
462
    raise errors.OpPrereqError("Invalid argument type 'instances'",
463
                               errors.ECODE_INVAL)
464

    
465
  if instances:
466
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
467
  else:
468
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
469
  return wanted
470

    
471

    
472
def _CheckOutputFields(static, dynamic, selected):
473
  """Checks whether all selected fields are valid.
474

475
  @type static: L{utils.FieldSet}
476
  @param static: static fields set
477
  @type dynamic: L{utils.FieldSet}
478
  @param dynamic: dynamic fields set
479

480
  """
481
  f = utils.FieldSet()
482
  f.Extend(static)
483
  f.Extend(dynamic)
484

    
485
  delta = f.NonMatching(selected)
486
  if delta:
487
    raise errors.OpPrereqError("Unknown output fields selected: %s"
488
                               % ",".join(delta), errors.ECODE_INVAL)
489

    
490

    
491
def _CheckBooleanOpField(op, name):
492
  """Validates boolean opcode parameters.
493

494
  This will ensure that an opcode parameter is either a boolean value,
495
  or None (but that it always exists).
496

497
  """
498
  val = getattr(op, name, None)
499
  if not (val is None or isinstance(val, bool)):
500
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
501
                               (name, str(val)), errors.ECODE_INVAL)
502
  setattr(op, name, val)
503

    
504

    
505
def _CheckGlobalHvParams(params):
506
  """Validates that given hypervisor params are not global ones.
507

508
  This will ensure that instances don't get customised versions of
509
  global params.
510

511
  """
512
  used_globals = constants.HVC_GLOBALS.intersection(params)
513
  if used_globals:
514
    msg = ("The following hypervisor parameters are global and cannot"
515
           " be customized at instance level, please modify them at"
516
           " cluster level: %s" % utils.CommaJoin(used_globals))
517
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
518

    
519

    
520
def _CheckNodeOnline(lu, node):
521
  """Ensure that a given node is online.
522

523
  @param lu: the LU on behalf of which we make the check
524
  @param node: the node to check
525
  @raise errors.OpPrereqError: if the node is offline
526

527
  """
528
  if lu.cfg.GetNodeInfo(node).offline:
529
    raise errors.OpPrereqError("Can't use offline node %s" % node,
530
                               errors.ECODE_INVAL)
531

    
532

    
533
def _CheckNodeNotDrained(lu, node):
534
  """Ensure that a given node is not drained.
535

536
  @param lu: the LU on behalf of which we make the check
537
  @param node: the node to check
538
  @raise errors.OpPrereqError: if the node is drained
539

540
  """
541
  if lu.cfg.GetNodeInfo(node).drained:
542
    raise errors.OpPrereqError("Can't use drained node %s" % node,
543
                               errors.ECODE_INVAL)
544

    
545

    
546
def _CheckNodeHasOS(lu, node, os_name, force_variant):
547
  """Ensure that a node supports a given OS.
548

549
  @param lu: the LU on behalf of which we make the check
550
  @param node: the node to check
551
  @param os_name: the OS to query about
552
  @param force_variant: whether to ignore variant errors
553
  @raise errors.OpPrereqError: if the node is not supporting the OS
554

555
  """
556
  result = lu.rpc.call_os_get(node, os_name)
557
  result.Raise("OS '%s' not in supported OS list for node %s" %
558
               (os_name, node),
559
               prereq=True, ecode=errors.ECODE_INVAL)
560
  if not force_variant:
561
    _CheckOSVariant(result.payload, os_name)
562

    
563

    
564
def _RequireFileStorage():
565
  """Checks that file storage is enabled.
566

567
  @raise errors.OpPrereqError: when file storage is disabled
568

569
  """
570
  if not constants.ENABLE_FILE_STORAGE:
571
    raise errors.OpPrereqError("File storage disabled at configure time",
572
                               errors.ECODE_INVAL)
573

    
574

    
575
def _CheckDiskTemplate(template):
576
  """Ensure a given disk template is valid.
577

578
  """
579
  if template not in constants.DISK_TEMPLATES:
580
    msg = ("Invalid disk template name '%s', valid templates are: %s" %
581
           (template, utils.CommaJoin(constants.DISK_TEMPLATES)))
582
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
583
  if template == constants.DT_FILE:
584
    _RequireFileStorage()
585

    
586

    
587
def _CheckStorageType(storage_type):
588
  """Ensure a given storage type is valid.
589

590
  """
591
  if storage_type not in constants.VALID_STORAGE_TYPES:
592
    raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
593
                               errors.ECODE_INVAL)
594
  if storage_type == constants.ST_FILE:
595
    _RequireFileStorage()
596

    
597

    
598

    
599
def _CheckInstanceDown(lu, instance, reason):
600
  """Ensure that an instance is not running."""
601
  if instance.admin_up:
602
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
603
                               (instance.name, reason), errors.ECODE_STATE)
604

    
605
  pnode = instance.primary_node
606
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
607
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
608
              prereq=True, ecode=errors.ECODE_ENVIRON)
609

    
610
  if instance.name in ins_l.payload:
611
    raise errors.OpPrereqError("Instance %s is running, %s" %
612
                               (instance.name, reason), errors.ECODE_STATE)
613

    
614

    
615
def _ExpandItemName(fn, name, kind):
616
  """Expand an item name.
617

618
  @param fn: the function to use for expansion
619
  @param name: requested item name
620
  @param kind: text description ('Node' or 'Instance')
621
  @return: the resolved (full) name
622
  @raise errors.OpPrereqError: if the item is not found
623

624
  """
625
  full_name = fn(name)
626
  if full_name is None:
627
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
628
                               errors.ECODE_NOENT)
629
  return full_name
630

    
631

    
632
def _ExpandNodeName(cfg, name):
633
  """Wrapper over L{_ExpandItemName} for nodes."""
634
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
635

    
636

    
637
def _ExpandInstanceName(cfg, name):
638
  """Wrapper over L{_ExpandItemName} for instance."""
639
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
640

    
641

    
642
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
643
                          memory, vcpus, nics, disk_template, disks,
644
                          bep, hvp, hypervisor_name):
645
  """Builds instance related env variables for hooks
646

647
  This builds the hook environment from individual variables.
648

649
  @type name: string
650
  @param name: the name of the instance
651
  @type primary_node: string
652
  @param primary_node: the name of the instance's primary node
653
  @type secondary_nodes: list
654
  @param secondary_nodes: list of secondary nodes as strings
655
  @type os_type: string
656
  @param os_type: the name of the instance's OS
657
  @type status: boolean
658
  @param status: the should_run status of the instance
659
  @type memory: string
660
  @param memory: the memory size of the instance
661
  @type vcpus: string
662
  @param vcpus: the count of VCPUs the instance has
663
  @type nics: list
664
  @param nics: list of tuples (ip, mac, mode, link) representing
665
      the NICs the instance has
666
  @type disk_template: string
667
  @param disk_template: the disk template of the instance
668
  @type disks: list
669
  @param disks: the list of (size, mode) pairs
670
  @type bep: dict
671
  @param bep: the backend parameters for the instance
672
  @type hvp: dict
673
  @param hvp: the hypervisor parameters for the instance
674
  @type hypervisor_name: string
675
  @param hypervisor_name: the hypervisor for the instance
676
  @rtype: dict
677
  @return: the hook environment for this instance
678

679
  """
680
  if status:
681
    str_status = "up"
682
  else:
683
    str_status = "down"
684
  env = {
685
    "OP_TARGET": name,
686
    "INSTANCE_NAME": name,
687
    "INSTANCE_PRIMARY": primary_node,
688
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
689
    "INSTANCE_OS_TYPE": os_type,
690
    "INSTANCE_STATUS": str_status,
691
    "INSTANCE_MEMORY": memory,
692
    "INSTANCE_VCPUS": vcpus,
693
    "INSTANCE_DISK_TEMPLATE": disk_template,
694
    "INSTANCE_HYPERVISOR": hypervisor_name,
695
  }
696

    
697
  if nics:
698
    nic_count = len(nics)
699
    for idx, (ip, mac, mode, link) in enumerate(nics):
700
      if ip is None:
701
        ip = ""
702
      env["INSTANCE_NIC%d_IP" % idx] = ip
703
      env["INSTANCE_NIC%d_MAC" % idx] = mac
704
      env["INSTANCE_NIC%d_MODE" % idx] = mode
705
      env["INSTANCE_NIC%d_LINK" % idx] = link
706
      if mode == constants.NIC_MODE_BRIDGED:
707
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
708
  else:
709
    nic_count = 0
710

    
711
  env["INSTANCE_NIC_COUNT"] = nic_count
712

    
713
  if disks:
714
    disk_count = len(disks)
715
    for idx, (size, mode) in enumerate(disks):
716
      env["INSTANCE_DISK%d_SIZE" % idx] = size
717
      env["INSTANCE_DISK%d_MODE" % idx] = mode
718
  else:
719
    disk_count = 0
720

    
721
  env["INSTANCE_DISK_COUNT"] = disk_count
722

    
723
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
724
    for key, value in source.items():
725
      env["INSTANCE_%s_%s" % (kind, key)] = value
726

    
727
  return env
728

    
729

    
730
def _NICListToTuple(lu, nics):
731
  """Build a list of nic information tuples.
732

733
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
734
  value in LUQueryInstanceData.
735

736
  @type lu:  L{LogicalUnit}
737
  @param lu: the logical unit on whose behalf we execute
738
  @type nics: list of L{objects.NIC}
739
  @param nics: list of nics to convert to hooks tuples
740

741
  """
742
  hooks_nics = []
743
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
744
  for nic in nics:
745
    ip = nic.ip
746
    mac = nic.mac
747
    filled_params = objects.FillDict(c_nicparams, nic.nicparams)
748
    mode = filled_params[constants.NIC_MODE]
749
    link = filled_params[constants.NIC_LINK]
750
    hooks_nics.append((ip, mac, mode, link))
751
  return hooks_nics
752

    
753

    
754
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
755
  """Builds instance related env variables for hooks from an object.
756

757
  @type lu: L{LogicalUnit}
758
  @param lu: the logical unit on whose behalf we execute
759
  @type instance: L{objects.Instance}
760
  @param instance: the instance for which we should build the
761
      environment
762
  @type override: dict
763
  @param override: dictionary with key/values that will override
764
      our values
765
  @rtype: dict
766
  @return: the hook environment dictionary
767

768
  """
769
  cluster = lu.cfg.GetClusterInfo()
770
  bep = cluster.FillBE(instance)
771
  hvp = cluster.FillHV(instance)
772
  args = {
773
    'name': instance.name,
774
    'primary_node': instance.primary_node,
775
    'secondary_nodes': instance.secondary_nodes,
776
    'os_type': instance.os,
777
    'status': instance.admin_up,
778
    'memory': bep[constants.BE_MEMORY],
779
    'vcpus': bep[constants.BE_VCPUS],
780
    'nics': _NICListToTuple(lu, instance.nics),
781
    'disk_template': instance.disk_template,
782
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
783
    'bep': bep,
784
    'hvp': hvp,
785
    'hypervisor_name': instance.hypervisor,
786
  }
787
  if override:
788
    args.update(override)
789
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
790

    
791

    
792
def _AdjustCandidatePool(lu, exceptions):
793
  """Adjust the candidate pool after node operations.
794

795
  """
796
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
797
  if mod_list:
798
    lu.LogInfo("Promoted nodes to master candidate role: %s",
799
               utils.CommaJoin(node.name for node in mod_list))
800
    for name in mod_list:
801
      lu.context.ReaddNode(name)
802
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
803
  if mc_now > mc_max:
804
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
805
               (mc_now, mc_max))
806

    
807

    
808
def _DecideSelfPromotion(lu, exceptions=None):
809
  """Decide whether I should promote myself as a master candidate.
810

811
  """
812
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
813
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
814
  # the new node will increase mc_max with one, so:
815
  mc_should = min(mc_should + 1, cp_size)
816
  return mc_now < mc_should
817

    
818

    
819
def _CheckNicsBridgesExist(lu, target_nics, target_node,
820
                               profile=constants.PP_DEFAULT):
821
  """Check that the brigdes needed by a list of nics exist.
822

823
  """
824
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
825
  paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
826
                for nic in target_nics]
827
  brlist = [params[constants.NIC_LINK] for params in paramslist
828
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
829
  if brlist:
830
    result = lu.rpc.call_bridges_exist(target_node, brlist)
831
    result.Raise("Error checking bridges on destination node '%s'" %
832
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
833

    
834

    
835
def _CheckInstanceBridgesExist(lu, instance, node=None):
836
  """Check that the brigdes needed by an instance exist.
837

838
  """
839
  if node is None:
840
    node = instance.primary_node
841
  _CheckNicsBridgesExist(lu, instance.nics, node)
842

    
843

    
844
def _CheckOSVariant(os_obj, name):
845
  """Check whether an OS name conforms to the os variants specification.
846

847
  @type os_obj: L{objects.OS}
848
  @param os_obj: OS object to check
849
  @type name: string
850
  @param name: OS name passed by the user, to check for validity
851

852
  """
853
  if not os_obj.supported_variants:
854
    return
855
  try:
856
    variant = name.split("+", 1)[1]
857
  except IndexError:
858
    raise errors.OpPrereqError("OS name must include a variant",
859
                               errors.ECODE_INVAL)
860

    
861
  if variant not in os_obj.supported_variants:
862
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
863

    
864

    
865
def _GetNodeInstancesInner(cfg, fn):
866
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
867

    
868

    
869
def _GetNodeInstances(cfg, node_name):
870
  """Returns a list of all primary and secondary instances on a node.
871

872
  """
873

    
874
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
875

    
876

    
877
def _GetNodePrimaryInstances(cfg, node_name):
878
  """Returns primary instances on a node.
879

880
  """
881
  return _GetNodeInstancesInner(cfg,
882
                                lambda inst: node_name == inst.primary_node)
883

    
884

    
885
def _GetNodeSecondaryInstances(cfg, node_name):
886
  """Returns secondary instances on a node.
887

888
  """
889
  return _GetNodeInstancesInner(cfg,
890
                                lambda inst: node_name in inst.secondary_nodes)
891

    
892

    
893
def _GetStorageTypeArgs(cfg, storage_type):
894
  """Returns the arguments for a storage type.
895

896
  """
897
  # Special case for file storage
898
  if storage_type == constants.ST_FILE:
899
    # storage.FileStorage wants a list of storage directories
900
    return [[cfg.GetFileStorageDir()]]
901

    
902
  return []
903

    
904

    
905
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
906
  faulty = []
907

    
908
  for dev in instance.disks:
909
    cfg.SetDiskID(dev, node_name)
910

    
911
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
912
  result.Raise("Failed to get disk status from node %s" % node_name,
913
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
914

    
915
  for idx, bdev_status in enumerate(result.payload):
916
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
917
      faulty.append(idx)
918

    
919
  return faulty
920

    
921

    
922
def _FormatTimestamp(secs):
923
  """Formats a Unix timestamp with the local timezone.
924

925
  """
926
  return time.strftime("%F %T %Z", time.gmtime(secs))
927

    
928

    
929
class LUPostInitCluster(LogicalUnit):
930
  """Logical unit for running hooks after cluster initialization.
931

932
  """
933
  HPATH = "cluster-init"
934
  HTYPE = constants.HTYPE_CLUSTER
935
  _OP_REQP = []
936

    
937
  def BuildHooksEnv(self):
938
    """Build hooks env.
939

940
    """
941
    env = {"OP_TARGET": self.cfg.GetClusterName()}
942
    mn = self.cfg.GetMasterNode()
943
    return env, [], [mn]
944

    
945
  def CheckPrereq(self):
946
    """No prerequisites to check.
947

948
    """
949
    return True
950

    
951
  def Exec(self, feedback_fn):
952
    """Nothing to do.
953

954
    """
955
    return True
956

    
957

    
958
class LUDestroyCluster(LogicalUnit):
959
  """Logical unit for destroying the cluster.
960

961
  """
962
  HPATH = "cluster-destroy"
963
  HTYPE = constants.HTYPE_CLUSTER
964
  _OP_REQP = []
965

    
966
  def BuildHooksEnv(self):
967
    """Build hooks env.
968

969
    """
970
    env = {"OP_TARGET": self.cfg.GetClusterName()}
971
    return env, [], []
972

    
973
  def CheckPrereq(self):
974
    """Check prerequisites.
975

976
    This checks whether the cluster is empty.
977

978
    Any errors are signaled by raising errors.OpPrereqError.
979

980
    """
981
    master = self.cfg.GetMasterNode()
982

    
983
    nodelist = self.cfg.GetNodeList()
984
    if len(nodelist) != 1 or nodelist[0] != master:
985
      raise errors.OpPrereqError("There are still %d node(s) in"
986
                                 " this cluster." % (len(nodelist) - 1),
987
                                 errors.ECODE_INVAL)
988
    instancelist = self.cfg.GetInstanceList()
989
    if instancelist:
990
      raise errors.OpPrereqError("There are still %d instance(s) in"
991
                                 " this cluster." % len(instancelist),
992
                                 errors.ECODE_INVAL)
993

    
994
  def Exec(self, feedback_fn):
995
    """Destroys the cluster.
996

997
    """
998
    master = self.cfg.GetMasterNode()
999
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1000

    
1001
    # Run post hooks on master node before it's removed
1002
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
1003
    try:
1004
      hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
1005
    except:
1006
      # pylint: disable-msg=W0702
1007
      self.LogWarning("Errors occurred running hooks on %s" % master)
1008

    
1009
    result = self.rpc.call_node_stop_master(master, False)
1010
    result.Raise("Could not disable the master role")
1011

    
1012
    if modify_ssh_setup:
1013
      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1014
      utils.CreateBackup(priv_key)
1015
      utils.CreateBackup(pub_key)
1016

    
1017
    return master
1018

    
1019

    
1020
def _VerifyCertificateInner(filename, expired, not_before, not_after, now,
1021
                            warn_days=constants.SSL_CERT_EXPIRATION_WARN,
1022
                            error_days=constants.SSL_CERT_EXPIRATION_ERROR):
1023
  """Verifies certificate details for LUVerifyCluster.
1024

1025
  """
1026
  if expired:
1027
    msg = "Certificate %s is expired" % filename
1028

    
1029
    if not_before is not None and not_after is not None:
1030
      msg += (" (valid from %s to %s)" %
1031
              (_FormatTimestamp(not_before),
1032
               _FormatTimestamp(not_after)))
1033
    elif not_before is not None:
1034
      msg += " (valid from %s)" % _FormatTimestamp(not_before)
1035
    elif not_after is not None:
1036
      msg += " (valid until %s)" % _FormatTimestamp(not_after)
1037

    
1038
    return (LUVerifyCluster.ETYPE_ERROR, msg)
1039

    
1040
  elif not_before is not None and not_before > now:
1041
    return (LUVerifyCluster.ETYPE_WARNING,
1042
            "Certificate %s not yet valid (valid from %s)" %
1043
            (filename, _FormatTimestamp(not_before)))
1044

    
1045
  elif not_after is not None:
1046
    remaining_days = int((not_after - now) / (24 * 3600))
1047

    
1048
    msg = ("Certificate %s expires in %d days" % (filename, remaining_days))
1049

    
1050
    if remaining_days <= error_days:
1051
      return (LUVerifyCluster.ETYPE_ERROR, msg)
1052

    
1053
    if remaining_days <= warn_days:
1054
      return (LUVerifyCluster.ETYPE_WARNING, msg)
1055

    
1056
  return (None, None)
1057

    
1058

    
1059
def _VerifyCertificate(filename):
1060
  """Verifies a certificate for LUVerifyCluster.
1061

1062
  @type filename: string
1063
  @param filename: Path to PEM file
1064

1065
  """
1066
  try:
1067
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1068
                                           utils.ReadFile(filename))
1069
  except Exception, err: # pylint: disable-msg=W0703
1070
    return (LUVerifyCluster.ETYPE_ERROR,
1071
            "Failed to load X509 certificate %s: %s" % (filename, err))
1072

    
1073
  # Depending on the pyOpenSSL version, this can just return (None, None)
1074
  (not_before, not_after) = utils.GetX509CertValidity(cert)
1075

    
1076
  return _VerifyCertificateInner(filename, cert.has_expired(),
1077
                                 not_before, not_after, time.time())
1078

    
1079

    
1080
class LUVerifyCluster(LogicalUnit):
1081
  """Verifies the cluster status.
1082

1083
  """
1084
  HPATH = "cluster-verify"
1085
  HTYPE = constants.HTYPE_CLUSTER
1086
  _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
1087
  REQ_BGL = False
1088

    
1089
  TCLUSTER = "cluster"
1090
  TNODE = "node"
1091
  TINSTANCE = "instance"
1092

    
1093
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1094
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1095
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1096
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1097
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1098
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1099
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1100
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1101
  ENODEDRBD = (TNODE, "ENODEDRBD")
1102
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1103
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1104
  ENODEHV = (TNODE, "ENODEHV")
1105
  ENODELVM = (TNODE, "ENODELVM")
1106
  ENODEN1 = (TNODE, "ENODEN1")
1107
  ENODENET = (TNODE, "ENODENET")
1108
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1109
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1110
  ENODERPC = (TNODE, "ENODERPC")
1111
  ENODESSH = (TNODE, "ENODESSH")
1112
  ENODEVERSION = (TNODE, "ENODEVERSION")
1113
  ENODESETUP = (TNODE, "ENODESETUP")
1114
  ENODETIME = (TNODE, "ENODETIME")
1115

    
1116
  ETYPE_FIELD = "code"
1117
  ETYPE_ERROR = "ERROR"
1118
  ETYPE_WARNING = "WARNING"
1119

    
1120
  class NodeImage(object):
1121
    """A class representing the logical and physical status of a node.
1122

1123
    @ivar volumes: a structure as returned from
1124
        L{ganeti.backend.GetVolumeList} (runtime)
1125
    @ivar instances: a list of running instances (runtime)
1126
    @ivar pinst: list of configured primary instances (config)
1127
    @ivar sinst: list of configured secondary instances (config)
1128
    @ivar sbp: diction of {secondary-node: list of instances} of all peers
1129
        of this node (config)
1130
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1131
    @ivar dfree: free disk, as reported by the node (runtime)
1132
    @ivar offline: the offline status (config)
1133
    @type rpc_fail: boolean
1134
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1135
        not whether the individual keys were correct) (runtime)
1136
    @type lvm_fail: boolean
1137
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1138
    @type hyp_fail: boolean
1139
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1140
    @type ghost: boolean
1141
    @ivar ghost: whether this is a known node or not (config)
1142

1143
    """
1144
    def __init__(self, offline=False):
1145
      self.volumes = {}
1146
      self.instances = []
1147
      self.pinst = []
1148
      self.sinst = []
1149
      self.sbp = {}
1150
      self.mfree = 0
1151
      self.dfree = 0
1152
      self.offline = offline
1153
      self.rpc_fail = False
1154
      self.lvm_fail = False
1155
      self.hyp_fail = False
1156
      self.ghost = False
1157

    
1158
  def ExpandNames(self):
1159
    self.needed_locks = {
1160
      locking.LEVEL_NODE: locking.ALL_SET,
1161
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1162
    }
1163
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1164

    
1165
  def _Error(self, ecode, item, msg, *args, **kwargs):
1166
    """Format an error message.
1167

1168
    Based on the opcode's error_codes parameter, either format a
1169
    parseable error code, or a simpler error string.
1170

1171
    This must be called only from Exec and functions called from Exec.
1172

1173
    """
1174
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1175
    itype, etxt = ecode
1176
    # first complete the msg
1177
    if args:
1178
      msg = msg % args
1179
    # then format the whole message
1180
    if self.op.error_codes:
1181
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1182
    else:
1183
      if item:
1184
        item = " " + item
1185
      else:
1186
        item = ""
1187
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1188
    # and finally report it via the feedback_fn
1189
    self._feedback_fn("  - %s" % msg)
1190

    
1191
  def _ErrorIf(self, cond, *args, **kwargs):
1192
    """Log an error message if the passed condition is True.
1193

1194
    """
1195
    cond = bool(cond) or self.op.debug_simulate_errors
1196
    if cond:
1197
      self._Error(*args, **kwargs)
1198
    # do not mark the operation as failed for WARN cases only
1199
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1200
      self.bad = self.bad or cond
1201

    
1202
  def _VerifyNode(self, ninfo, nresult):
1203
    """Run multiple tests against a node.
1204

1205
    Test list:
1206

1207
      - compares ganeti version
1208
      - checks vg existence and size > 20G
1209
      - checks config file checksum
1210
      - checks ssh to other nodes
1211

1212
    @type ninfo: L{objects.Node}
1213
    @param ninfo: the node to check
1214
    @param nresult: the results from the node
1215
    @rtype: boolean
1216
    @return: whether overall this call was successful (and we can expect
1217
         reasonable values in the respose)
1218

1219
    """
1220
    node = ninfo.name
1221
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1222

    
1223
    # main result, nresult should be a non-empty dict
1224
    test = not nresult or not isinstance(nresult, dict)
1225
    _ErrorIf(test, self.ENODERPC, node,
1226
                  "unable to verify node: no data returned")
1227
    if test:
1228
      return False
1229

    
1230
    # compares ganeti version
1231
    local_version = constants.PROTOCOL_VERSION
1232
    remote_version = nresult.get("version", None)
1233
    test = not (remote_version and
1234
                isinstance(remote_version, (list, tuple)) and
1235
                len(remote_version) == 2)
1236
    _ErrorIf(test, self.ENODERPC, node,
1237
             "connection to node returned invalid data")
1238
    if test:
1239
      return False
1240

    
1241
    test = local_version != remote_version[0]
1242
    _ErrorIf(test, self.ENODEVERSION, node,
1243
             "incompatible protocol versions: master %s,"
1244
             " node %s", local_version, remote_version[0])
1245
    if test:
1246
      return False
1247

    
1248
    # node seems compatible, we can actually try to look into its results
1249

    
1250
    # full package version
1251
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1252
                  self.ENODEVERSION, node,
1253
                  "software version mismatch: master %s, node %s",
1254
                  constants.RELEASE_VERSION, remote_version[1],
1255
                  code=self.ETYPE_WARNING)
1256

    
1257
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1258
    if isinstance(hyp_result, dict):
1259
      for hv_name, hv_result in hyp_result.iteritems():
1260
        test = hv_result is not None
1261
        _ErrorIf(test, self.ENODEHV, node,
1262
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1263

    
1264

    
1265
    test = nresult.get(constants.NV_NODESETUP,
1266
                           ["Missing NODESETUP results"])
1267
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1268
             "; ".join(test))
1269

    
1270
    return True
1271

    
1272
  def _VerifyNodeTime(self, ninfo, nresult,
1273
                      nvinfo_starttime, nvinfo_endtime):
1274
    """Check the node time.
1275

1276
    @type ninfo: L{objects.Node}
1277
    @param ninfo: the node to check
1278
    @param nresult: the remote results for the node
1279
    @param nvinfo_starttime: the start time of the RPC call
1280
    @param nvinfo_endtime: the end time of the RPC call
1281

1282
    """
1283
    node = ninfo.name
1284
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1285

    
1286
    ntime = nresult.get(constants.NV_TIME, None)
1287
    try:
1288
      ntime_merged = utils.MergeTime(ntime)
1289
    except (ValueError, TypeError):
1290
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1291
      return
1292

    
1293
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1294
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1295
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1296
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1297
    else:
1298
      ntime_diff = None
1299

    
1300
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1301
             "Node time diverges by at least %s from master node time",
1302
             ntime_diff)
1303

    
1304
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1305
    """Check the node time.
1306

1307
    @type ninfo: L{objects.Node}
1308
    @param ninfo: the node to check
1309
    @param nresult: the remote results for the node
1310
    @param vg_name: the configured VG name
1311

1312
    """
1313
    if vg_name is None:
1314
      return
1315

    
1316
    node = ninfo.name
1317
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1318

    
1319
    # checks vg existence and size > 20G
1320
    vglist = nresult.get(constants.NV_VGLIST, None)
1321
    test = not vglist
1322
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1323
    if not test:
1324
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1325
                                            constants.MIN_VG_SIZE)
1326
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1327

    
1328
    # check pv names
1329
    pvlist = nresult.get(constants.NV_PVLIST, None)
1330
    test = pvlist is None
1331
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1332
    if not test:
1333
      # check that ':' is not present in PV names, since it's a
1334
      # special character for lvcreate (denotes the range of PEs to
1335
      # use on the PV)
1336
      for _, pvname, owner_vg in pvlist:
1337
        test = ":" in pvname
1338
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1339
                 " '%s' of VG '%s'", pvname, owner_vg)
1340

    
1341
  def _VerifyNodeNetwork(self, ninfo, nresult):
1342
    """Check the node time.
1343

1344
    @type ninfo: L{objects.Node}
1345
    @param ninfo: the node to check
1346
    @param nresult: the remote results for the node
1347

1348
    """
1349
    node = ninfo.name
1350
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1351

    
1352
    test = constants.NV_NODELIST not in nresult
1353
    _ErrorIf(test, self.ENODESSH, node,
1354
             "node hasn't returned node ssh connectivity data")
1355
    if not test:
1356
      if nresult[constants.NV_NODELIST]:
1357
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1358
          _ErrorIf(True, self.ENODESSH, node,
1359
                   "ssh communication with node '%s': %s", a_node, a_msg)
1360

    
1361
    test = constants.NV_NODENETTEST not in nresult
1362
    _ErrorIf(test, self.ENODENET, node,
1363
             "node hasn't returned node tcp connectivity data")
1364
    if not test:
1365
      if nresult[constants.NV_NODENETTEST]:
1366
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1367
        for anode in nlist:
1368
          _ErrorIf(True, self.ENODENET, node,
1369
                   "tcp communication with node '%s': %s",
1370
                   anode, nresult[constants.NV_NODENETTEST][anode])
1371

    
1372
  def _VerifyInstance(self, instance, instanceconfig, node_image):
1373
    """Verify an instance.
1374

1375
    This function checks to see if the required block devices are
1376
    available on the instance's node.
1377

1378
    """
1379
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1380
    node_current = instanceconfig.primary_node
1381

    
1382
    node_vol_should = {}
1383
    instanceconfig.MapLVsByNode(node_vol_should)
1384

    
1385
    for node in node_vol_should:
1386
      n_img = node_image[node]
1387
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1388
        # ignore missing volumes on offline or broken nodes
1389
        continue
1390
      for volume in node_vol_should[node]:
1391
        test = volume not in n_img.volumes
1392
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1393
                 "volume %s missing on node %s", volume, node)
1394

    
1395
    if instanceconfig.admin_up:
1396
      pri_img = node_image[node_current]
1397
      test = instance not in pri_img.instances and not pri_img.offline
1398
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1399
               "instance not running on its primary node %s",
1400
               node_current)
1401

    
1402
    for node, n_img in node_image.items():
1403
      if (not node == node_current):
1404
        test = instance in n_img.instances
1405
        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1406
                 "instance should not run on node %s", node)
1407

    
1408
  def _VerifyOrphanVolumes(self, node_vol_should, node_image):
1409
    """Verify if there are any unknown volumes in the cluster.
1410

1411
    The .os, .swap and backup volumes are ignored. All other volumes are
1412
    reported as unknown.
1413

1414
    """
1415
    for node, n_img in node_image.items():
1416
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1417
        # skip non-healthy nodes
1418
        continue
1419
      for volume in n_img.volumes:
1420
        test = (node not in node_vol_should or
1421
                volume not in node_vol_should[node])
1422
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1423
                      "volume %s is unknown", volume)
1424

    
1425
  def _VerifyOrphanInstances(self, instancelist, node_image):
1426
    """Verify the list of running instances.
1427

1428
    This checks what instances are running but unknown to the cluster.
1429

1430
    """
1431
    for node, n_img in node_image.items():
1432
      for o_inst in n_img.instances:
1433
        test = o_inst not in instancelist
1434
        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1435
                      "instance %s on node %s should not exist", o_inst, node)
1436

    
1437
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1438
    """Verify N+1 Memory Resilience.
1439

1440
    Check that if one single node dies we can still start all the
1441
    instances it was primary for.
1442

1443
    """
1444
    for node, n_img in node_image.items():
1445
      # This code checks that every node which is now listed as
1446
      # secondary has enough memory to host all instances it is
1447
      # supposed to should a single other node in the cluster fail.
1448
      # FIXME: not ready for failover to an arbitrary node
1449
      # FIXME: does not support file-backed instances
1450
      # WARNING: we currently take into account down instances as well
1451
      # as up ones, considering that even if they're down someone
1452
      # might want to start them even in the event of a node failure.
1453
      for prinode, instances in n_img.sbp.items():
1454
        needed_mem = 0
1455
        for instance in instances:
1456
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1457
          if bep[constants.BE_AUTO_BALANCE]:
1458
            needed_mem += bep[constants.BE_MEMORY]
1459
        test = n_img.mfree < needed_mem
1460
        self._ErrorIf(test, self.ENODEN1, node,
1461
                      "not enough memory on to accommodate"
1462
                      " failovers should peer node %s fail", prinode)
1463

    
1464
  def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum,
1465
                       master_files):
1466
    """Verifies and computes the node required file checksums.
1467

1468
    @type ninfo: L{objects.Node}
1469
    @param ninfo: the node to check
1470
    @param nresult: the remote results for the node
1471
    @param file_list: required list of files
1472
    @param local_cksum: dictionary of local files and their checksums
1473
    @param master_files: list of files that only masters should have
1474

1475
    """
1476
    node = ninfo.name
1477
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1478

    
1479
    remote_cksum = nresult.get(constants.NV_FILELIST, None)
1480
    test = not isinstance(remote_cksum, dict)
1481
    _ErrorIf(test, self.ENODEFILECHECK, node,
1482
             "node hasn't returned file checksum data")
1483
    if test:
1484
      return
1485

    
1486
    for file_name in file_list:
1487
      node_is_mc = ninfo.master_candidate
1488
      must_have = (file_name not in master_files) or node_is_mc
1489
      # missing
1490
      test1 = file_name not in remote_cksum
1491
      # invalid checksum
1492
      test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1493
      # existing and good
1494
      test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1495
      _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1496
               "file '%s' missing", file_name)
1497
      _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1498
               "file '%s' has wrong checksum", file_name)
1499
      # not candidate and this is not a must-have file
1500
      _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1501
               "file '%s' should not exist on non master"
1502
               " candidates (and the file is outdated)", file_name)
1503
      # all good, except non-master/non-must have combination
1504
      _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1505
               "file '%s' should not exist"
1506
               " on non master candidates", file_name)
1507

    
1508
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_map):
1509
    """Verifies and the node DRBD status.
1510

1511
    @type ninfo: L{objects.Node}
1512
    @param ninfo: the node to check
1513
    @param nresult: the remote results for the node
1514
    @param instanceinfo: the dict of instances
1515
    @param drbd_map: the DRBD map as returned by
1516
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
1517

1518
    """
1519
    node = ninfo.name
1520
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1521

    
1522
    # compute the DRBD minors
1523
    node_drbd = {}
1524
    for minor, instance in drbd_map[node].items():
1525
      test = instance not in instanceinfo
1526
      _ErrorIf(test, self.ECLUSTERCFG, None,
1527
               "ghost instance '%s' in temporary DRBD map", instance)
1528
        # ghost instance should not be running, but otherwise we
1529
        # don't give double warnings (both ghost instance and
1530
        # unallocated minor in use)
1531
      if test:
1532
        node_drbd[minor] = (instance, False)
1533
      else:
1534
        instance = instanceinfo[instance]
1535
        node_drbd[minor] = (instance.name, instance.admin_up)
1536

    
1537
    # and now check them
1538
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
1539
    test = not isinstance(used_minors, (tuple, list))
1540
    _ErrorIf(test, self.ENODEDRBD, node,
1541
             "cannot parse drbd status file: %s", str(used_minors))
1542
    if test:
1543
      # we cannot check drbd status
1544
      return
1545

    
1546
    for minor, (iname, must_exist) in node_drbd.items():
1547
      test = minor not in used_minors and must_exist
1548
      _ErrorIf(test, self.ENODEDRBD, node,
1549
               "drbd minor %d of instance %s is not active", minor, iname)
1550
    for minor in used_minors:
1551
      test = minor not in node_drbd
1552
      _ErrorIf(test, self.ENODEDRBD, node,
1553
               "unallocated drbd minor %d is in use", minor)
1554

    
1555
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
1556
    """Verifies and updates the node volume data.
1557

1558
    This function will update a L{NodeImage}'s internal structures
1559
    with data from the remote call.
1560

1561
    @type ninfo: L{objects.Node}
1562
    @param ninfo: the node to check
1563
    @param nresult: the remote results for the node
1564
    @param nimg: the node image object
1565
    @param vg_name: the configured VG name
1566

1567
    """
1568
    node = ninfo.name
1569
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1570

    
1571
    nimg.lvm_fail = True
1572
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1573
    if vg_name is None:
1574
      pass
1575
    elif isinstance(lvdata, basestring):
1576
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1577
               utils.SafeEncode(lvdata))
1578
    elif not isinstance(lvdata, dict):
1579
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1580
    else:
1581
      nimg.volumes = lvdata
1582
      nimg.lvm_fail = False
1583

    
1584
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
1585
    """Verifies and updates the node instance list.
1586

1587
    If the listing was successful, then updates this node's instance
1588
    list. Otherwise, it marks the RPC call as failed for the instance
1589
    list key.
1590

1591
    @type ninfo: L{objects.Node}
1592
    @param ninfo: the node to check
1593
    @param nresult: the remote results for the node
1594
    @param nimg: the node image object
1595

1596
    """
1597
    idata = nresult.get(constants.NV_INSTANCELIST, None)
1598
    test = not isinstance(idata, list)
1599
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
1600
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
1601
    if test:
1602
      nimg.hyp_fail = True
1603
    else:
1604
      nimg.instances = idata
1605

    
1606
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
1607
    """Verifies and computes a node information map
1608

1609
    @type ninfo: L{objects.Node}
1610
    @param ninfo: the node to check
1611
    @param nresult: the remote results for the node
1612
    @param nimg: the node image object
1613
    @param vg_name: the configured VG name
1614

1615
    """
1616
    node = ninfo.name
1617
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1618

    
1619
    # try to read free memory (from the hypervisor)
1620
    hv_info = nresult.get(constants.NV_HVINFO, None)
1621
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
1622
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1623
    if not test:
1624
      try:
1625
        nimg.mfree = int(hv_info["memory_free"])
1626
      except (ValueError, TypeError):
1627
        _ErrorIf(True, self.ENODERPC, node,
1628
                 "node returned invalid nodeinfo, check hypervisor")
1629

    
1630
    # FIXME: devise a free space model for file based instances as well
1631
    if vg_name is not None:
1632
      test = (constants.NV_VGLIST not in nresult or
1633
              vg_name not in nresult[constants.NV_VGLIST])
1634
      _ErrorIf(test, self.ENODELVM, node,
1635
               "node didn't return data for the volume group '%s'"
1636
               " - it is either missing or broken", vg_name)
1637
      if not test:
1638
        try:
1639
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
1640
        except (ValueError, TypeError):
1641
          _ErrorIf(True, self.ENODERPC, node,
1642
                   "node returned invalid LVM info, check LVM status")
1643

    
1644
  def CheckPrereq(self):
1645
    """Check prerequisites.
1646

1647
    Transform the list of checks we're going to skip into a set and check that
1648
    all its members are valid.
1649

1650
    """
1651
    self.skip_set = frozenset(self.op.skip_checks)
1652
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1653
      raise errors.OpPrereqError("Invalid checks to be skipped specified",
1654
                                 errors.ECODE_INVAL)
1655

    
1656
  def BuildHooksEnv(self):
1657
    """Build hooks env.
1658

1659
    Cluster-Verify hooks just ran in the post phase and their failure makes
1660
    the output be logged in the verify output and the verification to fail.
1661

1662
    """
1663
    all_nodes = self.cfg.GetNodeList()
1664
    env = {
1665
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1666
      }
1667
    for node in self.cfg.GetAllNodesInfo().values():
1668
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1669

    
1670
    return env, [], all_nodes
1671

    
1672
  def Exec(self, feedback_fn):
1673
    """Verify integrity of cluster, performing various test on nodes.
1674

1675
    """
1676
    self.bad = False
1677
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1678
    verbose = self.op.verbose
1679
    self._feedback_fn = feedback_fn
1680
    feedback_fn("* Verifying global settings")
1681
    for msg in self.cfg.VerifyConfig():
1682
      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1683

    
1684
    # Check the cluster certificates
1685
    for cert_filename in constants.ALL_CERT_FILES:
1686
      (errcode, msg) = _VerifyCertificate(cert_filename)
1687
      _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
1688

    
1689
    vg_name = self.cfg.GetVGName()
1690
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1691
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
1692
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1693
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1694
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1695
                        for iname in instancelist)
1696
    i_non_redundant = [] # Non redundant instances
1697
    i_non_a_balanced = [] # Non auto-balanced instances
1698
    n_offline = 0 # Count of offline nodes
1699
    n_drained = 0 # Count of nodes being drained
1700
    node_vol_should = {}
1701

    
1702
    # FIXME: verify OS list
1703
    # do local checksums
1704
    master_files = [constants.CLUSTER_CONF_FILE]
1705

    
1706
    file_names = ssconf.SimpleStore().GetFileList()
1707
    file_names.extend(constants.ALL_CERT_FILES)
1708
    file_names.extend(master_files)
1709

    
1710
    local_checksums = utils.FingerprintFiles(file_names)
1711

    
1712
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1713
    node_verify_param = {
1714
      constants.NV_FILELIST: file_names,
1715
      constants.NV_NODELIST: [node.name for node in nodeinfo
1716
                              if not node.offline],
1717
      constants.NV_HYPERVISOR: hypervisors,
1718
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1719
                                  node.secondary_ip) for node in nodeinfo
1720
                                 if not node.offline],
1721
      constants.NV_INSTANCELIST: hypervisors,
1722
      constants.NV_VERSION: None,
1723
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1724
      constants.NV_NODESETUP: None,
1725
      constants.NV_TIME: None,
1726
      }
1727

    
1728
    if vg_name is not None:
1729
      node_verify_param[constants.NV_VGLIST] = None
1730
      node_verify_param[constants.NV_LVLIST] = vg_name
1731
      node_verify_param[constants.NV_PVLIST] = [vg_name]
1732
      node_verify_param[constants.NV_DRBDLIST] = None
1733

    
1734
    # Build our expected cluster state
1735
    node_image = dict((node.name, self.NodeImage(offline=node.offline))
1736
                      for node in nodeinfo)
1737

    
1738
    for instance in instancelist:
1739
      inst_config = instanceinfo[instance]
1740

    
1741
      for nname in inst_config.all_nodes:
1742
        if nname not in node_image:
1743
          # ghost node
1744
          gnode = self.NodeImage()
1745
          gnode.ghost = True
1746
          node_image[nname] = gnode
1747

    
1748
      inst_config.MapLVsByNode(node_vol_should)
1749

    
1750
      pnode = inst_config.primary_node
1751
      node_image[pnode].pinst.append(instance)
1752

    
1753
      for snode in inst_config.secondary_nodes:
1754
        nimg = node_image[snode]
1755
        nimg.sinst.append(instance)
1756
        if pnode not in nimg.sbp:
1757
          nimg.sbp[pnode] = []
1758
        nimg.sbp[pnode].append(instance)
1759

    
1760
    # At this point, we have the in-memory data structures complete,
1761
    # except for the runtime information, which we'll gather next
1762

    
1763
    # Due to the way our RPC system works, exact response times cannot be
1764
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
1765
    # time before and after executing the request, we can at least have a time
1766
    # window.
1767
    nvinfo_starttime = time.time()
1768
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1769
                                           self.cfg.GetClusterName())
1770
    nvinfo_endtime = time.time()
1771

    
1772
    cluster = self.cfg.GetClusterInfo()
1773
    master_node = self.cfg.GetMasterNode()
1774
    all_drbd_map = self.cfg.ComputeDRBDMap()
1775

    
1776
    feedback_fn("* Verifying node status")
1777
    for node_i in nodeinfo:
1778
      node = node_i.name
1779
      nimg = node_image[node]
1780

    
1781
      if node_i.offline:
1782
        if verbose:
1783
          feedback_fn("* Skipping offline node %s" % (node,))
1784
        n_offline += 1
1785
        continue
1786

    
1787
      if node == master_node:
1788
        ntype = "master"
1789
      elif node_i.master_candidate:
1790
        ntype = "master candidate"
1791
      elif node_i.drained:
1792
        ntype = "drained"
1793
        n_drained += 1
1794
      else:
1795
        ntype = "regular"
1796
      if verbose:
1797
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1798

    
1799
      msg = all_nvinfo[node].fail_msg
1800
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1801
      if msg:
1802
        nimg.rpc_fail = True
1803
        continue
1804

    
1805
      nresult = all_nvinfo[node].payload
1806

    
1807
      nimg.call_ok = self._VerifyNode(node_i, nresult)
1808
      self._VerifyNodeNetwork(node_i, nresult)
1809
      self._VerifyNodeLVM(node_i, nresult, vg_name)
1810
      self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums,
1811
                            master_files)
1812
      self._VerifyNodeDrbd(node_i, nresult, instanceinfo, all_drbd_map)
1813
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
1814

    
1815
      self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
1816
      self._UpdateNodeInstances(node_i, nresult, nimg)
1817
      self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
1818

    
1819
    feedback_fn("* Verifying instance status")
1820
    for instance in instancelist:
1821
      if verbose:
1822
        feedback_fn("* Verifying instance %s" % instance)
1823
      inst_config = instanceinfo[instance]
1824
      self._VerifyInstance(instance, inst_config, node_image)
1825
      inst_nodes_offline = []
1826

    
1827
      pnode = inst_config.primary_node
1828
      pnode_img = node_image[pnode]
1829
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1830
               self.ENODERPC, pnode, "instance %s, connection to"
1831
               " primary node failed", instance)
1832

    
1833
      if pnode_img.offline:
1834
        inst_nodes_offline.append(pnode)
1835

    
1836
      # If the instance is non-redundant we cannot survive losing its primary
1837
      # node, so we are not N+1 compliant. On the other hand we have no disk
1838
      # templates with more than one secondary so that situation is not well
1839
      # supported either.
1840
      # FIXME: does not support file-backed instances
1841
      if not inst_config.secondary_nodes:
1842
        i_non_redundant.append(instance)
1843
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
1844
               instance, "instance has multiple secondary nodes: %s",
1845
               utils.CommaJoin(inst_config.secondary_nodes),
1846
               code=self.ETYPE_WARNING)
1847

    
1848
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1849
        i_non_a_balanced.append(instance)
1850

    
1851
      for snode in inst_config.secondary_nodes:
1852
        s_img = node_image[snode]
1853
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
1854
                 "instance %s, connection to secondary node failed", instance)
1855

    
1856
        if s_img.offline:
1857
          inst_nodes_offline.append(snode)
1858

    
1859
      # warn that the instance lives on offline nodes
1860
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1861
               "instance lives on offline node(s) %s",
1862
               utils.CommaJoin(inst_nodes_offline))
1863
      # ... or ghost nodes
1864
      for node in inst_config.all_nodes:
1865
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
1866
                 "instance lives on ghost node %s", node)
1867

    
1868
    feedback_fn("* Verifying orphan volumes")
1869
    self._VerifyOrphanVolumes(node_vol_should, node_image)
1870

    
1871
    feedback_fn("* Verifying oprhan instances")
1872
    self._VerifyOrphanInstances(instancelist, node_image)
1873

    
1874
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1875
      feedback_fn("* Verifying N+1 Memory redundancy")
1876
      self._VerifyNPlusOneMemory(node_image, instanceinfo)
1877

    
1878
    feedback_fn("* Other Notes")
1879
    if i_non_redundant:
1880
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1881
                  % len(i_non_redundant))
1882

    
1883
    if i_non_a_balanced:
1884
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1885
                  % len(i_non_a_balanced))
1886

    
1887
    if n_offline:
1888
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
1889

    
1890
    if n_drained:
1891
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
1892

    
1893
    return not self.bad
1894

    
1895
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1896
    """Analyze the post-hooks' result
1897

1898
    This method analyses the hook result, handles it, and sends some
1899
    nicely-formatted feedback back to the user.
1900

1901
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1902
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1903
    @param hooks_results: the results of the multi-node hooks rpc call
1904
    @param feedback_fn: function used send feedback back to the caller
1905
    @param lu_result: previous Exec result
1906
    @return: the new Exec result, based on the previous result
1907
        and hook results
1908

1909
    """
1910
    # We only really run POST phase hooks, and are only interested in
1911
    # their results
1912
    if phase == constants.HOOKS_PHASE_POST:
1913
      # Used to change hooks' output to proper indentation
1914
      indent_re = re.compile('^', re.M)
1915
      feedback_fn("* Hooks Results")
1916
      assert hooks_results, "invalid result from hooks"
1917

    
1918
      for node_name in hooks_results:
1919
        res = hooks_results[node_name]
1920
        msg = res.fail_msg
1921
        test = msg and not res.offline
1922
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
1923
                      "Communication failure in hooks execution: %s", msg)
1924
        if res.offline or msg:
1925
          # No need to investigate payload if node is offline or gave an error.
1926
          # override manually lu_result here as _ErrorIf only
1927
          # overrides self.bad
1928
          lu_result = 1
1929
          continue
1930
        for script, hkr, output in res.payload:
1931
          test = hkr == constants.HKR_FAIL
1932
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
1933
                        "Script %s failed, output:", script)
1934
          if test:
1935
            output = indent_re.sub('      ', output)
1936
            feedback_fn("%s" % output)
1937
            lu_result = 0
1938

    
1939
      return lu_result
1940

    
1941

    
1942
class LUVerifyDisks(NoHooksLU):
1943
  """Verifies the cluster disks status.
1944

1945
  """
1946
  _OP_REQP = []
1947
  REQ_BGL = False
1948

    
1949
  def ExpandNames(self):
1950
    self.needed_locks = {
1951
      locking.LEVEL_NODE: locking.ALL_SET,
1952
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1953
    }
1954
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1955

    
1956
  def CheckPrereq(self):
1957
    """Check prerequisites.
1958

1959
    This has no prerequisites.
1960

1961
    """
1962
    pass
1963

    
1964
  def Exec(self, feedback_fn):
1965
    """Verify integrity of cluster disks.
1966

1967
    @rtype: tuple of three items
1968
    @return: a tuple of (dict of node-to-node_error, list of instances
1969
        which need activate-disks, dict of instance: (node, volume) for
1970
        missing volumes
1971

1972
    """
1973
    result = res_nodes, res_instances, res_missing = {}, [], {}
1974

    
1975
    vg_name = self.cfg.GetVGName()
1976
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1977
    instances = [self.cfg.GetInstanceInfo(name)
1978
                 for name in self.cfg.GetInstanceList()]
1979

    
1980
    nv_dict = {}
1981
    for inst in instances:
1982
      inst_lvs = {}
1983
      if (not inst.admin_up or
1984
          inst.disk_template not in constants.DTS_NET_MIRROR):
1985
        continue
1986
      inst.MapLVsByNode(inst_lvs)
1987
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1988
      for node, vol_list in inst_lvs.iteritems():
1989
        for vol in vol_list:
1990
          nv_dict[(node, vol)] = inst
1991

    
1992
    if not nv_dict:
1993
      return result
1994

    
1995
    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1996

    
1997
    for node in nodes:
1998
      # node_volume
1999
      node_res = node_lvs[node]
2000
      if node_res.offline:
2001
        continue
2002
      msg = node_res.fail_msg
2003
      if msg:
2004
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
2005
        res_nodes[node] = msg
2006
        continue
2007

    
2008
      lvs = node_res.payload
2009
      for lv_name, (_, _, lv_online) in lvs.items():
2010
        inst = nv_dict.pop((node, lv_name), None)
2011
        if (not lv_online and inst is not None
2012
            and inst.name not in res_instances):
2013
          res_instances.append(inst.name)
2014

    
2015
    # any leftover items in nv_dict are missing LVs, let's arrange the
2016
    # data better
2017
    for key, inst in nv_dict.iteritems():
2018
      if inst.name not in res_missing:
2019
        res_missing[inst.name] = []
2020
      res_missing[inst.name].append(key)
2021

    
2022
    return result
2023

    
2024

    
2025
class LURepairDiskSizes(NoHooksLU):
2026
  """Verifies the cluster disks sizes.
2027

2028
  """
2029
  _OP_REQP = ["instances"]
2030
  REQ_BGL = False
2031

    
2032
  def ExpandNames(self):
2033
    if not isinstance(self.op.instances, list):
2034
      raise errors.OpPrereqError("Invalid argument type 'instances'",
2035
                                 errors.ECODE_INVAL)
2036

    
2037
    if self.op.instances:
2038
      self.wanted_names = []
2039
      for name in self.op.instances:
2040
        full_name = _ExpandInstanceName(self.cfg, name)
2041
        self.wanted_names.append(full_name)
2042
      self.needed_locks = {
2043
        locking.LEVEL_NODE: [],
2044
        locking.LEVEL_INSTANCE: self.wanted_names,
2045
        }
2046
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2047
    else:
2048
      self.wanted_names = None
2049
      self.needed_locks = {
2050
        locking.LEVEL_NODE: locking.ALL_SET,
2051
        locking.LEVEL_INSTANCE: locking.ALL_SET,
2052
        }
2053
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
2054

    
2055
  def DeclareLocks(self, level):
2056
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
2057
      self._LockInstancesNodes(primary_only=True)
2058

    
2059
  def CheckPrereq(self):
2060
    """Check prerequisites.
2061

2062
    This only checks the optional instance list against the existing names.
2063

2064
    """
2065
    if self.wanted_names is None:
2066
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2067

    
2068
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
2069
                             in self.wanted_names]
2070

    
2071
  def _EnsureChildSizes(self, disk):
2072
    """Ensure children of the disk have the needed disk size.
2073

2074
    This is valid mainly for DRBD8 and fixes an issue where the
2075
    children have smaller disk size.
2076

2077
    @param disk: an L{ganeti.objects.Disk} object
2078

2079
    """
2080
    if disk.dev_type == constants.LD_DRBD8:
2081
      assert disk.children, "Empty children for DRBD8?"
2082
      fchild = disk.children[0]
2083
      mismatch = fchild.size < disk.size
2084
      if mismatch:
2085
        self.LogInfo("Child disk has size %d, parent %d, fixing",
2086
                     fchild.size, disk.size)
2087
        fchild.size = disk.size
2088

    
2089
      # and we recurse on this child only, not on the metadev
2090
      return self._EnsureChildSizes(fchild) or mismatch
2091
    else:
2092
      return False
2093

    
2094
  def Exec(self, feedback_fn):
2095
    """Verify the size of cluster disks.
2096

2097
    """
2098
    # TODO: check child disks too
2099
    # TODO: check differences in size between primary/secondary nodes
2100
    per_node_disks = {}
2101
    for instance in self.wanted_instances:
2102
      pnode = instance.primary_node
2103
      if pnode not in per_node_disks:
2104
        per_node_disks[pnode] = []
2105
      for idx, disk in enumerate(instance.disks):
2106
        per_node_disks[pnode].append((instance, idx, disk))
2107

    
2108
    changed = []
2109
    for node, dskl in per_node_disks.items():
2110
      newl = [v[2].Copy() for v in dskl]
2111
      for dsk in newl:
2112
        self.cfg.SetDiskID(dsk, node)
2113
      result = self.rpc.call_blockdev_getsizes(node, newl)
2114
      if result.fail_msg:
2115
        self.LogWarning("Failure in blockdev_getsizes call to node"
2116
                        " %s, ignoring", node)
2117
        continue
2118
      if len(result.data) != len(dskl):
2119
        self.LogWarning("Invalid result from node %s, ignoring node results",
2120
                        node)
2121
        continue
2122
      for ((instance, idx, disk), size) in zip(dskl, result.data):
2123
        if size is None:
2124
          self.LogWarning("Disk %d of instance %s did not return size"
2125
                          " information, ignoring", idx, instance.name)
2126
          continue
2127
        if not isinstance(size, (int, long)):
2128
          self.LogWarning("Disk %d of instance %s did not return valid"
2129
                          " size information, ignoring", idx, instance.name)
2130
          continue
2131
        size = size >> 20
2132
        if size != disk.size:
2133
          self.LogInfo("Disk %d of instance %s has mismatched size,"
2134
                       " correcting: recorded %d, actual %d", idx,
2135
                       instance.name, disk.size, size)
2136
          disk.size = size
2137
          self.cfg.Update(instance, feedback_fn)
2138
          changed.append((instance.name, idx, size))
2139
        if self._EnsureChildSizes(disk):
2140
          self.cfg.Update(instance, feedback_fn)
2141
          changed.append((instance.name, idx, disk.size))
2142
    return changed
2143

    
2144

    
2145
class LURenameCluster(LogicalUnit):
2146
  """Rename the cluster.
2147

2148
  """
2149
  HPATH = "cluster-rename"
2150
  HTYPE = constants.HTYPE_CLUSTER
2151
  _OP_REQP = ["name"]
2152

    
2153
  def BuildHooksEnv(self):
2154
    """Build hooks env.
2155

2156
    """
2157
    env = {
2158
      "OP_TARGET": self.cfg.GetClusterName(),
2159
      "NEW_NAME": self.op.name,
2160
      }
2161
    mn = self.cfg.GetMasterNode()
2162
    all_nodes = self.cfg.GetNodeList()
2163
    return env, [mn], all_nodes
2164

    
2165
  def CheckPrereq(self):
2166
    """Verify that the passed name is a valid one.
2167

2168
    """
2169
    hostname = utils.GetHostInfo(self.op.name)
2170

    
2171
    new_name = hostname.name
2172
    self.ip = new_ip = hostname.ip
2173
    old_name = self.cfg.GetClusterName()
2174
    old_ip = self.cfg.GetMasterIP()
2175
    if new_name == old_name and new_ip == old_ip:
2176
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
2177
                                 " cluster has changed",
2178
                                 errors.ECODE_INVAL)
2179
    if new_ip != old_ip:
2180
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
2181
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
2182
                                   " reachable on the network. Aborting." %
2183
                                   new_ip, errors.ECODE_NOTUNIQUE)
2184

    
2185
    self.op.name = new_name
2186

    
2187
  def Exec(self, feedback_fn):
2188
    """Rename the cluster.
2189

2190
    """
2191
    clustername = self.op.name
2192
    ip = self.ip
2193

    
2194
    # shutdown the master IP
2195
    master = self.cfg.GetMasterNode()
2196
    result = self.rpc.call_node_stop_master(master, False)
2197
    result.Raise("Could not disable the master role")
2198

    
2199
    try:
2200
      cluster = self.cfg.GetClusterInfo()
2201
      cluster.cluster_name = clustername
2202
      cluster.master_ip = ip
2203
      self.cfg.Update(cluster, feedback_fn)
2204

    
2205
      # update the known hosts file
2206
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
2207
      node_list = self.cfg.GetNodeList()
2208
      try:
2209
        node_list.remove(master)
2210
      except ValueError:
2211
        pass
2212
      result = self.rpc.call_upload_file(node_list,
2213
                                         constants.SSH_KNOWN_HOSTS_FILE)
2214
      for to_node, to_result in result.iteritems():
2215
        msg = to_result.fail_msg
2216
        if msg:
2217
          msg = ("Copy of file %s to node %s failed: %s" %
2218
                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
2219
          self.proc.LogWarning(msg)
2220

    
2221
    finally:
2222
      result = self.rpc.call_node_start_master(master, False, False)
2223
      msg = result.fail_msg
2224
      if msg:
2225
        self.LogWarning("Could not re-enable the master role on"
2226
                        " the master, please restart manually: %s", msg)
2227

    
2228

    
2229
def _RecursiveCheckIfLVMBased(disk):
2230
  """Check if the given disk or its children are lvm-based.
2231

2232
  @type disk: L{objects.Disk}
2233
  @param disk: the disk to check
2234
  @rtype: boolean
2235
  @return: boolean indicating whether a LD_LV dev_type was found or not
2236

2237
  """
2238
  if disk.children:
2239
    for chdisk in disk.children:
2240
      if _RecursiveCheckIfLVMBased(chdisk):
2241
        return True
2242
  return disk.dev_type == constants.LD_LV
2243

    
2244

    
2245
class LUSetClusterParams(LogicalUnit):
2246
  """Change the parameters of the cluster.
2247

2248
  """
2249
  HPATH = "cluster-modify"
2250
  HTYPE = constants.HTYPE_CLUSTER
2251
  _OP_REQP = []
2252
  REQ_BGL = False
2253

    
2254
  def CheckArguments(self):
2255
    """Check parameters
2256

2257
    """
2258
    if not hasattr(self.op, "candidate_pool_size"):
2259
      self.op.candidate_pool_size = None
2260
    if self.op.candidate_pool_size is not None:
2261
      try:
2262
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
2263
      except (ValueError, TypeError), err:
2264
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
2265
                                   str(err), errors.ECODE_INVAL)
2266
      if self.op.candidate_pool_size < 1:
2267
        raise errors.OpPrereqError("At least one master candidate needed",
2268
                                   errors.ECODE_INVAL)
2269

    
2270
    _CheckBooleanOpField(self.op, "maintain_node_health")
2271

    
2272
    if self.op.uid_pool:
2273
      uidpool.CheckUidPool(self.op.uid_pool)
2274

    
2275
  def ExpandNames(self):
2276
    # FIXME: in the future maybe other cluster params won't require checking on
2277
    # all nodes to be modified.
2278
    self.needed_locks = {
2279
      locking.LEVEL_NODE: locking.ALL_SET,
2280
    }
2281
    self.share_locks[locking.LEVEL_NODE] = 1
2282

    
2283
  def BuildHooksEnv(self):
2284
    """Build hooks env.
2285

2286
    """
2287
    env = {
2288
      "OP_TARGET": self.cfg.GetClusterName(),
2289
      "NEW_VG_NAME": self.op.vg_name,
2290
      }
2291
    mn = self.cfg.GetMasterNode()
2292
    return env, [mn], [mn]
2293

    
2294
  def CheckPrereq(self):
2295
    """Check prerequisites.
2296

2297
    This checks whether the given params don't conflict and
2298
    if the given volume group is valid.
2299

2300
    """
2301
    if self.op.vg_name is not None and not self.op.vg_name:
2302
      instances = self.cfg.GetAllInstancesInfo().values()
2303
      for inst in instances:
2304
        for disk in inst.disks:
2305
          if _RecursiveCheckIfLVMBased(disk):
2306
            raise errors.OpPrereqError("Cannot disable lvm storage while"
2307
                                       " lvm-based instances exist",
2308
                                       errors.ECODE_INVAL)
2309

    
2310
    node_list = self.acquired_locks[locking.LEVEL_NODE]
2311

    
2312
    # if vg_name not None, checks given volume group on all nodes
2313
    if self.op.vg_name:
2314
      vglist = self.rpc.call_vg_list(node_list)
2315
      for node in node_list:
2316
        msg = vglist[node].fail_msg
2317
        if msg:
2318
          # ignoring down node
2319
          self.LogWarning("Error while gathering data on node %s"
2320
                          " (ignoring node): %s", node, msg)
2321
          continue
2322
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2323
                                              self.op.vg_name,
2324
                                              constants.MIN_VG_SIZE)
2325
        if vgstatus:
2326
          raise errors.OpPrereqError("Error on node '%s': %s" %
2327
                                     (node, vgstatus), errors.ECODE_ENVIRON)
2328

    
2329
    self.cluster = cluster = self.cfg.GetClusterInfo()
2330
    # validate params changes
2331
    if self.op.beparams:
2332
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2333
      self.new_beparams = objects.FillDict(
2334
        cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
2335

    
2336
    if self.op.nicparams:
2337
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2338
      self.new_nicparams = objects.FillDict(
2339
        cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
2340
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
2341
      nic_errors = []
2342

    
2343
      # check all instances for consistency
2344
      for instance in self.cfg.GetAllInstancesInfo().values():
2345
        for nic_idx, nic in enumerate(instance.nics):
2346
          params_copy = copy.deepcopy(nic.nicparams)
2347
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
2348

    
2349
          # check parameter syntax
2350
          try:
2351
            objects.NIC.CheckParameterSyntax(params_filled)
2352
          except errors.ConfigurationError, err:
2353
            nic_errors.append("Instance %s, nic/%d: %s" %
2354
                              (instance.name, nic_idx, err))
2355

    
2356
          # if we're moving instances to routed, check that they have an ip
2357
          target_mode = params_filled[constants.NIC_MODE]
2358
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2359
            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2360
                              (instance.name, nic_idx))
2361
      if nic_errors:
2362
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2363
                                   "\n".join(nic_errors))
2364

    
2365
    # hypervisor list/parameters
2366
    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
2367
    if self.op.hvparams:
2368
      if not isinstance(self.op.hvparams, dict):
2369
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
2370
                                   errors.ECODE_INVAL)
2371
      for hv_name, hv_dict in self.op.hvparams.items():
2372
        if hv_name not in self.new_hvparams:
2373
          self.new_hvparams[hv_name] = hv_dict
2374
        else:
2375
          self.new_hvparams[hv_name].update(hv_dict)
2376

    
2377
    # os hypervisor parameters
2378
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
2379
    if self.op.os_hvp:
2380
      if not isinstance(self.op.os_hvp, dict):
2381
        raise errors.OpPrereqError("Invalid 'os_hvp' parameter on input",
2382
                                   errors.ECODE_INVAL)
2383
      for os_name, hvs in self.op.os_hvp.items():
2384
        if not isinstance(hvs, dict):
2385
          raise errors.OpPrereqError(("Invalid 'os_hvp' parameter on"
2386
                                      " input"), errors.ECODE_INVAL)
2387
        if os_name not in self.new_os_hvp:
2388
          self.new_os_hvp[os_name] = hvs
2389
        else:
2390
          for hv_name, hv_dict in hvs.items():
2391
            if hv_name not in self.new_os_hvp[os_name]:
2392
              self.new_os_hvp[os_name][hv_name] = hv_dict
2393
            else:
2394
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
2395

    
2396
    if self.op.enabled_hypervisors is not None:
2397
      self.hv_list = self.op.enabled_hypervisors
2398
      if not self.hv_list:
2399
        raise errors.OpPrereqError("Enabled hypervisors list must contain at"
2400
                                   " least one member",
2401
                                   errors.ECODE_INVAL)
2402
      invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
2403
      if invalid_hvs:
2404
        raise errors.OpPrereqError("Enabled hypervisors contains invalid"
2405
                                   " entries: %s" %
2406
                                   utils.CommaJoin(invalid_hvs),
2407
                                   errors.ECODE_INVAL)
2408
    else:
2409
      self.hv_list = cluster.enabled_hypervisors
2410

    
2411
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
2412
      # either the enabled list has changed, or the parameters have, validate
2413
      for hv_name, hv_params in self.new_hvparams.items():
2414
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
2415
            (self.op.enabled_hypervisors and
2416
             hv_name in self.op.enabled_hypervisors)):
2417
          # either this is a new hypervisor, or its parameters have changed
2418
          hv_class = hypervisor.GetHypervisor(hv_name)
2419
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2420
          hv_class.CheckParameterSyntax(hv_params)
2421
          _CheckHVParams(self, node_list, hv_name, hv_params)
2422

    
2423
    if self.op.os_hvp:
2424
      # no need to check any newly-enabled hypervisors, since the
2425
      # defaults have already been checked in the above code-block
2426
      for os_name, os_hvp in self.new_os_hvp.items():
2427
        for hv_name, hv_params in os_hvp.items():
2428
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2429
          # we need to fill in the new os_hvp on top of the actual hv_p
2430
          cluster_defaults = self.new_hvparams.get(hv_name, {})
2431
          new_osp = objects.FillDict(cluster_defaults, hv_params)
2432
          hv_class = hypervisor.GetHypervisor(hv_name)
2433
          hv_class.CheckParameterSyntax(new_osp)
2434
          _CheckHVParams(self, node_list, hv_name, new_osp)
2435

    
2436

    
2437
  def Exec(self, feedback_fn):
2438
    """Change the parameters of the cluster.
2439

2440
    """
2441
    if self.op.vg_name is not None:
2442
      new_volume = self.op.vg_name
2443
      if not new_volume:
2444
        new_volume = None
2445
      if new_volume != self.cfg.GetVGName():
2446
        self.cfg.SetVGName(new_volume)
2447
      else:
2448
        feedback_fn("Cluster LVM configuration already in desired"
2449
                    " state, not changing")
2450
    if self.op.hvparams:
2451
      self.cluster.hvparams = self.new_hvparams
2452
    if self.op.os_hvp:
2453
      self.cluster.os_hvp = self.new_os_hvp
2454
    if self.op.enabled_hypervisors is not None:
2455
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2456
    if self.op.beparams:
2457
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2458
    if self.op.nicparams:
2459
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2460

    
2461
    if self.op.candidate_pool_size is not None:
2462
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
2463
      # we need to update the pool size here, otherwise the save will fail
2464
      _AdjustCandidatePool(self, [])
2465

    
2466
    if self.op.maintain_node_health is not None:
2467
      self.cluster.maintain_node_health = self.op.maintain_node_health
2468

    
2469
    if self.op.uid_pool is not None:
2470
      self.cluster.uid_pool = self.op.uid_pool
2471

    
2472
    self.cfg.Update(self.cluster, feedback_fn)
2473

    
2474

    
2475
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2476
  """Distribute additional files which are part of the cluster configuration.
2477

2478
  ConfigWriter takes care of distributing the config and ssconf files, but
2479
  there are more files which should be distributed to all nodes. This function
2480
  makes sure those are copied.
2481

2482
  @param lu: calling logical unit
2483
  @param additional_nodes: list of nodes not in the config to distribute to
2484

2485
  """
2486
  # 1. Gather target nodes
2487
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2488
  dist_nodes = lu.cfg.GetOnlineNodeList()
2489
  if additional_nodes is not None:
2490
    dist_nodes.extend(additional_nodes)
2491
  if myself.name in dist_nodes:
2492
    dist_nodes.remove(myself.name)
2493

    
2494
  # 2. Gather files to distribute
2495
  dist_files = set([constants.ETC_HOSTS,
2496
                    constants.SSH_KNOWN_HOSTS_FILE,
2497
                    constants.RAPI_CERT_FILE,
2498
                    constants.RAPI_USERS_FILE,
2499
                    constants.CONFD_HMAC_KEY,
2500
                   ])
2501

    
2502
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2503
  for hv_name in enabled_hypervisors:
2504
    hv_class = hypervisor.GetHypervisor(hv_name)
2505
    dist_files.update(hv_class.GetAncillaryFiles())
2506

    
2507
  # 3. Perform the files upload
2508
  for fname in dist_files:
2509
    if os.path.exists(fname):
2510
      result = lu.rpc.call_upload_file(dist_nodes, fname)
2511
      for to_node, to_result in result.items():
2512
        msg = to_result.fail_msg
2513
        if msg:
2514
          msg = ("Copy of file %s to node %s failed: %s" %
2515
                 (fname, to_node, msg))
2516
          lu.proc.LogWarning(msg)
2517

    
2518

    
2519
class LURedistributeConfig(NoHooksLU):
2520
  """Force the redistribution of cluster configuration.
2521

2522
  This is a very simple LU.
2523

2524
  """
2525
  _OP_REQP = []
2526
  REQ_BGL = False
2527

    
2528
  def ExpandNames(self):
2529
    self.needed_locks = {
2530
      locking.LEVEL_NODE: locking.ALL_SET,
2531
    }
2532
    self.share_locks[locking.LEVEL_NODE] = 1
2533

    
2534
  def CheckPrereq(self):
2535
    """Check prerequisites.
2536

2537
    """
2538

    
2539
  def Exec(self, feedback_fn):
2540
    """Redistribute the configuration.
2541

2542
    """
2543
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2544
    _RedistributeAncillaryFiles(self)
2545

    
2546

    
2547
def _WaitForSync(lu, instance, oneshot=False):
2548
  """Sleep and poll for an instance's disk to sync.
2549

2550
  """
2551
  if not instance.disks:
2552
    return True
2553

    
2554
  if not oneshot:
2555
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2556

    
2557
  node = instance.primary_node
2558

    
2559
  for dev in instance.disks:
2560
    lu.cfg.SetDiskID(dev, node)
2561

    
2562
  # TODO: Convert to utils.Retry
2563

    
2564
  retries = 0
2565
  degr_retries = 10 # in seconds, as we sleep 1 second each time
2566
  while True:
2567
    max_time = 0
2568
    done = True
2569
    cumul_degraded = False
2570
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2571
    msg = rstats.fail_msg
2572
    if msg:
2573
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2574
      retries += 1
2575
      if retries >= 10:
2576
        raise errors.RemoteError("Can't contact node %s for mirror data,"
2577
                                 " aborting." % node)
2578
      time.sleep(6)
2579
      continue
2580
    rstats = rstats.payload
2581
    retries = 0
2582
    for i, mstat in enumerate(rstats):
2583
      if mstat is None:
2584
        lu.LogWarning("Can't compute data for node %s/%s",
2585
                           node, instance.disks[i].iv_name)
2586
        continue
2587

    
2588
      cumul_degraded = (cumul_degraded or
2589
                        (mstat.is_degraded and mstat.sync_percent is None))
2590
      if mstat.sync_percent is not None:
2591
        done = False
2592
        if mstat.estimated_time is not None:
2593
          rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2594
          max_time = mstat.estimated_time
2595
        else:
2596
          rem_time = "no time estimate"
2597
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2598
                        (instance.disks[i].iv_name, mstat.sync_percent,
2599
                         rem_time))
2600

    
2601
    # if we're done but degraded, let's do a few small retries, to
2602
    # make sure we see a stable and not transient situation; therefore
2603
    # we force restart of the loop
2604
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
2605
      logging.info("Degraded disks found, %d retries left", degr_retries)
2606
      degr_retries -= 1
2607
      time.sleep(1)
2608
      continue
2609

    
2610
    if done or oneshot:
2611
      break
2612

    
2613
    time.sleep(min(60, max_time))
2614

    
2615
  if done:
2616
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2617
  return not cumul_degraded
2618

    
2619

    
2620
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2621
  """Check that mirrors are not degraded.
2622

2623
  The ldisk parameter, if True, will change the test from the
2624
  is_degraded attribute (which represents overall non-ok status for
2625
  the device(s)) to the ldisk (representing the local storage status).
2626

2627
  """
2628
  lu.cfg.SetDiskID(dev, node)
2629

    
2630
  result = True
2631

    
2632
  if on_primary or dev.AssembleOnSecondary():
2633
    rstats = lu.rpc.call_blockdev_find(node, dev)
2634
    msg = rstats.fail_msg
2635
    if msg:
2636
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2637
      result = False
2638
    elif not rstats.payload:
2639
      lu.LogWarning("Can't find disk on node %s", node)
2640
      result = False
2641
    else:
2642
      if ldisk:
2643
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2644
      else:
2645
        result = result and not rstats.payload.is_degraded
2646

    
2647
  if dev.children:
2648
    for child in dev.children:
2649
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2650

    
2651
  return result
2652

    
2653

    
2654
class LUDiagnoseOS(NoHooksLU):
2655
  """Logical unit for OS diagnose/query.
2656

2657
  """
2658
  _OP_REQP = ["output_fields", "names"]
2659
  REQ_BGL = False
2660
  _FIELDS_STATIC = utils.FieldSet()
2661
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants")
2662
  # Fields that need calculation of global os validity
2663
  _FIELDS_NEEDVALID = frozenset(["valid", "variants"])
2664

    
2665
  def ExpandNames(self):
2666
    if self.op.names:
2667
      raise errors.OpPrereqError("Selective OS query not supported",
2668
                                 errors.ECODE_INVAL)
2669

    
2670
    _CheckOutputFields(static=self._FIELDS_STATIC,
2671
                       dynamic=self._FIELDS_DYNAMIC,
2672
                       selected=self.op.output_fields)
2673

    
2674
    # Lock all nodes, in shared mode
2675
    # Temporary removal of locks, should be reverted later
2676
    # TODO: reintroduce locks when they are lighter-weight
2677
    self.needed_locks = {}
2678
    #self.share_locks[locking.LEVEL_NODE] = 1
2679
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2680

    
2681
  def CheckPrereq(self):
2682
    """Check prerequisites.
2683

2684
    """
2685

    
2686
  @staticmethod
2687
  def _DiagnoseByOS(rlist):
2688
    """Remaps a per-node return list into an a per-os per-node dictionary
2689

2690
    @param rlist: a map with node names as keys and OS objects as values
2691

2692
    @rtype: dict
2693
    @return: a dictionary with osnames as keys and as value another map, with
2694
        nodes as keys and tuples of (path, status, diagnose) as values, eg::
2695

2696
          {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2697
                                     (/srv/..., False, "invalid api")],
2698
                           "node2": [(/srv/..., True, "")]}
2699
          }
2700

2701
    """
2702
    all_os = {}
2703
    # we build here the list of nodes that didn't fail the RPC (at RPC
2704
    # level), so that nodes with a non-responding node daemon don't
2705
    # make all OSes invalid
2706
    good_nodes = [node_name for node_name in rlist
2707
                  if not rlist[node_name].fail_msg]
2708
    for node_name, nr in rlist.items():
2709
      if nr.fail_msg or not nr.payload:
2710
        continue
2711
      for name, path, status, diagnose, variants in nr.payload:
2712
        if name not in all_os:
2713
          # build a list of nodes for this os containing empty lists
2714
          # for each node in node_list
2715
          all_os[name] = {}
2716
          for nname in good_nodes:
2717
            all_os[name][nname] = []
2718
        all_os[name][node_name].append((path, status, diagnose, variants))
2719
    return all_os
2720

    
2721
  def Exec(self, feedback_fn):
2722
    """Compute the list of OSes.
2723

2724
    """
2725
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2726
    node_data = self.rpc.call_os_diagnose(valid_nodes)
2727
    pol = self._DiagnoseByOS(node_data)
2728
    output = []
2729
    calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields)
2730
    calc_variants = "variants" in self.op.output_fields
2731

    
2732
    for os_name, os_data in pol.items():
2733
      row = []
2734
      if calc_valid:
2735
        valid = True
2736
        variants = None
2737
        for osl in os_data.values():
2738
          valid = valid and osl and osl[0][1]
2739
          if not valid:
2740
            variants = None
2741
            break
2742
          if calc_variants:
2743
            node_variants = osl[0][3]
2744
            if variants is None:
2745
              variants = node_variants
2746
            else:
2747
              variants = [v for v in variants if v in node_variants]
2748

    
2749
      for field in self.op.output_fields:
2750
        if field == "name":
2751
          val = os_name
2752
        elif field == "valid":
2753
          val = valid
2754
        elif field == "node_status":
2755
          # this is just a copy of the dict
2756
          val = {}
2757
          for node_name, nos_list in os_data.items():
2758
            val[node_name] = nos_list
2759
        elif field == "variants":
2760
          val =  variants
2761
        else:
2762
          raise errors.ParameterError(field)
2763
        row.append(val)
2764
      output.append(row)
2765

    
2766
    return output
2767

    
2768

    
2769
class LURemoveNode(LogicalUnit):
2770
  """Logical unit for removing a node.
2771

2772
  """
2773
  HPATH = "node-remove"
2774
  HTYPE = constants.HTYPE_NODE
2775
  _OP_REQP = ["node_name"]
2776

    
2777
  def BuildHooksEnv(self):
2778
    """Build hooks env.
2779

2780
    This doesn't run on the target node in the pre phase as a failed
2781
    node would then be impossible to remove.
2782

2783
    """
2784
    env = {
2785
      "OP_TARGET": self.op.node_name,
2786
      "NODE_NAME": self.op.node_name,
2787
      }
2788
    all_nodes = self.cfg.GetNodeList()
2789
    try:
2790
      all_nodes.remove(self.op.node_name)
2791
    except ValueError:
2792
      logging.warning("Node %s which is about to be removed not found"
2793
                      " in the all nodes list", self.op.node_name)
2794
    return env, all_nodes, all_nodes
2795

    
2796
  def CheckPrereq(self):
2797
    """Check prerequisites.
2798

2799
    This checks:
2800
     - the node exists in the configuration
2801
     - it does not have primary or secondary instances
2802
     - it's not the master
2803

2804
    Any errors are signaled by raising errors.OpPrereqError.
2805

2806
    """
2807
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
2808
    node = self.cfg.GetNodeInfo(self.op.node_name)
2809
    assert node is not None
2810

    
2811
    instance_list = self.cfg.GetInstanceList()
2812

    
2813
    masternode = self.cfg.GetMasterNode()
2814
    if node.name == masternode:
2815
      raise errors.OpPrereqError("Node is the master node,"
2816
                                 " you need to failover first.",
2817
                                 errors.ECODE_INVAL)
2818

    
2819
    for instance_name in instance_list:
2820
      instance = self.cfg.GetInstanceInfo(instance_name)
2821
      if node.name in instance.all_nodes:
2822
        raise errors.OpPrereqError("Instance %s is still running on the node,"
2823
                                   " please remove first." % instance_name,
2824
                                   errors.ECODE_INVAL)
2825
    self.op.node_name = node.name
2826
    self.node = node
2827

    
2828
  def Exec(self, feedback_fn):
2829
    """Removes the node from the cluster.
2830

2831
    """
2832
    node = self.node
2833
    logging.info("Stopping the node daemon and removing configs from node %s",
2834
                 node.name)
2835

    
2836
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
2837

    
2838
    # Promote nodes to master candidate as needed
2839
    _AdjustCandidatePool(self, exceptions=[node.name])
2840
    self.context.RemoveNode(node.name)
2841

    
2842
    # Run post hooks on the node before it's removed
2843
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2844
    try:
2845
      hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2846
    except:
2847
      # pylint: disable-msg=W0702
2848
      self.LogWarning("Errors occurred running hooks on %s" % node.name)
2849

    
2850
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
2851
    msg = result.fail_msg
2852
    if msg:
2853
      self.LogWarning("Errors encountered on the remote node while leaving"
2854
                      " the cluster: %s", msg)
2855

    
2856

    
2857
class LUQueryNodes(NoHooksLU):
2858
  """Logical unit for querying nodes.
2859

2860
  """
2861
  # pylint: disable-msg=W0142
2862
  _OP_REQP = ["output_fields", "names", "use_locking"]
2863
  REQ_BGL = False
2864

    
2865
  _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2866
                    "master_candidate", "offline", "drained"]
2867

    
2868
  _FIELDS_DYNAMIC = utils.FieldSet(
2869
    "dtotal", "dfree",
2870
    "mtotal", "mnode", "mfree",
2871
    "bootid",
2872
    "ctotal", "cnodes", "csockets",
2873
    )
2874

    
2875
  _FIELDS_STATIC = utils.FieldSet(*[
2876
    "pinst_cnt", "sinst_cnt",
2877
    "pinst_list", "sinst_list",
2878
    "pip", "sip", "tags",
2879
    "master",
2880
    "role"] + _SIMPLE_FIELDS
2881
    )
2882

    
2883
  def ExpandNames(self):
2884
    _CheckOutputFields(static=self._FIELDS_STATIC,
2885
                       dynamic=self._FIELDS_DYNAMIC,
2886
                       selected=self.op.output_fields)
2887

    
2888
    self.needed_locks = {}
2889
    self.share_locks[locking.LEVEL_NODE] = 1
2890

    
2891
    if self.op.names:
2892
      self.wanted = _GetWantedNodes(self, self.op.names)
2893
    else:
2894
      self.wanted = locking.ALL_SET
2895

    
2896
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2897
    self.do_locking = self.do_node_query and self.op.use_locking
2898
    if self.do_locking:
2899
      # if we don't request only static fields, we need to lock the nodes
2900
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
2901

    
2902
  def CheckPrereq(self):
2903
    """Check prerequisites.
2904

2905
    """
2906
    # The validation of the node list is done in the _GetWantedNodes,
2907
    # if non empty, and if empty, there's no validation to do
2908
    pass
2909

    
2910
  def Exec(self, feedback_fn):
2911
    """Computes the list of nodes and their attributes.
2912

2913
    """
2914
    all_info = self.cfg.GetAllNodesInfo()
2915
    if self.do_locking:
2916
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
2917
    elif self.wanted != locking.ALL_SET:
2918
      nodenames = self.wanted
2919
      missing = set(nodenames).difference(all_info.keys())
2920
      if missing:
2921
        raise errors.OpExecError(
2922
          "Some nodes were removed before retrieving their data: %s" % missing)
2923
    else:
2924
      nodenames = all_info.keys()
2925

    
2926
    nodenames = utils.NiceSort(nodenames)
2927
    nodelist = [all_info[name] for name in nodenames]
2928

    
2929
    # begin data gathering
2930

    
2931
    if self.do_node_query:
2932
      live_data = {}
2933
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2934
                                          self.cfg.GetHypervisorType())
2935
      for name in nodenames:
2936
        nodeinfo = node_data[name]
2937
        if not nodeinfo.fail_msg and nodeinfo.payload:
2938
          nodeinfo = nodeinfo.payload
2939
          fn = utils.TryConvert
2940
          live_data[name] = {
2941
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2942
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2943
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
2944
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2945
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
2946
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2947
            "bootid": nodeinfo.get('bootid', None),
2948
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2949
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2950
            }
2951
        else:
2952
          live_data[name] = {}
2953
    else:
2954
      live_data = dict.fromkeys(nodenames, {})
2955

    
2956
    node_to_primary = dict([(name, set()) for name in nodenames])
2957
    node_to_secondary = dict([(name, set()) for name in nodenames])
2958

    
2959
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
2960
                             "sinst_cnt", "sinst_list"))
2961
    if inst_fields & frozenset(self.op.output_fields):
2962
      inst_data = self.cfg.GetAllInstancesInfo()
2963

    
2964
      for inst in inst_data.values():
2965
        if inst.primary_node in node_to_primary:
2966
          node_to_primary[inst.primary_node].add(inst.name)
2967
        for secnode in inst.secondary_nodes:
2968
          if secnode in node_to_secondary:
2969
            node_to_secondary[secnode].add(inst.name)
2970

    
2971
    master_node = self.cfg.GetMasterNode()
2972

    
2973
    # end data gathering
2974

    
2975
    output = []
2976
    for node in nodelist:
2977
      node_output = []
2978
      for field in self.op.output_fields:
2979
        if field in self._SIMPLE_FIELDS:
2980
          val = getattr(node, field)
2981
        elif field == "pinst_list":
2982
          val = list(node_to_primary[node.name])
2983
        elif field == "sinst_list":
2984
          val = list(node_to_secondary[node.name])
2985
        elif field == "pinst_cnt":
2986
          val = len(node_to_primary[node.name])
2987
        elif field == "sinst_cnt":
2988
          val = len(node_to_secondary[node.name])
2989
        elif field == "pip":
2990
          val = node.primary_ip
2991
        elif field == "sip":
2992
          val = node.secondary_ip
2993
        elif field == "tags":
2994
          val = list(node.GetTags())
2995
        elif field == "master":
2996
          val = node.name == master_node
2997
        elif self._FIELDS_DYNAMIC.Matches(field):
2998
          val = live_data[node.name].get(field, None)
2999
        elif field == "role":
3000
          if node.name == master_node:
3001
            val = "M"
3002
          elif node.master_candidate:
3003
            val = "C"
3004
          elif node.drained:
3005
            val = "D"
3006
          elif node.offline:
3007
            val = "O"
3008
          else:
3009
            val = "R"
3010
        else:
3011
          raise errors.ParameterError(field)
3012
        node_output.append(val)
3013
      output.append(node_output)
3014

    
3015
    return output
3016

    
3017

    
3018
class LUQueryNodeVolumes(NoHooksLU):
3019
  """Logical unit for getting volumes on node(s).
3020

3021
  """
3022
  _OP_REQP = ["nodes", "output_fields"]
3023
  REQ_BGL = False
3024
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
3025
  _FIELDS_STATIC = utils.FieldSet("node")
3026

    
3027
  def ExpandNames(self):
3028
    _CheckOutputFields(static=self._FIELDS_STATIC,
3029
                       dynamic=self._FIELDS_DYNAMIC,
3030
                       selected=self.op.output_fields)
3031

    
3032
    self.needed_locks = {}
3033
    self.share_locks[locking.LEVEL_NODE] = 1
3034
    if not self.op.nodes:
3035
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3036
    else:
3037
      self.needed_locks[locking.LEVEL_NODE] = \
3038
        _GetWantedNodes(self, self.op.nodes)
3039

    
3040
  def CheckPrereq(self):
3041
    """Check prerequisites.
3042

3043
    This checks that the fields required are valid output fields.
3044

3045
    """
3046
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
3047

    
3048
  def Exec(self, feedback_fn):
3049
    """Computes the list of nodes and their attributes.
3050

3051
    """
3052
    nodenames = self.nodes
3053
    volumes = self.rpc.call_node_volumes(nodenames)
3054

    
3055
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3056
             in self.cfg.GetInstanceList()]
3057

    
3058
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
3059

    
3060
    output = []
3061
    for node in nodenames:
3062
      nresult = volumes[node]
3063
      if nresult.offline:
3064
        continue
3065
      msg = nresult.fail_msg
3066
      if msg:
3067
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
3068
        continue
3069

    
3070
      node_vols = nresult.payload[:]
3071
      node_vols.sort(key=lambda vol: vol['dev'])
3072

    
3073
      for vol in node_vols:
3074
        node_output = []
3075
        for field in self.op.output_fields:
3076
          if field == "node":
3077
            val = node
3078
          elif field == "phys":
3079
            val = vol['dev']
3080
          elif field == "vg":
3081
            val = vol['vg']
3082
          elif field == "name":
3083
            val = vol['name']
3084
          elif field == "size":
3085
            val = int(float(vol['size']))
3086
          elif field == "instance":
3087
            for inst in ilist:
3088
              if node not in lv_by_node[inst]:
3089
                continue
3090
              if vol['name'] in lv_by_node[inst][node]:
3091
                val = inst.name
3092
                break
3093
            else:
3094
              val = '-'
3095
          else:
3096
            raise errors.ParameterError(field)
3097
          node_output.append(str(val))
3098

    
3099
        output.append(node_output)
3100

    
3101
    return output
3102

    
3103

    
3104
class LUQueryNodeStorage(NoHooksLU):
3105
  """Logical unit for getting information on storage units on node(s).
3106

3107
  """
3108
  _OP_REQP = ["nodes", "storage_type", "output_fields"]
3109
  REQ_BGL = False
3110
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
3111

    
3112
  def CheckArguments(self):
3113
    _CheckStorageType(self.op.storage_type)
3114

    
3115
    _CheckOutputFields(static=self._FIELDS_STATIC,
3116
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
3117
                       selected=self.op.output_fields)
3118

    
3119
  def ExpandNames(self):
3120
    self.needed_locks = {}
3121
    self.share_locks[locking.LEVEL_NODE] = 1
3122

    
3123
    if self.op.nodes:
3124
      self.needed_locks[locking.LEVEL_NODE] = \
3125
        _GetWantedNodes(self, self.op.nodes)
3126
    else:
3127
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3128

    
3129
  def CheckPrereq(self):
3130
    """Check prerequisites.
3131

3132
    This checks that the fields required are valid output fields.
3133

3134
    """
3135
    self.op.name = getattr(self.op, "name", None)
3136

    
3137
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
3138

    
3139
  def Exec(self, feedback_fn):
3140
    """Computes the list of nodes and their attributes.
3141

3142
    """
3143
    # Always get name to sort by
3144
    if constants.SF_NAME in self.op.output_fields:
3145
      fields = self.op.output_fields[:]
3146
    else:
3147
      fields = [constants.SF_NAME] + self.op.output_fields
3148

    
3149
    # Never ask for node or type as it's only known to the LU
3150
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
3151
      while extra in fields:
3152
        fields.remove(extra)
3153

    
3154
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
3155
    name_idx = field_idx[constants.SF_NAME]
3156

    
3157
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3158
    data = self.rpc.call_storage_list(self.nodes,
3159
                                      self.op.storage_type, st_args,
3160
                                      self.op.name, fields)
3161

    
3162
    result = []
3163

    
3164
    for node in utils.NiceSort(self.nodes):
3165
      nresult = data[node]
3166
      if nresult.offline:
3167
        continue
3168

    
3169
      msg = nresult.fail_msg
3170
      if msg:
3171
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
3172
        continue
3173

    
3174
      rows = dict([(row[name_idx], row) for row in nresult.payload])
3175

    
3176
      for name in utils.NiceSort(rows.keys()):
3177
        row = rows[name]
3178

    
3179
        out = []
3180

    
3181
        for field in self.op.output_fields:
3182
          if field == constants.SF_NODE:
3183
            val = node
3184
          elif field == constants.SF_TYPE:
3185
            val = self.op.storage_type
3186
          elif field in field_idx:
3187
            val = row[field_idx[field]]
3188
          else:
3189
            raise errors.ParameterError(field)
3190

    
3191
          out.append(val)
3192

    
3193
        result.append(out)
3194

    
3195
    return result
3196

    
3197

    
3198
class LUModifyNodeStorage(NoHooksLU):
3199
  """Logical unit for modifying a storage volume on a node.
3200

3201
  """
3202
  _OP_REQP = ["node_name", "storage_type", "name", "changes"]
3203
  REQ_BGL = False
3204

    
3205
  def CheckArguments(self):
3206
    self.opnode_name = _ExpandNodeName(self.cfg, self.op.node_name)
3207

    
3208
    _CheckStorageType(self.op.storage_type)
3209

    
3210
  def ExpandNames(self):
3211
    self.needed_locks = {
3212
      locking.LEVEL_NODE: self.op.node_name,
3213
      }
3214

    
3215
  def CheckPrereq(self):
3216
    """Check prerequisites.
3217

3218
    """
3219
    storage_type = self.op.storage_type
3220

    
3221
    try:
3222
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
3223
    except KeyError:
3224
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
3225
                                 " modified" % storage_type,
3226
                                 errors.ECODE_INVAL)
3227

    
3228
    diff = set(self.op.changes.keys()) - modifiable
3229
    if diff:
3230
      raise errors.OpPrereqError("The following fields can not be modified for"
3231
                                 " storage units of type '%s': %r" %
3232
                                 (storage_type, list(diff)),
3233
                                 errors.ECODE_INVAL)
3234

    
3235
  def Exec(self, feedback_fn):
3236
    """Computes the list of nodes and their attributes.
3237

3238
    """
3239
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3240
    result = self.rpc.call_storage_modify(self.op.node_name,
3241
                                          self.op.storage_type, st_args,
3242
                                          self.op.name, self.op.changes)
3243
    result.Raise("Failed to modify storage unit '%s' on %s" %
3244
                 (self.op.name, self.op.node_name))
3245

    
3246

    
3247
class LUAddNode(LogicalUnit):
3248
  """Logical unit for adding node to the cluster.
3249

3250
  """
3251
  HPATH = "node-add"
3252
  HTYPE = constants.HTYPE_NODE
3253
  _OP_REQP = ["node_name"]
3254

    
3255
  def CheckArguments(self):
3256
    # validate/normalize the node name
3257
    self.op.node_name = utils.HostInfo.NormalizeName(self.op.node_name)
3258

    
3259
  def BuildHooksEnv(self):
3260
    """Build hooks env.
3261

3262
    This will run on all nodes before, and on all nodes + the new node after.
3263

3264
    """
3265
    env = {
3266
      "OP_TARGET": self.op.node_name,
3267
      "NODE_NAME": self.op.node_name,
3268
      "NODE_PIP": self.op.primary_ip,
3269
      "NODE_SIP": self.op.secondary_ip,
3270
      }
3271
    nodes_0 = self.cfg.GetNodeList()
3272
    nodes_1 = nodes_0 + [self.op.node_name, ]
3273
    return env, nodes_0, nodes_1
3274

    
3275
  def CheckPrereq(self):
3276
    """Check prerequisites.
3277

3278
    This checks:
3279
     - the new node is not already in the config
3280
     - it is resolvable
3281
     - its parameters (single/dual homed) matches the cluster
3282

3283
    Any errors are signaled by raising errors.OpPrereqError.
3284

3285
    """
3286
    node_name = self.op.node_name
3287
    cfg = self.cfg
3288

    
3289
    dns_data = utils.GetHostInfo(node_name)
3290

    
3291
    node = dns_data.name
3292
    primary_ip = self.op.primary_ip = dns_data.ip
3293
    secondary_ip = getattr(self.op, "secondary_ip", None)
3294
    if secondary_ip is None:
3295
      secondary_ip = primary_ip
3296
    if not utils.IsValidIP(secondary_ip):
3297
      raise errors.OpPrereqError("Invalid secondary IP given",
3298
                                 errors.ECODE_INVAL)
3299
    self.op.secondary_ip = secondary_ip
3300

    
3301
    node_list = cfg.GetNodeList()
3302
    if not self.op.readd and node in node_list:
3303
      raise errors.OpPrereqError("Node %s is already in the configuration" %
3304
                                 node, errors.ECODE_EXISTS)
3305
    elif self.op.readd and node not in node_list:
3306
      raise errors.OpPrereqError("Node %s is not in the configuration" % node,
3307
                                 errors.ECODE_NOENT)
3308

    
3309
    for existing_node_name in node_list:
3310
      existing_node = cfg.GetNodeInfo(existing_node_name)
3311

    
3312
      if self.op.readd and node == existing_node_name:
3313
        if (existing_node.primary_ip != primary_ip or
3314
            existing_node.secondary_ip != secondary_ip):
3315
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
3316
                                     " address configuration as before",
3317
                                     errors.ECODE_INVAL)
3318
        continue
3319

    
3320
      if (existing_node.primary_ip == primary_ip or
3321
          existing_node.secondary_ip == primary_ip or
3322
          existing_node.primary_ip == secondary_ip or
3323
          existing_node.secondary_ip == secondary_ip):
3324
        raise errors.OpPrereqError("New node ip address(es) conflict with"
3325
                                   " existing node %s" % existing_node.name,
3326
                                   errors.ECODE_NOTUNIQUE)
3327

    
3328
    # check that the type of the node (single versus dual homed) is the
3329
    # same as for the master
3330
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
3331
    master_singlehomed = myself.secondary_ip == myself.primary_ip
3332
    newbie_singlehomed = secondary_ip == primary_ip
3333
    if master_singlehomed != newbie_singlehomed:
3334
      if master_singlehomed:
3335
        raise errors.OpPrereqError("The master has no private ip but the"
3336
                                   " new node has one",
3337
                                   errors.ECODE_INVAL)
3338
      else:
3339
        raise errors.OpPrereqError("The master has a private ip but the"
3340
                                   " new node doesn't have one",
3341
                                   errors.ECODE_INVAL)
3342

    
3343
    # checks reachability
3344
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
3345
      raise errors.OpPrereqError("Node not reachable by ping",
3346
                                 errors.ECODE_ENVIRON)
3347

    
3348
    if not newbie_singlehomed:
3349
      # check reachability from my secondary ip to newbie's secondary ip
3350
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
3351
                           source=myself.secondary_ip):
3352
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
3353
                                   " based ping to noded port",
3354
                                   errors.ECODE_ENVIRON)
3355

    
3356
    if self.op.readd:
3357
      exceptions = [node]
3358
    else:
3359
      exceptions = []
3360

    
3361
    self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
3362

    
3363
    if self.op.readd:
3364
      self.new_node = self.cfg.GetNodeInfo(node)
3365
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
3366
    else:
3367
      self.new_node = objects.Node(name=node,
3368
                                   primary_ip=primary_ip,
3369
                                   secondary_ip=secondary_ip,
3370
                                   master_candidate=self.master_candidate,
3371
                                   offline=False, drained=False)
3372

    
3373
  def Exec(self, feedback_fn):
3374
    """Adds the new node to the cluster.
3375

3376
    """
3377
    new_node = self.new_node
3378
    node = new_node.name
3379

    
3380
    # for re-adds, reset the offline/drained/master-candidate flags;
3381
    # we need to reset here, otherwise offline would prevent RPC calls
3382
    # later in the procedure; this also means that if the re-add
3383
    # fails, we are left with a non-offlined, broken node
3384
    if self.op.readd:
3385
      new_node.drained = new_node.offline = False # pylint: disable-msg=W0201
3386
      self.LogInfo("Readding a node, the offline/drained flags were reset")
3387
      # if we demote the node, we do cleanup later in the procedure
3388
      new_node.master_candidate = self.master_candidate
3389

    
3390
    # notify the user about any possible mc promotion
3391
    if new_node.master_candidate:
3392
      self.LogInfo("Node will be a master candidate")
3393

    
3394
    # check connectivity
3395
    result = self.rpc.call_version([node])[node]
3396
    result.Raise("Can't get version information from node %s" % node)
3397
    if constants.PROTOCOL_VERSION == result.payload:
3398
      logging.info("Communication to node %s fine, sw version %s match",
3399
                   node, result.payload)
3400
    else:
3401
      raise errors.OpExecError("Version mismatch master version %s,"
3402
                               " node version %s" %
3403
                               (constants.PROTOCOL_VERSION, result.payload))
3404

    
3405
    # setup ssh on node
3406
    if self.cfg.GetClusterInfo().modify_ssh_setup:
3407
      logging.info("Copy ssh key to node %s", node)
3408
      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
3409
      keyarray = []
3410
      keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
3411
                  constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
3412
                  priv_key, pub_key]
3413

    
3414
      for i in keyfiles:
3415
        keyarray.append(utils.ReadFile(i))
3416

    
3417
      result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
3418
                                      keyarray[2], keyarray[3], keyarray[4],
3419
                                      keyarray[5])
3420
      result.Raise("Cannot transfer ssh keys to the new node")
3421

    
3422
    # Add node to our /etc/hosts, and add key to known_hosts
3423
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3424
      utils.AddHostToEtcHosts(new_node.name)
3425

    
3426
    if new_node.secondary_ip != new_node.primary_ip:
3427
      result = self.rpc.call_node_has_ip_address(new_node.name,
3428
                                                 new_node.secondary_ip)
3429
      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
3430
                   prereq=True, ecode=errors.ECODE_ENVIRON)
3431
      if not result.payload:
3432
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
3433
                                 " you gave (%s). Please fix and re-run this"
3434
                                 " command." % new_node.secondary_ip)
3435

    
3436
    node_verify_list = [self.cfg.GetMasterNode()]
3437
    node_verify_param = {
3438
      constants.NV_NODELIST: [node],
3439
      # TODO: do a node-net-test as well?
3440
    }
3441

    
3442
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3443
                                       self.cfg.GetClusterName())
3444
    for verifier in node_verify_list:
3445
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
3446
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
3447
      if nl_payload:
3448
        for failed in nl_payload:
3449
          feedback_fn("ssh/hostname verification failed"
3450
                      " (checking from %s): %s" %
3451
                      (verifier, nl_payload[failed]))
3452
        raise errors.OpExecError("ssh/hostname verification failed.")
3453

    
3454
    if self.op.readd:
3455
      _RedistributeAncillaryFiles(self)
3456
      self.context.ReaddNode(new_node)
3457
      # make sure we redistribute the config
3458
      self.cfg.Update(new_node, feedback_fn)
3459
      # and make sure the new node will not have old files around
3460
      if not new_node.master_candidate:
3461
        result = self.rpc.call_node_demote_from_mc(new_node.name)
3462
        msg = result.fail_msg
3463
        if msg:
3464
          self.LogWarning("Node failed to demote itself from master"
3465
                          " candidate status: %s" % msg)
3466
    else:
3467
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
3468
      self.context.AddNode(new_node, self.proc.GetECId())
3469

    
3470

    
3471
class LUSetNodeParams(LogicalUnit):
3472
  """Modifies the parameters of a node.
3473

3474
  """
3475
  HPATH = "node-modify"
3476
  HTYPE = constants.HTYPE_NODE
3477
  _OP_REQP = ["node_name"]
3478
  REQ_BGL = False
3479

    
3480
  def CheckArguments(self):
3481
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3482
    _CheckBooleanOpField(self.op, 'master_candidate')
3483
    _CheckBooleanOpField(self.op, 'offline')
3484
    _CheckBooleanOpField(self.op, 'drained')
3485
    _CheckBooleanOpField(self.op, 'auto_promote')
3486
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
3487
    if all_mods.count(None) == 3:
3488
      raise errors.OpPrereqError("Please pass at least one modification",
3489
                                 errors.ECODE_INVAL)
3490
    if all_mods.count(True) > 1:
3491
      raise errors.OpPrereqError("Can't set the node into more than one"
3492
                                 " state at the same time",
3493
                                 errors.ECODE_INVAL)
3494

    
3495
    # Boolean value that tells us whether we're offlining or draining the node
3496
    self.offline_or_drain = (self.op.offline == True or
3497
                             self.op.drained == True)
3498
    self.deoffline_or_drain = (self.op.offline == False or
3499
                               self.op.drained == False)
3500
    self.might_demote = (self.op.master_candidate == False or
3501
                         self.offline_or_drain)
3502

    
3503
    self.lock_all = self.op.auto_promote and self.might_demote
3504

    
3505

    
3506
  def ExpandNames(self):
3507
    if self.lock_all:
3508
      self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET}
3509
    else:
3510
      self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3511

    
3512
  def BuildHooksEnv(self):
3513
    """Build hooks env.
3514

3515
    This runs on the master node.
3516

3517
    """
3518
    env = {
3519
      "OP_TARGET": self.op.node_name,
3520
      "MASTER_CANDIDATE": str(self.op.master_candidate),
3521
      "OFFLINE": str(self.op.offline),
3522
      "DRAINED": str(self.op.drained),
3523
      }
3524
    nl = [self.cfg.GetMasterNode(),
3525
          self.op.node_name]
3526
    return env, nl, nl
3527

    
3528
  def CheckPrereq(self):
3529
    """Check prerequisites.
3530

3531
    This only checks the instance list against the existing names.
3532

3533
    """
3534
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3535

    
3536
    if (self.op.master_candidate is not None or
3537
        self.op.drained is not None or
3538
        self.op.offline is not None):
3539
      # we can't change the master's node flags
3540
      if self.op.node_name == self.cfg.GetMasterNode():
3541
        raise errors.OpPrereqError("The master role can be changed"
3542
                                   " only via masterfailover",
3543
                                   errors.ECODE_INVAL)
3544

    
3545

    
3546
    if node.master_candidate and self.might_demote and not self.lock_all:
3547
      assert not self.op.auto_promote, "auto-promote set but lock_all not"
3548
      # check if after removing the current node, we're missing master
3549
      # candidates
3550
      (mc_remaining, mc_should, _) = \
3551
          self.cfg.GetMasterCandidateStats(exceptions=[node.name])
3552
      if mc_remaining < mc_should:
3553
        raise errors.OpPrereqError("Not enough master candidates, please"
3554
                                   " pass auto_promote to allow promotion",
3555
                                   errors.ECODE_INVAL)
3556

    
3557
    if (self.op.master_candidate == True and
3558
        ((node.offline and not self.op.offline == False) or
3559
         (node.drained and not self.op.drained == False))):
3560
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3561
                                 " to master_candidate" % node.name,
3562
                                 errors.ECODE_INVAL)
3563

    
3564
    # If we're being deofflined/drained, we'll MC ourself if needed
3565
    if (self.deoffline_or_drain and not self.offline_or_drain and not
3566
        self.op.master_candidate == True and not node.master_candidate):
3567
      self.op.master_candidate = _DecideSelfPromotion(self)
3568
      if self.op.master_candidate:
3569
        self.LogInfo("Autopromoting node to master candidate")
3570

    
3571
    return
3572

    
3573
  def Exec(self, feedback_fn):
3574
    """Modifies a node.
3575

3576
    """
3577
    node = self.node
3578

    
3579
    result = []
3580
    changed_mc = False
3581

    
3582
    if self.op.offline is not None:
3583
      node.offline = self.op.offline
3584
      result.append(("offline", str(self.op.offline)))
3585
      if self.op.offline == True:
3586
        if node.master_candidate:
3587
          node.master_candidate = False
3588
          changed_mc = True
3589
          result.append(("master_candidate", "auto-demotion due to offline"))
3590
        if node.drained:
3591
          node.drained = False
3592
          result.append(("drained", "clear drained status due to offline"))
3593

    
3594
    if self.op.master_candidate is not None:
3595
      node.master_candidate = self.op.master_candidate
3596
      changed_mc = True
3597
      result.append(("master_candidate", str(self.op.master_candidate)))
3598
      if self.op.master_candidate == False:
3599
        rrc = self.rpc.call_node_demote_from_mc(node.name)
3600
        msg = rrc.fail_msg
3601
        if msg:
3602
          self.LogWarning("Node failed to demote itself: %s" % msg)
3603

    
3604
    if self.op.drained is not None:
3605
      node.drained = self.op.drained
3606
      result.append(("drained", str(self.op.drained)))
3607
      if self.op.drained == True:
3608
        if node.master_candidate:
3609
          node.master_candidate = False
3610
          changed_mc = True
3611
          result.append(("master_candidate", "auto-demotion due to drain"))
3612
          rrc = self.rpc.call_node_demote_from_mc(node.name)
3613
          msg = rrc.fail_msg
3614
          if msg:
3615
            self.LogWarning("Node failed to demote itself: %s" % msg)
3616
        if node.offline:
3617
          node.offline = False
3618
          result.append(("offline", "clear offline status due to drain"))
3619

    
3620
    # we locked all nodes, we adjust the CP before updating this node
3621
    if self.lock_all:
3622
      _AdjustCandidatePool(self, [node.name])
3623

    
3624
    # this will trigger configuration file update, if needed
3625
    self.cfg.Update(node, feedback_fn)
3626

    
3627
    # this will trigger job queue propagation or cleanup
3628
    if changed_mc:
3629
      self.context.ReaddNode(node)
3630

    
3631
    return result
3632

    
3633

    
3634
class LUPowercycleNode(NoHooksLU):
3635
  """Powercycles a node.
3636

3637
  """
3638
  _OP_REQP = ["node_name", "force"]
3639
  REQ_BGL = False
3640

    
3641
  def CheckArguments(self):
3642
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3643
    if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
3644
      raise errors.OpPrereqError("The node is the master and the force"
3645
                                 " parameter was not set",
3646
                                 errors.ECODE_INVAL)
3647

    
3648
  def ExpandNames(self):
3649
    """Locking for PowercycleNode.
3650

3651
    This is a last-resort option and shouldn't block on other
3652
    jobs. Therefore, we grab no locks.
3653

3654
    """
3655
    self.needed_locks = {}
3656

    
3657
  def CheckPrereq(self):
3658
    """Check prerequisites.
3659

3660
    This LU has no prereqs.
3661

3662
    """
3663
    pass
3664

    
3665
  def Exec(self, feedback_fn):
3666
    """Reboots a node.
3667

3668
    """
3669
    result = self.rpc.call_node_powercycle(self.op.node_name,
3670
                                           self.cfg.GetHypervisorType())
3671
    result.Raise("Failed to schedule the reboot")
3672
    return result.payload
3673

    
3674

    
3675
class LUQueryClusterInfo(NoHooksLU):
3676
  """Query cluster configuration.
3677

3678
  """
3679
  _OP_REQP = []
3680
  REQ_BGL = False
3681

    
3682
  def ExpandNames(self):
3683
    self.needed_locks = {}
3684

    
3685
  def CheckPrereq(self):
3686
    """No prerequsites needed for this LU.
3687

3688
    """
3689
    pass
3690

    
3691
  def Exec(self, feedback_fn):
3692
    """Return cluster config.
3693

3694
    """
3695
    cluster = self.cfg.GetClusterInfo()
3696
    os_hvp = {}
3697

    
3698
    # Filter just for enabled hypervisors
3699
    for os_name, hv_dict in cluster.os_hvp.items():
3700
      os_hvp[os_name] = {}
3701
      for hv_name, hv_params in hv_dict.items():
3702
        if hv_name in cluster.enabled_hypervisors:
3703
          os_hvp[os_name][hv_name] = hv_params
3704

    
3705
    result = {
3706
      "software_version": constants.RELEASE_VERSION,
3707
      "protocol_version": constants.PROTOCOL_VERSION,
3708
      "config_version": constants.CONFIG_VERSION,
3709
      "os_api_version": max(constants.OS_API_VERSIONS),
3710
      "export_version": constants.EXPORT_VERSION,
3711
      "architecture": (platform.architecture()[0], platform.machine()),
3712
      "name": cluster.cluster_name,
3713
      "master": cluster.master_node,
3714
      "default_hypervisor": cluster.enabled_hypervisors[0],
3715
      "enabled_hypervisors": cluster.enabled_hypervisors,
3716
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3717
                        for hypervisor_name in cluster.enabled_hypervisors]),
3718
      "os_hvp": os_hvp,
3719
      "beparams": cluster.beparams,
3720
      "nicparams": cluster.nicparams,
3721
      "candidate_pool_size": cluster.candidate_pool_size,
3722
      "master_netdev": cluster.master_netdev,
3723
      "volume_group_name": cluster.volume_group_name,
3724
      "file_storage_dir": cluster.file_storage_dir,
3725
      "maintain_node_health": cluster.maintain_node_health,
3726
      "ctime": cluster.ctime,
3727
      "mtime": cluster.mtime,
3728
      "uuid": cluster.uuid,
3729
      "tags": list(cluster.GetTags()),
3730
      "uid_pool": cluster.uid_pool,
3731
      }
3732

    
3733
    return result
3734

    
3735

    
3736
class LUQueryConfigValues(NoHooksLU):
3737
  """Return configuration values.
3738

3739
  """
3740
  _OP_REQP = []
3741
  REQ_BGL = False
3742
  _FIELDS_DYNAMIC = utils.FieldSet()
3743
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3744
                                  "watcher_pause")
3745

    
3746
  def ExpandNames(self):
3747
    self.needed_locks = {}
3748

    
3749
    _CheckOutputFields(static=self._FIELDS_STATIC,
3750
                       dynamic=self._FIELDS_DYNAMIC,
3751
                       selected=self.op.output_fields)
3752

    
3753
  def CheckPrereq(self):
3754
    """No prerequisites.
3755

3756
    """
3757
    pass
3758

    
3759
  def Exec(self, feedback_fn):
3760
    """Dump a representation of the cluster config to the standard output.
3761

3762
    """
3763
    values = []
3764
    for field in self.op.output_fields:
3765
      if field == "cluster_name":
3766
        entry = self.cfg.GetClusterName()
3767
      elif field == "master_node":
3768
        entry = self.cfg.GetMasterNode()
3769
      elif field == "drain_flag":
3770
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3771
      elif field == "watcher_pause":
3772
        entry = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3773
      else:
3774
        raise errors.ParameterError(field)
3775
      values.append(entry)
3776
    return values
3777

    
3778

    
3779
class LUActivateInstanceDisks(NoHooksLU):
3780
  """Bring up an instance's disks.
3781

3782
  """
3783
  _OP_REQP = ["instance_name"]
3784
  REQ_BGL = False
3785

    
3786
  def ExpandNames(self):
3787
    self._ExpandAndLockInstance()
3788
    self.needed_locks[locking.LEVEL_NODE] = []
3789
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3790

    
3791
  def DeclareLocks(self, level):
3792
    if level == locking.LEVEL_NODE:
3793
      self._LockInstancesNodes()
3794

    
3795
  def CheckPrereq(self):
3796
    """Check prerequisites.
3797

3798
    This checks that the instance is in the cluster.
3799

3800
    """
3801
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3802
    assert self.instance is not None, \
3803
      "Cannot retrieve locked instance %s" % self.op.instance_name
3804
    _CheckNodeOnline(self, self.instance.primary_node)
3805
    if not hasattr(self.op, "ignore_size"):
3806
      self.op.ignore_size = False
3807

    
3808
  def Exec(self, feedback_fn):
3809
    """Activate the disks.
3810

3811
    """
3812
    disks_ok, disks_info = \
3813
              _AssembleInstanceDisks(self, self.instance,
3814
                                     ignore_size=self.op.ignore_size)
3815
    if not disks_ok:
3816
      raise errors.OpExecError("Cannot activate block devices")
3817

    
3818
    return disks_info
3819

    
3820

    
3821
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3822
                           ignore_size=False):
3823
  """Prepare the block devices for an instance.
3824

3825
  This sets up the block devices on all nodes.
3826

3827
  @type lu: L{LogicalUnit}
3828
  @param lu: the logical unit on whose behalf we execute
3829
  @type instance: L{objects.Instance}
3830
  @param instance: the instance for whose disks we assemble
3831
  @type ignore_secondaries: boolean
3832
  @param ignore_secondaries: if true, errors on secondary nodes
3833
      won't result in an error return from the function
3834
  @type ignore_size: boolean
3835
  @param ignore_size: if true, the current known size of the disk
3836
      will not be used during the disk activation, useful for cases
3837
      when the size is wrong
3838
  @return: False if the operation failed, otherwise a list of
3839
      (host, instance_visible_name, node_visible_name)
3840
      with the mapping from node devices to instance devices
3841

3842
  """
3843
  device_info = []
3844
  disks_ok = True
3845
  iname = instance.name
3846
  # With the two passes mechanism we try to reduce the window of
3847
  # opportunity for the race condition of switching DRBD to primary
3848
  # before handshaking occured, but we do not eliminate it
3849

    
3850
  # The proper fix would be to wait (with some limits) until the
3851
  # connection has been made and drbd transitions from WFConnection
3852
  # into any other network-connected state (Connected, SyncTarget,
3853
  # SyncSource, etc.)
3854

    
3855
  # 1st pass, assemble on all nodes in secondary mode
3856
  for inst_disk in instance.disks:
3857
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3858
      if ignore_size:
3859
        node_disk = node_disk.Copy()
3860
        node_disk.UnsetSize()
3861
      lu.cfg.SetDiskID(node_disk, node)
3862
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3863
      msg = result.fail_msg
3864
      if msg:
3865
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3866
                           " (is_primary=False, pass=1): %s",
3867
                           inst_disk.iv_name, node, msg)
3868
        if not ignore_secondaries:
3869
          disks_ok = False
3870

    
3871
  # FIXME: race condition on drbd migration to primary
3872

    
3873
  # 2nd pass, do only the primary node
3874
  for inst_disk in instance.disks:
3875
    dev_path = None
3876

    
3877
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3878
      if node != instance.primary_node:
3879
        continue
3880
      if ignore_size:
3881
        node_disk = node_disk.Copy()
3882
        node_disk.UnsetSize()
3883
      lu.cfg.SetDiskID(node_disk, node)
3884
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3885
      msg = result.fail_msg
3886
      if msg:
3887
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3888
                           " (is_primary=True, pass=2): %s",
3889
                           inst_disk.iv_name, node, msg)
3890
        disks_ok = False
3891
      else:
3892
        dev_path = result.payload
3893

    
3894
    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
3895

    
3896
  # leave the disks configured for the primary node
3897
  # this is a workaround that would be fixed better by
3898
  # improving the logical/physical id handling
3899
  for disk in instance.disks:
3900
    lu.cfg.SetDiskID(disk, instance.primary_node)
3901

    
3902
  return disks_ok, device_info
3903

    
3904

    
3905
def _StartInstanceDisks(lu, instance, force):
3906
  """Start the disks of an instance.
3907

3908
  """
3909
  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3910
                                           ignore_secondaries=force)
3911
  if not disks_ok:
3912
    _ShutdownInstanceDisks(lu, instance)
3913
    if force is not None and not force:
3914
      lu.proc.LogWarning("", hint="If the message above refers to a"
3915
                         " secondary node,"
3916
                         " you can retry the operation using '--force'.")
3917
    raise errors.OpExecError("Disk consistency error")
3918

    
3919

    
3920
class LUDeactivateInstanceDisks(NoHooksLU):
3921
  """Shutdown an instance's disks.
3922

3923
  """
3924
  _OP_REQP = ["instance_name"]
3925
  REQ_BGL = False
3926

    
3927
  def ExpandNames(self):
3928
    self._ExpandAndLockInstance()
3929
    self.needed_locks[locking.LEVEL_NODE] = []
3930
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3931

    
3932
  def DeclareLocks(self, level):
3933
    if level == locking.LEVEL_NODE:
3934
      self._LockInstancesNodes()
3935

    
3936
  def CheckPrereq(self):
3937
    """Check prerequisites.
3938

3939
    This checks that the instance is in the cluster.
3940

3941
    """
3942
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3943
    assert self.instance is not None, \
3944
      "Cannot retrieve locked instance %s" % self.op.instance_name
3945

    
3946
  def Exec(self, feedback_fn):
3947
    """Deactivate the disks
3948

3949
    """
3950
    instance = self.instance
3951
    _SafeShutdownInstanceDisks(self, instance)
3952

    
3953

    
3954
def _SafeShutdownInstanceDisks(lu, instance):
3955
  """Shutdown block devices of an instance.
3956

3957
  This function checks if an instance is running, before calling
3958
  _ShutdownInstanceDisks.
3959

3960
  """
3961
  _CheckInstanceDown(lu, instance, "cannot shutdown disks")
3962
  _ShutdownInstanceDisks(lu, instance)
3963

    
3964

    
3965
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3966
  """Shutdown block devices of an instance.
3967

3968
  This does the shutdown on all nodes of the instance.
3969

3970
  If the ignore_primary is false, errors on the primary node are
3971
  ignored.
3972

3973
  """
3974
  all_result = True
3975
  for disk in instance.disks:
3976
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3977
      lu.cfg.SetDiskID(top_disk, node)
3978
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3979
      msg = result.fail_msg
3980
      if msg:
3981
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3982
                      disk.iv_name, node, msg)
3983
        if not ignore_primary or node != instance.primary_node:
3984
          all_result = False
3985
  return all_result
3986

    
3987

    
3988
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3989
  """Checks if a node has enough free memory.
3990

3991
  This function check if a given node has the needed amount of free
3992
  memory. In case the node has less memory or we cannot get the
3993
  information from the node, this function raise an OpPrereqError
3994
  exception.
3995

3996
  @type lu: C{LogicalUnit}
3997
  @param lu: a logical unit from which we get configuration data
3998
  @type node: C{str}
3999
  @param node: the node to check
4000
  @type reason: C{str}
4001
  @param reason: string to use in the error message
4002
  @type requested: C{int}
4003
  @param requested: the amount of memory in MiB to check for
4004
  @type hypervisor_name: C{str}
4005
  @param hypervisor_name: the hypervisor to ask for memory stats
4006
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
4007
      we cannot check the node
4008

4009
  """
4010
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
4011
  nodeinfo[node].Raise("Can't get data from node %s" % node,
4012
                       prereq=True, ecode=errors.ECODE_ENVIRON)
4013
  free_mem = nodeinfo[node].payload.get('memory_free', None)
4014
  if not isinstance(free_mem,