Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ c772d142

History | View | Annotate | Download (253.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import time
29
import re
30
import platform
31
import logging
32
import copy
33

    
34
from ganeti import ssh
35
from ganeti import utils
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import serializer
42
from ganeti import ssconf
43

    
44

    
45
class LogicalUnit(object):
46
  """Logical Unit base class.
47

48
  Subclasses must follow these rules:
49
    - implement ExpandNames
50
    - implement CheckPrereq
51
    - implement Exec
52
    - implement BuildHooksEnv
53
    - redefine HPATH and HTYPE
54
    - optionally redefine their run requirements:
55
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56

57
  Note that all commands require root permissions.
58

59
  @ivar dry_run_result: the value (if any) that will be returned to the caller
60
      in dry-run mode (signalled by opcode dry_run parameter)
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_BGL = True
67

    
68
  def __init__(self, processor, op, context, rpc):
69
    """Constructor for LogicalUnit.
70

71
    This needs to be overridden in derived classes in order to check op
72
    validity.
73

74
    """
75
    self.proc = processor
76
    self.op = op
77
    self.cfg = context.cfg
78
    self.context = context
79
    self.rpc = rpc
80
    # Dicts used to declare locking needs to mcpu
81
    self.needed_locks = None
82
    self.acquired_locks = {}
83
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
84
    self.add_locks = {}
85
    self.remove_locks = {}
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89
    # logging
90
    self.LogWarning = processor.LogWarning
91
    self.LogInfo = processor.LogInfo
92
    # support for dry-run
93
    self.dry_run_result = None
94

    
95
    for attr_name in self._OP_REQP:
96
      attr_val = getattr(op, attr_name, None)
97
      if attr_val is None:
98
        raise errors.OpPrereqError("Required parameter '%s' missing" %
99
                                   attr_name)
100
    self.CheckArguments()
101

    
102
  def __GetSSH(self):
103
    """Returns the SshRunner object
104

105
    """
106
    if not self.__ssh:
107
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
108
    return self.__ssh
109

    
110
  ssh = property(fget=__GetSSH)
111

    
112
  def CheckArguments(self):
113
    """Check syntactic validity for the opcode arguments.
114

115
    This method is for doing a simple syntactic check and ensure
116
    validity of opcode parameters, without any cluster-related
117
    checks. While the same can be accomplished in ExpandNames and/or
118
    CheckPrereq, doing these separate is better because:
119

120
      - ExpandNames is left as as purely a lock-related function
121
      - CheckPrereq is run after we have acquired locks (and possible
122
        waited for them)
123

124
    The function is allowed to change the self.op attribute so that
125
    later methods can no longer worry about missing parameters.
126

127
    """
128
    pass
129

    
130
  def ExpandNames(self):
131
    """Expand names for this LU.
132

133
    This method is called before starting to execute the opcode, and it should
134
    update all the parameters of the opcode to their canonical form (e.g. a
135
    short node name must be fully expanded after this method has successfully
136
    completed). This way locking, hooks, logging, ecc. can work correctly.
137

138
    LUs which implement this method must also populate the self.needed_locks
139
    member, as a dict with lock levels as keys, and a list of needed lock names
140
    as values. Rules:
141

142
      - use an empty dict if you don't need any lock
143
      - if you don't need any lock at a particular level omit that level
144
      - don't put anything for the BGL level
145
      - if you want all locks at a level use locking.ALL_SET as a value
146

147
    If you need to share locks (rather than acquire them exclusively) at one
148
    level you can modify self.share_locks, setting a true value (usually 1) for
149
    that level. By default locks are not shared.
150

151
    Examples::
152

153
      # Acquire all nodes and one instance
154
      self.needed_locks = {
155
        locking.LEVEL_NODE: locking.ALL_SET,
156
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
157
      }
158
      # Acquire just two nodes
159
      self.needed_locks = {
160
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
161
      }
162
      # Acquire no locks
163
      self.needed_locks = {} # No, you can't leave it to the default value None
164

165
    """
166
    # The implementation of this method is mandatory only if the new LU is
167
    # concurrent, so that old LUs don't need to be changed all at the same
168
    # time.
169
    if self.REQ_BGL:
170
      self.needed_locks = {} # Exclusive LUs don't need locks.
171
    else:
172
      raise NotImplementedError
173

    
174
  def DeclareLocks(self, level):
175
    """Declare LU locking needs for a level
176

177
    While most LUs can just declare their locking needs at ExpandNames time,
178
    sometimes there's the need to calculate some locks after having acquired
179
    the ones before. This function is called just before acquiring locks at a
180
    particular level, but after acquiring the ones at lower levels, and permits
181
    such calculations. It can be used to modify self.needed_locks, and by
182
    default it does nothing.
183

184
    This function is only called if you have something already set in
185
    self.needed_locks for the level.
186

187
    @param level: Locking level which is going to be locked
188
    @type level: member of ganeti.locking.LEVELS
189

190
    """
191

    
192
  def CheckPrereq(self):
193
    """Check prerequisites for this LU.
194

195
    This method should check that the prerequisites for the execution
196
    of this LU are fulfilled. It can do internode communication, but
197
    it should be idempotent - no cluster or system changes are
198
    allowed.
199

200
    The method should raise errors.OpPrereqError in case something is
201
    not fulfilled. Its return value is ignored.
202

203
    This method should also update all the parameters of the opcode to
204
    their canonical form if it hasn't been done by ExpandNames before.
205

206
    """
207
    raise NotImplementedError
208

    
209
  def Exec(self, feedback_fn):
210
    """Execute the LU.
211

212
    This method should implement the actual work. It should raise
213
    errors.OpExecError for failures that are somewhat dealt with in
214
    code, or expected.
215

216
    """
217
    raise NotImplementedError
218

    
219
  def BuildHooksEnv(self):
220
    """Build hooks environment for this LU.
221

222
    This method should return a three-node tuple consisting of: a dict
223
    containing the environment that will be used for running the
224
    specific hook for this LU, a list of node names on which the hook
225
    should run before the execution, and a list of node names on which
226
    the hook should run after the execution.
227

228
    The keys of the dict must not have 'GANETI_' prefixed as this will
229
    be handled in the hooks runner. Also note additional keys will be
230
    added by the hooks runner. If the LU doesn't define any
231
    environment, an empty dict (and not None) should be returned.
232

233
    No nodes should be returned as an empty list (and not None).
234

235
    Note that if the HPATH for a LU class is None, this function will
236
    not be called.
237

238
    """
239
    raise NotImplementedError
240

    
241
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
242
    """Notify the LU about the results of its hooks.
243

244
    This method is called every time a hooks phase is executed, and notifies
245
    the Logical Unit about the hooks' result. The LU can then use it to alter
246
    its result based on the hooks.  By default the method does nothing and the
247
    previous result is passed back unchanged but any LU can define it if it
248
    wants to use the local cluster hook-scripts somehow.
249

250
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
251
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
252
    @param hook_results: the results of the multi-node hooks rpc call
253
    @param feedback_fn: function used send feedback back to the caller
254
    @param lu_result: the previous Exec result this LU had, or None
255
        in the PRE phase
256
    @return: the new Exec result, based on the previous result
257
        and hook results
258

259
    """
260
    return lu_result
261

    
262
  def _ExpandAndLockInstance(self):
263
    """Helper function to expand and lock an instance.
264

265
    Many LUs that work on an instance take its name in self.op.instance_name
266
    and need to expand it and then declare the expanded name for locking. This
267
    function does it, and then updates self.op.instance_name to the expanded
268
    name. It also initializes needed_locks as a dict, if this hasn't been done
269
    before.
270

271
    """
272
    if self.needed_locks is None:
273
      self.needed_locks = {}
274
    else:
275
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
276
        "_ExpandAndLockInstance called with instance-level locks set"
277
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
278
    if expanded_name is None:
279
      raise errors.OpPrereqError("Instance '%s' not known" %
280
                                  self.op.instance_name)
281
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
282
    self.op.instance_name = expanded_name
283

    
284
  def _LockInstancesNodes(self, primary_only=False):
285
    """Helper function to declare instances' nodes for locking.
286

287
    This function should be called after locking one or more instances to lock
288
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
289
    with all primary or secondary nodes for instances already locked and
290
    present in self.needed_locks[locking.LEVEL_INSTANCE].
291

292
    It should be called from DeclareLocks, and for safety only works if
293
    self.recalculate_locks[locking.LEVEL_NODE] is set.
294

295
    In the future it may grow parameters to just lock some instance's nodes, or
296
    to just lock primaries or secondary nodes, if needed.
297

298
    If should be called in DeclareLocks in a way similar to::
299

300
      if level == locking.LEVEL_NODE:
301
        self._LockInstancesNodes()
302

303
    @type primary_only: boolean
304
    @param primary_only: only lock primary nodes of locked instances
305

306
    """
307
    assert locking.LEVEL_NODE in self.recalculate_locks, \
308
      "_LockInstancesNodes helper function called with no nodes to recalculate"
309

    
310
    # TODO: check if we're really been called with the instance locks held
311

    
312
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
313
    # future we might want to have different behaviors depending on the value
314
    # of self.recalculate_locks[locking.LEVEL_NODE]
315
    wanted_nodes = []
316
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
317
      instance = self.context.cfg.GetInstanceInfo(instance_name)
318
      wanted_nodes.append(instance.primary_node)
319
      if not primary_only:
320
        wanted_nodes.extend(instance.secondary_nodes)
321

    
322
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
323
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
324
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
325
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
326

    
327
    del self.recalculate_locks[locking.LEVEL_NODE]
328

    
329

    
330
class NoHooksLU(LogicalUnit):
331
  """Simple LU which runs no hooks.
332

333
  This LU is intended as a parent for other LogicalUnits which will
334
  run no hooks, in order to reduce duplicate code.
335

336
  """
337
  HPATH = None
338
  HTYPE = None
339

    
340

    
341
def _GetWantedNodes(lu, nodes):
342
  """Returns list of checked and expanded node names.
343

344
  @type lu: L{LogicalUnit}
345
  @param lu: the logical unit on whose behalf we execute
346
  @type nodes: list
347
  @param nodes: list of node names or None for all nodes
348
  @rtype: list
349
  @return: the list of nodes, sorted
350
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
351

352
  """
353
  if not isinstance(nodes, list):
354
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
355

    
356
  if not nodes:
357
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
358
      " non-empty list of nodes whose name is to be expanded.")
359

    
360
  wanted = []
361
  for name in nodes:
362
    node = lu.cfg.ExpandNodeName(name)
363
    if node is None:
364
      raise errors.OpPrereqError("No such node name '%s'" % name)
365
    wanted.append(node)
366

    
367
  return utils.NiceSort(wanted)
368

    
369

    
370
def _GetWantedInstances(lu, instances):
371
  """Returns list of checked and expanded instance names.
372

373
  @type lu: L{LogicalUnit}
374
  @param lu: the logical unit on whose behalf we execute
375
  @type instances: list
376
  @param instances: list of instance names or None for all instances
377
  @rtype: list
378
  @return: the list of instances, sorted
379
  @raise errors.OpPrereqError: if the instances parameter is wrong type
380
  @raise errors.OpPrereqError: if any of the passed instances is not found
381

382
  """
383
  if not isinstance(instances, list):
384
    raise errors.OpPrereqError("Invalid argument type 'instances'")
385

    
386
  if instances:
387
    wanted = []
388

    
389
    for name in instances:
390
      instance = lu.cfg.ExpandInstanceName(name)
391
      if instance is None:
392
        raise errors.OpPrereqError("No such instance name '%s'" % name)
393
      wanted.append(instance)
394

    
395
  else:
396
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
397
  return wanted
398

    
399

    
400
def _CheckOutputFields(static, dynamic, selected):
401
  """Checks whether all selected fields are valid.
402

403
  @type static: L{utils.FieldSet}
404
  @param static: static fields set
405
  @type dynamic: L{utils.FieldSet}
406
  @param dynamic: dynamic fields set
407

408
  """
409
  f = utils.FieldSet()
410
  f.Extend(static)
411
  f.Extend(dynamic)
412

    
413
  delta = f.NonMatching(selected)
414
  if delta:
415
    raise errors.OpPrereqError("Unknown output fields selected: %s"
416
                               % ",".join(delta))
417

    
418

    
419
def _CheckBooleanOpField(op, name):
420
  """Validates boolean opcode parameters.
421

422
  This will ensure that an opcode parameter is either a boolean value,
423
  or None (but that it always exists).
424

425
  """
426
  val = getattr(op, name, None)
427
  if not (val is None or isinstance(val, bool)):
428
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
429
                               (name, str(val)))
430
  setattr(op, name, val)
431

    
432

    
433
def _CheckNodeOnline(lu, node):
434
  """Ensure that a given node is online.
435

436
  @param lu: the LU on behalf of which we make the check
437
  @param node: the node to check
438
  @raise errors.OpPrereqError: if the node is offline
439

440
  """
441
  if lu.cfg.GetNodeInfo(node).offline:
442
    raise errors.OpPrereqError("Can't use offline node %s" % node)
443

    
444

    
445
def _CheckNodeNotDrained(lu, node):
446
  """Ensure that a given node is not drained.
447

448
  @param lu: the LU on behalf of which we make the check
449
  @param node: the node to check
450
  @raise errors.OpPrereqError: if the node is drained
451

452
  """
453
  if lu.cfg.GetNodeInfo(node).drained:
454
    raise errors.OpPrereqError("Can't use drained node %s" % node)
455

    
456

    
457
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
458
                          memory, vcpus, nics, disk_template, disks,
459
                          bep, hvp, hypervisor_name):
460
  """Builds instance related env variables for hooks
461

462
  This builds the hook environment from individual variables.
463

464
  @type name: string
465
  @param name: the name of the instance
466
  @type primary_node: string
467
  @param primary_node: the name of the instance's primary node
468
  @type secondary_nodes: list
469
  @param secondary_nodes: list of secondary nodes as strings
470
  @type os_type: string
471
  @param os_type: the name of the instance's OS
472
  @type status: boolean
473
  @param status: the should_run status of the instance
474
  @type memory: string
475
  @param memory: the memory size of the instance
476
  @type vcpus: string
477
  @param vcpus: the count of VCPUs the instance has
478
  @type nics: list
479
  @param nics: list of tuples (ip, mac, mode, link) representing
480
      the NICs the instance has
481
  @type disk_template: string
482
  @param disk_template: the disk template of the instance
483
  @type disks: list
484
  @param disks: the list of (size, mode) pairs
485
  @type bep: dict
486
  @param bep: the backend parameters for the instance
487
  @type hvp: dict
488
  @param hvp: the hypervisor parameters for the instance
489
  @type hypervisor_name: string
490
  @param hypervisor_name: the hypervisor for the instance
491
  @rtype: dict
492
  @return: the hook environment for this instance
493

494
  """
495
  if status:
496
    str_status = "up"
497
  else:
498
    str_status = "down"
499
  env = {
500
    "OP_TARGET": name,
501
    "INSTANCE_NAME": name,
502
    "INSTANCE_PRIMARY": primary_node,
503
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
504
    "INSTANCE_OS_TYPE": os_type,
505
    "INSTANCE_STATUS": str_status,
506
    "INSTANCE_MEMORY": memory,
507
    "INSTANCE_VCPUS": vcpus,
508
    "INSTANCE_DISK_TEMPLATE": disk_template,
509
    "INSTANCE_HYPERVISOR": hypervisor_name,
510
  }
511

    
512
  if nics:
513
    nic_count = len(nics)
514
    for idx, (ip, mac, mode, link) in enumerate(nics):
515
      if ip is None:
516
        ip = ""
517
      env["INSTANCE_NIC%d_IP" % idx] = ip
518
      env["INSTANCE_NIC%d_MAC" % idx] = mac
519
      env["INSTANCE_NIC%d_MODE" % idx] = mode
520
      env["INSTANCE_NIC%d_LINK" % idx] = link
521
      if mode == constants.NIC_MODE_BRIDGED:
522
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
523
  else:
524
    nic_count = 0
525

    
526
  env["INSTANCE_NIC_COUNT"] = nic_count
527

    
528
  if disks:
529
    disk_count = len(disks)
530
    for idx, (size, mode) in enumerate(disks):
531
      env["INSTANCE_DISK%d_SIZE" % idx] = size
532
      env["INSTANCE_DISK%d_MODE" % idx] = mode
533
  else:
534
    disk_count = 0
535

    
536
  env["INSTANCE_DISK_COUNT"] = disk_count
537

    
538
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
539
    for key, value in source.items():
540
      env["INSTANCE_%s_%s" % (kind, key)] = value
541

    
542
  return env
543

    
544
def _NICListToTuple(lu, nics):
545
  """Build a list of nic information tuples.
546

547
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
548
  value in LUQueryInstanceData.
549

550
  @type lu:  L{LogicalUnit}
551
  @param lu: the logical unit on whose behalf we execute
552
  @type nics: list of L{objects.NIC}
553
  @param nics: list of nics to convert to hooks tuples
554

555
  """
556
  hooks_nics = []
557
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
558
  for nic in nics:
559
    ip = nic.ip
560
    mac = nic.mac
561
    filled_params = objects.FillDict(c_nicparams, nic.nicparams)
562
    mode = filled_params[constants.NIC_MODE]
563
    link = filled_params[constants.NIC_LINK]
564
    hooks_nics.append((ip, mac, mode, link))
565
  return hooks_nics
566

    
567
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
568
  """Builds instance related env variables for hooks from an object.
569

570
  @type lu: L{LogicalUnit}
571
  @param lu: the logical unit on whose behalf we execute
572
  @type instance: L{objects.Instance}
573
  @param instance: the instance for which we should build the
574
      environment
575
  @type override: dict
576
  @param override: dictionary with key/values that will override
577
      our values
578
  @rtype: dict
579
  @return: the hook environment dictionary
580

581
  """
582
  cluster = lu.cfg.GetClusterInfo()
583
  bep = cluster.FillBE(instance)
584
  hvp = cluster.FillHV(instance)
585
  args = {
586
    'name': instance.name,
587
    'primary_node': instance.primary_node,
588
    'secondary_nodes': instance.secondary_nodes,
589
    'os_type': instance.os,
590
    'status': instance.admin_up,
591
    'memory': bep[constants.BE_MEMORY],
592
    'vcpus': bep[constants.BE_VCPUS],
593
    'nics': _NICListToTuple(lu, instance.nics),
594
    'disk_template': instance.disk_template,
595
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
596
    'bep': bep,
597
    'hvp': hvp,
598
    'hypervisor': instance.hypervisor,
599
  }
600
  if override:
601
    args.update(override)
602
  return _BuildInstanceHookEnv(**args)
603

    
604

    
605
def _AdjustCandidatePool(lu):
606
  """Adjust the candidate pool after node operations.
607

608
  """
609
  mod_list = lu.cfg.MaintainCandidatePool()
610
  if mod_list:
611
    lu.LogInfo("Promoted nodes to master candidate role: %s",
612
               ", ".join(node.name for node in mod_list))
613
    for name in mod_list:
614
      lu.context.ReaddNode(name)
615
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
616
  if mc_now > mc_max:
617
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
618
               (mc_now, mc_max))
619

    
620

    
621
def _CheckNicsBridgesExist(lu, target_nics, target_node,
622
                               profile=constants.PP_DEFAULT):
623
  """Check that the brigdes needed by a list of nics exist.
624

625
  """
626
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
627
  paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
628
                for nic in target_nics]
629
  brlist = [params[constants.NIC_LINK] for params in paramslist
630
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
631
  if brlist:
632
    result = lu.rpc.call_bridges_exist(target_node, brlist)
633
    result.Raise("Error checking bridges on destination node '%s'" %
634
                 target_node, prereq=True)
635

    
636

    
637
def _CheckInstanceBridgesExist(lu, instance, node=None):
638
  """Check that the brigdes needed by an instance exist.
639

640
  """
641
  if node is None:
642
    node = instance.primary_node
643
  _CheckNicsBridgesExist(lu, instance.nics, node)
644

    
645

    
646
class LUDestroyCluster(NoHooksLU):
647
  """Logical unit for destroying the cluster.
648

649
  """
650
  _OP_REQP = []
651

    
652
  def CheckPrereq(self):
653
    """Check prerequisites.
654

655
    This checks whether the cluster is empty.
656

657
    Any errors are signaled by raising errors.OpPrereqError.
658

659
    """
660
    master = self.cfg.GetMasterNode()
661

    
662
    nodelist = self.cfg.GetNodeList()
663
    if len(nodelist) != 1 or nodelist[0] != master:
664
      raise errors.OpPrereqError("There are still %d node(s) in"
665
                                 " this cluster." % (len(nodelist) - 1))
666
    instancelist = self.cfg.GetInstanceList()
667
    if instancelist:
668
      raise errors.OpPrereqError("There are still %d instance(s) in"
669
                                 " this cluster." % len(instancelist))
670

    
671
  def Exec(self, feedback_fn):
672
    """Destroys the cluster.
673

674
    """
675
    master = self.cfg.GetMasterNode()
676
    result = self.rpc.call_node_stop_master(master, False)
677
    result.Raise("Could not disable the master role")
678
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
679
    utils.CreateBackup(priv_key)
680
    utils.CreateBackup(pub_key)
681
    return master
682

    
683

    
684
class LUVerifyCluster(LogicalUnit):
685
  """Verifies the cluster status.
686

687
  """
688
  HPATH = "cluster-verify"
689
  HTYPE = constants.HTYPE_CLUSTER
690
  _OP_REQP = ["skip_checks"]
691
  REQ_BGL = False
692

    
693
  def ExpandNames(self):
694
    self.needed_locks = {
695
      locking.LEVEL_NODE: locking.ALL_SET,
696
      locking.LEVEL_INSTANCE: locking.ALL_SET,
697
    }
698
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
699

    
700
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
701
                  node_result, feedback_fn, master_files,
702
                  drbd_map, vg_name):
703
    """Run multiple tests against a node.
704

705
    Test list:
706

707
      - compares ganeti version
708
      - checks vg existence and size > 20G
709
      - checks config file checksum
710
      - checks ssh to other nodes
711

712
    @type nodeinfo: L{objects.Node}
713
    @param nodeinfo: the node to check
714
    @param file_list: required list of files
715
    @param local_cksum: dictionary of local files and their checksums
716
    @param node_result: the results from the node
717
    @param feedback_fn: function used to accumulate results
718
    @param master_files: list of files that only masters should have
719
    @param drbd_map: the useddrbd minors for this node, in
720
        form of minor: (instance, must_exist) which correspond to instances
721
        and their running status
722
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
723

724
    """
725
    node = nodeinfo.name
726

    
727
    # main result, node_result should be a non-empty dict
728
    if not node_result or not isinstance(node_result, dict):
729
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
730
      return True
731

    
732
    # compares ganeti version
733
    local_version = constants.PROTOCOL_VERSION
734
    remote_version = node_result.get('version', None)
735
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
736
            len(remote_version) == 2):
737
      feedback_fn("  - ERROR: connection to %s failed" % (node))
738
      return True
739

    
740
    if local_version != remote_version[0]:
741
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
742
                  " node %s %s" % (local_version, node, remote_version[0]))
743
      return True
744

    
745
    # node seems compatible, we can actually try to look into its results
746

    
747
    bad = False
748

    
749
    # full package version
750
    if constants.RELEASE_VERSION != remote_version[1]:
751
      feedback_fn("  - WARNING: software version mismatch: master %s,"
752
                  " node %s %s" %
753
                  (constants.RELEASE_VERSION, node, remote_version[1]))
754

    
755
    # checks vg existence and size > 20G
756
    if vg_name is not None:
757
      vglist = node_result.get(constants.NV_VGLIST, None)
758
      if not vglist:
759
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
760
                        (node,))
761
        bad = True
762
      else:
763
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
764
                                              constants.MIN_VG_SIZE)
765
        if vgstatus:
766
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
767
          bad = True
768

    
769
    # checks config file checksum
770

    
771
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
772
    if not isinstance(remote_cksum, dict):
773
      bad = True
774
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
775
    else:
776
      for file_name in file_list:
777
        node_is_mc = nodeinfo.master_candidate
778
        must_have_file = file_name not in master_files
779
        if file_name not in remote_cksum:
780
          if node_is_mc or must_have_file:
781
            bad = True
782
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
783
        elif remote_cksum[file_name] != local_cksum[file_name]:
784
          if node_is_mc or must_have_file:
785
            bad = True
786
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
787
          else:
788
            # not candidate and this is not a must-have file
789
            bad = True
790
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
791
                        " candidates (and the file is outdated)" % file_name)
792
        else:
793
          # all good, except non-master/non-must have combination
794
          if not node_is_mc and not must_have_file:
795
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
796
                        " candidates" % file_name)
797

    
798
    # checks ssh to any
799

    
800
    if constants.NV_NODELIST not in node_result:
801
      bad = True
802
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
803
    else:
804
      if node_result[constants.NV_NODELIST]:
805
        bad = True
806
        for node in node_result[constants.NV_NODELIST]:
807
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
808
                          (node, node_result[constants.NV_NODELIST][node]))
809

    
810
    if constants.NV_NODENETTEST not in node_result:
811
      bad = True
812
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
813
    else:
814
      if node_result[constants.NV_NODENETTEST]:
815
        bad = True
816
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
817
        for node in nlist:
818
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
819
                          (node, node_result[constants.NV_NODENETTEST][node]))
820

    
821
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
822
    if isinstance(hyp_result, dict):
823
      for hv_name, hv_result in hyp_result.iteritems():
824
        if hv_result is not None:
825
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
826
                      (hv_name, hv_result))
827

    
828
    # check used drbd list
829
    if vg_name is not None:
830
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
831
      if not isinstance(used_minors, (tuple, list)):
832
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
833
                    str(used_minors))
834
      else:
835
        for minor, (iname, must_exist) in drbd_map.items():
836
          if minor not in used_minors and must_exist:
837
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
838
                        " not active" % (minor, iname))
839
            bad = True
840
        for minor in used_minors:
841
          if minor not in drbd_map:
842
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
843
                        minor)
844
            bad = True
845

    
846
    return bad
847

    
848
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
849
                      node_instance, feedback_fn, n_offline):
850
    """Verify an instance.
851

852
    This function checks to see if the required block devices are
853
    available on the instance's node.
854

855
    """
856
    bad = False
857

    
858
    node_current = instanceconfig.primary_node
859

    
860
    node_vol_should = {}
861
    instanceconfig.MapLVsByNode(node_vol_should)
862

    
863
    for node in node_vol_should:
864
      if node in n_offline:
865
        # ignore missing volumes on offline nodes
866
        continue
867
      for volume in node_vol_should[node]:
868
        if node not in node_vol_is or volume not in node_vol_is[node]:
869
          feedback_fn("  - ERROR: volume %s missing on node %s" %
870
                          (volume, node))
871
          bad = True
872

    
873
    if instanceconfig.admin_up:
874
      if ((node_current not in node_instance or
875
          not instance in node_instance[node_current]) and
876
          node_current not in n_offline):
877
        feedback_fn("  - ERROR: instance %s not running on node %s" %
878
                        (instance, node_current))
879
        bad = True
880

    
881
    for node in node_instance:
882
      if (not node == node_current):
883
        if instance in node_instance[node]:
884
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
885
                          (instance, node))
886
          bad = True
887

    
888
    return bad
889

    
890
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
891
    """Verify if there are any unknown volumes in the cluster.
892

893
    The .os, .swap and backup volumes are ignored. All other volumes are
894
    reported as unknown.
895

896
    """
897
    bad = False
898

    
899
    for node in node_vol_is:
900
      for volume in node_vol_is[node]:
901
        if node not in node_vol_should or volume not in node_vol_should[node]:
902
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
903
                      (volume, node))
904
          bad = True
905
    return bad
906

    
907
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
908
    """Verify the list of running instances.
909

910
    This checks what instances are running but unknown to the cluster.
911

912
    """
913
    bad = False
914
    for node in node_instance:
915
      for runninginstance in node_instance[node]:
916
        if runninginstance not in instancelist:
917
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
918
                          (runninginstance, node))
919
          bad = True
920
    return bad
921

    
922
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
923
    """Verify N+1 Memory Resilience.
924

925
    Check that if one single node dies we can still start all the instances it
926
    was primary for.
927

928
    """
929
    bad = False
930

    
931
    for node, nodeinfo in node_info.iteritems():
932
      # This code checks that every node which is now listed as secondary has
933
      # enough memory to host all instances it is supposed to should a single
934
      # other node in the cluster fail.
935
      # FIXME: not ready for failover to an arbitrary node
936
      # FIXME: does not support file-backed instances
937
      # WARNING: we currently take into account down instances as well as up
938
      # ones, considering that even if they're down someone might want to start
939
      # them even in the event of a node failure.
940
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
941
        needed_mem = 0
942
        for instance in instances:
943
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
944
          if bep[constants.BE_AUTO_BALANCE]:
945
            needed_mem += bep[constants.BE_MEMORY]
946
        if nodeinfo['mfree'] < needed_mem:
947
          feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
948
                      " failovers should node %s fail" % (node, prinode))
949
          bad = True
950
    return bad
951

    
952
  def CheckPrereq(self):
953
    """Check prerequisites.
954

955
    Transform the list of checks we're going to skip into a set and check that
956
    all its members are valid.
957

958
    """
959
    self.skip_set = frozenset(self.op.skip_checks)
960
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
961
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
962

    
963
  def BuildHooksEnv(self):
964
    """Build hooks env.
965

966
    Cluster-Verify hooks just ran in the post phase and their failure makes
967
    the output be logged in the verify output and the verification to fail.
968

969
    """
970
    all_nodes = self.cfg.GetNodeList()
971
    env = {
972
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
973
      }
974
    for node in self.cfg.GetAllNodesInfo().values():
975
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
976

    
977
    return env, [], all_nodes
978

    
979
  def Exec(self, feedback_fn):
980
    """Verify integrity of cluster, performing various test on nodes.
981

982
    """
983
    bad = False
984
    feedback_fn("* Verifying global settings")
985
    for msg in self.cfg.VerifyConfig():
986
      feedback_fn("  - ERROR: %s" % msg)
987

    
988
    vg_name = self.cfg.GetVGName()
989
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
990
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
991
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
992
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
993
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
994
                        for iname in instancelist)
995
    i_non_redundant = [] # Non redundant instances
996
    i_non_a_balanced = [] # Non auto-balanced instances
997
    n_offline = [] # List of offline nodes
998
    n_drained = [] # List of nodes being drained
999
    node_volume = {}
1000
    node_instance = {}
1001
    node_info = {}
1002
    instance_cfg = {}
1003

    
1004
    # FIXME: verify OS list
1005
    # do local checksums
1006
    master_files = [constants.CLUSTER_CONF_FILE]
1007

    
1008
    file_names = ssconf.SimpleStore().GetFileList()
1009
    file_names.append(constants.SSL_CERT_FILE)
1010
    file_names.append(constants.RAPI_CERT_FILE)
1011
    file_names.extend(master_files)
1012

    
1013
    local_checksums = utils.FingerprintFiles(file_names)
1014

    
1015
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1016
    node_verify_param = {
1017
      constants.NV_FILELIST: file_names,
1018
      constants.NV_NODELIST: [node.name for node in nodeinfo
1019
                              if not node.offline],
1020
      constants.NV_HYPERVISOR: hypervisors,
1021
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1022
                                  node.secondary_ip) for node in nodeinfo
1023
                                 if not node.offline],
1024
      constants.NV_INSTANCELIST: hypervisors,
1025
      constants.NV_VERSION: None,
1026
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1027
      }
1028
    if vg_name is not None:
1029
      node_verify_param[constants.NV_VGLIST] = None
1030
      node_verify_param[constants.NV_LVLIST] = vg_name
1031
      node_verify_param[constants.NV_DRBDLIST] = None
1032
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1033
                                           self.cfg.GetClusterName())
1034

    
1035
    cluster = self.cfg.GetClusterInfo()
1036
    master_node = self.cfg.GetMasterNode()
1037
    all_drbd_map = self.cfg.ComputeDRBDMap()
1038

    
1039
    for node_i in nodeinfo:
1040
      node = node_i.name
1041

    
1042
      if node_i.offline:
1043
        feedback_fn("* Skipping offline node %s" % (node,))
1044
        n_offline.append(node)
1045
        continue
1046

    
1047
      if node == master_node:
1048
        ntype = "master"
1049
      elif node_i.master_candidate:
1050
        ntype = "master candidate"
1051
      elif node_i.drained:
1052
        ntype = "drained"
1053
        n_drained.append(node)
1054
      else:
1055
        ntype = "regular"
1056
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1057

    
1058
      msg = all_nvinfo[node].fail_msg
1059
      if msg:
1060
        feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1061
        bad = True
1062
        continue
1063

    
1064
      nresult = all_nvinfo[node].payload
1065
      node_drbd = {}
1066
      for minor, instance in all_drbd_map[node].items():
1067
        if instance not in instanceinfo:
1068
          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1069
                      instance)
1070
          # ghost instance should not be running, but otherwise we
1071
          # don't give double warnings (both ghost instance and
1072
          # unallocated minor in use)
1073
          node_drbd[minor] = (instance, False)
1074
        else:
1075
          instance = instanceinfo[instance]
1076
          node_drbd[minor] = (instance.name, instance.admin_up)
1077
      result = self._VerifyNode(node_i, file_names, local_checksums,
1078
                                nresult, feedback_fn, master_files,
1079
                                node_drbd, vg_name)
1080
      bad = bad or result
1081

    
1082
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1083
      if vg_name is None:
1084
        node_volume[node] = {}
1085
      elif isinstance(lvdata, basestring):
1086
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1087
                    (node, utils.SafeEncode(lvdata)))
1088
        bad = True
1089
        node_volume[node] = {}
1090
      elif not isinstance(lvdata, dict):
1091
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1092
        bad = True
1093
        continue
1094
      else:
1095
        node_volume[node] = lvdata
1096

    
1097
      # node_instance
1098
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1099
      if not isinstance(idata, list):
1100
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1101
                    (node,))
1102
        bad = True
1103
        continue
1104

    
1105
      node_instance[node] = idata
1106

    
1107
      # node_info
1108
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1109
      if not isinstance(nodeinfo, dict):
1110
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1111
        bad = True
1112
        continue
1113

    
1114
      try:
1115
        node_info[node] = {
1116
          "mfree": int(nodeinfo['memory_free']),
1117
          "pinst": [],
1118
          "sinst": [],
1119
          # dictionary holding all instances this node is secondary for,
1120
          # grouped by their primary node. Each key is a cluster node, and each
1121
          # value is a list of instances which have the key as primary and the
1122
          # current node as secondary.  this is handy to calculate N+1 memory
1123
          # availability if you can only failover from a primary to its
1124
          # secondary.
1125
          "sinst-by-pnode": {},
1126
        }
1127
        # FIXME: devise a free space model for file based instances as well
1128
        if vg_name is not None:
1129
          if (constants.NV_VGLIST not in nresult or
1130
              vg_name not in nresult[constants.NV_VGLIST]):
1131
            feedback_fn("  - ERROR: node %s didn't return data for the"
1132
                        " volume group '%s' - it is either missing or broken" %
1133
                        (node, vg_name))
1134
            bad = True
1135
            continue
1136
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1137
      except (ValueError, KeyError):
1138
        feedback_fn("  - ERROR: invalid nodeinfo value returned"
1139
                    " from node %s" % (node,))
1140
        bad = True
1141
        continue
1142

    
1143
    node_vol_should = {}
1144

    
1145
    for instance in instancelist:
1146
      feedback_fn("* Verifying instance %s" % instance)
1147
      inst_config = instanceinfo[instance]
1148
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1149
                                     node_instance, feedback_fn, n_offline)
1150
      bad = bad or result
1151
      inst_nodes_offline = []
1152

    
1153
      inst_config.MapLVsByNode(node_vol_should)
1154

    
1155
      instance_cfg[instance] = inst_config
1156

    
1157
      pnode = inst_config.primary_node
1158
      if pnode in node_info:
1159
        node_info[pnode]['pinst'].append(instance)
1160
      elif pnode not in n_offline:
1161
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1162
                    " %s failed" % (instance, pnode))
1163
        bad = True
1164

    
1165
      if pnode in n_offline:
1166
        inst_nodes_offline.append(pnode)
1167

    
1168
      # If the instance is non-redundant we cannot survive losing its primary
1169
      # node, so we are not N+1 compliant. On the other hand we have no disk
1170
      # templates with more than one secondary so that situation is not well
1171
      # supported either.
1172
      # FIXME: does not support file-backed instances
1173
      if len(inst_config.secondary_nodes) == 0:
1174
        i_non_redundant.append(instance)
1175
      elif len(inst_config.secondary_nodes) > 1:
1176
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1177
                    % instance)
1178

    
1179
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1180
        i_non_a_balanced.append(instance)
1181

    
1182
      for snode in inst_config.secondary_nodes:
1183
        if snode in node_info:
1184
          node_info[snode]['sinst'].append(instance)
1185
          if pnode not in node_info[snode]['sinst-by-pnode']:
1186
            node_info[snode]['sinst-by-pnode'][pnode] = []
1187
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1188
        elif snode not in n_offline:
1189
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1190
                      " %s failed" % (instance, snode))
1191
          bad = True
1192
        if snode in n_offline:
1193
          inst_nodes_offline.append(snode)
1194

    
1195
      if inst_nodes_offline:
1196
        # warn that the instance lives on offline nodes, and set bad=True
1197
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1198
                    ", ".join(inst_nodes_offline))
1199
        bad = True
1200

    
1201
    feedback_fn("* Verifying orphan volumes")
1202
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1203
                                       feedback_fn)
1204
    bad = bad or result
1205

    
1206
    feedback_fn("* Verifying remaining instances")
1207
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1208
                                         feedback_fn)
1209
    bad = bad or result
1210

    
1211
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1212
      feedback_fn("* Verifying N+1 Memory redundancy")
1213
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1214
      bad = bad or result
1215

    
1216
    feedback_fn("* Other Notes")
1217
    if i_non_redundant:
1218
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1219
                  % len(i_non_redundant))
1220

    
1221
    if i_non_a_balanced:
1222
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1223
                  % len(i_non_a_balanced))
1224

    
1225
    if n_offline:
1226
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1227

    
1228
    if n_drained:
1229
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1230

    
1231
    return not bad
1232

    
1233
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1234
    """Analyze the post-hooks' result
1235

1236
    This method analyses the hook result, handles it, and sends some
1237
    nicely-formatted feedback back to the user.
1238

1239
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1240
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1241
    @param hooks_results: the results of the multi-node hooks rpc call
1242
    @param feedback_fn: function used send feedback back to the caller
1243
    @param lu_result: previous Exec result
1244
    @return: the new Exec result, based on the previous result
1245
        and hook results
1246

1247
    """
1248
    # We only really run POST phase hooks, and are only interested in
1249
    # their results
1250
    if phase == constants.HOOKS_PHASE_POST:
1251
      # Used to change hooks' output to proper indentation
1252
      indent_re = re.compile('^', re.M)
1253
      feedback_fn("* Hooks Results")
1254
      if not hooks_results:
1255
        feedback_fn("  - ERROR: general communication failure")
1256
        lu_result = 1
1257
      else:
1258
        for node_name in hooks_results:
1259
          show_node_header = True
1260
          res = hooks_results[node_name]
1261
          msg = res.fail_msg
1262
          if msg:
1263
            if res.offline:
1264
              # no need to warn or set fail return value
1265
              continue
1266
            feedback_fn("    Communication failure in hooks execution: %s" %
1267
                        msg)
1268
            lu_result = 1
1269
            continue
1270
          for script, hkr, output in res.payload:
1271
            if hkr == constants.HKR_FAIL:
1272
              # The node header is only shown once, if there are
1273
              # failing hooks on that node
1274
              if show_node_header:
1275
                feedback_fn("  Node %s:" % node_name)
1276
                show_node_header = False
1277
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1278
              output = indent_re.sub('      ', output)
1279
              feedback_fn("%s" % output)
1280
              lu_result = 1
1281

    
1282
      return lu_result
1283

    
1284

    
1285
class LUVerifyDisks(NoHooksLU):
1286
  """Verifies the cluster disks status.
1287

1288
  """
1289
  _OP_REQP = []
1290
  REQ_BGL = False
1291

    
1292
  def ExpandNames(self):
1293
    self.needed_locks = {
1294
      locking.LEVEL_NODE: locking.ALL_SET,
1295
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1296
    }
1297
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1298

    
1299
  def CheckPrereq(self):
1300
    """Check prerequisites.
1301

1302
    This has no prerequisites.
1303

1304
    """
1305
    pass
1306

    
1307
  def Exec(self, feedback_fn):
1308
    """Verify integrity of cluster disks.
1309

1310
    @rtype: tuple of three items
1311
    @return: a tuple of (dict of node-to-node_error, list of instances
1312
        which need activate-disks, dict of instance: (node, volume) for
1313
        missing volumes
1314

1315
    """
1316
    result = res_nodes, res_instances, res_missing = {}, [], {}
1317

    
1318
    vg_name = self.cfg.GetVGName()
1319
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1320
    instances = [self.cfg.GetInstanceInfo(name)
1321
                 for name in self.cfg.GetInstanceList()]
1322

    
1323
    nv_dict = {}
1324
    for inst in instances:
1325
      inst_lvs = {}
1326
      if (not inst.admin_up or
1327
          inst.disk_template not in constants.DTS_NET_MIRROR):
1328
        continue
1329
      inst.MapLVsByNode(inst_lvs)
1330
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1331
      for node, vol_list in inst_lvs.iteritems():
1332
        for vol in vol_list:
1333
          nv_dict[(node, vol)] = inst
1334

    
1335
    if not nv_dict:
1336
      return result
1337

    
1338
    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1339

    
1340
    for node in nodes:
1341
      # node_volume
1342
      node_res = node_lvs[node]
1343
      if node_res.offline:
1344
        continue
1345
      msg = node_res.fail_msg
1346
      if msg:
1347
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1348
        res_nodes[node] = msg
1349
        continue
1350

    
1351
      lvs = node_res.payload
1352
      for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1353
        inst = nv_dict.pop((node, lv_name), None)
1354
        if (not lv_online and inst is not None
1355
            and inst.name not in res_instances):
1356
          res_instances.append(inst.name)
1357

    
1358
    # any leftover items in nv_dict are missing LVs, let's arrange the
1359
    # data better
1360
    for key, inst in nv_dict.iteritems():
1361
      if inst.name not in res_missing:
1362
        res_missing[inst.name] = []
1363
      res_missing[inst.name].append(key)
1364

    
1365
    return result
1366

    
1367

    
1368
class LURenameCluster(LogicalUnit):
1369
  """Rename the cluster.
1370

1371
  """
1372
  HPATH = "cluster-rename"
1373
  HTYPE = constants.HTYPE_CLUSTER
1374
  _OP_REQP = ["name"]
1375

    
1376
  def BuildHooksEnv(self):
1377
    """Build hooks env.
1378

1379
    """
1380
    env = {
1381
      "OP_TARGET": self.cfg.GetClusterName(),
1382
      "NEW_NAME": self.op.name,
1383
      }
1384
    mn = self.cfg.GetMasterNode()
1385
    return env, [mn], [mn]
1386

    
1387
  def CheckPrereq(self):
1388
    """Verify that the passed name is a valid one.
1389

1390
    """
1391
    hostname = utils.HostInfo(self.op.name)
1392

    
1393
    new_name = hostname.name
1394
    self.ip = new_ip = hostname.ip
1395
    old_name = self.cfg.GetClusterName()
1396
    old_ip = self.cfg.GetMasterIP()
1397
    if new_name == old_name and new_ip == old_ip:
1398
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1399
                                 " cluster has changed")
1400
    if new_ip != old_ip:
1401
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1402
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1403
                                   " reachable on the network. Aborting." %
1404
                                   new_ip)
1405

    
1406
    self.op.name = new_name
1407

    
1408
  def Exec(self, feedback_fn):
1409
    """Rename the cluster.
1410

1411
    """
1412
    clustername = self.op.name
1413
    ip = self.ip
1414

    
1415
    # shutdown the master IP
1416
    master = self.cfg.GetMasterNode()
1417
    result = self.rpc.call_node_stop_master(master, False)
1418
    result.Raise("Could not disable the master role")
1419

    
1420
    try:
1421
      cluster = self.cfg.GetClusterInfo()
1422
      cluster.cluster_name = clustername
1423
      cluster.master_ip = ip
1424
      self.cfg.Update(cluster)
1425

    
1426
      # update the known hosts file
1427
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1428
      node_list = self.cfg.GetNodeList()
1429
      try:
1430
        node_list.remove(master)
1431
      except ValueError:
1432
        pass
1433
      result = self.rpc.call_upload_file(node_list,
1434
                                         constants.SSH_KNOWN_HOSTS_FILE)
1435
      for to_node, to_result in result.iteritems():
1436
        msg = to_result.fail_msg
1437
        if msg:
1438
          msg = ("Copy of file %s to node %s failed: %s" %
1439
                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1440
          self.proc.LogWarning(msg)
1441

    
1442
    finally:
1443
      result = self.rpc.call_node_start_master(master, False, False)
1444
      msg = result.fail_msg
1445
      if msg:
1446
        self.LogWarning("Could not re-enable the master role on"
1447
                        " the master, please restart manually: %s", msg)
1448

    
1449

    
1450
def _RecursiveCheckIfLVMBased(disk):
1451
  """Check if the given disk or its children are lvm-based.
1452

1453
  @type disk: L{objects.Disk}
1454
  @param disk: the disk to check
1455
  @rtype: boolean
1456
  @return: boolean indicating whether a LD_LV dev_type was found or not
1457

1458
  """
1459
  if disk.children:
1460
    for chdisk in disk.children:
1461
      if _RecursiveCheckIfLVMBased(chdisk):
1462
        return True
1463
  return disk.dev_type == constants.LD_LV
1464

    
1465

    
1466
class LUSetClusterParams(LogicalUnit):
1467
  """Change the parameters of the cluster.
1468

1469
  """
1470
  HPATH = "cluster-modify"
1471
  HTYPE = constants.HTYPE_CLUSTER
1472
  _OP_REQP = []
1473
  REQ_BGL = False
1474

    
1475
  def CheckArguments(self):
1476
    """Check parameters
1477

1478
    """
1479
    if not hasattr(self.op, "candidate_pool_size"):
1480
      self.op.candidate_pool_size = None
1481
    if self.op.candidate_pool_size is not None:
1482
      try:
1483
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1484
      except (ValueError, TypeError), err:
1485
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1486
                                   str(err))
1487
      if self.op.candidate_pool_size < 1:
1488
        raise errors.OpPrereqError("At least one master candidate needed")
1489

    
1490
  def ExpandNames(self):
1491
    # FIXME: in the future maybe other cluster params won't require checking on
1492
    # all nodes to be modified.
1493
    self.needed_locks = {
1494
      locking.LEVEL_NODE: locking.ALL_SET,
1495
    }
1496
    self.share_locks[locking.LEVEL_NODE] = 1
1497

    
1498
  def BuildHooksEnv(self):
1499
    """Build hooks env.
1500

1501
    """
1502
    env = {
1503
      "OP_TARGET": self.cfg.GetClusterName(),
1504
      "NEW_VG_NAME": self.op.vg_name,
1505
      }
1506
    mn = self.cfg.GetMasterNode()
1507
    return env, [mn], [mn]
1508

    
1509
  def CheckPrereq(self):
1510
    """Check prerequisites.
1511

1512
    This checks whether the given params don't conflict and
1513
    if the given volume group is valid.
1514

1515
    """
1516
    if self.op.vg_name is not None and not self.op.vg_name:
1517
      instances = self.cfg.GetAllInstancesInfo().values()
1518
      for inst in instances:
1519
        for disk in inst.disks:
1520
          if _RecursiveCheckIfLVMBased(disk):
1521
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1522
                                       " lvm-based instances exist")
1523

    
1524
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1525

    
1526
    # if vg_name not None, checks given volume group on all nodes
1527
    if self.op.vg_name:
1528
      vglist = self.rpc.call_vg_list(node_list)
1529
      for node in node_list:
1530
        msg = vglist[node].fail_msg
1531
        if msg:
1532
          # ignoring down node
1533
          self.LogWarning("Error while gathering data on node %s"
1534
                          " (ignoring node): %s", node, msg)
1535
          continue
1536
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1537
                                              self.op.vg_name,
1538
                                              constants.MIN_VG_SIZE)
1539
        if vgstatus:
1540
          raise errors.OpPrereqError("Error on node '%s': %s" %
1541
                                     (node, vgstatus))
1542

    
1543
    self.cluster = cluster = self.cfg.GetClusterInfo()
1544
    # validate params changes
1545
    if self.op.beparams:
1546
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1547
      self.new_beparams = objects.FillDict(
1548
        cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1549

    
1550
    if self.op.nicparams:
1551
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1552
      self.new_nicparams = objects.FillDict(
1553
        cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1554
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1555

    
1556
    # hypervisor list/parameters
1557
    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1558
    if self.op.hvparams:
1559
      if not isinstance(self.op.hvparams, dict):
1560
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1561
      for hv_name, hv_dict in self.op.hvparams.items():
1562
        if hv_name not in self.new_hvparams:
1563
          self.new_hvparams[hv_name] = hv_dict
1564
        else:
1565
          self.new_hvparams[hv_name].update(hv_dict)
1566

    
1567
    if self.op.enabled_hypervisors is not None:
1568
      self.hv_list = self.op.enabled_hypervisors
1569
    else:
1570
      self.hv_list = cluster.enabled_hypervisors
1571

    
1572
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1573
      # either the enabled list has changed, or the parameters have, validate
1574
      for hv_name, hv_params in self.new_hvparams.items():
1575
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1576
            (self.op.enabled_hypervisors and
1577
             hv_name in self.op.enabled_hypervisors)):
1578
          # either this is a new hypervisor, or its parameters have changed
1579
          hv_class = hypervisor.GetHypervisor(hv_name)
1580
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1581
          hv_class.CheckParameterSyntax(hv_params)
1582
          _CheckHVParams(self, node_list, hv_name, hv_params)
1583

    
1584
  def Exec(self, feedback_fn):
1585
    """Change the parameters of the cluster.
1586

1587
    """
1588
    if self.op.vg_name is not None:
1589
      new_volume = self.op.vg_name
1590
      if not new_volume:
1591
        new_volume = None
1592
      if new_volume != self.cfg.GetVGName():
1593
        self.cfg.SetVGName(new_volume)
1594
      else:
1595
        feedback_fn("Cluster LVM configuration already in desired"
1596
                    " state, not changing")
1597
    if self.op.hvparams:
1598
      self.cluster.hvparams = self.new_hvparams
1599
    if self.op.enabled_hypervisors is not None:
1600
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1601
    if self.op.beparams:
1602
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1603
    if self.op.nicparams:
1604
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1605

    
1606
    if self.op.candidate_pool_size is not None:
1607
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1608
      # we need to update the pool size here, otherwise the save will fail
1609
      _AdjustCandidatePool(self)
1610

    
1611
    self.cfg.Update(self.cluster)
1612

    
1613

    
1614
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1615
  """Distribute additional files which are part of the cluster configuration.
1616

1617
  ConfigWriter takes care of distributing the config and ssconf files, but
1618
  there are more files which should be distributed to all nodes. This function
1619
  makes sure those are copied.
1620

1621
  @param lu: calling logical unit
1622
  @param additional_nodes: list of nodes not in the config to distribute to
1623

1624
  """
1625
  # 1. Gather target nodes
1626
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1627
  dist_nodes = lu.cfg.GetNodeList()
1628
  if additional_nodes is not None:
1629
    dist_nodes.extend(additional_nodes)
1630
  if myself.name in dist_nodes:
1631
    dist_nodes.remove(myself.name)
1632
  # 2. Gather files to distribute
1633
  dist_files = set([constants.ETC_HOSTS,
1634
                    constants.SSH_KNOWN_HOSTS_FILE,
1635
                    constants.RAPI_CERT_FILE,
1636
                    constants.RAPI_USERS_FILE,
1637
                   ])
1638

    
1639
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1640
  for hv_name in enabled_hypervisors:
1641
    hv_class = hypervisor.GetHypervisor(hv_name)
1642
    dist_files.update(hv_class.GetAncillaryFiles())
1643

    
1644
  # 3. Perform the files upload
1645
  for fname in dist_files:
1646
    if os.path.exists(fname):
1647
      result = lu.rpc.call_upload_file(dist_nodes, fname)
1648
      for to_node, to_result in result.items():
1649
        msg = to_result.fail_msg
1650
        if msg:
1651
          msg = ("Copy of file %s to node %s failed: %s" %
1652
                 (fname, to_node, msg))
1653
          lu.proc.LogWarning(msg)
1654

    
1655

    
1656
class LURedistributeConfig(NoHooksLU):
1657
  """Force the redistribution of cluster configuration.
1658

1659
  This is a very simple LU.
1660

1661
  """
1662
  _OP_REQP = []
1663
  REQ_BGL = False
1664

    
1665
  def ExpandNames(self):
1666
    self.needed_locks = {
1667
      locking.LEVEL_NODE: locking.ALL_SET,
1668
    }
1669
    self.share_locks[locking.LEVEL_NODE] = 1
1670

    
1671
  def CheckPrereq(self):
1672
    """Check prerequisites.
1673

1674
    """
1675

    
1676
  def Exec(self, feedback_fn):
1677
    """Redistribute the configuration.
1678

1679
    """
1680
    self.cfg.Update(self.cfg.GetClusterInfo())
1681
    _RedistributeAncillaryFiles(self)
1682

    
1683

    
1684
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1685
  """Sleep and poll for an instance's disk to sync.
1686

1687
  """
1688
  if not instance.disks:
1689
    return True
1690

    
1691
  if not oneshot:
1692
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1693

    
1694
  node = instance.primary_node
1695

    
1696
  for dev in instance.disks:
1697
    lu.cfg.SetDiskID(dev, node)
1698

    
1699
  retries = 0
1700
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1701
  while True:
1702
    max_time = 0
1703
    done = True
1704
    cumul_degraded = False
1705
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1706
    msg = rstats.fail_msg
1707
    if msg:
1708
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1709
      retries += 1
1710
      if retries >= 10:
1711
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1712
                                 " aborting." % node)
1713
      time.sleep(6)
1714
      continue
1715
    rstats = rstats.payload
1716
    retries = 0
1717
    for i, mstat in enumerate(rstats):
1718
      if mstat is None:
1719
        lu.LogWarning("Can't compute data for node %s/%s",
1720
                           node, instance.disks[i].iv_name)
1721
        continue
1722
      # we ignore the ldisk parameter
1723
      perc_done, est_time, is_degraded, _ = mstat
1724
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1725
      if perc_done is not None:
1726
        done = False
1727
        if est_time is not None:
1728
          rem_time = "%d estimated seconds remaining" % est_time
1729
          max_time = est_time
1730
        else:
1731
          rem_time = "no time estimate"
1732
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1733
                        (instance.disks[i].iv_name, perc_done, rem_time))
1734

    
1735
    # if we're done but degraded, let's do a few small retries, to
1736
    # make sure we see a stable and not transient situation; therefore
1737
    # we force restart of the loop
1738
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1739
      logging.info("Degraded disks found, %d retries left", degr_retries)
1740
      degr_retries -= 1
1741
      time.sleep(1)
1742
      continue
1743

    
1744
    if done or oneshot:
1745
      break
1746

    
1747
    time.sleep(min(60, max_time))
1748

    
1749
  if done:
1750
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1751
  return not cumul_degraded
1752

    
1753

    
1754
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1755
  """Check that mirrors are not degraded.
1756

1757
  The ldisk parameter, if True, will change the test from the
1758
  is_degraded attribute (which represents overall non-ok status for
1759
  the device(s)) to the ldisk (representing the local storage status).
1760

1761
  """
1762
  lu.cfg.SetDiskID(dev, node)
1763
  if ldisk:
1764
    idx = 6
1765
  else:
1766
    idx = 5
1767

    
1768
  result = True
1769
  if on_primary or dev.AssembleOnSecondary():
1770
    rstats = lu.rpc.call_blockdev_find(node, dev)
1771
    msg = rstats.fail_msg
1772
    if msg:
1773
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1774
      result = False
1775
    elif not rstats.payload:
1776
      lu.LogWarning("Can't find disk on node %s", node)
1777
      result = False
1778
    else:
1779
      result = result and (not rstats.payload[idx])
1780
  if dev.children:
1781
    for child in dev.children:
1782
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1783

    
1784
  return result
1785

    
1786

    
1787
class LUDiagnoseOS(NoHooksLU):
1788
  """Logical unit for OS diagnose/query.
1789

1790
  """
1791
  _OP_REQP = ["output_fields", "names"]
1792
  REQ_BGL = False
1793
  _FIELDS_STATIC = utils.FieldSet()
1794
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1795

    
1796
  def ExpandNames(self):
1797
    if self.op.names:
1798
      raise errors.OpPrereqError("Selective OS query not supported")
1799

    
1800
    _CheckOutputFields(static=self._FIELDS_STATIC,
1801
                       dynamic=self._FIELDS_DYNAMIC,
1802
                       selected=self.op.output_fields)
1803

    
1804
    # Lock all nodes, in shared mode
1805
    # Temporary removal of locks, should be reverted later
1806
    # TODO: reintroduce locks when they are lighter-weight
1807
    self.needed_locks = {}
1808
    #self.share_locks[locking.LEVEL_NODE] = 1
1809
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1810

    
1811
  def CheckPrereq(self):
1812
    """Check prerequisites.
1813

1814
    """
1815

    
1816
  @staticmethod
1817
  def _DiagnoseByOS(node_list, rlist):
1818
    """Remaps a per-node return list into an a per-os per-node dictionary
1819

1820
    @param node_list: a list with the names of all nodes
1821
    @param rlist: a map with node names as keys and OS objects as values
1822

1823
    @rtype: dict
1824
    @return: a dictionary with osnames as keys and as value another map, with
1825
        nodes as keys and tuples of (path, status, diagnose) as values, eg::
1826

1827
          {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1828
                                     (/srv/..., False, "invalid api")],
1829
                           "node2": [(/srv/..., True, "")]}
1830
          }
1831

1832
    """
1833
    all_os = {}
1834
    # we build here the list of nodes that didn't fail the RPC (at RPC
1835
    # level), so that nodes with a non-responding node daemon don't
1836
    # make all OSes invalid
1837
    good_nodes = [node_name for node_name in rlist
1838
                  if not rlist[node_name].fail_msg]
1839
    for node_name, nr in rlist.items():
1840
      if nr.fail_msg or not nr.payload:
1841
        continue
1842
      for name, path, status, diagnose in nr.payload:
1843
        if name not in all_os:
1844
          # build a list of nodes for this os containing empty lists
1845
          # for each node in node_list
1846
          all_os[name] = {}
1847
          for nname in good_nodes:
1848
            all_os[name][nname] = []
1849
        all_os[name][node_name].append((path, status, diagnose))
1850
    return all_os
1851

    
1852
  def Exec(self, feedback_fn):
1853
    """Compute the list of OSes.
1854

1855
    """
1856
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1857
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1858
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1859
    output = []
1860
    for os_name, os_data in pol.items():
1861
      row = []
1862
      for field in self.op.output_fields:
1863
        if field == "name":
1864
          val = os_name
1865
        elif field == "valid":
1866
          val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1867
        elif field == "node_status":
1868
          # this is just a copy of the dict
1869
          val = {}
1870
          for node_name, nos_list in os_data.items():
1871
            val[node_name] = nos_list
1872
        else:
1873
          raise errors.ParameterError(field)
1874
        row.append(val)
1875
      output.append(row)
1876

    
1877
    return output
1878

    
1879

    
1880
class LURemoveNode(LogicalUnit):
1881
  """Logical unit for removing a node.
1882

1883
  """
1884
  HPATH = "node-remove"
1885
  HTYPE = constants.HTYPE_NODE
1886
  _OP_REQP = ["node_name"]
1887

    
1888
  def BuildHooksEnv(self):
1889
    """Build hooks env.
1890

1891
    This doesn't run on the target node in the pre phase as a failed
1892
    node would then be impossible to remove.
1893

1894
    """
1895
    env = {
1896
      "OP_TARGET": self.op.node_name,
1897
      "NODE_NAME": self.op.node_name,
1898
      }
1899
    all_nodes = self.cfg.GetNodeList()
1900
    all_nodes.remove(self.op.node_name)
1901
    return env, all_nodes, all_nodes
1902

    
1903
  def CheckPrereq(self):
1904
    """Check prerequisites.
1905

1906
    This checks:
1907
     - the node exists in the configuration
1908
     - it does not have primary or secondary instances
1909
     - it's not the master
1910

1911
    Any errors are signaled by raising errors.OpPrereqError.
1912

1913
    """
1914
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1915
    if node is None:
1916
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1917

    
1918
    instance_list = self.cfg.GetInstanceList()
1919

    
1920
    masternode = self.cfg.GetMasterNode()
1921
    if node.name == masternode:
1922
      raise errors.OpPrereqError("Node is the master node,"
1923
                                 " you need to failover first.")
1924

    
1925
    for instance_name in instance_list:
1926
      instance = self.cfg.GetInstanceInfo(instance_name)
1927
      if node.name in instance.all_nodes:
1928
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1929
                                   " please remove first." % instance_name)
1930
    self.op.node_name = node.name
1931
    self.node = node
1932

    
1933
  def Exec(self, feedback_fn):
1934
    """Removes the node from the cluster.
1935

1936
    """
1937
    node = self.node
1938
    logging.info("Stopping the node daemon and removing configs from node %s",
1939
                 node.name)
1940

    
1941
    self.context.RemoveNode(node.name)
1942

    
1943
    result = self.rpc.call_node_leave_cluster(node.name)
1944
    msg = result.fail_msg
1945
    if msg:
1946
      self.LogWarning("Errors encountered on the remote node while leaving"
1947
                      " the cluster: %s", msg)
1948

    
1949
    # Promote nodes to master candidate as needed
1950
    _AdjustCandidatePool(self)
1951

    
1952

    
1953
class LUQueryNodes(NoHooksLU):
1954
  """Logical unit for querying nodes.
1955

1956
  """
1957
  _OP_REQP = ["output_fields", "names", "use_locking"]
1958
  REQ_BGL = False
1959
  _FIELDS_DYNAMIC = utils.FieldSet(
1960
    "dtotal", "dfree",
1961
    "mtotal", "mnode", "mfree",
1962
    "bootid",
1963
    "ctotal", "cnodes", "csockets",
1964
    )
1965

    
1966
  _FIELDS_STATIC = utils.FieldSet(
1967
    "name", "pinst_cnt", "sinst_cnt",
1968
    "pinst_list", "sinst_list",
1969
    "pip", "sip", "tags",
1970
    "serial_no",
1971
    "master_candidate",
1972
    "master",
1973
    "offline",
1974
    "drained",
1975
    "role",
1976
    )
1977

    
1978
  def ExpandNames(self):
1979
    _CheckOutputFields(static=self._FIELDS_STATIC,
1980
                       dynamic=self._FIELDS_DYNAMIC,
1981
                       selected=self.op.output_fields)
1982

    
1983
    self.needed_locks = {}
1984
    self.share_locks[locking.LEVEL_NODE] = 1
1985

    
1986
    if self.op.names:
1987
      self.wanted = _GetWantedNodes(self, self.op.names)
1988
    else:
1989
      self.wanted = locking.ALL_SET
1990

    
1991
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1992
    self.do_locking = self.do_node_query and self.op.use_locking
1993
    if self.do_locking:
1994
      # if we don't request only static fields, we need to lock the nodes
1995
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1996

    
1997

    
1998
  def CheckPrereq(self):
1999
    """Check prerequisites.
2000

2001
    """
2002
    # The validation of the node list is done in the _GetWantedNodes,
2003
    # if non empty, and if empty, there's no validation to do
2004
    pass
2005

    
2006
  def Exec(self, feedback_fn):
2007
    """Computes the list of nodes and their attributes.
2008

2009
    """
2010
    all_info = self.cfg.GetAllNodesInfo()
2011
    if self.do_locking:
2012
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
2013
    elif self.wanted != locking.ALL_SET:
2014
      nodenames = self.wanted
2015
      missing = set(nodenames).difference(all_info.keys())
2016
      if missing:
2017
        raise errors.OpExecError(
2018
          "Some nodes were removed before retrieving their data: %s" % missing)
2019
    else:
2020
      nodenames = all_info.keys()
2021

    
2022
    nodenames = utils.NiceSort(nodenames)
2023
    nodelist = [all_info[name] for name in nodenames]
2024

    
2025
    # begin data gathering
2026

    
2027
    if self.do_node_query:
2028
      live_data = {}
2029
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2030
                                          self.cfg.GetHypervisorType())
2031
      for name in nodenames:
2032
        nodeinfo = node_data[name]
2033
        if not nodeinfo.fail_msg and nodeinfo.payload:
2034
          nodeinfo = nodeinfo.payload
2035
          fn = utils.TryConvert
2036
          live_data[name] = {
2037
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2038
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2039
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
2040
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2041
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
2042
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2043
            "bootid": nodeinfo.get('bootid', None),
2044
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2045
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2046
            }
2047
        else:
2048
          live_data[name] = {}
2049
    else:
2050
      live_data = dict.fromkeys(nodenames, {})
2051

    
2052
    node_to_primary = dict([(name, set()) for name in nodenames])
2053
    node_to_secondary = dict([(name, set()) for name in nodenames])
2054

    
2055
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
2056
                             "sinst_cnt", "sinst_list"))
2057
    if inst_fields & frozenset(self.op.output_fields):
2058
      instancelist = self.cfg.GetInstanceList()
2059

    
2060
      for instance_name in instancelist:
2061
        inst = self.cfg.GetInstanceInfo(instance_name)
2062
        if inst.primary_node in node_to_primary:
2063
          node_to_primary[inst.primary_node].add(inst.name)
2064
        for secnode in inst.secondary_nodes:
2065
          if secnode in node_to_secondary:
2066
            node_to_secondary[secnode].add(inst.name)
2067

    
2068
    master_node = self.cfg.GetMasterNode()
2069

    
2070
    # end data gathering
2071

    
2072
    output = []
2073
    for node in nodelist:
2074
      node_output = []
2075
      for field in self.op.output_fields:
2076
        if field == "name":
2077
          val = node.name
2078
        elif field == "pinst_list":
2079
          val = list(node_to_primary[node.name])
2080
        elif field == "sinst_list":
2081
          val = list(node_to_secondary[node.name])
2082
        elif field == "pinst_cnt":
2083
          val = len(node_to_primary[node.name])
2084
        elif field == "sinst_cnt":
2085
          val = len(node_to_secondary[node.name])
2086
        elif field == "pip":
2087
          val = node.primary_ip
2088
        elif field == "sip":
2089
          val = node.secondary_ip
2090
        elif field == "tags":
2091
          val = list(node.GetTags())
2092
        elif field == "serial_no":
2093
          val = node.serial_no
2094
        elif field == "master_candidate":
2095
          val = node.master_candidate
2096
        elif field == "master":
2097
          val = node.name == master_node
2098
        elif field == "offline":
2099
          val = node.offline
2100
        elif field == "drained":
2101
          val = node.drained
2102
        elif self._FIELDS_DYNAMIC.Matches(field):
2103
          val = live_data[node.name].get(field, None)
2104
        elif field == "role":
2105
          if node.name == master_node:
2106
            val = "M"
2107
          elif node.master_candidate:
2108
            val = "C"
2109
          elif node.drained:
2110
            val = "D"
2111
          elif node.offline:
2112
            val = "O"
2113
          else:
2114
            val = "R"
2115
        else:
2116
          raise errors.ParameterError(field)
2117
        node_output.append(val)
2118
      output.append(node_output)
2119

    
2120
    return output
2121

    
2122

    
2123
class LUQueryNodeVolumes(NoHooksLU):
2124
  """Logical unit for getting volumes on node(s).
2125

2126
  """
2127
  _OP_REQP = ["nodes", "output_fields"]
2128
  REQ_BGL = False
2129
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2130
  _FIELDS_STATIC = utils.FieldSet("node")
2131

    
2132
  def ExpandNames(self):
2133
    _CheckOutputFields(static=self._FIELDS_STATIC,
2134
                       dynamic=self._FIELDS_DYNAMIC,
2135
                       selected=self.op.output_fields)
2136

    
2137
    self.needed_locks = {}
2138
    self.share_locks[locking.LEVEL_NODE] = 1
2139
    if not self.op.nodes:
2140
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2141
    else:
2142
      self.needed_locks[locking.LEVEL_NODE] = \
2143
        _GetWantedNodes(self, self.op.nodes)
2144

    
2145
  def CheckPrereq(self):
2146
    """Check prerequisites.
2147

2148
    This checks that the fields required are valid output fields.
2149

2150
    """
2151
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2152

    
2153
  def Exec(self, feedback_fn):
2154
    """Computes the list of nodes and their attributes.
2155

2156
    """
2157
    nodenames = self.nodes
2158
    volumes = self.rpc.call_node_volumes(nodenames)
2159

    
2160
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2161
             in self.cfg.GetInstanceList()]
2162

    
2163
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2164

    
2165
    output = []
2166
    for node in nodenames:
2167
      nresult = volumes[node]
2168
      if nresult.offline:
2169
        continue
2170
      msg = nresult.fail_msg
2171
      if msg:
2172
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2173
        continue
2174

    
2175
      node_vols = nresult.payload[:]
2176
      node_vols.sort(key=lambda vol: vol['dev'])
2177

    
2178
      for vol in node_vols:
2179
        node_output = []
2180
        for field in self.op.output_fields:
2181
          if field == "node":
2182
            val = node
2183
          elif field == "phys":
2184
            val = vol['dev']
2185
          elif field == "vg":
2186
            val = vol['vg']
2187
          elif field == "name":
2188
            val = vol['name']
2189
          elif field == "size":
2190
            val = int(float(vol['size']))
2191
          elif field == "instance":
2192
            for inst in ilist:
2193
              if node not in lv_by_node[inst]:
2194
                continue
2195
              if vol['name'] in lv_by_node[inst][node]:
2196
                val = inst.name
2197
                break
2198
            else:
2199
              val = '-'
2200
          else:
2201
            raise errors.ParameterError(field)
2202
          node_output.append(str(val))
2203

    
2204
        output.append(node_output)
2205

    
2206
    return output
2207

    
2208

    
2209
class LUAddNode(LogicalUnit):
2210
  """Logical unit for adding node to the cluster.
2211

2212
  """
2213
  HPATH = "node-add"
2214
  HTYPE = constants.HTYPE_NODE
2215
  _OP_REQP = ["node_name"]
2216

    
2217
  def BuildHooksEnv(self):
2218
    """Build hooks env.
2219

2220
    This will run on all nodes before, and on all nodes + the new node after.
2221

2222
    """
2223
    env = {
2224
      "OP_TARGET": self.op.node_name,
2225
      "NODE_NAME": self.op.node_name,
2226
      "NODE_PIP": self.op.primary_ip,
2227
      "NODE_SIP": self.op.secondary_ip,
2228
      }
2229
    nodes_0 = self.cfg.GetNodeList()
2230
    nodes_1 = nodes_0 + [self.op.node_name, ]
2231
    return env, nodes_0, nodes_1
2232

    
2233
  def CheckPrereq(self):
2234
    """Check prerequisites.
2235

2236
    This checks:
2237
     - the new node is not already in the config
2238
     - it is resolvable
2239
     - its parameters (single/dual homed) matches the cluster
2240

2241
    Any errors are signaled by raising errors.OpPrereqError.
2242

2243
    """
2244
    node_name = self.op.node_name
2245
    cfg = self.cfg
2246

    
2247
    dns_data = utils.HostInfo(node_name)
2248

    
2249
    node = dns_data.name
2250
    primary_ip = self.op.primary_ip = dns_data.ip
2251
    secondary_ip = getattr(self.op, "secondary_ip", None)
2252
    if secondary_ip is None:
2253
      secondary_ip = primary_ip
2254
    if not utils.IsValidIP(secondary_ip):
2255
      raise errors.OpPrereqError("Invalid secondary IP given")
2256
    self.op.secondary_ip = secondary_ip
2257

    
2258
    node_list = cfg.GetNodeList()
2259
    if not self.op.readd and node in node_list:
2260
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2261
                                 node)
2262
    elif self.op.readd and node not in node_list:
2263
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2264

    
2265
    for existing_node_name in node_list:
2266
      existing_node = cfg.GetNodeInfo(existing_node_name)
2267

    
2268
      if self.op.readd and node == existing_node_name:
2269
        if (existing_node.primary_ip != primary_ip or
2270
            existing_node.secondary_ip != secondary_ip):
2271
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2272
                                     " address configuration as before")
2273
        continue
2274

    
2275
      if (existing_node.primary_ip == primary_ip or
2276
          existing_node.secondary_ip == primary_ip or
2277
          existing_node.primary_ip == secondary_ip or
2278
          existing_node.secondary_ip == secondary_ip):
2279
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2280
                                   " existing node %s" % existing_node.name)
2281

    
2282
    # check that the type of the node (single versus dual homed) is the
2283
    # same as for the master
2284
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2285
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2286
    newbie_singlehomed = secondary_ip == primary_ip
2287
    if master_singlehomed != newbie_singlehomed:
2288
      if master_singlehomed:
2289
        raise errors.OpPrereqError("The master has no private ip but the"
2290
                                   " new node has one")
2291
      else:
2292
        raise errors.OpPrereqError("The master has a private ip but the"
2293
                                   " new node doesn't have one")
2294

    
2295
    # checks reachability
2296
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2297
      raise errors.OpPrereqError("Node not reachable by ping")
2298

    
2299
    if not newbie_singlehomed:
2300
      # check reachability from my secondary ip to newbie's secondary ip
2301
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2302
                           source=myself.secondary_ip):
2303
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2304
                                   " based ping to noded port")
2305

    
2306
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2307
    if self.op.readd:
2308
      exceptions = [node]
2309
    else:
2310
      exceptions = []
2311
    mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2312
    # the new node will increase mc_max with one, so:
2313
    mc_max = min(mc_max + 1, cp_size)
2314
    self.master_candidate = mc_now < mc_max
2315

    
2316
    if self.op.readd:
2317
      self.new_node = self.cfg.GetNodeInfo(node)
2318
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
2319
    else:
2320
      self.new_node = objects.Node(name=node,
2321
                                   primary_ip=primary_ip,
2322
                                   secondary_ip=secondary_ip,
2323
                                   master_candidate=self.master_candidate,
2324
                                   offline=False, drained=False)
2325

    
2326
  def Exec(self, feedback_fn):
2327
    """Adds the new node to the cluster.
2328

2329
    """
2330
    new_node = self.new_node
2331
    node = new_node.name
2332

    
2333
    # for re-adds, reset the offline/drained/master-candidate flags;
2334
    # we need to reset here, otherwise offline would prevent RPC calls
2335
    # later in the procedure; this also means that if the re-add
2336
    # fails, we are left with a non-offlined, broken node
2337
    if self.op.readd:
2338
      new_node.drained = new_node.offline = False
2339
      self.LogInfo("Readding a node, the offline/drained flags were reset")
2340
      # if we demote the node, we do cleanup later in the procedure
2341
      new_node.master_candidate = self.master_candidate
2342

    
2343
    # notify the user about any possible mc promotion
2344
    if new_node.master_candidate:
2345
      self.LogInfo("Node will be a master candidate")
2346

    
2347
    # check connectivity
2348
    result = self.rpc.call_version([node])[node]
2349
    result.Raise("Can't get version information from node %s" % node)
2350
    if constants.PROTOCOL_VERSION == result.payload:
2351
      logging.info("Communication to node %s fine, sw version %s match",
2352
                   node, result.payload)
2353
    else:
2354
      raise errors.OpExecError("Version mismatch master version %s,"
2355
                               " node version %s" %
2356
                               (constants.PROTOCOL_VERSION, result.payload))
2357

    
2358
    # setup ssh on node
2359
    logging.info("Copy ssh key to node %s", node)
2360
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2361
    keyarray = []
2362
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2363
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2364
                priv_key, pub_key]
2365

    
2366
    for i in keyfiles:
2367
      f = open(i, 'r')
2368
      try:
2369
        keyarray.append(f.read())
2370
      finally:
2371
        f.close()
2372

    
2373
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2374
                                    keyarray[2],
2375
                                    keyarray[3], keyarray[4], keyarray[5])
2376
    result.Raise("Cannot transfer ssh keys to the new node")
2377

    
2378
    # Add node to our /etc/hosts, and add key to known_hosts
2379
    if self.cfg.GetClusterInfo().modify_etc_hosts:
2380
      utils.AddHostToEtcHosts(new_node.name)
2381

    
2382
    if new_node.secondary_ip != new_node.primary_ip:
2383
      result = self.rpc.call_node_has_ip_address(new_node.name,
2384
                                                 new_node.secondary_ip)
2385
      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2386
                   prereq=True)
2387
      if not result.payload:
2388
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2389
                                 " you gave (%s). Please fix and re-run this"
2390
                                 " command." % new_node.secondary_ip)
2391

    
2392
    node_verify_list = [self.cfg.GetMasterNode()]
2393
    node_verify_param = {
2394
      'nodelist': [node],
2395
      # TODO: do a node-net-test as well?
2396
    }
2397

    
2398
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2399
                                       self.cfg.GetClusterName())
2400
    for verifier in node_verify_list:
2401
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
2402
      nl_payload = result[verifier].payload['nodelist']
2403
      if nl_payload:
2404
        for failed in nl_payload:
2405
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2406
                      (verifier, nl_payload[failed]))
2407
        raise errors.OpExecError("ssh/hostname verification failed.")
2408

    
2409
    if self.op.readd:
2410
      _RedistributeAncillaryFiles(self)
2411
      self.context.ReaddNode(new_node)
2412
      # make sure we redistribute the config
2413
      self.cfg.Update(new_node)
2414
      # and make sure the new node will not have old files around
2415
      if not new_node.master_candidate:
2416
        result = self.rpc.call_node_demote_from_mc(new_node.name)
2417
        msg = result.RemoteFailMsg()
2418
        if msg:
2419
          self.LogWarning("Node failed to demote itself from master"
2420
                          " candidate status: %s" % msg)
2421
    else:
2422
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
2423
      self.context.AddNode(new_node)
2424

    
2425

    
2426
class LUSetNodeParams(LogicalUnit):
2427
  """Modifies the parameters of a node.
2428

2429
  """
2430
  HPATH = "node-modify"
2431
  HTYPE = constants.HTYPE_NODE
2432
  _OP_REQP = ["node_name"]
2433
  REQ_BGL = False
2434

    
2435
  def CheckArguments(self):
2436
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2437
    if node_name is None:
2438
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2439
    self.op.node_name = node_name
2440
    _CheckBooleanOpField(self.op, 'master_candidate')
2441
    _CheckBooleanOpField(self.op, 'offline')
2442
    _CheckBooleanOpField(self.op, 'drained')
2443
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2444
    if all_mods.count(None) == 3:
2445
      raise errors.OpPrereqError("Please pass at least one modification")
2446
    if all_mods.count(True) > 1:
2447
      raise errors.OpPrereqError("Can't set the node into more than one"
2448
                                 " state at the same time")
2449

    
2450
  def ExpandNames(self):
2451
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2452

    
2453
  def BuildHooksEnv(self):
2454
    """Build hooks env.
2455

2456
    This runs on the master node.
2457

2458
    """
2459
    env = {
2460
      "OP_TARGET": self.op.node_name,
2461
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2462
      "OFFLINE": str(self.op.offline),
2463
      "DRAINED": str(self.op.drained),
2464
      }
2465
    nl = [self.cfg.GetMasterNode(),
2466
          self.op.node_name]
2467
    return env, nl, nl
2468

    
2469
  def CheckPrereq(self):
2470
    """Check prerequisites.
2471

2472
    This only checks the instance list against the existing names.
2473

2474
    """
2475
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2476

    
2477
    if ((self.op.master_candidate == False or self.op.offline == True or
2478
         self.op.drained == True) and node.master_candidate):
2479
      # we will demote the node from master_candidate
2480
      if self.op.node_name == self.cfg.GetMasterNode():
2481
        raise errors.OpPrereqError("The master node has to be a"
2482
                                   " master candidate, online and not drained")
2483
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2484
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2485
      if num_candidates <= cp_size:
2486
        msg = ("Not enough master candidates (desired"
2487
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2488
        if self.op.force:
2489
          self.LogWarning(msg)
2490
        else:
2491
          raise errors.OpPrereqError(msg)
2492

    
2493
    if (self.op.master_candidate == True and
2494
        ((node.offline and not self.op.offline == False) or
2495
         (node.drained and not self.op.drained == False))):
2496
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2497
                                 " to master_candidate" % node.name)
2498

    
2499
    return
2500

    
2501
  def Exec(self, feedback_fn):
2502
    """Modifies a node.
2503

2504
    """
2505
    node = self.node
2506

    
2507
    result = []
2508
    changed_mc = False
2509

    
2510
    if self.op.offline is not None:
2511
      node.offline = self.op.offline
2512
      result.append(("offline", str(self.op.offline)))
2513
      if self.op.offline == True:
2514
        if node.master_candidate:
2515
          node.master_candidate = False
2516
          changed_mc = True
2517
          result.append(("master_candidate", "auto-demotion due to offline"))
2518
        if node.drained:
2519
          node.drained = False
2520
          result.append(("drained", "clear drained status due to offline"))
2521

    
2522
    if self.op.master_candidate is not None:
2523
      node.master_candidate = self.op.master_candidate
2524
      changed_mc = True
2525
      result.append(("master_candidate", str(self.op.master_candidate)))
2526
      if self.op.master_candidate == False:
2527
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2528
        msg = rrc.fail_msg
2529
        if msg:
2530
          self.LogWarning("Node failed to demote itself: %s" % msg)
2531

    
2532
    if self.op.drained is not None:
2533
      node.drained = self.op.drained
2534
      result.append(("drained", str(self.op.drained)))
2535
      if self.op.drained == True:
2536
        if node.master_candidate:
2537
          node.master_candidate = False
2538
          changed_mc = True
2539
          result.append(("master_candidate", "auto-demotion due to drain"))
2540
          rrc = self.rpc.call_node_demote_from_mc(node.name)
2541
          msg = rrc.RemoteFailMsg()
2542
          if msg:
2543
            self.LogWarning("Node failed to demote itself: %s" % msg)
2544
        if node.offline:
2545
          node.offline = False
2546
          result.append(("offline", "clear offline status due to drain"))
2547

    
2548
    # this will trigger configuration file update, if needed
2549
    self.cfg.Update(node)
2550
    # this will trigger job queue propagation or cleanup
2551
    if changed_mc:
2552
      self.context.ReaddNode(node)
2553

    
2554
    return result
2555

    
2556

    
2557
class LUPowercycleNode(NoHooksLU):
2558
  """Powercycles a node.
2559

2560
  """
2561
  _OP_REQP = ["node_name", "force"]
2562
  REQ_BGL = False
2563

    
2564
  def CheckArguments(self):
2565
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2566
    if node_name is None:
2567
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2568
    self.op.node_name = node_name
2569
    if node_name == self.cfg.GetMasterNode() and not self.op.force:
2570
      raise errors.OpPrereqError("The node is the master and the force"
2571
                                 " parameter was not set")
2572

    
2573
  def ExpandNames(self):
2574
    """Locking for PowercycleNode.
2575

2576
    This is a last-resource option and shouldn't block on other
2577
    jobs. Therefore, we grab no locks.
2578

2579
    """
2580
    self.needed_locks = {}
2581

    
2582
  def CheckPrereq(self):
2583
    """Check prerequisites.
2584

2585
    This LU has no prereqs.
2586

2587
    """
2588
    pass
2589

    
2590
  def Exec(self, feedback_fn):
2591
    """Reboots a node.
2592

2593
    """
2594
    result = self.rpc.call_node_powercycle(self.op.node_name,
2595
                                           self.cfg.GetHypervisorType())
2596
    result.Raise("Failed to schedule the reboot")
2597
    return result.payload
2598

    
2599

    
2600
class LUQueryClusterInfo(NoHooksLU):
2601
  """Query cluster configuration.
2602

2603
  """
2604
  _OP_REQP = []
2605
  REQ_BGL = False
2606

    
2607
  def ExpandNames(self):
2608
    self.needed_locks = {}
2609

    
2610
  def CheckPrereq(self):
2611
    """No prerequsites needed for this LU.
2612

2613
    """
2614
    pass
2615

    
2616
  def Exec(self, feedback_fn):
2617
    """Return cluster config.
2618

2619
    """
2620
    cluster = self.cfg.GetClusterInfo()
2621
    result = {
2622
      "software_version": constants.RELEASE_VERSION,
2623
      "protocol_version": constants.PROTOCOL_VERSION,
2624
      "config_version": constants.CONFIG_VERSION,
2625
      "os_api_version": max(constants.OS_API_VERSIONS),
2626
      "export_version": constants.EXPORT_VERSION,
2627
      "architecture": (platform.architecture()[0], platform.machine()),
2628
      "name": cluster.cluster_name,
2629
      "master": cluster.master_node,
2630
      "default_hypervisor": cluster.default_hypervisor,
2631
      "enabled_hypervisors": cluster.enabled_hypervisors,
2632
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
2633
                        for hypervisor_name in cluster.enabled_hypervisors]),
2634
      "beparams": cluster.beparams,
2635
      "nicparams": cluster.nicparams,
2636
      "candidate_pool_size": cluster.candidate_pool_size,
2637
      "master_netdev": cluster.master_netdev,
2638
      "volume_group_name": cluster.volume_group_name,
2639
      "file_storage_dir": cluster.file_storage_dir,
2640
      }
2641

    
2642
    return result
2643

    
2644

    
2645
class LUQueryConfigValues(NoHooksLU):
2646
  """Return configuration values.
2647

2648
  """
2649
  _OP_REQP = []
2650
  REQ_BGL = False
2651
  _FIELDS_DYNAMIC = utils.FieldSet()
2652
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2653

    
2654
  def ExpandNames(self):
2655
    self.needed_locks = {}
2656

    
2657
    _CheckOutputFields(static=self._FIELDS_STATIC,
2658
                       dynamic=self._FIELDS_DYNAMIC,
2659
                       selected=self.op.output_fields)
2660

    
2661
  def CheckPrereq(self):
2662
    """No prerequisites.
2663

2664
    """
2665
    pass
2666

    
2667
  def Exec(self, feedback_fn):
2668
    """Dump a representation of the cluster config to the standard output.
2669

2670
    """
2671
    values = []
2672
    for field in self.op.output_fields:
2673
      if field == "cluster_name":
2674
        entry = self.cfg.GetClusterName()
2675
      elif field == "master_node":
2676
        entry = self.cfg.GetMasterNode()
2677
      elif field == "drain_flag":
2678
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2679
      else:
2680
        raise errors.ParameterError(field)
2681
      values.append(entry)
2682
    return values
2683

    
2684

    
2685
class LUActivateInstanceDisks(NoHooksLU):
2686
  """Bring up an instance's disks.
2687

2688
  """
2689
  _OP_REQP = ["instance_name"]
2690
  REQ_BGL = False
2691

    
2692
  def ExpandNames(self):
2693
    self._ExpandAndLockInstance()
2694
    self.needed_locks[locking.LEVEL_NODE] = []
2695
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2696

    
2697
  def DeclareLocks(self, level):
2698
    if level == locking.LEVEL_NODE:
2699
      self._LockInstancesNodes()
2700

    
2701
  def CheckPrereq(self):
2702
    """Check prerequisites.
2703

2704
    This checks that the instance is in the cluster.
2705

2706
    """
2707
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2708
    assert self.instance is not None, \
2709
      "Cannot retrieve locked instance %s" % self.op.instance_name
2710
    _CheckNodeOnline(self, self.instance.primary_node)
2711

    
2712
  def Exec(self, feedback_fn):
2713
    """Activate the disks.
2714

2715
    """
2716
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2717
    if not disks_ok:
2718
      raise errors.OpExecError("Cannot activate block devices")
2719

    
2720
    return disks_info
2721

    
2722

    
2723
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2724
  """Prepare the block devices for an instance.
2725

2726
  This sets up the block devices on all nodes.
2727

2728
  @type lu: L{LogicalUnit}
2729
  @param lu: the logical unit on whose behalf we execute
2730
  @type instance: L{objects.Instance}
2731
  @param instance: the instance for whose disks we assemble
2732
  @type ignore_secondaries: boolean
2733
  @param ignore_secondaries: if true, errors on secondary nodes
2734
      won't result in an error return from the function
2735
  @return: False if the operation failed, otherwise a list of
2736
      (host, instance_visible_name, node_visible_name)
2737
      with the mapping from node devices to instance devices
2738

2739
  """
2740
  device_info = []
2741
  disks_ok = True
2742
  iname = instance.name
2743
  # With the two passes mechanism we try to reduce the window of
2744
  # opportunity for the race condition of switching DRBD to primary
2745
  # before handshaking occured, but we do not eliminate it
2746

    
2747
  # The proper fix would be to wait (with some limits) until the
2748
  # connection has been made and drbd transitions from WFConnection
2749
  # into any other network-connected state (Connected, SyncTarget,
2750
  # SyncSource, etc.)
2751

    
2752
  # 1st pass, assemble on all nodes in secondary mode
2753
  for inst_disk in instance.disks:
2754
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2755
      lu.cfg.SetDiskID(node_disk, node)
2756
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2757
      msg = result.fail_msg
2758
      if msg:
2759
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2760
                           " (is_primary=False, pass=1): %s",
2761
                           inst_disk.iv_name, node, msg)
2762
        if not ignore_secondaries:
2763
          disks_ok = False
2764

    
2765
  # FIXME: race condition on drbd migration to primary
2766

    
2767
  # 2nd pass, do only the primary node
2768
  for inst_disk in instance.disks:
2769
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2770
      if node != instance.primary_node:
2771
        continue
2772
      lu.cfg.SetDiskID(node_disk, node)
2773
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2774
      msg = result.fail_msg
2775
      if msg:
2776
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2777
                           " (is_primary=True, pass=2): %s",
2778
                           inst_disk.iv_name, node, msg)
2779
        disks_ok = False
2780
    device_info.append((instance.primary_node, inst_disk.iv_name,
2781
                        result.payload))
2782

    
2783
  # leave the disks configured for the primary node
2784
  # this is a workaround that would be fixed better by
2785
  # improving the logical/physical id handling
2786
  for disk in instance.disks:
2787
    lu.cfg.SetDiskID(disk, instance.primary_node)
2788

    
2789
  return disks_ok, device_info
2790

    
2791

    
2792
def _StartInstanceDisks(lu, instance, force):
2793
  """Start the disks of an instance.
2794

2795
  """
2796
  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
2797
                                           ignore_secondaries=force)
2798
  if not disks_ok:
2799
    _ShutdownInstanceDisks(lu, instance)
2800
    if force is not None and not force:
2801
      lu.proc.LogWarning("", hint="If the message above refers to a"
2802
                         " secondary node,"
2803
                         " you can retry the operation using '--force'.")
2804
    raise errors.OpExecError("Disk consistency error")
2805

    
2806

    
2807
class LUDeactivateInstanceDisks(NoHooksLU):
2808
  """Shutdown an instance's disks.
2809

2810
  """
2811
  _OP_REQP = ["instance_name"]
2812
  REQ_BGL = False
2813

    
2814
  def ExpandNames(self):
2815
    self._ExpandAndLockInstance()
2816
    self.needed_locks[locking.LEVEL_NODE] = []
2817
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2818

    
2819
  def DeclareLocks(self, level):
2820
    if level == locking.LEVEL_NODE:
2821
      self._LockInstancesNodes()
2822

    
2823
  def CheckPrereq(self):
2824
    """Check prerequisites.
2825

2826
    This checks that the instance is in the cluster.
2827

2828
    """
2829
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2830
    assert self.instance is not None, \
2831
      "Cannot retrieve locked instance %s" % self.op.instance_name
2832

    
2833
  def Exec(self, feedback_fn):
2834
    """Deactivate the disks
2835

2836
    """
2837
    instance = self.instance
2838
    _SafeShutdownInstanceDisks(self, instance)
2839

    
2840

    
2841
def _SafeShutdownInstanceDisks(lu, instance):
2842
  """Shutdown block devices of an instance.
2843

2844
  This function checks if an instance is running, before calling
2845
  _ShutdownInstanceDisks.
2846

2847
  """
2848
  pnode = instance.primary_node
2849
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
2850
  ins_l.Raise("Can't contact node %s" % pnode)
2851

    
2852
  if instance.name in ins_l.payload:
2853
    raise errors.OpExecError("Instance is running, can't shutdown"
2854
                             " block devices.")
2855

    
2856
  _ShutdownInstanceDisks(lu, instance)
2857

    
2858

    
2859
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2860
  """Shutdown block devices of an instance.
2861

2862
  This does the shutdown on all nodes of the instance.
2863

2864
  If the ignore_primary is false, errors on the primary node are
2865
  ignored.
2866

2867
  """
2868
  all_result = True
2869
  for disk in instance.disks:
2870
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2871
      lu.cfg.SetDiskID(top_disk, node)
2872
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2873
      msg = result.fail_msg
2874
      if msg:
2875
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2876
                      disk.iv_name, node, msg)
2877
        if not ignore_primary or node != instance.primary_node:
2878
          all_result = False
2879
  return all_result
2880

    
2881

    
2882
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2883
  """Checks if a node has enough free memory.
2884

2885
  This function check if a given node has the needed amount of free
2886
  memory. In case the node has less memory or we cannot get the
2887
  information from the node, this function raise an OpPrereqError
2888
  exception.
2889

2890
  @type lu: C{LogicalUnit}
2891
  @param lu: a logical unit from which we get configuration data
2892
  @type node: C{str}
2893
  @param node: the node to check
2894
  @type reason: C{str}
2895
  @param reason: string to use in the error message
2896
  @type requested: C{int}
2897
  @param requested: the amount of memory in MiB to check for
2898
  @type hypervisor_name: C{str}
2899
  @param hypervisor_name: the hypervisor to ask for memory stats
2900
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2901
      we cannot check the node
2902

2903
  """
2904
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2905
  nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
2906
  free_mem = nodeinfo[node].payload.get('memory_free', None)
2907
  if not isinstance(free_mem, int):
2908
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2909
                               " was '%s'" % (node, free_mem))
2910
  if requested > free_mem:
2911
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2912
                               " needed %s MiB, available %s MiB" %
2913
                               (node, reason, requested, free_mem))
2914

    
2915

    
2916
class LUStartupInstance(LogicalUnit):
2917
  """Starts an instance.
2918

2919
  """
2920
  HPATH = "instance-start"
2921
  HTYPE = constants.HTYPE_INSTANCE
2922
  _OP_REQP = ["instance_name", "force"]
2923
  REQ_BGL = False
2924

    
2925
  def ExpandNames(self):
2926
    self._ExpandAndLockInstance()
2927

    
2928
  def BuildHooksEnv(self):
2929
    """Build hooks env.
2930

2931
    This runs on master, primary and secondary nodes of the instance.
2932

2933
    """
2934
    env = {
2935
      "FORCE": self.op.force,
2936
      }
2937
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2938
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2939
    return env, nl, nl
2940

    
2941
  def CheckPrereq(self):
2942
    """Check prerequisites.
2943

2944
    This checks that the instance is in the cluster.
2945

2946
    """
2947
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2948
    assert self.instance is not None, \
2949
      "Cannot retrieve locked instance %s" % self.op.instance_name
2950

    
2951
    # extra beparams
2952
    self.beparams = getattr(self.op, "beparams", {})
2953
    if self.beparams:
2954
      if not isinstance(self.beparams, dict):
2955
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2956
                                   " dict" % (type(self.beparams), ))
2957
      # fill the beparams dict
2958
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2959
      self.op.beparams = self.beparams
2960

    
2961
    # extra hvparams
2962
    self.hvparams = getattr(self.op, "hvparams", {})
2963
    if self.hvparams:
2964
      if not isinstance(self.hvparams, dict):
2965
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2966
                                   " dict" % (type(self.hvparams), ))
2967

    
2968
      # check hypervisor parameter syntax (locally)
2969
      cluster = self.cfg.GetClusterInfo()
2970
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2971
      filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2972
                                    instance.hvparams)
2973
      filled_hvp.update(self.hvparams)
2974
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2975
      hv_type.CheckParameterSyntax(filled_hvp)
2976
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2977
      self.op.hvparams = self.hvparams
2978

    
2979
    _CheckNodeOnline(self, instance.primary_node)
2980

    
2981
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2982
    # check bridges existence
2983
    _CheckInstanceBridgesExist(self, instance)
2984

    
2985
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2986
                                              instance.name,
2987
                                              instance.hypervisor)
2988
    remote_info.Raise("Error checking node %s" % instance.primary_node,
2989
                      prereq=True)
2990
    if not remote_info.payload: # not running already
2991
      _CheckNodeFreeMemory(self, instance.primary_node,
2992
                           "starting instance %s" % instance.name,
2993
                           bep[constants.BE_MEMORY], instance.hypervisor)
2994

    
2995
  def Exec(self, feedback_fn):
2996
    """Start the instance.
2997

2998
    """
2999
    instance = self.instance
3000
    force = self.op.force
3001

    
3002
    self.cfg.MarkInstanceUp(instance.name)
3003

    
3004
    node_current = instance.primary_node
3005

    
3006
    _StartInstanceDisks(self, instance, force)
3007

    
3008
    result = self.rpc.call_instance_start(node_current, instance,
3009
                                          self.hvparams, self.beparams)
3010
    msg = result.fail_msg
3011
    if msg:
3012
      _ShutdownInstanceDisks(self, instance)
3013
      raise errors.OpExecError("Could not start instance: %s" % msg)
3014

    
3015

    
3016
class LURebootInstance(LogicalUnit):
3017
  """Reboot an instance.
3018

3019
  """
3020
  HPATH = "instance-reboot"
3021
  HTYPE = constants.HTYPE_INSTANCE
3022
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3023
  REQ_BGL = False
3024

    
3025
  def ExpandNames(self):
3026
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3027
                                   constants.INSTANCE_REBOOT_HARD,
3028
                                   constants.INSTANCE_REBOOT_FULL]:
3029
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3030
                                  (constants.INSTANCE_REBOOT_SOFT,
3031
                                   constants.INSTANCE_REBOOT_HARD,
3032
                                   constants.INSTANCE_REBOOT_FULL))
3033
    self._ExpandAndLockInstance()
3034

    
3035
  def BuildHooksEnv(self):
3036
    """Build hooks env.
3037

3038
    This runs on master, primary and secondary nodes of the instance.
3039

3040
    """
3041
    env = {
3042
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3043
      "REBOOT_TYPE": self.op.reboot_type,
3044
      }
3045
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3046
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3047
    return env, nl, nl
3048

    
3049
  def CheckPrereq(self):
3050
    """Check prerequisites.
3051

3052
    This checks that the instance is in the cluster.
3053

3054
    """
3055
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3056
    assert self.instance is not None, \
3057
      "Cannot retrieve locked instance %s" % self.op.instance_name
3058

    
3059
    _CheckNodeOnline(self, instance.primary_node)
3060

    
3061
    # check bridges existence
3062
    _CheckInstanceBridgesExist(self, instance)
3063

    
3064
  def Exec(self, feedback_fn):
3065
    """Reboot the instance.
3066

3067
    """
3068
    instance = self.instance
3069
    ignore_secondaries = self.op.ignore_secondaries
3070
    reboot_type = self.op.reboot_type
3071

    
3072
    node_current = instance.primary_node
3073

    
3074
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3075
                       constants.INSTANCE_REBOOT_HARD]:
3076
      for disk in instance.disks:
3077
        self.cfg.SetDiskID(disk, node_current)
3078
      result = self.rpc.call_instance_reboot(node_current, instance,
3079
                                             reboot_type)
3080
      result.Raise("Could not reboot instance")
3081
    else:
3082
      result = self.rpc.call_instance_shutdown(node_current, instance)
3083
      result.Raise("Could not shutdown instance for full reboot")
3084
      _ShutdownInstanceDisks(self, instance)
3085
      _StartInstanceDisks(self, instance, ignore_secondaries)
3086
      result = self.rpc.call_instance_start(node_current, instance, None, None)
3087
      msg = result.fail_msg
3088
      if msg:
3089
        _ShutdownInstanceDisks(self, instance)
3090
        raise errors.OpExecError("Could not start instance for"
3091
                                 " full reboot: %s" % msg)
3092

    
3093
    self.cfg.MarkInstanceUp(instance.name)
3094

    
3095

    
3096
class LUShutdownInstance(LogicalUnit):
3097
  """Shutdown an instance.
3098

3099
  """
3100
  HPATH = "instance-stop"
3101
  HTYPE = constants.HTYPE_INSTANCE
3102
  _OP_REQP = ["instance_name"]
3103
  REQ_BGL = False
3104

    
3105
  def ExpandNames(self):
3106
    self._ExpandAndLockInstance()
3107

    
3108
  def BuildHooksEnv(self):
3109
    """Build hooks env.
3110

3111
    This runs on master, primary and secondary nodes of the instance.
3112

3113
    """
3114
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3115
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3116
    return env, nl, nl
3117

    
3118
  def CheckPrereq(self):
3119
    """Check prerequisites.
3120

3121
    This checks that the instance is in the cluster.
3122

3123
    """
3124
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3125
    assert self.instance is not None, \
3126
      "Cannot retrieve locked instance %s" % self.op.instance_name
3127
    _CheckNodeOnline(self, self.instance.primary_node)
3128

    
3129
  def Exec(self, feedback_fn):
3130
    """Shutdown the instance.
3131

3132
    """
3133
    instance = self.instance
3134
    node_current = instance.primary_node
3135
    self.cfg.MarkInstanceDown(instance.name)
3136
    result = self.rpc.call_instance_shutdown(node_current, instance)
3137
    msg = result.fail_msg
3138
    if msg:
3139
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3140

    
3141
    _ShutdownInstanceDisks(self, instance)
3142

    
3143

    
3144
class LUReinstallInstance(LogicalUnit):
3145
  """Reinstall an instance.
3146

3147
  """
3148
  HPATH = "instance-reinstall"
3149
  HTYPE = constants.HTYPE_INSTANCE
3150
  _OP_REQP = ["instance_name"]
3151
  REQ_BGL = False
3152

    
3153
  def ExpandNames(self):
3154
    self._ExpandAndLockInstance()
3155

    
3156
  def BuildHooksEnv(self):
3157
    """Build hooks env.
3158

3159
    This runs on master, primary and secondary nodes of the instance.
3160

3161
    """
3162
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3163
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3164
    return env, nl, nl
3165

    
3166
  def CheckPrereq(self):
3167
    """Check prerequisites.
3168

3169
    This checks that the instance is in the cluster and is not running.
3170

3171
    """
3172
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3173
    assert instance is not None, \
3174
      "Cannot retrieve locked instance %s" % self.op.instance_name
3175
    _CheckNodeOnline(self, instance.primary_node)
3176

    
3177
    if instance.disk_template == constants.DT_DISKLESS:
3178
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3179
                                 self.op.instance_name)
3180
    if instance.admin_up:
3181
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3182
                                 self.op.instance_name)
3183
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3184
                                              instance.name,
3185
                                              instance.hypervisor)
3186
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3187
                      prereq=True)
3188
    if remote_info.payload:
3189
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3190
                                 (self.op.instance_name,
3191
                                  instance.primary_node))
3192

    
3193
    self.op.os_type = getattr(self.op, "os_type", None)
3194
    if self.op.os_type is not None:
3195
      # OS verification
3196
      pnode = self.cfg.GetNodeInfo(
3197
        self.cfg.ExpandNodeName(instance.primary_node))
3198
      if pnode is None:
3199
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
3200
                                   self.op.pnode)
3201
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3202
      result.Raise("OS '%s' not in supported OS list for primary node %s" %
3203
                   (self.op.os_type, pnode.name), prereq=True)
3204

    
3205
    self.instance = instance
3206

    
3207
  def Exec(self, feedback_fn):
3208
    """Reinstall the instance.
3209

3210
    """
3211
    inst = self.instance
3212

    
3213
    if self.op.os_type is not None:
3214
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3215
      inst.os = self.op.os_type
3216
      self.cfg.Update(inst)
3217

    
3218
    _StartInstanceDisks(self, inst, None)
3219
    try:
3220
      feedback_fn("Running the instance OS create scripts...")
3221
      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3222
      result.Raise("Could not install OS for instance %s on node %s" %
3223
                   (inst.name, inst.primary_node))
3224
    finally:
3225
      _ShutdownInstanceDisks(self, inst)
3226

    
3227

    
3228
class LURenameInstance(LogicalUnit):
3229
  """Rename an instance.
3230

3231
  """
3232
  HPATH = "instance-rename"
3233
  HTYPE = constants.HTYPE_INSTANCE
3234
  _OP_REQP = ["instance_name", "new_name"]
3235

    
3236
  def BuildHooksEnv(self):
3237
    """Build hooks env.
3238

3239
    This runs on master, primary and secondary nodes of the instance.
3240

3241
    """
3242
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3243
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3244
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3245
    return env, nl, nl
3246

    
3247
  def CheckPrereq(self):
3248
    """Check prerequisites.
3249

3250
    This checks that the instance is in the cluster and is not running.
3251

3252
    """
3253
    instance = self.cfg.GetInstanceInfo(
3254
      self.cfg.ExpandInstanceName(self.op.instance_name))
3255
    if instance is None:
3256
      raise errors.OpPrereqError("Instance '%s' not known" %
3257
                                 self.op.instance_name)
3258
    _CheckNodeOnline(self, instance.primary_node)
3259

    
3260
    if instance.admin_up:
3261
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3262
                                 self.op.instance_name)
3263
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3264
                                              instance.name,
3265
                                              instance.hypervisor)
3266
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3267
                      prereq=True)
3268
    if remote_info.payload:
3269
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3270
                                 (self.op.instance_name,
3271
                                  instance.primary_node))
3272
    self.instance = instance
3273

    
3274
    # new name verification
3275
    name_info = utils.HostInfo(self.op.new_name)
3276

    
3277
    self.op.new_name = new_name = name_info.name
3278
    instance_list = self.cfg.GetInstanceList()
3279
    if new_name in instance_list:
3280
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3281
                                 new_name)
3282

    
3283
    if not getattr(self.op, "ignore_ip", False):
3284
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3285
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3286
                                   (name_info.ip, new_name))
3287

    
3288

    
3289
  def Exec(self, feedback_fn):
3290
    """Reinstall the instance.
3291

3292
    """
3293
    inst = self.instance
3294
    old_name = inst.name
3295

    
3296
    if inst.disk_template == constants.DT_FILE:
3297
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3298

    
3299
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3300
    # Change the instance lock. This is definitely safe while we hold the BGL
3301
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3302
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3303

    
3304
    # re-read the instance from the configuration after rename
3305
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3306

    
3307
    if inst.disk_template == constants.DT_FILE:
3308
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3309
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3310
                                                     old_file_storage_dir,
3311
                                                     new_file_storage_dir)
3312
      result.Raise("Could not rename on node %s directory '%s' to '%s'"
3313
                   " (but the instance has been renamed in Ganeti)" %
3314
                   (inst.primary_node, old_file_storage_dir,
3315
                    new_file_storage_dir))
3316

    
3317
    _StartInstanceDisks(self, inst, None)
3318
    try:
3319
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3320
                                                 old_name)
3321
      msg = result.fail_msg
3322
      if msg:
3323
        msg = ("Could not run OS rename script for instance %s on node %s"
3324
               " (but the instance has been renamed in Ganeti): %s" %
3325
               (inst.name, inst.primary_node, msg))
3326
        self.proc.LogWarning(msg)
3327
    finally:
3328
      _ShutdownInstanceDisks(self, inst)
3329

    
3330

    
3331
class LURemoveInstance(LogicalUnit):
3332
  """Remove an instance.
3333

3334
  """
3335
  HPATH = "instance-remove"
3336
  HTYPE = constants.HTYPE_INSTANCE
3337
  _OP_REQP = ["instance_name", "ignore_failures"]
3338
  REQ_BGL = False
3339

    
3340
  def ExpandNames(self):
3341
    self._ExpandAndLockInstance()
3342
    self.needed_locks[locking.LEVEL_NODE] = []
3343
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3344

    
3345
  def DeclareLocks(self, level):
3346
    if level == locking.LEVEL_NODE:
3347
      self._LockInstancesNodes()
3348

    
3349
  def BuildHooksEnv(self):
3350
    """Build hooks env.
3351

3352
    This runs on master, primary and secondary nodes of the instance.
3353

3354
    """
3355
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3356
    nl = [self.cfg.GetMasterNode()]
3357
    return env, nl, nl
3358

    
3359
  def CheckPrereq(self):
3360
    """Check prerequisites.
3361

3362
    This checks that the instance is in the cluster.
3363

3364
    """
3365
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3366
    assert self.instance is not None, \
3367
      "Cannot retrieve locked instance %s" % self.op.instance_name
3368

    
3369
  def Exec(self, feedback_fn):
3370
    """Remove the instance.
3371

3372
    """
3373
    instance = self.instance
3374
    logging.info("Shutting down instance %s on node %s",
3375
                 instance.name, instance.primary_node)
3376

    
3377
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3378
    msg = result.fail_msg
3379
    if msg:
3380
      if self.op.ignore_failures:
3381
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3382
      else:
3383
        raise errors.OpExecError("Could not shutdown instance %s on"
3384
                                 " node %s: %s" %
3385
                                 (instance.name, instance.primary_node, msg))
3386

    
3387
    logging.info("Removing block devices for instance %s", instance.name)
3388

    
3389
    if not _RemoveDisks(self, instance):
3390
      if self.op.ignore_failures:
3391
        feedback_fn("Warning: can't remove instance's disks")
3392
      else:
3393
        raise errors.OpExecError("Can't remove instance's disks")
3394

    
3395
    logging.info("Removing instance %s out of cluster config", instance.name)
3396

    
3397
    self.cfg.RemoveInstance(instance.name)
3398
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3399

    
3400

    
3401
class LUQueryInstances(NoHooksLU):
3402
  """Logical unit for querying instances.
3403

3404
  """
3405
  _OP_REQP = ["output_fields", "names", "use_locking"]
3406
  REQ_BGL = False
3407
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3408
                                    "admin_state",
3409
                                    "disk_template", "ip", "mac", "bridge",
3410
                                    "nic_mode", "nic_link",
3411
                                    "sda_size", "sdb_size", "vcpus", "tags",
3412
                                    "network_port", "beparams",
3413
                                    r"(disk)\.(size)/([0-9]+)",
3414
                                    r"(disk)\.(sizes)", "disk_usage",
3415
                                    r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3416
                                    r"(nic)\.(bridge)/([0-9]+)",
3417
                                    r"(nic)\.(macs|ips|modes|links|bridges)",
3418
                                    r"(disk|nic)\.(count)",
3419
                                    "serial_no", "hypervisor", "hvparams",] +
3420
                                  ["hv/%s" % name
3421
                                   for name in constants.HVS_PARAMETERS] +
3422
                                  ["be/%s" % name
3423
                                   for name in constants.BES_PARAMETERS])
3424
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3425

    
3426

    
3427
  def ExpandNames(self):
3428
    _CheckOutputFields(static=self._FIELDS_STATIC,
3429
                       dynamic=self._FIELDS_DYNAMIC,
3430
                       selected=self.op.output_fields)
3431

    
3432
    self.needed_locks = {}
3433
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3434
    self.share_locks[locking.LEVEL_NODE] = 1
3435

    
3436
    if self.op.names:
3437
      self.wanted = _GetWantedInstances(self, self.op.names)
3438
    else:
3439
      self.wanted = locking.ALL_SET
3440

    
3441
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3442
    self.do_locking = self.do_node_query and self.op.use_locking
3443
    if self.do_locking:
3444
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3445
      self.needed_locks[locking.LEVEL_NODE] = []
3446
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3447

    
3448
  def DeclareLocks(self, level):
3449
    if level == locking.LEVEL_NODE and self.do_locking:
3450
      self._LockInstancesNodes()
3451

    
3452
  def CheckPrereq(self):
3453
    """Check prerequisites.
3454

3455
    """
3456
    pass
3457

    
3458
  def Exec(self, feedback_fn):
3459
    """Computes the list of nodes and their attributes.
3460

3461
    """
3462
    all_info = self.cfg.GetAllInstancesInfo()
3463
    if self.wanted == locking.ALL_SET:
3464
      # caller didn't specify instance names, so ordering is not important
3465
      if self.do_locking:
3466
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3467
      else:
3468
        instance_names = all_info.keys()
3469
      instance_names = utils.NiceSort(instance_names)
3470
    else:
3471
      # caller did specify names, so we must keep the ordering
3472
      if self.do_locking:
3473
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3474
      else:
3475
        tgt_set = all_info.keys()
3476
      missing = set(self.wanted).difference(tgt_set)
3477
      if missing:
3478
        raise errors.OpExecError("Some instances were removed before"
3479
                                 " retrieving their data: %s" % missing)
3480
      instance_names = self.wanted
3481

    
3482
    instance_list = [all_info[iname] for iname in instance_names]
3483

    
3484
    # begin data gathering
3485

    
3486
    nodes = frozenset([inst.primary_node for inst in instance_list])
3487
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3488

    
3489
    bad_nodes = []
3490
    off_nodes = []
3491
    if self.do_node_query:
3492
      live_data = {}
3493
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3494
      for name in nodes:
3495
        result = node_data[name]
3496
        if result.offline:
3497
          # offline nodes will be in both lists
3498
          off_nodes.append(name)
3499
        if result.failed or result.fail_msg:
3500
          bad_nodes.append(name)
3501
        else:
3502
          if result.payload:
3503
            live_data.update(result.payload)
3504
          # else no instance is alive
3505
    else:
3506
      live_data = dict([(name, {}) for name in instance_names])
3507

    
3508
    # end data gathering
3509

    
3510
    HVPREFIX = "hv/"
3511
    BEPREFIX = "be/"
3512
    output = []
3513
    cluster = self.cfg.GetClusterInfo()
3514
    for instance in instance_list:
3515
      iout = []
3516
      i_hv = cluster.FillHV(instance)
3517
      i_be = cluster.FillBE(instance)
3518
      i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
3519
                                 nic.nicparams) for nic in instance.nics]
3520
      for field in self.op.output_fields:
3521
        st_match = self._FIELDS_STATIC.Matches(field)
3522
        if field == "name":
3523
          val = instance.name
3524
        elif field == "os":
3525
          val = instance.os
3526
        elif field == "pnode":
3527
          val = instance.primary_node
3528
        elif field == "snodes":
3529
          val = list(instance.secondary_nodes)
3530
        elif field == "admin_state":
3531
          val = instance.admin_up
3532
        elif field == "oper_state":
3533
          if instance.primary_node in bad_nodes:
3534
            val = None
3535
          else:
3536
            val = bool(live_data.get(instance.name))
3537
        elif field == "status":
3538
          if instance.primary_node in off_nodes:
3539
            val = "ERROR_nodeoffline"
3540
          elif instance.primary_node in bad_nodes:
3541
            val = "ERROR_nodedown"
3542
          else:
3543
            running = bool(live_data.get(instance.name))
3544
            if running:
3545
              if instance.admin_up:
3546
                val = "running"
3547
              else:
3548
                val = "ERROR_up"
3549
            else:
3550
              if instance.admin_up:
3551
                val = "ERROR_down"
3552
              else:
3553
                val = "ADMIN_down"
3554
        elif field == "oper_ram":
3555
          if instance.primary_node in bad_nodes:
3556
            val = None
3557
          elif instance.name in live_data:
3558
            val = live_data[instance.name].get("memory", "?")
3559
          else:
3560
            val = "-"
3561
        elif field == "vcpus":
3562
          val = i_be[constants.BE_VCPUS]
3563
        elif field == "disk_template":
3564
          val = instance.disk_template
3565
        elif field == "ip":
3566
          if instance.nics:
3567
            val = instance.nics[0].ip
3568
          else:
3569
            val = None
3570
        elif field == "nic_mode":
3571
          if instance.nics:
3572
            val = i_nicp[0][constants.NIC_MODE]
3573
          else:
3574
            val = None
3575
        elif field == "nic_link":
3576
          if instance.nics:
3577
            val = i_nicp[0][constants.NIC_LINK]
3578
          else:
3579
            val = None
3580
        elif field == "bridge":
3581
          if (instance.nics and
3582
              i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
3583
            val = i_nicp[0][constants.NIC_LINK]
3584
          else:
3585
            val = None
3586
        elif field == "mac":
3587
          if instance.nics:
3588
            val = instance.nics[0].mac
3589
          else:
3590
            val = None
3591
        elif field == "sda_size" or field == "sdb_size":
3592
          idx = ord(field[2]) - ord('a')
3593
          try:
3594
            val = instance.FindDisk(idx).size
3595
          except errors.OpPrereqError:
3596
            val = None
3597
        elif field == "disk_usage": # total disk usage per node
3598
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3599
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3600
        elif field == "tags":
3601
          val = list(instance.GetTags())
3602
        elif field == "serial_no":
3603
          val = instance.serial_no
3604
        elif field == "network_port":
3605
          val = instance.network_port
3606
        elif field == "hypervisor":
3607
          val = instance.hypervisor
3608
        elif field == "hvparams":
3609
          val = i_hv
3610
        elif (field.startswith(HVPREFIX) and
3611
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3612
          val = i_hv.get(field[len(HVPREFIX):], None)
3613
        elif field == "beparams":
3614
          val = i_be
3615
        elif (field.startswith(BEPREFIX) and
3616
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3617
          val = i_be.get(field[len(BEPREFIX):], None)
3618
        elif st_match and st_match.groups():
3619
          # matches a variable list
3620
          st_groups = st_match.groups()
3621
          if st_groups and st_groups[0] == "disk":
3622
            if st_groups[1] == "count":
3623
              val = len(instance.disks)
3624
            elif st_groups[1] == "sizes":
3625
              val = [disk.size for disk in instance.disks]
3626
            elif st_groups[1] == "size":
3627
              try:
3628
                val = instance.FindDisk(st_groups[2]).size
3629
              except errors.OpPrereqError:
3630
                val = None
3631
            else:
3632
              assert False, "Unhandled disk parameter"
3633
          elif st_groups[0] == "nic":
3634
            if st_groups[1] == "count":
3635
              val = len(instance.nics)
3636
            elif st_groups[1] == "macs":
3637
              val = [nic.mac for nic in instance.nics]
3638
            elif st_groups[1] == "ips":
3639
              val = [nic.ip for nic in instance.nics]
3640
            elif st_groups[1] == "modes":
3641
              val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
3642
            elif st_groups[1] == "links":
3643
              val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
3644
            elif st_groups[1] == "bridges":
3645
              val = []
3646
              for nicp in i_nicp:
3647
                if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3648
                  val.append(nicp[constants.NIC_LINK])
3649
                else:
3650
                  val.append(None)
3651
            else:
3652
              # index-based item
3653
              nic_idx = int(st_groups[2])
3654
              if nic_idx >= len(instance.nics):
3655
                val = None
3656
              else:
3657
                if st_groups[1] == "mac":
3658
                  val = instance.nics[nic_idx].mac
3659
                elif st_groups[1] == "ip":
3660
                  val = instance.nics[nic_idx].ip
3661
                elif st_groups[1] == "mode":
3662
                  val = i_nicp[nic_idx][constants.NIC_MODE]
3663
                elif st_groups[1] == "link":
3664
                  val = i_nicp[nic_idx][constants.NIC_LINK]
3665
                elif st_groups[1] == "bridge":
3666
                  nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
3667
                  if nic_mode == constants.NIC_MODE_BRIDGED:
3668
                    val = i_nicp[nic_idx][constants.NIC_LINK]
3669
                  else:
3670
                    val = None
3671
                else:
3672
                  assert False, "Unhandled NIC parameter"
3673
          else:
3674
            assert False, ("Declared but unhandled variable parameter '%s'" %
3675
                           field)
3676
        else:
3677
          assert False, "Declared but unhandled parameter '%s'" % field
3678
        iout.append(val)
3679
      output.append(iout)
3680

    
3681
    return output
3682

    
3683

    
3684
class LUFailoverInstance(LogicalUnit):
3685
  """Failover an instance.
3686

3687
  """
3688
  HPATH = "instance-failover"
3689
  HTYPE = constants.HTYPE_INSTANCE
3690
  _OP_REQP = ["instance_name", "ignore_consistency"]
3691
  REQ_BGL = False
3692

    
3693
  def ExpandNames(self):
3694
    self._ExpandAndLockInstance()
3695
    self.needed_locks[locking.LEVEL_NODE] = []
3696
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3697

    
3698
  def DeclareLocks(self, level):
3699
    if level == locking.LEVEL_NODE:
3700
      self._LockInstancesNodes()
3701

    
3702
  def BuildHooksEnv(self):
3703
    """Build hooks env.
3704

3705
    This runs on master, primary and secondary nodes of the instance.
3706

3707
    """
3708
    env = {
3709
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3710
      }
3711
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3712
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3713
    return env, nl, nl
3714

    
3715
  def CheckPrereq(self):
3716
    """Check prerequisites.
3717

3718
    This checks that the instance is in the cluster.
3719

3720
    """
3721
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3722
    assert self.instance is not None, \
3723
      "Cannot retrieve locked instance %s" % self.op.instance_name
3724

    
3725
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3726
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3727
      raise errors.OpPrereqError("Instance's disk layout is not"
3728
                                 " network mirrored, cannot failover.")
3729

    
3730
    secondary_nodes = instance.secondary_nodes
3731
    if not secondary_nodes:
3732
      raise errors.ProgrammerError("no secondary node but using "
3733
                                   "a mirrored disk template")
3734

    
3735
    target_node = secondary_nodes[0]
3736
    _CheckNodeOnline(self, target_node)
3737
    _CheckNodeNotDrained(self, target_node)
3738
    if instance.admin_up:
3739
      # check memory requirements on the secondary node
3740
      _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3741
                           instance.name, bep[constants.BE_MEMORY],
3742
                           instance.hypervisor)
3743
    else:
3744
      self.LogInfo("Not checking memory on the secondary node as"
3745
                   " instance will not be started")
3746

    
3747
    # check bridge existance
3748
    _CheckInstanceBridgesExist(self, instance, node=target_node)
3749

    
3750
  def Exec(self, feedback_fn):
3751
    """Failover an instance.
3752

3753
    The failover is done by shutting it down on its present node and
3754
    starting it on the secondary.
3755

3756
    """
3757
    instance = self.instance
3758

    
3759
    source_node = instance.primary_node
3760
    target_node = instance.secondary_nodes[0]
3761

    
3762
    feedback_fn("* checking disk consistency between source and target")
3763
    for dev in instance.disks:
3764
      # for drbd, these are drbd over lvm
3765
      if not _CheckDiskConsistency(self, dev, target_node, False):
3766
        if instance.admin_up and not self.op.ignore_consistency:
3767
          raise errors.OpExecError("Disk %s is degraded on target node,"
3768
                                   " aborting failover." % dev.iv_name)
3769

    
3770
    feedback_fn("* shutting down instance on source node")
3771
    logging.info("Shutting down instance %s on node %s",
3772
                 instance.name, source_node)
3773

    
3774
    result = self.rpc.call_instance_shutdown(source_node, instance)
3775
    msg = result.fail_msg
3776
    if msg:
3777
      if self.op.ignore_consistency:
3778
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3779
                             " Proceeding anyway. Please make sure node"
3780
                             " %s is down. Error details: %s",
3781
                             instance.name, source_node, source_node, msg)
3782
      else:
3783
        raise errors.OpExecError("Could not shutdown instance %s on"
3784
                                 " node %s: %s" %
3785
                                 (instance.name, source_node, msg))
3786

    
3787
    feedback_fn("* deactivating the instance's disks on source node")
3788
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3789
      raise errors.OpExecError("Can't shut down the instance's disks.")
3790

    
3791
    instance.primary_node = target_node
3792
    # distribute new instance config to the other nodes
3793
    self.cfg.Update(instance)
3794

    
3795
    # Only start the instance if it's marked as up
3796
    if instance.admin_up:
3797
      feedback_fn("* activating the instance's disks on target node")
3798
      logging.info("Starting instance %s on node %s",
3799
                   instance.name, target_node)
3800

    
3801
      disks_ok, _ = _AssembleInstanceDisks(self, instance,
3802
                                               ignore_secondaries=True)
3803
      if not disks_ok:
3804
        _ShutdownInstanceDisks(self, instance)
3805
        raise errors.OpExecError("Can't activate the instance's disks")
3806

    
3807
      feedback_fn("* starting the instance on the target node")
3808
      result = self.rpc.call_instance_start(target_node, instance, None, None)
3809
      msg = result.fail_msg
3810
      if msg:
3811
        _ShutdownInstanceDisks(self, instance)
3812
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3813
                                 (instance.name, target_node, msg))
3814

    
3815

    
3816
class LUMigrateInstance(LogicalUnit):
3817
  """Migrate an instance.
3818

3819
  This is migration without shutting down, compared to the failover,
3820
  which is done with shutdown.
3821

3822
  """
3823
  HPATH = "instance-migrate"
3824
  HTYPE = constants.HTYPE_INSTANCE
3825
  _OP_REQP = ["instance_name", "live", "cleanup"]
3826

    
3827
  REQ_BGL = False
3828

    
3829
  def ExpandNames(self):
3830
    self._ExpandAndLockInstance()
3831
    self.needed_locks[locking.LEVEL_NODE] = []
3832
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3833

    
3834
  def DeclareLocks(self, level):
3835
    if level == locking.LEVEL_NODE:
3836
      self._LockInstancesNodes()
3837

    
3838
  def BuildHooksEnv(self):
3839
    """Build hooks env.
3840

3841
    This runs on master, primary and secondary nodes of the instance.
3842

3843
    """
3844
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3845
    env["MIGRATE_LIVE"] = self.op.live
3846
    env["MIGRATE_CLEANUP"] = self.op.cleanup
3847
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3848
    return env, nl, nl
3849

    
3850
  def CheckPrereq(self):
3851
    """Check prerequisites.
3852

3853
    This checks that the instance is in the cluster.
3854

3855
    """
3856
    instance = self.cfg.GetInstanceInfo(
3857
      self.cfg.ExpandInstanceName(self.op.instance_name))
3858
    if instance is None:
3859
      raise errors.OpPrereqError("Instance '%s' not known" %
3860
                                 self.op.instance_name)
3861

    
3862
    if instance.disk_template != constants.DT_DRBD8:
3863
      raise errors.OpPrereqError("Instance's disk layout is not"
3864
                                 " drbd8, cannot migrate.")
3865

    
3866
    secondary_nodes = instance.secondary_nodes
3867
    if not secondary_nodes:
3868
      raise errors.ConfigurationError("No secondary node but using"
3869
                                      " drbd8 disk template")
3870

    
3871
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3872

    
3873
    target_node = secondary_nodes[0]
3874
    # check memory requirements on the secondary node
3875
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3876
                         instance.name, i_be[constants.BE_MEMORY],
3877
                         instance.hypervisor)
3878

    
3879
    # check bridge existance
3880
    _CheckInstanceBridgesExist(self, instance, node=target_node)
3881

    
3882
    if not self.op.cleanup:
3883
      _CheckNodeNotDrained(self, target_node)
3884
      result = self.rpc.call_instance_migratable(instance.primary_node,
3885
                                                 instance)
3886
      result.Raise("Can't migrate, please use failover", prereq=True)
3887

    
3888
    self.instance = instance
3889

    
3890
  def _WaitUntilSync(self):
3891
    """Poll with custom rpc for disk sync.
3892

3893
    This uses our own step-based rpc call.
3894

3895
    """
3896
    self.feedback_fn("* wait until resync is done")
3897
    all_done = False
3898
    while not all_done:
3899
      all_done = True
3900
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3901
                                            self.nodes_ip,
3902
                                            self.instance.disks)
3903
      min_percent = 100
3904
      for node, nres in result.items():
3905
        nres.Raise("Cannot resync disks on node %s" % node)
3906
        node_done, node_percent = nres.payload
3907
        all_done = all_done and node_done
3908
        if node_percent is not None:
3909
          min_percent = min(min_percent, node_percent)
3910
      if not all_done:
3911
        if min_percent < 100:
3912
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3913
        time.sleep(2)
3914

    
3915
  def _EnsureSecondary(self, node):
3916
    """Demote a node to secondary.
3917

3918
    """
3919
    self.feedback_fn("* switching node %s to secondary mode" % node)
3920

    
3921
    for dev in self.instance.disks:
3922
      self.cfg.SetDiskID(dev, node)
3923

    
3924
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3925
                                          self.instance.disks)
3926
    result.Raise("Cannot change disk to secondary on node %s" % node)
3927

    
3928
  def _GoStandalone(self):
3929
    """Disconnect from the network.
3930

3931
    """
3932
    self.feedback_fn("* changing into standalone mode")
3933
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3934
                                               self.instance.disks)
3935
    for node, nres in result.items():
3936
      nres.Raise("Cannot disconnect disks node %s" % node)
3937

    
3938
  def _GoReconnect(self, multimaster):
3939
    """Reconnect to the network.
3940

3941
    """
3942
    if multimaster:
3943
      msg = "dual-master"
3944
    else:
3945
      msg = "single-master"
3946
    self.feedback_fn("* changing disks into %s mode" % msg)
3947
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3948
                                           self.instance.disks,
3949
                                           self.instance.name, multimaster)
3950
    for node, nres in result.items():
3951
      nres.Raise("Cannot change disks config on node %s" % node)
3952

    
3953
  def _ExecCleanup(self):
3954
    """Try to cleanup after a failed migration.
3955

3956
    The cleanup is done by:
3957
      - check that the instance is running only on one node
3958
        (and update the config if needed)
3959
      - change disks on its secondary node to secondary
3960
      - wait until disks are fully synchronized
3961
      - disconnect from the network
3962
      - change disks into single-master mode
3963
      - wait again until disks are fully synchronized
3964

3965
    """
3966
    instance = self.instance
3967
    target_node = self.target_node
3968
    source_node = self.source_node
3969

    
3970
    # check running on only one node
3971
    self.feedback_fn("* checking where the instance actually runs"
3972
                     " (if this hangs, the hypervisor might be in"
3973
                     " a bad state)")
3974
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3975
    for node, result in ins_l.items():
3976
      result.Raise("Can't contact node %s" % node)
3977

    
3978
    runningon_source = instance.name in ins_l[source_node].payload
3979
    runningon_target = instance.name in ins_l[target_node].payload
3980

    
3981
    if runningon_source and runningon_target:
3982
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3983
                               " or the hypervisor is confused. You will have"
3984
                               " to ensure manually that it runs only on one"
3985
                               " and restart this operation.")
3986

    
3987
    if