Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 08896026

History | View | Annotate | Download (250.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import time
29
import re
30
import platform
31
import logging
32
import copy
33

    
34
from ganeti import ssh
35
from ganeti import utils
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import serializer
42
from ganeti import ssconf
43

    
44

    
45
class LogicalUnit(object):
46
  """Logical Unit base class.
47

48
  Subclasses must follow these rules:
49
    - implement ExpandNames
50
    - implement CheckPrereq
51
    - implement Exec
52
    - implement BuildHooksEnv
53
    - redefine HPATH and HTYPE
54
    - optionally redefine their run requirements:
55
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56

57
  Note that all commands require root permissions.
58

59
  @ivar dry_run_result: the value (if any) that will be returned to the caller
60
      in dry-run mode (signalled by opcode dry_run parameter)
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_BGL = True
67

    
68
  def __init__(self, processor, op, context, rpc):
69
    """Constructor for LogicalUnit.
70

71
    This needs to be overriden in derived classes in order to check op
72
    validity.
73

74
    """
75
    self.proc = processor
76
    self.op = op
77
    self.cfg = context.cfg
78
    self.context = context
79
    self.rpc = rpc
80
    # Dicts used to declare locking needs to mcpu
81
    self.needed_locks = None
82
    self.acquired_locks = {}
83
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84
    self.add_locks = {}
85
    self.remove_locks = {}
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89
    # logging
90
    self.LogWarning = processor.LogWarning
91
    self.LogInfo = processor.LogInfo
92
    # support for dry-run
93
    self.dry_run_result = None
94

    
95
    for attr_name in self._OP_REQP:
96
      attr_val = getattr(op, attr_name, None)
97
      if attr_val is None:
98
        raise errors.OpPrereqError("Required parameter '%s' missing" %
99
                                   attr_name)
100
    self.CheckArguments()
101

    
102
  def __GetSSH(self):
103
    """Returns the SshRunner object
104

105
    """
106
    if not self.__ssh:
107
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
108
    return self.__ssh
109

    
110
  ssh = property(fget=__GetSSH)
111

    
112
  def CheckArguments(self):
113
    """Check syntactic validity for the opcode arguments.
114

115
    This method is for doing a simple syntactic check and ensure
116
    validity of opcode parameters, without any cluster-related
117
    checks. While the same can be accomplished in ExpandNames and/or
118
    CheckPrereq, doing these separate is better because:
119

120
      - ExpandNames is left as as purely a lock-related function
121
      - CheckPrereq is run after we have aquired locks (and possible
122
        waited for them)
123

124
    The function is allowed to change the self.op attribute so that
125
    later methods can no longer worry about missing parameters.
126

127
    """
128
    pass
129

    
130
  def ExpandNames(self):
131
    """Expand names for this LU.
132

133
    This method is called before starting to execute the opcode, and it should
134
    update all the parameters of the opcode to their canonical form (e.g. a
135
    short node name must be fully expanded after this method has successfully
136
    completed). This way locking, hooks, logging, ecc. can work correctly.
137

138
    LUs which implement this method must also populate the self.needed_locks
139
    member, as a dict with lock levels as keys, and a list of needed lock names
140
    as values. Rules:
141

142
      - use an empty dict if you don't need any lock
143
      - if you don't need any lock at a particular level omit that level
144
      - don't put anything for the BGL level
145
      - if you want all locks at a level use locking.ALL_SET as a value
146

147
    If you need to share locks (rather than acquire them exclusively) at one
148
    level you can modify self.share_locks, setting a true value (usually 1) for
149
    that level. By default locks are not shared.
150

151
    Examples::
152

153
      # Acquire all nodes and one instance
154
      self.needed_locks = {
155
        locking.LEVEL_NODE: locking.ALL_SET,
156
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
157
      }
158
      # Acquire just two nodes
159
      self.needed_locks = {
160
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
161
      }
162
      # Acquire no locks
163
      self.needed_locks = {} # No, you can't leave it to the default value None
164

165
    """
166
    # The implementation of this method is mandatory only if the new LU is
167
    # concurrent, so that old LUs don't need to be changed all at the same
168
    # time.
169
    if self.REQ_BGL:
170
      self.needed_locks = {} # Exclusive LUs don't need locks.
171
    else:
172
      raise NotImplementedError
173

    
174
  def DeclareLocks(self, level):
175
    """Declare LU locking needs for a level
176

177
    While most LUs can just declare their locking needs at ExpandNames time,
178
    sometimes there's the need to calculate some locks after having acquired
179
    the ones before. This function is called just before acquiring locks at a
180
    particular level, but after acquiring the ones at lower levels, and permits
181
    such calculations. It can be used to modify self.needed_locks, and by
182
    default it does nothing.
183

184
    This function is only called if you have something already set in
185
    self.needed_locks for the level.
186

187
    @param level: Locking level which is going to be locked
188
    @type level: member of ganeti.locking.LEVELS
189

190
    """
191

    
192
  def CheckPrereq(self):
193
    """Check prerequisites for this LU.
194

195
    This method should check that the prerequisites for the execution
196
    of this LU are fulfilled. It can do internode communication, but
197
    it should be idempotent - no cluster or system changes are
198
    allowed.
199

200
    The method should raise errors.OpPrereqError in case something is
201
    not fulfilled. Its return value is ignored.
202

203
    This method should also update all the parameters of the opcode to
204
    their canonical form if it hasn't been done by ExpandNames before.
205

206
    """
207
    raise NotImplementedError
208

    
209
  def Exec(self, feedback_fn):
210
    """Execute the LU.
211

212
    This method should implement the actual work. It should raise
213
    errors.OpExecError for failures that are somewhat dealt with in
214
    code, or expected.
215

216
    """
217
    raise NotImplementedError
218

    
219
  def BuildHooksEnv(self):
220
    """Build hooks environment for this LU.
221

222
    This method should return a three-node tuple consisting of: a dict
223
    containing the environment that will be used for running the
224
    specific hook for this LU, a list of node names on which the hook
225
    should run before the execution, and a list of node names on which
226
    the hook should run after the execution.
227

228
    The keys of the dict must not have 'GANETI_' prefixed as this will
229
    be handled in the hooks runner. Also note additional keys will be
230
    added by the hooks runner. If the LU doesn't define any
231
    environment, an empty dict (and not None) should be returned.
232

233
    No nodes should be returned as an empty list (and not None).
234

235
    Note that if the HPATH for a LU class is None, this function will
236
    not be called.
237

238
    """
239
    raise NotImplementedError
240

    
241
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
242
    """Notify the LU about the results of its hooks.
243

244
    This method is called every time a hooks phase is executed, and notifies
245
    the Logical Unit about the hooks' result. The LU can then use it to alter
246
    its result based on the hooks.  By default the method does nothing and the
247
    previous result is passed back unchanged but any LU can define it if it
248
    wants to use the local cluster hook-scripts somehow.
249

250
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
251
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
252
    @param hook_results: the results of the multi-node hooks rpc call
253
    @param feedback_fn: function used send feedback back to the caller
254
    @param lu_result: the previous Exec result this LU had, or None
255
        in the PRE phase
256
    @return: the new Exec result, based on the previous result
257
        and hook results
258

259
    """
260
    return lu_result
261

    
262
  def _ExpandAndLockInstance(self):
263
    """Helper function to expand and lock an instance.
264

265
    Many LUs that work on an instance take its name in self.op.instance_name
266
    and need to expand it and then declare the expanded name for locking. This
267
    function does it, and then updates self.op.instance_name to the expanded
268
    name. It also initializes needed_locks as a dict, if this hasn't been done
269
    before.
270

271
    """
272
    if self.needed_locks is None:
273
      self.needed_locks = {}
274
    else:
275
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
276
        "_ExpandAndLockInstance called with instance-level locks set"
277
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
278
    if expanded_name is None:
279
      raise errors.OpPrereqError("Instance '%s' not known" %
280
                                  self.op.instance_name)
281
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
282
    self.op.instance_name = expanded_name
283

    
284
  def _LockInstancesNodes(self, primary_only=False):
285
    """Helper function to declare instances' nodes for locking.
286

287
    This function should be called after locking one or more instances to lock
288
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
289
    with all primary or secondary nodes for instances already locked and
290
    present in self.needed_locks[locking.LEVEL_INSTANCE].
291

292
    It should be called from DeclareLocks, and for safety only works if
293
    self.recalculate_locks[locking.LEVEL_NODE] is set.
294

295
    In the future it may grow parameters to just lock some instance's nodes, or
296
    to just lock primaries or secondary nodes, if needed.
297

298
    If should be called in DeclareLocks in a way similar to::
299

300
      if level == locking.LEVEL_NODE:
301
        self._LockInstancesNodes()
302

303
    @type primary_only: boolean
304
    @param primary_only: only lock primary nodes of locked instances
305

306
    """
307
    assert locking.LEVEL_NODE in self.recalculate_locks, \
308
      "_LockInstancesNodes helper function called with no nodes to recalculate"
309

    
310
    # TODO: check if we're really been called with the instance locks held
311

    
312
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
313
    # future we might want to have different behaviors depending on the value
314
    # of self.recalculate_locks[locking.LEVEL_NODE]
315
    wanted_nodes = []
316
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
317
      instance = self.context.cfg.GetInstanceInfo(instance_name)
318
      wanted_nodes.append(instance.primary_node)
319
      if not primary_only:
320
        wanted_nodes.extend(instance.secondary_nodes)
321

    
322
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
323
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
324
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
325
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
326

    
327
    del self.recalculate_locks[locking.LEVEL_NODE]
328

    
329

    
330
class NoHooksLU(LogicalUnit):
331
  """Simple LU which runs no hooks.
332

333
  This LU is intended as a parent for other LogicalUnits which will
334
  run no hooks, in order to reduce duplicate code.
335

336
  """
337
  HPATH = None
338
  HTYPE = None
339

    
340

    
341
def _GetWantedNodes(lu, nodes):
342
  """Returns list of checked and expanded node names.
343

344
  @type lu: L{LogicalUnit}
345
  @param lu: the logical unit on whose behalf we execute
346
  @type nodes: list
347
  @param nodes: list of node names or None for all nodes
348
  @rtype: list
349
  @return: the list of nodes, sorted
350
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
351

352
  """
353
  if not isinstance(nodes, list):
354
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
355

    
356
  if not nodes:
357
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
358
      " non-empty list of nodes whose name is to be expanded.")
359

    
360
  wanted = []
361
  for name in nodes:
362
    node = lu.cfg.ExpandNodeName(name)
363
    if node is None:
364
      raise errors.OpPrereqError("No such node name '%s'" % name)
365
    wanted.append(node)
366

    
367
  return utils.NiceSort(wanted)
368

    
369

    
370
def _GetWantedInstances(lu, instances):
371
  """Returns list of checked and expanded instance names.
372

373
  @type lu: L{LogicalUnit}
374
  @param lu: the logical unit on whose behalf we execute
375
  @type instances: list
376
  @param instances: list of instance names or None for all instances
377
  @rtype: list
378
  @return: the list of instances, sorted
379
  @raise errors.OpPrereqError: if the instances parameter is wrong type
380
  @raise errors.OpPrereqError: if any of the passed instances is not found
381

382
  """
383
  if not isinstance(instances, list):
384
    raise errors.OpPrereqError("Invalid argument type 'instances'")
385

    
386
  if instances:
387
    wanted = []
388

    
389
    for name in instances:
390
      instance = lu.cfg.ExpandInstanceName(name)
391
      if instance is None:
392
        raise errors.OpPrereqError("No such instance name '%s'" % name)
393
      wanted.append(instance)
394

    
395
  else:
396
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
397
  return wanted
398

    
399

    
400
def _CheckOutputFields(static, dynamic, selected):
401
  """Checks whether all selected fields are valid.
402

403
  @type static: L{utils.FieldSet}
404
  @param static: static fields set
405
  @type dynamic: L{utils.FieldSet}
406
  @param dynamic: dynamic fields set
407

408
  """
409
  f = utils.FieldSet()
410
  f.Extend(static)
411
  f.Extend(dynamic)
412

    
413
  delta = f.NonMatching(selected)
414
  if delta:
415
    raise errors.OpPrereqError("Unknown output fields selected: %s"
416
                               % ",".join(delta))
417

    
418

    
419
def _CheckBooleanOpField(op, name):
420
  """Validates boolean opcode parameters.
421

422
  This will ensure that an opcode parameter is either a boolean value,
423
  or None (but that it always exists).
424

425
  """
426
  val = getattr(op, name, None)
427
  if not (val is None or isinstance(val, bool)):
428
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
429
                               (name, str(val)))
430
  setattr(op, name, val)
431

    
432

    
433
def _CheckNodeOnline(lu, node):
434
  """Ensure that a given node is online.
435

436
  @param lu: the LU on behalf of which we make the check
437
  @param node: the node to check
438
  @raise errors.OpPrereqError: if the node is offline
439

440
  """
441
  if lu.cfg.GetNodeInfo(node).offline:
442
    raise errors.OpPrereqError("Can't use offline node %s" % node)
443

    
444

    
445
def _CheckNodeNotDrained(lu, node):
446
  """Ensure that a given node is not drained.
447

448
  @param lu: the LU on behalf of which we make the check
449
  @param node: the node to check
450
  @raise errors.OpPrereqError: if the node is drained
451

452
  """
453
  if lu.cfg.GetNodeInfo(node).drained:
454
    raise errors.OpPrereqError("Can't use drained node %s" % node)
455

    
456

    
457
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
458
                          memory, vcpus, nics, disk_template, disks,
459
                          bep, hvp, hypervisor):
460
  """Builds instance related env variables for hooks
461

462
  This builds the hook environment from individual variables.
463

464
  @type name: string
465
  @param name: the name of the instance
466
  @type primary_node: string
467
  @param primary_node: the name of the instance's primary node
468
  @type secondary_nodes: list
469
  @param secondary_nodes: list of secondary nodes as strings
470
  @type os_type: string
471
  @param os_type: the name of the instance's OS
472
  @type status: boolean
473
  @param status: the should_run status of the instance
474
  @type memory: string
475
  @param memory: the memory size of the instance
476
  @type vcpus: string
477
  @param vcpus: the count of VCPUs the instance has
478
  @type nics: list
479
  @param nics: list of tuples (ip, mac, mode, link) representing
480
      the NICs the instance has
481
  @type disk_template: string
482
  @param disk_template: the distk template of the instance
483
  @type disks: list
484
  @param disks: the list of (size, mode) pairs
485
  @type bep: dict
486
  @param bep: the backend parameters for the instance
487
  @type hvp: dict
488
  @param hvp: the hypervisor parameters for the instance
489
  @type hypervisor: string
490
  @param hypervisor: the hypervisor for the instance
491
  @rtype: dict
492
  @return: the hook environment for this instance
493

494
  """
495
  if status:
496
    str_status = "up"
497
  else:
498
    str_status = "down"
499
  env = {
500
    "OP_TARGET": name,
501
    "INSTANCE_NAME": name,
502
    "INSTANCE_PRIMARY": primary_node,
503
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
504
    "INSTANCE_OS_TYPE": os_type,
505
    "INSTANCE_STATUS": str_status,
506
    "INSTANCE_MEMORY": memory,
507
    "INSTANCE_VCPUS": vcpus,
508
    "INSTANCE_DISK_TEMPLATE": disk_template,
509
    "INSTANCE_HYPERVISOR": hypervisor,
510
  }
511

    
512
  if nics:
513
    nic_count = len(nics)
514
    for idx, (ip, mac, mode, link) in enumerate(nics):
515
      if ip is None:
516
        ip = ""
517
      env["INSTANCE_NIC%d_IP" % idx] = ip
518
      env["INSTANCE_NIC%d_MAC" % idx] = mac
519
      env["INSTANCE_NIC%d_MODE" % idx] = mode
520
      env["INSTANCE_NIC%d_LINK" % idx] = link
521
      if mode == constants.NIC_MODE_BRIDGED:
522
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
523
  else:
524
    nic_count = 0
525

    
526
  env["INSTANCE_NIC_COUNT"] = nic_count
527

    
528
  if disks:
529
    disk_count = len(disks)
530
    for idx, (size, mode) in enumerate(disks):
531
      env["INSTANCE_DISK%d_SIZE" % idx] = size
532
      env["INSTANCE_DISK%d_MODE" % idx] = mode
533
  else:
534
    disk_count = 0
535

    
536
  env["INSTANCE_DISK_COUNT"] = disk_count
537

    
538
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
539
    for key, value in source.items():
540
      env["INSTANCE_%s_%s" % (kind, key)] = value
541

    
542
  return env
543

    
544
def _NICListToTuple(lu, nics):
545
  """Build a list of nic information tuples.
546

547
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
548
  value in LUQueryInstanceData.
549

550
  @type lu:  L{LogicalUnit}
551
  @param lu: the logical unit on whose behalf we execute
552
  @type nics: list of L{objects.NIC}
553
  @param nics: list of nics to convert to hooks tuples
554

555
  """
556
  hooks_nics = []
557
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
558
  for nic in nics:
559
    ip = nic.ip
560
    mac = nic.mac
561
    filled_params = objects.FillDict(c_nicparams, nic.nicparams)
562
    mode = filled_params[constants.NIC_MODE]
563
    link = filled_params[constants.NIC_LINK]
564
    hooks_nics.append((ip, mac, mode, link))
565
  return hooks_nics
566

    
567
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
568
  """Builds instance related env variables for hooks from an object.
569

570
  @type lu: L{LogicalUnit}
571
  @param lu: the logical unit on whose behalf we execute
572
  @type instance: L{objects.Instance}
573
  @param instance: the instance for which we should build the
574
      environment
575
  @type override: dict
576
  @param override: dictionary with key/values that will override
577
      our values
578
  @rtype: dict
579
  @return: the hook environment dictionary
580

581
  """
582
  cluster = lu.cfg.GetClusterInfo()
583
  bep = cluster.FillBE(instance)
584
  hvp = cluster.FillHV(instance)
585
  args = {
586
    'name': instance.name,
587
    'primary_node': instance.primary_node,
588
    'secondary_nodes': instance.secondary_nodes,
589
    'os_type': instance.os,
590
    'status': instance.admin_up,
591
    'memory': bep[constants.BE_MEMORY],
592
    'vcpus': bep[constants.BE_VCPUS],
593
    'nics': _NICListToTuple(lu, instance.nics),
594
    'disk_template': instance.disk_template,
595
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
596
    'bep': bep,
597
    'hvp': hvp,
598
    'hypervisor': instance.hypervisor,
599
  }
600
  if override:
601
    args.update(override)
602
  return _BuildInstanceHookEnv(**args)
603

    
604

    
605
def _AdjustCandidatePool(lu):
606
  """Adjust the candidate pool after node operations.
607

608
  """
609
  mod_list = lu.cfg.MaintainCandidatePool()
610
  if mod_list:
611
    lu.LogInfo("Promoted nodes to master candidate role: %s",
612
               ", ".join(node.name for node in mod_list))
613
    for name in mod_list:
614
      lu.context.ReaddNode(name)
615
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
616
  if mc_now > mc_max:
617
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
618
               (mc_now, mc_max))
619

    
620

    
621
def _CheckNicsBridgesExist(lu, target_nics, target_node,
622
                               profile=constants.PP_DEFAULT):
623
  """Check that the brigdes needed by a list of nics exist.
624

625
  """
626
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
627
  paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
628
                for nic in target_nics]
629
  brlist = [params[constants.NIC_LINK] for params in paramslist
630
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
631
  if brlist:
632
    result = lu.rpc.call_bridges_exist(target_node, brlist)
633
    result.Raise("Error checking bridges on destination node '%s'" %
634
                 target_node, prereq=True)
635

    
636

    
637
def _CheckInstanceBridgesExist(lu, instance, node=None):
638
  """Check that the brigdes needed by an instance exist.
639

640
  """
641
  if node is None:
642
    node = instance.primary_node
643
  _CheckNicsBridgesExist(lu, instance.nics, node)
644

    
645

    
646
class LUDestroyCluster(NoHooksLU):
647
  """Logical unit for destroying the cluster.
648

649
  """
650
  _OP_REQP = []
651

    
652
  def CheckPrereq(self):
653
    """Check prerequisites.
654

655
    This checks whether the cluster is empty.
656

657
    Any errors are signalled by raising errors.OpPrereqError.
658

659
    """
660
    master = self.cfg.GetMasterNode()
661

    
662
    nodelist = self.cfg.GetNodeList()
663
    if len(nodelist) != 1 or nodelist[0] != master:
664
      raise errors.OpPrereqError("There are still %d node(s) in"
665
                                 " this cluster." % (len(nodelist) - 1))
666
    instancelist = self.cfg.GetInstanceList()
667
    if instancelist:
668
      raise errors.OpPrereqError("There are still %d instance(s) in"
669
                                 " this cluster." % len(instancelist))
670

    
671
  def Exec(self, feedback_fn):
672
    """Destroys the cluster.
673

674
    """
675
    master = self.cfg.GetMasterNode()
676
    result = self.rpc.call_node_stop_master(master, False)
677
    result.Raise("Could not disable the master role")
678
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
679
    utils.CreateBackup(priv_key)
680
    utils.CreateBackup(pub_key)
681
    return master
682

    
683

    
684
class LUVerifyCluster(LogicalUnit):
685
  """Verifies the cluster status.
686

687
  """
688
  HPATH = "cluster-verify"
689
  HTYPE = constants.HTYPE_CLUSTER
690
  _OP_REQP = ["skip_checks"]
691
  REQ_BGL = False
692

    
693
  def ExpandNames(self):
694
    self.needed_locks = {
695
      locking.LEVEL_NODE: locking.ALL_SET,
696
      locking.LEVEL_INSTANCE: locking.ALL_SET,
697
    }
698
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
699

    
700
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
701
                  node_result, feedback_fn, master_files,
702
                  drbd_map, vg_name):
703
    """Run multiple tests against a node.
704

705
    Test list:
706

707
      - compares ganeti version
708
      - checks vg existance and size > 20G
709
      - checks config file checksum
710
      - checks ssh to other nodes
711

712
    @type nodeinfo: L{objects.Node}
713
    @param nodeinfo: the node to check
714
    @param file_list: required list of files
715
    @param local_cksum: dictionary of local files and their checksums
716
    @param node_result: the results from the node
717
    @param feedback_fn: function used to accumulate results
718
    @param master_files: list of files that only masters should have
719
    @param drbd_map: the useddrbd minors for this node, in
720
        form of minor: (instance, must_exist) which correspond to instances
721
        and their running status
722
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
723

724
    """
725
    node = nodeinfo.name
726

    
727
    # main result, node_result should be a non-empty dict
728
    if not node_result or not isinstance(node_result, dict):
729
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
730
      return True
731

    
732
    # compares ganeti version
733
    local_version = constants.PROTOCOL_VERSION
734
    remote_version = node_result.get('version', None)
735
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
736
            len(remote_version) == 2):
737
      feedback_fn("  - ERROR: connection to %s failed" % (node))
738
      return True
739

    
740
    if local_version != remote_version[0]:
741
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
742
                  " node %s %s" % (local_version, node, remote_version[0]))
743
      return True
744

    
745
    # node seems compatible, we can actually try to look into its results
746

    
747
    bad = False
748

    
749
    # full package version
750
    if constants.RELEASE_VERSION != remote_version[1]:
751
      feedback_fn("  - WARNING: software version mismatch: master %s,"
752
                  " node %s %s" %
753
                  (constants.RELEASE_VERSION, node, remote_version[1]))
754

    
755
    # checks vg existence and size > 20G
756
    if vg_name is not None:
757
      vglist = node_result.get(constants.NV_VGLIST, None)
758
      if not vglist:
759
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
760
                        (node,))
761
        bad = True
762
      else:
763
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
764
                                              constants.MIN_VG_SIZE)
765
        if vgstatus:
766
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
767
          bad = True
768

    
769
    # checks config file checksum
770

    
771
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
772
    if not isinstance(remote_cksum, dict):
773
      bad = True
774
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
775
    else:
776
      for file_name in file_list:
777
        node_is_mc = nodeinfo.master_candidate
778
        must_have_file = file_name not in master_files
779
        if file_name not in remote_cksum:
780
          if node_is_mc or must_have_file:
781
            bad = True
782
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
783
        elif remote_cksum[file_name] != local_cksum[file_name]:
784
          if node_is_mc or must_have_file:
785
            bad = True
786
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
787
          else:
788
            # not candidate and this is not a must-have file
789
            bad = True
790
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
791
                        " '%s'" % file_name)
792
        else:
793
          # all good, except non-master/non-must have combination
794
          if not node_is_mc and not must_have_file:
795
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
796
                        " candidates" % file_name)
797

    
798
    # checks ssh to any
799

    
800
    if constants.NV_NODELIST not in node_result:
801
      bad = True
802
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
803
    else:
804
      if node_result[constants.NV_NODELIST]:
805
        bad = True
806
        for node in node_result[constants.NV_NODELIST]:
807
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
808
                          (node, node_result[constants.NV_NODELIST][node]))
809

    
810
    if constants.NV_NODENETTEST not in node_result:
811
      bad = True
812
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
813
    else:
814
      if node_result[constants.NV_NODENETTEST]:
815
        bad = True
816
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
817
        for node in nlist:
818
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
819
                          (node, node_result[constants.NV_NODENETTEST][node]))
820

    
821
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
822
    if isinstance(hyp_result, dict):
823
      for hv_name, hv_result in hyp_result.iteritems():
824
        if hv_result is not None:
825
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
826
                      (hv_name, hv_result))
827

    
828
    # check used drbd list
829
    if vg_name is not None:
830
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
831
      if not isinstance(used_minors, (tuple, list)):
832
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
833
                    str(used_minors))
834
      else:
835
        for minor, (iname, must_exist) in drbd_map.items():
836
          if minor not in used_minors and must_exist:
837
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
838
                        " not active" % (minor, iname))
839
            bad = True
840
        for minor in used_minors:
841
          if minor not in drbd_map:
842
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
843
                        minor)
844
            bad = True
845

    
846
    return bad
847

    
848
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
849
                      node_instance, feedback_fn, n_offline):
850
    """Verify an instance.
851

852
    This function checks to see if the required block devices are
853
    available on the instance's node.
854

855
    """
856
    bad = False
857

    
858
    node_current = instanceconfig.primary_node
859

    
860
    node_vol_should = {}
861
    instanceconfig.MapLVsByNode(node_vol_should)
862

    
863
    for node in node_vol_should:
864
      if node in n_offline:
865
        # ignore missing volumes on offline nodes
866
        continue
867
      for volume in node_vol_should[node]:
868
        if node not in node_vol_is or volume not in node_vol_is[node]:
869
          feedback_fn("  - ERROR: volume %s missing on node %s" %
870
                          (volume, node))
871
          bad = True
872

    
873
    if instanceconfig.admin_up:
874
      if ((node_current not in node_instance or
875
          not instance in node_instance[node_current]) and
876
          node_current not in n_offline):
877
        feedback_fn("  - ERROR: instance %s not running on node %s" %
878
                        (instance, node_current))
879
        bad = True
880

    
881
    for node in node_instance:
882
      if (not node == node_current):
883
        if instance in node_instance[node]:
884
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
885
                          (instance, node))
886
          bad = True
887

    
888
    return bad
889

    
890
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
891
    """Verify if there are any unknown volumes in the cluster.
892

893
    The .os, .swap and backup volumes are ignored. All other volumes are
894
    reported as unknown.
895

896
    """
897
    bad = False
898

    
899
    for node in node_vol_is:
900
      for volume in node_vol_is[node]:
901
        if node not in node_vol_should or volume not in node_vol_should[node]:
902
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
903
                      (volume, node))
904
          bad = True
905
    return bad
906

    
907
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
908
    """Verify the list of running instances.
909

910
    This checks what instances are running but unknown to the cluster.
911

912
    """
913
    bad = False
914
    for node in node_instance:
915
      for runninginstance in node_instance[node]:
916
        if runninginstance not in instancelist:
917
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
918
                          (runninginstance, node))
919
          bad = True
920
    return bad
921

    
922
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
923
    """Verify N+1 Memory Resilience.
924

925
    Check that if one single node dies we can still start all the instances it
926
    was primary for.
927

928
    """
929
    bad = False
930

    
931
    for node, nodeinfo in node_info.iteritems():
932
      # This code checks that every node which is now listed as secondary has
933
      # enough memory to host all instances it is supposed to should a single
934
      # other node in the cluster fail.
935
      # FIXME: not ready for failover to an arbitrary node
936
      # FIXME: does not support file-backed instances
937
      # WARNING: we currently take into account down instances as well as up
938
      # ones, considering that even if they're down someone might want to start
939
      # them even in the event of a node failure.
940
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
941
        needed_mem = 0
942
        for instance in instances:
943
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
944
          if bep[constants.BE_AUTO_BALANCE]:
945
            needed_mem += bep[constants.BE_MEMORY]
946
        if nodeinfo['mfree'] < needed_mem:
947
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
948
                      " failovers should node %s fail" % (node, prinode))
949
          bad = True
950
    return bad
951

    
952
  def CheckPrereq(self):
953
    """Check prerequisites.
954

955
    Transform the list of checks we're going to skip into a set and check that
956
    all its members are valid.
957

958
    """
959
    self.skip_set = frozenset(self.op.skip_checks)
960
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
961
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
962

    
963
  def BuildHooksEnv(self):
964
    """Build hooks env.
965

966
    Cluster-Verify hooks just rone in the post phase and their failure makes
967
    the output be logged in the verify output and the verification to fail.
968

969
    """
970
    all_nodes = self.cfg.GetNodeList()
971
    env = {
972
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
973
      }
974
    for node in self.cfg.GetAllNodesInfo().values():
975
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
976

    
977
    return env, [], all_nodes
978

    
979
  def Exec(self, feedback_fn):
980
    """Verify integrity of cluster, performing various test on nodes.
981

982
    """
983
    bad = False
984
    feedback_fn("* Verifying global settings")
985
    for msg in self.cfg.VerifyConfig():
986
      feedback_fn("  - ERROR: %s" % msg)
987

    
988
    vg_name = self.cfg.GetVGName()
989
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
990
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
991
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
992
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
993
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
994
                        for iname in instancelist)
995
    i_non_redundant = [] # Non redundant instances
996
    i_non_a_balanced = [] # Non auto-balanced instances
997
    n_offline = [] # List of offline nodes
998
    n_drained = [] # List of nodes being drained
999
    node_volume = {}
1000
    node_instance = {}
1001
    node_info = {}
1002
    instance_cfg = {}
1003

    
1004
    # FIXME: verify OS list
1005
    # do local checksums
1006
    master_files = [constants.CLUSTER_CONF_FILE]
1007

    
1008
    file_names = ssconf.SimpleStore().GetFileList()
1009
    file_names.append(constants.SSL_CERT_FILE)
1010
    file_names.append(constants.RAPI_CERT_FILE)
1011
    file_names.extend(master_files)
1012

    
1013
    local_checksums = utils.FingerprintFiles(file_names)
1014

    
1015
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1016
    node_verify_param = {
1017
      constants.NV_FILELIST: file_names,
1018
      constants.NV_NODELIST: [node.name for node in nodeinfo
1019
                              if not node.offline],
1020
      constants.NV_HYPERVISOR: hypervisors,
1021
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1022
                                  node.secondary_ip) for node in nodeinfo
1023
                                 if not node.offline],
1024
      constants.NV_INSTANCELIST: hypervisors,
1025
      constants.NV_VERSION: None,
1026
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1027
      }
1028
    if vg_name is not None:
1029
      node_verify_param[constants.NV_VGLIST] = None
1030
      node_verify_param[constants.NV_LVLIST] = vg_name
1031
      node_verify_param[constants.NV_DRBDLIST] = None
1032
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1033
                                           self.cfg.GetClusterName())
1034

    
1035
    cluster = self.cfg.GetClusterInfo()
1036
    master_node = self.cfg.GetMasterNode()
1037
    all_drbd_map = self.cfg.ComputeDRBDMap()
1038

    
1039
    for node_i in nodeinfo:
1040
      node = node_i.name
1041

    
1042
      if node_i.offline:
1043
        feedback_fn("* Skipping offline node %s" % (node,))
1044
        n_offline.append(node)
1045
        continue
1046

    
1047
      if node == master_node:
1048
        ntype = "master"
1049
      elif node_i.master_candidate:
1050
        ntype = "master candidate"
1051
      elif node_i.drained:
1052
        ntype = "drained"
1053
        n_drained.append(node)
1054
      else:
1055
        ntype = "regular"
1056
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1057

    
1058
      msg = all_nvinfo[node].fail_msg
1059
      if msg:
1060
        feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1061
        bad = True
1062
        continue
1063

    
1064
      nresult = all_nvinfo[node].payload
1065
      node_drbd = {}
1066
      for minor, instance in all_drbd_map[node].items():
1067
        if instance not in instanceinfo:
1068
          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1069
                      instance)
1070
          # ghost instance should not be running, but otherwise we
1071
          # don't give double warnings (both ghost instance and
1072
          # unallocated minor in use)
1073
          node_drbd[minor] = (instance, False)
1074
        else:
1075
          instance = instanceinfo[instance]
1076
          node_drbd[minor] = (instance.name, instance.admin_up)
1077
      result = self._VerifyNode(node_i, file_names, local_checksums,
1078
                                nresult, feedback_fn, master_files,
1079
                                node_drbd, vg_name)
1080
      bad = bad or result
1081

    
1082
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1083
      if vg_name is None:
1084
        node_volume[node] = {}
1085
      elif isinstance(lvdata, basestring):
1086
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1087
                    (node, utils.SafeEncode(lvdata)))
1088
        bad = True
1089
        node_volume[node] = {}
1090
      elif not isinstance(lvdata, dict):
1091
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1092
        bad = True
1093
        continue
1094
      else:
1095
        node_volume[node] = lvdata
1096

    
1097
      # node_instance
1098
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1099
      if not isinstance(idata, list):
1100
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1101
                    (node,))
1102
        bad = True
1103
        continue
1104

    
1105
      node_instance[node] = idata
1106

    
1107
      # node_info
1108
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1109
      if not isinstance(nodeinfo, dict):
1110
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1111
        bad = True
1112
        continue
1113

    
1114
      try:
1115
        node_info[node] = {
1116
          "mfree": int(nodeinfo['memory_free']),
1117
          "pinst": [],
1118
          "sinst": [],
1119
          # dictionary holding all instances this node is secondary for,
1120
          # grouped by their primary node. Each key is a cluster node, and each
1121
          # value is a list of instances which have the key as primary and the
1122
          # current node as secondary.  this is handy to calculate N+1 memory
1123
          # availability if you can only failover from a primary to its
1124
          # secondary.
1125
          "sinst-by-pnode": {},
1126
        }
1127
        # FIXME: devise a free space model for file based instances as well
1128
        if vg_name is not None:
1129
          if (constants.NV_VGLIST not in nresult or
1130
              vg_name not in nresult[constants.NV_VGLIST]):
1131
            feedback_fn("  - ERROR: node %s didn't return data for the"
1132
                        " volume group '%s' - it is either missing or broken" %
1133
                        (node, vg_name))
1134
            bad = True
1135
            continue
1136
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1137
      except (ValueError, KeyError):
1138
        feedback_fn("  - ERROR: invalid nodeinfo value returned"
1139
                    " from node %s" % (node,))
1140
        bad = True
1141
        continue
1142

    
1143
    node_vol_should = {}
1144

    
1145
    for instance in instancelist:
1146
      feedback_fn("* Verifying instance %s" % instance)
1147
      inst_config = instanceinfo[instance]
1148
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1149
                                     node_instance, feedback_fn, n_offline)
1150
      bad = bad or result
1151
      inst_nodes_offline = []
1152

    
1153
      inst_config.MapLVsByNode(node_vol_should)
1154

    
1155
      instance_cfg[instance] = inst_config
1156

    
1157
      pnode = inst_config.primary_node
1158
      if pnode in node_info:
1159
        node_info[pnode]['pinst'].append(instance)
1160
      elif pnode not in n_offline:
1161
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1162
                    " %s failed" % (instance, pnode))
1163
        bad = True
1164

    
1165
      if pnode in n_offline:
1166
        inst_nodes_offline.append(pnode)
1167

    
1168
      # If the instance is non-redundant we cannot survive losing its primary
1169
      # node, so we are not N+1 compliant. On the other hand we have no disk
1170
      # templates with more than one secondary so that situation is not well
1171
      # supported either.
1172
      # FIXME: does not support file-backed instances
1173
      if len(inst_config.secondary_nodes) == 0:
1174
        i_non_redundant.append(instance)
1175
      elif len(inst_config.secondary_nodes) > 1:
1176
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1177
                    % instance)
1178

    
1179
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1180
        i_non_a_balanced.append(instance)
1181

    
1182
      for snode in inst_config.secondary_nodes:
1183
        if snode in node_info:
1184
          node_info[snode]['sinst'].append(instance)
1185
          if pnode not in node_info[snode]['sinst-by-pnode']:
1186
            node_info[snode]['sinst-by-pnode'][pnode] = []
1187
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1188
        elif snode not in n_offline:
1189
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1190
                      " %s failed" % (instance, snode))
1191
          bad = True
1192
        if snode in n_offline:
1193
          inst_nodes_offline.append(snode)
1194

    
1195
      if inst_nodes_offline:
1196
        # warn that the instance lives on offline nodes, and set bad=True
1197
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1198
                    ", ".join(inst_nodes_offline))
1199
        bad = True
1200

    
1201
    feedback_fn("* Verifying orphan volumes")
1202
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1203
                                       feedback_fn)
1204
    bad = bad or result
1205

    
1206
    feedback_fn("* Verifying remaining instances")
1207
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1208
                                         feedback_fn)
1209
    bad = bad or result
1210

    
1211
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1212
      feedback_fn("* Verifying N+1 Memory redundancy")
1213
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1214
      bad = bad or result
1215

    
1216
    feedback_fn("* Other Notes")
1217
    if i_non_redundant:
1218
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1219
                  % len(i_non_redundant))
1220

    
1221
    if i_non_a_balanced:
1222
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1223
                  % len(i_non_a_balanced))
1224

    
1225
    if n_offline:
1226
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1227

    
1228
    if n_drained:
1229
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1230

    
1231
    return not bad
1232

    
1233
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1234
    """Analize the post-hooks' result
1235

1236
    This method analyses the hook result, handles it, and sends some
1237
    nicely-formatted feedback back to the user.
1238

1239
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1240
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1241
    @param hooks_results: the results of the multi-node hooks rpc call
1242
    @param feedback_fn: function used send feedback back to the caller
1243
    @param lu_result: previous Exec result
1244
    @return: the new Exec result, based on the previous result
1245
        and hook results
1246

1247
    """
1248
    # We only really run POST phase hooks, and are only interested in
1249
    # their results
1250
    if phase == constants.HOOKS_PHASE_POST:
1251
      # Used to change hooks' output to proper indentation
1252
      indent_re = re.compile('^', re.M)
1253
      feedback_fn("* Hooks Results")
1254
      if not hooks_results:
1255
        feedback_fn("  - ERROR: general communication failure")
1256
        lu_result = 1
1257
      else:
1258
        for node_name in hooks_results:
1259
          show_node_header = True
1260
          res = hooks_results[node_name]
1261
          msg = res.fail_msg
1262
          if msg:
1263
            if res.offline:
1264
              # no need to warn or set fail return value
1265
              continue
1266
            feedback_fn("    Communication failure in hooks execution: %s" %
1267
                        msg)
1268
            lu_result = 1
1269
            continue
1270
          for script, hkr, output in res.payload:
1271
            if hkr == constants.HKR_FAIL:
1272
              # The node header is only shown once, if there are
1273
              # failing hooks on that node
1274
              if show_node_header:
1275
                feedback_fn("  Node %s:" % node_name)
1276
                show_node_header = False
1277
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1278
              output = indent_re.sub('      ', output)
1279
              feedback_fn("%s" % output)
1280
              lu_result = 1
1281

    
1282
      return lu_result
1283

    
1284

    
1285
class LUVerifyDisks(NoHooksLU):
1286
  """Verifies the cluster disks status.
1287

1288
  """
1289
  _OP_REQP = []
1290
  REQ_BGL = False
1291

    
1292
  def ExpandNames(self):
1293
    self.needed_locks = {
1294
      locking.LEVEL_NODE: locking.ALL_SET,
1295
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1296
    }
1297
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1298

    
1299
  def CheckPrereq(self):
1300
    """Check prerequisites.
1301

1302
    This has no prerequisites.
1303

1304
    """
1305
    pass
1306

    
1307
  def Exec(self, feedback_fn):
1308
    """Verify integrity of cluster disks.
1309

1310
    @rtype: tuple of three items
1311
    @return: a tuple of (dict of node-to-node_error, list of instances
1312
        which need activate-disks, dict of instance: (node, volume) for
1313
        missing volumes
1314

1315
    """
1316
    result = res_nodes, res_instances, res_missing = {}, [], {}
1317

    
1318
    vg_name = self.cfg.GetVGName()
1319
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1320
    instances = [self.cfg.GetInstanceInfo(name)
1321
                 for name in self.cfg.GetInstanceList()]
1322

    
1323
    nv_dict = {}
1324
    for inst in instances:
1325
      inst_lvs = {}
1326
      if (not inst.admin_up or
1327
          inst.disk_template not in constants.DTS_NET_MIRROR):
1328
        continue
1329
      inst.MapLVsByNode(inst_lvs)
1330
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1331
      for node, vol_list in inst_lvs.iteritems():
1332
        for vol in vol_list:
1333
          nv_dict[(node, vol)] = inst
1334

    
1335
    if not nv_dict:
1336
      return result
1337

    
1338
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1339

    
1340
    to_act = set()
1341
    for node in nodes:
1342
      # node_volume
1343
      node_res = node_lvs[node]
1344
      if node_res.offline:
1345
        continue
1346
      msg = node_res.fail_msg
1347
      if msg:
1348
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1349
        res_nodes[node] = msg
1350
        continue
1351

    
1352
      lvs = node_res.payload
1353
      for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1354
        inst = nv_dict.pop((node, lv_name), None)
1355
        if (not lv_online and inst is not None
1356
            and inst.name not in res_instances):
1357
          res_instances.append(inst.name)
1358

    
1359
    # any leftover items in nv_dict are missing LVs, let's arrange the
1360
    # data better
1361
    for key, inst in nv_dict.iteritems():
1362
      if inst.name not in res_missing:
1363
        res_missing[inst.name] = []
1364
      res_missing[inst.name].append(key)
1365

    
1366
    return result
1367

    
1368

    
1369
class LURenameCluster(LogicalUnit):
1370
  """Rename the cluster.
1371

1372
  """
1373
  HPATH = "cluster-rename"
1374
  HTYPE = constants.HTYPE_CLUSTER
1375
  _OP_REQP = ["name"]
1376

    
1377
  def BuildHooksEnv(self):
1378
    """Build hooks env.
1379

1380
    """
1381
    env = {
1382
      "OP_TARGET": self.cfg.GetClusterName(),
1383
      "NEW_NAME": self.op.name,
1384
      }
1385
    mn = self.cfg.GetMasterNode()
1386
    return env, [mn], [mn]
1387

    
1388
  def CheckPrereq(self):
1389
    """Verify that the passed name is a valid one.
1390

1391
    """
1392
    hostname = utils.HostInfo(self.op.name)
1393

    
1394
    new_name = hostname.name
1395
    self.ip = new_ip = hostname.ip
1396
    old_name = self.cfg.GetClusterName()
1397
    old_ip = self.cfg.GetMasterIP()
1398
    if new_name == old_name and new_ip == old_ip:
1399
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1400
                                 " cluster has changed")
1401
    if new_ip != old_ip:
1402
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1403
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1404
                                   " reachable on the network. Aborting." %
1405
                                   new_ip)
1406

    
1407
    self.op.name = new_name
1408

    
1409
  def Exec(self, feedback_fn):
1410
    """Rename the cluster.
1411

1412
    """
1413
    clustername = self.op.name
1414
    ip = self.ip
1415

    
1416
    # shutdown the master IP
1417
    master = self.cfg.GetMasterNode()
1418
    result = self.rpc.call_node_stop_master(master, False)
1419
    result.Raise("Could not disable the master role")
1420

    
1421
    try:
1422
      cluster = self.cfg.GetClusterInfo()
1423
      cluster.cluster_name = clustername
1424
      cluster.master_ip = ip
1425
      self.cfg.Update(cluster)
1426

    
1427
      # update the known hosts file
1428
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1429
      node_list = self.cfg.GetNodeList()
1430
      try:
1431
        node_list.remove(master)
1432
      except ValueError:
1433
        pass
1434
      result = self.rpc.call_upload_file(node_list,
1435
                                         constants.SSH_KNOWN_HOSTS_FILE)
1436
      for to_node, to_result in result.iteritems():
1437
        msg = to_result.fail_msg
1438
        if msg:
1439
          msg = ("Copy of file %s to node %s failed: %s" %
1440
                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1441
          self.proc.LogWarning(msg)
1442

    
1443
    finally:
1444
      result = self.rpc.call_node_start_master(master, False)
1445
      msg = result.fail_msg
1446
      if msg:
1447
        self.LogWarning("Could not re-enable the master role on"
1448
                        " the master, please restart manually: %s", msg)
1449

    
1450

    
1451
def _RecursiveCheckIfLVMBased(disk):
1452
  """Check if the given disk or its children are lvm-based.
1453

1454
  @type disk: L{objects.Disk}
1455
  @param disk: the disk to check
1456
  @rtype: booleean
1457
  @return: boolean indicating whether a LD_LV dev_type was found or not
1458

1459
  """
1460
  if disk.children:
1461
    for chdisk in disk.children:
1462
      if _RecursiveCheckIfLVMBased(chdisk):
1463
        return True
1464
  return disk.dev_type == constants.LD_LV
1465

    
1466

    
1467
class LUSetClusterParams(LogicalUnit):
1468
  """Change the parameters of the cluster.
1469

1470
  """
1471
  HPATH = "cluster-modify"
1472
  HTYPE = constants.HTYPE_CLUSTER
1473
  _OP_REQP = []
1474
  REQ_BGL = False
1475

    
1476
  def CheckArguments(self):
1477
    """Check parameters
1478

1479
    """
1480
    if not hasattr(self.op, "candidate_pool_size"):
1481
      self.op.candidate_pool_size = None
1482
    if self.op.candidate_pool_size is not None:
1483
      try:
1484
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1485
      except (ValueError, TypeError), err:
1486
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1487
                                   str(err))
1488
      if self.op.candidate_pool_size < 1:
1489
        raise errors.OpPrereqError("At least one master candidate needed")
1490

    
1491
  def ExpandNames(self):
1492
    # FIXME: in the future maybe other cluster params won't require checking on
1493
    # all nodes to be modified.
1494
    self.needed_locks = {
1495
      locking.LEVEL_NODE: locking.ALL_SET,
1496
    }
1497
    self.share_locks[locking.LEVEL_NODE] = 1
1498

    
1499
  def BuildHooksEnv(self):
1500
    """Build hooks env.
1501

1502
    """
1503
    env = {
1504
      "OP_TARGET": self.cfg.GetClusterName(),
1505
      "NEW_VG_NAME": self.op.vg_name,
1506
      }
1507
    mn = self.cfg.GetMasterNode()
1508
    return env, [mn], [mn]
1509

    
1510
  def CheckPrereq(self):
1511
    """Check prerequisites.
1512

1513
    This checks whether the given params don't conflict and
1514
    if the given volume group is valid.
1515

1516
    """
1517
    if self.op.vg_name is not None and not self.op.vg_name:
1518
      instances = self.cfg.GetAllInstancesInfo().values()
1519
      for inst in instances:
1520
        for disk in inst.disks:
1521
          if _RecursiveCheckIfLVMBased(disk):
1522
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1523
                                       " lvm-based instances exist")
1524

    
1525
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1526

    
1527
    # if vg_name not None, checks given volume group on all nodes
1528
    if self.op.vg_name:
1529
      vglist = self.rpc.call_vg_list(node_list)
1530
      for node in node_list:
1531
        msg = vglist[node].fail_msg
1532
        if msg:
1533
          # ignoring down node
1534
          self.LogWarning("Error while gathering data on node %s"
1535
                          " (ignoring node): %s", node, msg)
1536
          continue
1537
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1538
                                              self.op.vg_name,
1539
                                              constants.MIN_VG_SIZE)
1540
        if vgstatus:
1541
          raise errors.OpPrereqError("Error on node '%s': %s" %
1542
                                     (node, vgstatus))
1543

    
1544
    self.cluster = cluster = self.cfg.GetClusterInfo()
1545
    # validate params changes
1546
    if self.op.beparams:
1547
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1548
      self.new_beparams = objects.FillDict(
1549
        cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1550

    
1551
    if self.op.nicparams:
1552
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1553
      self.new_nicparams = objects.FillDict(
1554
        cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1555
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1556

    
1557
    # hypervisor list/parameters
1558
    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1559
    if self.op.hvparams:
1560
      if not isinstance(self.op.hvparams, dict):
1561
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1562
      for hv_name, hv_dict in self.op.hvparams.items():
1563
        if hv_name not in self.new_hvparams:
1564
          self.new_hvparams[hv_name] = hv_dict
1565
        else:
1566
          self.new_hvparams[hv_name].update(hv_dict)
1567

    
1568
    if self.op.enabled_hypervisors is not None:
1569
      self.hv_list = self.op.enabled_hypervisors
1570
    else:
1571
      self.hv_list = cluster.enabled_hypervisors
1572

    
1573
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1574
      # either the enabled list has changed, or the parameters have, validate
1575
      for hv_name, hv_params in self.new_hvparams.items():
1576
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1577
            (self.op.enabled_hypervisors and
1578
             hv_name in self.op.enabled_hypervisors)):
1579
          # either this is a new hypervisor, or its parameters have changed
1580
          hv_class = hypervisor.GetHypervisor(hv_name)
1581
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1582
          hv_class.CheckParameterSyntax(hv_params)
1583
          _CheckHVParams(self, node_list, hv_name, hv_params)
1584

    
1585
  def Exec(self, feedback_fn):
1586
    """Change the parameters of the cluster.
1587

1588
    """
1589
    if self.op.vg_name is not None:
1590
      new_volume = self.op.vg_name
1591
      if not new_volume:
1592
        new_volume = None
1593
      if new_volume != self.cfg.GetVGName():
1594
        self.cfg.SetVGName(new_volume)
1595
      else:
1596
        feedback_fn("Cluster LVM configuration already in desired"
1597
                    " state, not changing")
1598
    if self.op.hvparams:
1599
      self.cluster.hvparams = self.new_hvparams
1600
    if self.op.enabled_hypervisors is not None:
1601
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1602
    if self.op.beparams:
1603
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1604
    if self.op.nicparams:
1605
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1606

    
1607
    if self.op.candidate_pool_size is not None:
1608
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1609

    
1610
    self.cfg.Update(self.cluster)
1611

    
1612
    # we want to update nodes after the cluster so that if any errors
1613
    # happen, we have recorded and saved the cluster info
1614
    if self.op.candidate_pool_size is not None:
1615
      _AdjustCandidatePool(self)
1616

    
1617

    
1618
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1619
  """Distribute additional files which are part of the cluster configuration.
1620

1621
  ConfigWriter takes care of distributing the config and ssconf files, but
1622
  there are more files which should be distributed to all nodes. This function
1623
  makes sure those are copied.
1624

1625
  @param lu: calling logical unit
1626
  @param additional_nodes: list of nodes not in the config to distribute to
1627

1628
  """
1629
  # 1. Gather target nodes
1630
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1631
  dist_nodes = lu.cfg.GetNodeList()
1632
  if additional_nodes is not None:
1633
    dist_nodes.extend(additional_nodes)
1634
  if myself.name in dist_nodes:
1635
    dist_nodes.remove(myself.name)
1636
  # 2. Gather files to distribute
1637
  dist_files = set([constants.ETC_HOSTS,
1638
                    constants.SSH_KNOWN_HOSTS_FILE,
1639
                    constants.RAPI_CERT_FILE,
1640
                    constants.RAPI_USERS_FILE,
1641
                   ])
1642

    
1643
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1644
  for hv_name in enabled_hypervisors:
1645
    hv_class = hypervisor.GetHypervisor(hv_name)
1646
    dist_files.update(hv_class.GetAncillaryFiles())
1647

    
1648
  # 3. Perform the files upload
1649
  for fname in dist_files:
1650
    if os.path.exists(fname):
1651
      result = lu.rpc.call_upload_file(dist_nodes, fname)
1652
      for to_node, to_result in result.items():
1653
        msg = to_result.fail_msg
1654
        if msg:
1655
          msg = ("Copy of file %s to node %s failed: %s" %
1656
                 (fname, to_node, msg))
1657
          lu.proc.LogWarning(msg)
1658

    
1659

    
1660
class LURedistributeConfig(NoHooksLU):
1661
  """Force the redistribution of cluster configuration.
1662

1663
  This is a very simple LU.
1664

1665
  """
1666
  _OP_REQP = []
1667
  REQ_BGL = False
1668

    
1669
  def ExpandNames(self):
1670
    self.needed_locks = {
1671
      locking.LEVEL_NODE: locking.ALL_SET,
1672
    }
1673
    self.share_locks[locking.LEVEL_NODE] = 1
1674

    
1675
  def CheckPrereq(self):
1676
    """Check prerequisites.
1677

1678
    """
1679

    
1680
  def Exec(self, feedback_fn):
1681
    """Redistribute the configuration.
1682

1683
    """
1684
    self.cfg.Update(self.cfg.GetClusterInfo())
1685
    _RedistributeAncillaryFiles(self)
1686

    
1687

    
1688
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1689
  """Sleep and poll for an instance's disk to sync.
1690

1691
  """
1692
  if not instance.disks:
1693
    return True
1694

    
1695
  if not oneshot:
1696
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1697

    
1698
  node = instance.primary_node
1699

    
1700
  for dev in instance.disks:
1701
    lu.cfg.SetDiskID(dev, node)
1702

    
1703
  retries = 0
1704
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1705
  while True:
1706
    max_time = 0
1707
    done = True
1708
    cumul_degraded = False
1709
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1710
    msg = rstats.fail_msg
1711
    if msg:
1712
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1713
      retries += 1
1714
      if retries >= 10:
1715
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1716
                                 " aborting." % node)
1717
      time.sleep(6)
1718
      continue
1719
    rstats = rstats.payload
1720
    retries = 0
1721
    for i, mstat in enumerate(rstats):
1722
      if mstat is None:
1723
        lu.LogWarning("Can't compute data for node %s/%s",
1724
                           node, instance.disks[i].iv_name)
1725
        continue
1726
      # we ignore the ldisk parameter
1727
      perc_done, est_time, is_degraded, _ = mstat
1728
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1729
      if perc_done is not None:
1730
        done = False
1731
        if est_time is not None:
1732
          rem_time = "%d estimated seconds remaining" % est_time
1733
          max_time = est_time
1734
        else:
1735
          rem_time = "no time estimate"
1736
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1737
                        (instance.disks[i].iv_name, perc_done, rem_time))
1738

    
1739
    # if we're done but degraded, let's do a few small retries, to
1740
    # make sure we see a stable and not transient situation; therefore
1741
    # we force restart of the loop
1742
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1743
      logging.info("Degraded disks found, %d retries left", degr_retries)
1744
      degr_retries -= 1
1745
      time.sleep(1)
1746
      continue
1747

    
1748
    if done or oneshot:
1749
      break
1750

    
1751
    time.sleep(min(60, max_time))
1752

    
1753
  if done:
1754
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1755
  return not cumul_degraded
1756

    
1757

    
1758
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1759
  """Check that mirrors are not degraded.
1760

1761
  The ldisk parameter, if True, will change the test from the
1762
  is_degraded attribute (which represents overall non-ok status for
1763
  the device(s)) to the ldisk (representing the local storage status).
1764

1765
  """
1766
  lu.cfg.SetDiskID(dev, node)
1767
  if ldisk:
1768
    idx = 6
1769
  else:
1770
    idx = 5
1771

    
1772
  result = True
1773
  if on_primary or dev.AssembleOnSecondary():
1774
    rstats = lu.rpc.call_blockdev_find(node, dev)
1775
    msg = rstats.fail_msg
1776
    if msg:
1777
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1778
      result = False
1779
    elif not rstats.payload:
1780
      lu.LogWarning("Can't find disk on node %s", node)
1781
      result = False
1782
    else:
1783
      result = result and (not rstats.payload[idx])
1784
  if dev.children:
1785
    for child in dev.children:
1786
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1787

    
1788
  return result
1789

    
1790

    
1791
class LUDiagnoseOS(NoHooksLU):
1792
  """Logical unit for OS diagnose/query.
1793

1794
  """
1795
  _OP_REQP = ["output_fields", "names"]
1796
  REQ_BGL = False
1797
  _FIELDS_STATIC = utils.FieldSet()
1798
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1799

    
1800
  def ExpandNames(self):
1801
    if self.op.names:
1802
      raise errors.OpPrereqError("Selective OS query not supported")
1803

    
1804
    _CheckOutputFields(static=self._FIELDS_STATIC,
1805
                       dynamic=self._FIELDS_DYNAMIC,
1806
                       selected=self.op.output_fields)
1807

    
1808
    # Lock all nodes, in shared mode
1809
    # Temporary removal of locks, should be reverted later
1810
    # TODO: reintroduce locks when they are lighter-weight
1811
    self.needed_locks = {}
1812
    #self.share_locks[locking.LEVEL_NODE] = 1
1813
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1814

    
1815
  def CheckPrereq(self):
1816
    """Check prerequisites.
1817

1818
    """
1819

    
1820
  @staticmethod
1821
  def _DiagnoseByOS(node_list, rlist):
1822
    """Remaps a per-node return list into an a per-os per-node dictionary
1823

1824
    @param node_list: a list with the names of all nodes
1825
    @param rlist: a map with node names as keys and OS objects as values
1826

1827
    @rtype: dict
1828
    @return: a dictionary with osnames as keys and as value another map, with
1829
        nodes as keys and tuples of (path, status, diagnose) as values, eg::
1830

1831
          {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1832
                                     (/srv/..., False, "invalid api")],
1833
                           "node2": [(/srv/..., True, "")]}
1834
          }
1835

1836
    """
1837
    all_os = {}
1838
    # we build here the list of nodes that didn't fail the RPC (at RPC
1839
    # level), so that nodes with a non-responding node daemon don't
1840
    # make all OSes invalid
1841
    good_nodes = [node_name for node_name in rlist
1842
                  if not rlist[node_name].fail_msg]
1843
    for node_name, nr in rlist.items():
1844
      if nr.fail_msg or not nr.payload:
1845
        continue
1846
      for name, path, status, diagnose in nr.payload:
1847
        if name not in all_os:
1848
          # build a list of nodes for this os containing empty lists
1849
          # for each node in node_list
1850
          all_os[name] = {}
1851
          for nname in good_nodes:
1852
            all_os[name][nname] = []
1853
        all_os[name][node_name].append((path, status, diagnose))
1854
    return all_os
1855

    
1856
  def Exec(self, feedback_fn):
1857
    """Compute the list of OSes.
1858

1859
    """
1860
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1861
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1862
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1863
    output = []
1864
    for os_name, os_data in pol.items():
1865
      row = []
1866
      for field in self.op.output_fields:
1867
        if field == "name":
1868
          val = os_name
1869
        elif field == "valid":
1870
          val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1871
        elif field == "node_status":
1872
          # this is just a copy of the dict
1873
          val = {}
1874
          for node_name, nos_list in os_data.items():
1875
            val[node_name] = nos_list
1876
        else:
1877
          raise errors.ParameterError(field)
1878
        row.append(val)
1879
      output.append(row)
1880

    
1881
    return output
1882

    
1883

    
1884
class LURemoveNode(LogicalUnit):
1885
  """Logical unit for removing a node.
1886

1887
  """
1888
  HPATH = "node-remove"
1889
  HTYPE = constants.HTYPE_NODE
1890
  _OP_REQP = ["node_name"]
1891

    
1892
  def BuildHooksEnv(self):
1893
    """Build hooks env.
1894

1895
    This doesn't run on the target node in the pre phase as a failed
1896
    node would then be impossible to remove.
1897

1898
    """
1899
    env = {
1900
      "OP_TARGET": self.op.node_name,
1901
      "NODE_NAME": self.op.node_name,
1902
      }
1903
    all_nodes = self.cfg.GetNodeList()
1904
    all_nodes.remove(self.op.node_name)
1905
    return env, all_nodes, all_nodes
1906

    
1907
  def CheckPrereq(self):
1908
    """Check prerequisites.
1909

1910
    This checks:
1911
     - the node exists in the configuration
1912
     - it does not have primary or secondary instances
1913
     - it's not the master
1914

1915
    Any errors are signalled by raising errors.OpPrereqError.
1916

1917
    """
1918
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1919
    if node is None:
1920
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1921

    
1922
    instance_list = self.cfg.GetInstanceList()
1923

    
1924
    masternode = self.cfg.GetMasterNode()
1925
    if node.name == masternode:
1926
      raise errors.OpPrereqError("Node is the master node,"
1927
                                 " you need to failover first.")
1928

    
1929
    for instance_name in instance_list:
1930
      instance = self.cfg.GetInstanceInfo(instance_name)
1931
      if node.name in instance.all_nodes:
1932
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1933
                                   " please remove first." % instance_name)
1934
    self.op.node_name = node.name
1935
    self.node = node
1936

    
1937
  def Exec(self, feedback_fn):
1938
    """Removes the node from the cluster.
1939

1940
    """
1941
    node = self.node
1942
    logging.info("Stopping the node daemon and removing configs from node %s",
1943
                 node.name)
1944

    
1945
    self.context.RemoveNode(node.name)
1946

    
1947
    result = self.rpc.call_node_leave_cluster(node.name)
1948
    msg = result.fail_msg
1949
    if msg:
1950
      self.LogWarning("Errors encountered on the remote node while leaving"
1951
                      " the cluster: %s", msg)
1952

    
1953
    # Promote nodes to master candidate as needed
1954
    _AdjustCandidatePool(self)
1955

    
1956

    
1957
class LUQueryNodes(NoHooksLU):
1958
  """Logical unit for querying nodes.
1959

1960
  """
1961
  _OP_REQP = ["output_fields", "names", "use_locking"]
1962
  REQ_BGL = False
1963
  _FIELDS_DYNAMIC = utils.FieldSet(
1964
    "dtotal", "dfree",
1965
    "mtotal", "mnode", "mfree",
1966
    "bootid",
1967
    "ctotal", "cnodes", "csockets",
1968
    )
1969

    
1970
  _FIELDS_STATIC = utils.FieldSet(
1971
    "name", "pinst_cnt", "sinst_cnt",
1972
    "pinst_list", "sinst_list",
1973
    "pip", "sip", "tags",
1974
    "serial_no",
1975
    "master_candidate",
1976
    "master",
1977
    "offline",
1978
    "drained",
1979
    )
1980

    
1981
  def ExpandNames(self):
1982
    _CheckOutputFields(static=self._FIELDS_STATIC,
1983
                       dynamic=self._FIELDS_DYNAMIC,
1984
                       selected=self.op.output_fields)
1985

    
1986
    self.needed_locks = {}
1987
    self.share_locks[locking.LEVEL_NODE] = 1
1988

    
1989
    if self.op.names:
1990
      self.wanted = _GetWantedNodes(self, self.op.names)
1991
    else:
1992
      self.wanted = locking.ALL_SET
1993

    
1994
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1995
    self.do_locking = self.do_node_query and self.op.use_locking
1996
    if self.do_locking:
1997
      # if we don't request only static fields, we need to lock the nodes
1998
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1999

    
2000

    
2001
  def CheckPrereq(self):
2002
    """Check prerequisites.
2003

2004
    """
2005
    # The validation of the node list is done in the _GetWantedNodes,
2006
    # if non empty, and if empty, there's no validation to do
2007
    pass
2008

    
2009
  def Exec(self, feedback_fn):
2010
    """Computes the list of nodes and their attributes.
2011

2012
    """
2013
    all_info = self.cfg.GetAllNodesInfo()
2014
    if self.do_locking:
2015
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
2016
    elif self.wanted != locking.ALL_SET:
2017
      nodenames = self.wanted
2018
      missing = set(nodenames).difference(all_info.keys())
2019
      if missing:
2020
        raise errors.OpExecError(
2021
          "Some nodes were removed before retrieving their data: %s" % missing)
2022
    else:
2023
      nodenames = all_info.keys()
2024

    
2025
    nodenames = utils.NiceSort(nodenames)
2026
    nodelist = [all_info[name] for name in nodenames]
2027

    
2028
    # begin data gathering
2029

    
2030
    if self.do_node_query:
2031
      live_data = {}
2032
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2033
                                          self.cfg.GetHypervisorType())
2034
      for name in nodenames:
2035
        nodeinfo = node_data[name]
2036
        if not nodeinfo.fail_msg and nodeinfo.payload:
2037
          nodeinfo = nodeinfo.payload
2038
          fn = utils.TryConvert
2039
          live_data[name] = {
2040
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2041
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2042
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
2043
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2044
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
2045
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2046
            "bootid": nodeinfo.get('bootid', None),
2047
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2048
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2049
            }
2050
        else:
2051
          live_data[name] = {}
2052
    else:
2053
      live_data = dict.fromkeys(nodenames, {})
2054

    
2055
    node_to_primary = dict([(name, set()) for name in nodenames])
2056
    node_to_secondary = dict([(name, set()) for name in nodenames])
2057

    
2058
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
2059
                             "sinst_cnt", "sinst_list"))
2060
    if inst_fields & frozenset(self.op.output_fields):
2061
      instancelist = self.cfg.GetInstanceList()
2062

    
2063
      for instance_name in instancelist:
2064
        inst = self.cfg.GetInstanceInfo(instance_name)
2065
        if inst.primary_node in node_to_primary:
2066
          node_to_primary[inst.primary_node].add(inst.name)
2067
        for secnode in inst.secondary_nodes:
2068
          if secnode in node_to_secondary:
2069
            node_to_secondary[secnode].add(inst.name)
2070

    
2071
    master_node = self.cfg.GetMasterNode()
2072

    
2073
    # end data gathering
2074

    
2075
    output = []
2076
    for node in nodelist:
2077
      node_output = []
2078
      for field in self.op.output_fields:
2079
        if field == "name":
2080
          val = node.name
2081
        elif field == "pinst_list":
2082
          val = list(node_to_primary[node.name])
2083
        elif field == "sinst_list":
2084
          val = list(node_to_secondary[node.name])
2085
        elif field == "pinst_cnt":
2086
          val = len(node_to_primary[node.name])
2087
        elif field == "sinst_cnt":
2088
          val = len(node_to_secondary[node.name])
2089
        elif field == "pip":
2090
          val = node.primary_ip
2091
        elif field == "sip":
2092
          val = node.secondary_ip
2093
        elif field == "tags":
2094
          val = list(node.GetTags())
2095
        elif field == "serial_no":
2096
          val = node.serial_no
2097
        elif field == "master_candidate":
2098
          val = node.master_candidate
2099
        elif field == "master":
2100
          val = node.name == master_node
2101
        elif field == "offline":
2102
          val = node.offline
2103
        elif field == "drained":
2104
          val = node.drained
2105
        elif self._FIELDS_DYNAMIC.Matches(field):
2106
          val = live_data[node.name].get(field, None)
2107
        else:
2108
          raise errors.ParameterError(field)
2109
        node_output.append(val)
2110
      output.append(node_output)
2111

    
2112
    return output
2113

    
2114

    
2115
class LUQueryNodeVolumes(NoHooksLU):
2116
  """Logical unit for getting volumes on node(s).
2117

2118
  """
2119
  _OP_REQP = ["nodes", "output_fields"]
2120
  REQ_BGL = False
2121
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2122
  _FIELDS_STATIC = utils.FieldSet("node")
2123

    
2124
  def ExpandNames(self):
2125
    _CheckOutputFields(static=self._FIELDS_STATIC,
2126
                       dynamic=self._FIELDS_DYNAMIC,
2127
                       selected=self.op.output_fields)
2128

    
2129
    self.needed_locks = {}
2130
    self.share_locks[locking.LEVEL_NODE] = 1
2131
    if not self.op.nodes:
2132
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2133
    else:
2134
      self.needed_locks[locking.LEVEL_NODE] = \
2135
        _GetWantedNodes(self, self.op.nodes)
2136

    
2137
  def CheckPrereq(self):
2138
    """Check prerequisites.
2139

2140
    This checks that the fields required are valid output fields.
2141

2142
    """
2143
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2144

    
2145
  def Exec(self, feedback_fn):
2146
    """Computes the list of nodes and their attributes.
2147

2148
    """
2149
    nodenames = self.nodes
2150
    volumes = self.rpc.call_node_volumes(nodenames)
2151

    
2152
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2153
             in self.cfg.GetInstanceList()]
2154

    
2155
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2156

    
2157
    output = []
2158
    for node in nodenames:
2159
      nresult = volumes[node]
2160
      if nresult.offline:
2161
        continue
2162
      msg = nresult.fail_msg
2163
      if msg:
2164
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2165
        continue
2166

    
2167
      node_vols = nresult.payload[:]
2168
      node_vols.sort(key=lambda vol: vol['dev'])
2169

    
2170
      for vol in node_vols:
2171
        node_output = []
2172
        for field in self.op.output_fields:
2173
          if field == "node":
2174
            val = node
2175
          elif field == "phys":
2176
            val = vol['dev']
2177
          elif field == "vg":
2178
            val = vol['vg']
2179
          elif field == "name":
2180
            val = vol['name']
2181
          elif field == "size":
2182
            val = int(float(vol['size']))
2183
          elif field == "instance":
2184
            for inst in ilist:
2185
              if node not in lv_by_node[inst]:
2186
                continue
2187
              if vol['name'] in lv_by_node[inst][node]:
2188
                val = inst.name
2189
                break
2190
            else:
2191
              val = '-'
2192
          else:
2193
            raise errors.ParameterError(field)
2194
          node_output.append(str(val))
2195

    
2196
        output.append(node_output)
2197

    
2198
    return output
2199

    
2200

    
2201
class LUAddNode(LogicalUnit):
2202
  """Logical unit for adding node to the cluster.
2203

2204
  """
2205
  HPATH = "node-add"
2206
  HTYPE = constants.HTYPE_NODE
2207
  _OP_REQP = ["node_name"]
2208

    
2209
  def BuildHooksEnv(self):
2210
    """Build hooks env.
2211

2212
    This will run on all nodes before, and on all nodes + the new node after.
2213

2214
    """
2215
    env = {
2216
      "OP_TARGET": self.op.node_name,
2217
      "NODE_NAME": self.op.node_name,
2218
      "NODE_PIP": self.op.primary_ip,
2219
      "NODE_SIP": self.op.secondary_ip,
2220
      }
2221
    nodes_0 = self.cfg.GetNodeList()
2222
    nodes_1 = nodes_0 + [self.op.node_name, ]
2223
    return env, nodes_0, nodes_1
2224

    
2225
  def CheckPrereq(self):
2226
    """Check prerequisites.
2227

2228
    This checks:
2229
     - the new node is not already in the config
2230
     - it is resolvable
2231
     - its parameters (single/dual homed) matches the cluster
2232

2233
    Any errors are signalled by raising errors.OpPrereqError.
2234

2235
    """
2236
    node_name = self.op.node_name
2237
    cfg = self.cfg
2238

    
2239
    dns_data = utils.HostInfo(node_name)
2240

    
2241
    node = dns_data.name
2242
    primary_ip = self.op.primary_ip = dns_data.ip
2243
    secondary_ip = getattr(self.op, "secondary_ip", None)
2244
    if secondary_ip is None:
2245
      secondary_ip = primary_ip
2246
    if not utils.IsValidIP(secondary_ip):
2247
      raise errors.OpPrereqError("Invalid secondary IP given")
2248
    self.op.secondary_ip = secondary_ip
2249

    
2250
    node_list = cfg.GetNodeList()
2251
    if not self.op.readd and node in node_list:
2252
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2253
                                 node)
2254
    elif self.op.readd and node not in node_list:
2255
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2256

    
2257
    for existing_node_name in node_list:
2258
      existing_node = cfg.GetNodeInfo(existing_node_name)
2259

    
2260
      if self.op.readd and node == existing_node_name:
2261
        if (existing_node.primary_ip != primary_ip or
2262
            existing_node.secondary_ip != secondary_ip):
2263
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2264
                                     " address configuration as before")
2265
        continue
2266

    
2267
      if (existing_node.primary_ip == primary_ip or
2268
          existing_node.secondary_ip == primary_ip or
2269
          existing_node.primary_ip == secondary_ip or
2270
          existing_node.secondary_ip == secondary_ip):
2271
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2272
                                   " existing node %s" % existing_node.name)
2273

    
2274
    # check that the type of the node (single versus dual homed) is the
2275
    # same as for the master
2276
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2277
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2278
    newbie_singlehomed = secondary_ip == primary_ip
2279
    if master_singlehomed != newbie_singlehomed:
2280
      if master_singlehomed:
2281
        raise errors.OpPrereqError("The master has no private ip but the"
2282
                                   " new node has one")
2283
      else:
2284
        raise errors.OpPrereqError("The master has a private ip but the"
2285
                                   " new node doesn't have one")
2286

    
2287
    # checks reachablity
2288
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2289
      raise errors.OpPrereqError("Node not reachable by ping")
2290

    
2291
    if not newbie_singlehomed:
2292
      # check reachability from my secondary ip to newbie's secondary ip
2293
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2294
                           source=myself.secondary_ip):
2295
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2296
                                   " based ping to noded port")
2297

    
2298
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2299
    mc_now, _ = self.cfg.GetMasterCandidateStats()
2300
    master_candidate = mc_now < cp_size
2301

    
2302
    self.new_node = objects.Node(name=node,
2303
                                 primary_ip=primary_ip,
2304
                                 secondary_ip=secondary_ip,
2305
                                 master_candidate=master_candidate,
2306
                                 offline=False, drained=False)
2307

    
2308
  def Exec(self, feedback_fn):
2309
    """Adds the new node to the cluster.
2310

2311
    """
2312
    new_node = self.new_node
2313
    node = new_node.name
2314

    
2315
    # check connectivity
2316
    result = self.rpc.call_version([node])[node]
2317
    result.Raise("Can't get version information from node %s" % node)
2318
    if constants.PROTOCOL_VERSION == result.payload:
2319
      logging.info("Communication to node %s fine, sw version %s match",
2320
                   node, result.payload)
2321
    else:
2322
      raise errors.OpExecError("Version mismatch master version %s,"
2323
                               " node version %s" %
2324
                               (constants.PROTOCOL_VERSION, result.payload))
2325

    
2326
    # setup ssh on node
2327
    logging.info("Copy ssh key to node %s", node)
2328
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2329
    keyarray = []
2330
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2331
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2332
                priv_key, pub_key]
2333

    
2334
    for i in keyfiles:
2335
      f = open(i, 'r')
2336
      try:
2337
        keyarray.append(f.read())
2338
      finally:
2339
        f.close()
2340

    
2341
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2342
                                    keyarray[2],
2343
                                    keyarray[3], keyarray[4], keyarray[5])
2344
    result.Raise("Cannot transfer ssh keys to the new node")
2345

    
2346
    # Add node to our /etc/hosts, and add key to known_hosts
2347
    if self.cfg.GetClusterInfo().modify_etc_hosts:
2348
      utils.AddHostToEtcHosts(new_node.name)
2349

    
2350
    if new_node.secondary_ip != new_node.primary_ip:
2351
      result = self.rpc.call_node_has_ip_address(new_node.name,
2352
                                                 new_node.secondary_ip)
2353
      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2354
                   prereq=True)
2355
      if not result.payload:
2356
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2357
                                 " you gave (%s). Please fix and re-run this"
2358
                                 " command." % new_node.secondary_ip)
2359

    
2360
    node_verify_list = [self.cfg.GetMasterNode()]
2361
    node_verify_param = {
2362
      'nodelist': [node],
2363
      # TODO: do a node-net-test as well?
2364
    }
2365

    
2366
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2367
                                       self.cfg.GetClusterName())
2368
    for verifier in node_verify_list:
2369
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
2370
      nl_payload = result[verifier].payload['nodelist']
2371
      if nl_payload:
2372
        for failed in nl_payload:
2373
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2374
                      (verifier, nl_payload[failed]))
2375
        raise errors.OpExecError("ssh/hostname verification failed.")
2376

    
2377
    if self.op.readd:
2378
      _RedistributeAncillaryFiles(self)
2379
      self.context.ReaddNode(new_node)
2380
    else:
2381
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
2382
      self.context.AddNode(new_node)
2383

    
2384

    
2385
class LUSetNodeParams(LogicalUnit):
2386
  """Modifies the parameters of a node.
2387

2388
  """
2389
  HPATH = "node-modify"
2390
  HTYPE = constants.HTYPE_NODE
2391
  _OP_REQP = ["node_name"]
2392
  REQ_BGL = False
2393

    
2394
  def CheckArguments(self):
2395
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2396
    if node_name is None:
2397
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2398
    self.op.node_name = node_name
2399
    _CheckBooleanOpField(self.op, 'master_candidate')
2400
    _CheckBooleanOpField(self.op, 'offline')
2401
    _CheckBooleanOpField(self.op, 'drained')
2402
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2403
    if all_mods.count(None) == 3:
2404
      raise errors.OpPrereqError("Please pass at least one modification")
2405
    if all_mods.count(True) > 1:
2406
      raise errors.OpPrereqError("Can't set the node into more than one"
2407
                                 " state at the same time")
2408

    
2409
  def ExpandNames(self):
2410
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2411

    
2412
  def BuildHooksEnv(self):
2413
    """Build hooks env.
2414

2415
    This runs on the master node.
2416

2417
    """
2418
    env = {
2419
      "OP_TARGET": self.op.node_name,
2420
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2421
      "OFFLINE": str(self.op.offline),
2422
      "DRAINED": str(self.op.drained),
2423
      }
2424
    nl = [self.cfg.GetMasterNode(),
2425
          self.op.node_name]
2426
    return env, nl, nl
2427

    
2428
  def CheckPrereq(self):
2429
    """Check prerequisites.
2430

2431
    This only checks the instance list against the existing names.
2432

2433
    """
2434
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2435

    
2436
    if ((self.op.master_candidate == False or self.op.offline == True or
2437
         self.op.drained == True) and node.master_candidate):
2438
      # we will demote the node from master_candidate
2439
      if self.op.node_name == self.cfg.GetMasterNode():
2440
        raise errors.OpPrereqError("The master node has to be a"
2441
                                   " master candidate, online and not drained")
2442
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2443
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2444
      if num_candidates <= cp_size:
2445
        msg = ("Not enough master candidates (desired"
2446
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2447
        if self.op.force:
2448
          self.LogWarning(msg)
2449
        else:
2450
          raise errors.OpPrereqError(msg)
2451

    
2452
    if (self.op.master_candidate == True and
2453
        ((node.offline and not self.op.offline == False) or
2454
         (node.drained and not self.op.drained == False))):
2455
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2456
                                 " to master_candidate" % node.name)
2457

    
2458
    return
2459

    
2460
  def Exec(self, feedback_fn):
2461
    """Modifies a node.
2462

2463
    """
2464
    node = self.node
2465

    
2466
    result = []
2467
    changed_mc = False
2468

    
2469
    if self.op.offline is not None:
2470
      node.offline = self.op.offline
2471
      result.append(("offline", str(self.op.offline)))
2472
      if self.op.offline == True:
2473
        if node.master_candidate:
2474
          node.master_candidate = False
2475
          changed_mc = True
2476
          result.append(("master_candidate", "auto-demotion due to offline"))
2477
        if node.drained:
2478
          node.drained = False
2479
          result.append(("drained", "clear drained status due to offline"))
2480

    
2481
    if self.op.master_candidate is not None:
2482
      node.master_candidate = self.op.master_candidate
2483
      changed_mc = True
2484
      result.append(("master_candidate", str(self.op.master_candidate)))
2485
      if self.op.master_candidate == False:
2486
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2487
        msg = rrc.fail_msg
2488
        if msg:
2489
          self.LogWarning("Node failed to demote itself: %s" % msg)
2490

    
2491
    if self.op.drained is not None:
2492
      node.drained = self.op.drained
2493
      result.append(("drained", str(self.op.drained)))
2494
      if self.op.drained == True:
2495
        if node.master_candidate:
2496
          node.master_candidate = False
2497
          changed_mc = True
2498
          result.append(("master_candidate", "auto-demotion due to drain"))
2499
        if node.offline:
2500
          node.offline = False
2501
          result.append(("offline", "clear offline status due to drain"))
2502

    
2503
    # this will trigger configuration file update, if needed
2504
    self.cfg.Update(node)
2505
    # this will trigger job queue propagation or cleanup
2506
    if changed_mc:
2507
      self.context.ReaddNode(node)
2508

    
2509
    return result
2510

    
2511

    
2512
class LUPowercycleNode(NoHooksLU):
2513
  """Powercycles a node.
2514

2515
  """
2516
  _OP_REQP = ["node_name", "force"]
2517
  REQ_BGL = False
2518

    
2519
  def CheckArguments(self):
2520
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2521
    if node_name is None:
2522
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2523
    self.op.node_name = node_name
2524
    if node_name == self.cfg.GetMasterNode() and not self.op.force:
2525
      raise errors.OpPrereqError("The node is the master and the force"
2526
                                 " parameter was not set")
2527

    
2528
  def ExpandNames(self):
2529
    """Locking for PowercycleNode.
2530

2531
    This is a last-resource option and shouldn't block on other
2532
    jobs. Therefore, we grab no locks.
2533

2534
    """
2535
    self.needed_locks = {}
2536

    
2537
  def CheckPrereq(self):
2538
    """Check prerequisites.
2539

2540
    This LU has no prereqs.
2541

2542
    """
2543
    pass
2544

    
2545
  def Exec(self, feedback_fn):
2546
    """Reboots a node.
2547

2548
    """
2549
    result = self.rpc.call_node_powercycle(self.op.node_name,
2550
                                           self.cfg.GetHypervisorType())
2551
    result.Raise("Failed to schedule the reboot")
2552
    return result.payload
2553

    
2554

    
2555
class LUQueryClusterInfo(NoHooksLU):
2556
  """Query cluster configuration.
2557

2558
  """
2559
  _OP_REQP = []
2560
  REQ_BGL = False
2561

    
2562
  def ExpandNames(self):
2563
    self.needed_locks = {}
2564

    
2565
  def CheckPrereq(self):
2566
    """No prerequsites needed for this LU.
2567

2568
    """
2569
    pass
2570

    
2571
  def Exec(self, feedback_fn):
2572
    """Return cluster config.
2573

2574
    """
2575
    cluster = self.cfg.GetClusterInfo()
2576
    result = {
2577
      "software_version": constants.RELEASE_VERSION,
2578
      "protocol_version": constants.PROTOCOL_VERSION,
2579
      "config_version": constants.CONFIG_VERSION,
2580
      "os_api_version": constants.OS_API_VERSION,
2581
      "export_version": constants.EXPORT_VERSION,
2582
      "architecture": (platform.architecture()[0], platform.machine()),
2583
      "name": cluster.cluster_name,
2584
      "master": cluster.master_node,
2585
      "default_hypervisor": cluster.default_hypervisor,
2586
      "enabled_hypervisors": cluster.enabled_hypervisors,
2587
      "hvparams": dict([(hvname, cluster.hvparams[hvname])
2588
                        for hvname in cluster.enabled_hypervisors]),
2589
      "beparams": cluster.beparams,
2590
      "nicparams": cluster.nicparams,
2591
      "candidate_pool_size": cluster.candidate_pool_size,
2592
      "master_netdev": cluster.master_netdev,
2593
      "volume_group_name": cluster.volume_group_name,
2594
      "file_storage_dir": cluster.file_storage_dir,
2595
      }
2596

    
2597
    return result
2598

    
2599

    
2600
class LUQueryConfigValues(NoHooksLU):
2601
  """Return configuration values.
2602

2603
  """
2604
  _OP_REQP = []
2605
  REQ_BGL = False
2606
  _FIELDS_DYNAMIC = utils.FieldSet()
2607
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2608

    
2609
  def ExpandNames(self):
2610
    self.needed_locks = {}
2611

    
2612
    _CheckOutputFields(static=self._FIELDS_STATIC,
2613
                       dynamic=self._FIELDS_DYNAMIC,
2614
                       selected=self.op.output_fields)
2615

    
2616
  def CheckPrereq(self):
2617
    """No prerequisites.
2618

2619
    """
2620
    pass
2621

    
2622
  def Exec(self, feedback_fn):
2623
    """Dump a representation of the cluster config to the standard output.
2624

2625
    """
2626
    values = []
2627
    for field in self.op.output_fields:
2628
      if field == "cluster_name":
2629
        entry = self.cfg.GetClusterName()
2630
      elif field == "master_node":
2631
        entry = self.cfg.GetMasterNode()
2632
      elif field == "drain_flag":
2633
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2634
      else:
2635
        raise errors.ParameterError(field)
2636
      values.append(entry)
2637
    return values
2638

    
2639

    
2640
class LUActivateInstanceDisks(NoHooksLU):
2641
  """Bring up an instance's disks.
2642

2643
  """
2644
  _OP_REQP = ["instance_name"]
2645
  REQ_BGL = False
2646

    
2647
  def ExpandNames(self):
2648
    self._ExpandAndLockInstance()
2649
    self.needed_locks[locking.LEVEL_NODE] = []
2650
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2651

    
2652
  def DeclareLocks(self, level):
2653
    if level == locking.LEVEL_NODE:
2654
      self._LockInstancesNodes()
2655

    
2656
  def CheckPrereq(self):
2657
    """Check prerequisites.
2658

2659
    This checks that the instance is in the cluster.
2660

2661
    """
2662
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2663
    assert self.instance is not None, \
2664
      "Cannot retrieve locked instance %s" % self.op.instance_name
2665
    _CheckNodeOnline(self, self.instance.primary_node)
2666

    
2667
  def Exec(self, feedback_fn):
2668
    """Activate the disks.
2669

2670
    """
2671
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2672
    if not disks_ok:
2673
      raise errors.OpExecError("Cannot activate block devices")
2674

    
2675
    return disks_info
2676

    
2677

    
2678
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2679
  """Prepare the block devices for an instance.
2680

2681
  This sets up the block devices on all nodes.
2682

2683
  @type lu: L{LogicalUnit}
2684
  @param lu: the logical unit on whose behalf we execute
2685
  @type instance: L{objects.Instance}
2686
  @param instance: the instance for whose disks we assemble
2687
  @type ignore_secondaries: boolean
2688
  @param ignore_secondaries: if true, errors on secondary nodes
2689
      won't result in an error return from the function
2690
  @return: False if the operation failed, otherwise a list of
2691
      (host, instance_visible_name, node_visible_name)
2692
      with the mapping from node devices to instance devices
2693

2694
  """
2695
  device_info = []
2696
  disks_ok = True
2697
  iname = instance.name
2698
  # With the two passes mechanism we try to reduce the window of
2699
  # opportunity for the race condition of switching DRBD to primary
2700
  # before handshaking occured, but we do not eliminate it
2701

    
2702
  # The proper fix would be to wait (with some limits) until the
2703
  # connection has been made and drbd transitions from WFConnection
2704
  # into any other network-connected state (Connected, SyncTarget,
2705
  # SyncSource, etc.)
2706

    
2707
  # 1st pass, assemble on all nodes in secondary mode
2708
  for inst_disk in instance.disks:
2709
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2710
      lu.cfg.SetDiskID(node_disk, node)
2711
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2712
      msg = result.fail_msg
2713
      if msg:
2714
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2715
                           " (is_primary=False, pass=1): %s",
2716
                           inst_disk.iv_name, node, msg)
2717
        if not ignore_secondaries:
2718
          disks_ok = False
2719

    
2720
  # FIXME: race condition on drbd migration to primary
2721

    
2722
  # 2nd pass, do only the primary node
2723
  for inst_disk in instance.disks:
2724
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2725
      if node != instance.primary_node:
2726
        continue
2727
      lu.cfg.SetDiskID(node_disk, node)
2728
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2729
      msg = result.fail_msg
2730
      if msg:
2731
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2732
                           " (is_primary=True, pass=2): %s",
2733
                           inst_disk.iv_name, node, msg)
2734
        disks_ok = False
2735
    device_info.append((instance.primary_node, inst_disk.iv_name,
2736
                        result.payload))
2737

    
2738
  # leave the disks configured for the primary node
2739
  # this is a workaround that would be fixed better by
2740
  # improving the logical/physical id handling
2741
  for disk in instance.disks:
2742
    lu.cfg.SetDiskID(disk, instance.primary_node)
2743

    
2744
  return disks_ok, device_info
2745

    
2746

    
2747
def _StartInstanceDisks(lu, instance, force):
2748
  """Start the disks of an instance.
2749

2750
  """
2751
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2752
                                           ignore_secondaries=force)
2753
  if not disks_ok:
2754
    _ShutdownInstanceDisks(lu, instance)
2755
    if force is not None and not force:
2756
      lu.proc.LogWarning("", hint="If the message above refers to a"
2757
                         " secondary node,"
2758
                         " you can retry the operation using '--force'.")
2759
    raise errors.OpExecError("Disk consistency error")
2760

    
2761

    
2762
class LUDeactivateInstanceDisks(NoHooksLU):
2763
  """Shutdown an instance's disks.
2764

2765
  """
2766
  _OP_REQP = ["instance_name"]
2767
  REQ_BGL = False
2768

    
2769
  def ExpandNames(self):
2770
    self._ExpandAndLockInstance()
2771
    self.needed_locks[locking.LEVEL_NODE] = []
2772
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2773

    
2774
  def DeclareLocks(self, level):
2775
    if level == locking.LEVEL_NODE:
2776
      self._LockInstancesNodes()
2777

    
2778
  def CheckPrereq(self):
2779
    """Check prerequisites.
2780

2781
    This checks that the instance is in the cluster.
2782

2783
    """
2784
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2785
    assert self.instance is not None, \
2786
      "Cannot retrieve locked instance %s" % self.op.instance_name
2787

    
2788
  def Exec(self, feedback_fn):
2789
    """Deactivate the disks
2790

2791
    """
2792
    instance = self.instance
2793
    _SafeShutdownInstanceDisks(self, instance)
2794

    
2795

    
2796
def _SafeShutdownInstanceDisks(lu, instance):
2797
  """Shutdown block devices of an instance.
2798

2799
  This function checks if an instance is running, before calling
2800
  _ShutdownInstanceDisks.
2801

2802
  """
2803
  pnode = instance.primary_node
2804
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
2805
  ins_l.Raise("Can't contact node %s" % pnode)
2806

    
2807
  if instance.name in ins_l.payload:
2808
    raise errors.OpExecError("Instance is running, can't shutdown"
2809
                             " block devices.")
2810

    
2811
  _ShutdownInstanceDisks(lu, instance)
2812

    
2813

    
2814
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2815
  """Shutdown block devices of an instance.
2816

2817
  This does the shutdown on all nodes of the instance.
2818

2819
  If the ignore_primary is false, errors on the primary node are
2820
  ignored.
2821

2822
  """
2823
  all_result = True
2824
  for disk in instance.disks:
2825
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2826
      lu.cfg.SetDiskID(top_disk, node)
2827
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2828
      msg = result.fail_msg
2829
      if msg:
2830
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2831
                      disk.iv_name, node, msg)
2832
        if not ignore_primary or node != instance.primary_node:
2833
          all_result = False
2834
  return all_result
2835

    
2836

    
2837
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2838
  """Checks if a node has enough free memory.
2839

2840
  This function check if a given node has the needed amount of free
2841
  memory. In case the node has less memory or we cannot get the
2842
  information from the node, this function raise an OpPrereqError
2843
  exception.
2844

2845
  @type lu: C{LogicalUnit}
2846
  @param lu: a logical unit from which we get configuration data
2847
  @type node: C{str}
2848
  @param node: the node to check
2849
  @type reason: C{str}
2850
  @param reason: string to use in the error message
2851
  @type requested: C{int}
2852
  @param requested: the amount of memory in MiB to check for
2853
  @type hypervisor_name: C{str}
2854
  @param hypervisor_name: the hypervisor to ask for memory stats
2855
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2856
      we cannot check the node
2857

2858
  """
2859
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2860
  nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
2861
  free_mem = nodeinfo[node].payload.get('memory_free', None)
2862
  if not isinstance(free_mem, int):
2863
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2864
                               " was '%s'" % (node, free_mem))
2865
  if requested > free_mem:
2866
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2867
                               " needed %s MiB, available %s MiB" %
2868
                               (node, reason, requested, free_mem))
2869

    
2870

    
2871
class LUStartupInstance(LogicalUnit):
2872
  """Starts an instance.
2873

2874
  """
2875
  HPATH = "instance-start"
2876
  HTYPE = constants.HTYPE_INSTANCE
2877
  _OP_REQP = ["instance_name", "force"]
2878
  REQ_BGL = False
2879

    
2880
  def ExpandNames(self):
2881
    self._ExpandAndLockInstance()
2882

    
2883
  def BuildHooksEnv(self):
2884
    """Build hooks env.
2885

2886
    This runs on master, primary and secondary nodes of the instance.
2887

2888
    """
2889
    env = {
2890
      "FORCE": self.op.force,
2891
      }
2892
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2893
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2894
    return env, nl, nl
2895

    
2896
  def CheckPrereq(self):
2897
    """Check prerequisites.
2898

2899
    This checks that the instance is in the cluster.
2900

2901
    """
2902
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2903
    assert self.instance is not None, \
2904
      "Cannot retrieve locked instance %s" % self.op.instance_name
2905

    
2906
    # extra beparams
2907
    self.beparams = getattr(self.op, "beparams", {})
2908
    if self.beparams:
2909
      if not isinstance(self.beparams, dict):
2910
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2911
                                   " dict" % (type(self.beparams), ))
2912
      # fill the beparams dict
2913
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2914
      self.op.beparams = self.beparams
2915

    
2916
    # extra hvparams
2917
    self.hvparams = getattr(self.op, "hvparams", {})
2918
    if self.hvparams:
2919
      if not isinstance(self.hvparams, dict):
2920
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2921
                                   " dict" % (type(self.hvparams), ))
2922

    
2923
      # check hypervisor parameter syntax (locally)
2924
      cluster = self.cfg.GetClusterInfo()
2925
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2926
      filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2927
                                    instance.hvparams)
2928
      filled_hvp.update(self.hvparams)
2929
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2930
      hv_type.CheckParameterSyntax(filled_hvp)
2931
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2932
      self.op.hvparams = self.hvparams
2933

    
2934
    _CheckNodeOnline(self, instance.primary_node)
2935

    
2936
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2937
    # check bridges existance
2938
    _CheckInstanceBridgesExist(self, instance)
2939

    
2940
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2941
                                              instance.name,
2942
                                              instance.hypervisor)
2943
    remote_info.Raise("Error checking node %s" % instance.primary_node,
2944
                      prereq=True)
2945
    if not remote_info.payload: # not running already
2946
      _CheckNodeFreeMemory(self, instance.primary_node,
2947
                           "starting instance %s" % instance.name,
2948
                           bep[constants.BE_MEMORY], instance.hypervisor)
2949

    
2950
  def Exec(self, feedback_fn):
2951
    """Start the instance.
2952

2953
    """
2954
    instance = self.instance
2955
    force = self.op.force
2956

    
2957
    self.cfg.MarkInstanceUp(instance.name)
2958

    
2959
    node_current = instance.primary_node
2960

    
2961
    _StartInstanceDisks(self, instance, force)
2962

    
2963
    result = self.rpc.call_instance_start(node_current, instance,
2964
                                          self.hvparams, self.beparams)
2965
    msg = result.fail_msg
2966
    if msg:
2967
      _ShutdownInstanceDisks(self, instance)
2968
      raise errors.OpExecError("Could not start instance: %s" % msg)
2969

    
2970

    
2971
class LURebootInstance(LogicalUnit):
2972
  """Reboot an instance.
2973

2974
  """
2975
  HPATH = "instance-reboot"
2976
  HTYPE = constants.HTYPE_INSTANCE
2977
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2978
  REQ_BGL = False
2979

    
2980
  def ExpandNames(self):
2981
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2982
                                   constants.INSTANCE_REBOOT_HARD,
2983
                                   constants.INSTANCE_REBOOT_FULL]:
2984
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2985
                                  (constants.INSTANCE_REBOOT_SOFT,
2986
                                   constants.INSTANCE_REBOOT_HARD,
2987
                                   constants.INSTANCE_REBOOT_FULL))
2988
    self._ExpandAndLockInstance()
2989

    
2990
  def BuildHooksEnv(self):
2991
    """Build hooks env.
2992

2993
    This runs on master, primary and secondary nodes of the instance.
2994

2995
    """
2996
    env = {
2997
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2998
      "REBOOT_TYPE": self.op.reboot_type,
2999
      }
3000
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3001
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3002
    return env, nl, nl
3003

    
3004
  def CheckPrereq(self):
3005
    """Check prerequisites.
3006

3007
    This checks that the instance is in the cluster.
3008

3009
    """
3010
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3011
    assert self.instance is not None, \
3012
      "Cannot retrieve locked instance %s" % self.op.instance_name
3013

    
3014
    _CheckNodeOnline(self, instance.primary_node)
3015

    
3016
    # check bridges existance
3017
    _CheckInstanceBridgesExist(self, instance)
3018

    
3019
  def Exec(self, feedback_fn):
3020
    """Reboot the instance.
3021

3022
    """
3023
    instance = self.instance
3024
    ignore_secondaries = self.op.ignore_secondaries
3025
    reboot_type = self.op.reboot_type
3026

    
3027
    node_current = instance.primary_node
3028

    
3029
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3030
                       constants.INSTANCE_REBOOT_HARD]:
3031
      for disk in instance.disks:
3032
        self.cfg.SetDiskID(disk, node_current)
3033
      result = self.rpc.call_instance_reboot(node_current, instance,
3034
                                             reboot_type)
3035
      result.Raise("Could not reboot instance")
3036
    else:
3037
      result = self.rpc.call_instance_shutdown(node_current, instance)
3038
      result.Raise("Could not shutdown instance for full reboot")
3039
      _ShutdownInstanceDisks(self, instance)
3040
      _StartInstanceDisks(self, instance, ignore_secondaries)
3041
      result = self.rpc.call_instance_start(node_current, instance, None, None)
3042
      msg = result.fail_msg
3043
      if msg:
3044
        _ShutdownInstanceDisks(self, instance)
3045
        raise errors.OpExecError("Could not start instance for"
3046
                                 " full reboot: %s" % msg)
3047

    
3048
    self.cfg.MarkInstanceUp(instance.name)
3049

    
3050

    
3051
class LUShutdownInstance(LogicalUnit):
3052
  """Shutdown an instance.
3053

3054
  """
3055
  HPATH = "instance-stop"
3056
  HTYPE = constants.HTYPE_INSTANCE
3057
  _OP_REQP = ["instance_name"]
3058
  REQ_BGL = False
3059

    
3060
  def ExpandNames(self):
3061
    self._ExpandAndLockInstance()
3062

    
3063
  def BuildHooksEnv(self):
3064
    """Build hooks env.
3065

3066
    This runs on master, primary and secondary nodes of the instance.
3067

3068
    """
3069
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3070
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3071
    return env, nl, nl
3072

    
3073
  def CheckPrereq(self):
3074
    """Check prerequisites.
3075

3076
    This checks that the instance is in the cluster.
3077

3078
    """
3079
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3080
    assert self.instance is not None, \
3081
      "Cannot retrieve locked instance %s" % self.op.instance_name
3082
    _CheckNodeOnline(self, self.instance.primary_node)
3083

    
3084
  def Exec(self, feedback_fn):
3085
    """Shutdown the instance.
3086

3087
    """
3088
    instance = self.instance
3089
    node_current = instance.primary_node
3090
    self.cfg.MarkInstanceDown(instance.name)
3091
    result = self.rpc.call_instance_shutdown(node_current, instance)
3092
    msg = result.fail_msg
3093
    if msg:
3094
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3095

    
3096
    _ShutdownInstanceDisks(self, instance)
3097

    
3098

    
3099
class LUReinstallInstance(LogicalUnit):
3100
  """Reinstall an instance.
3101

3102
  """
3103
  HPATH = "instance-reinstall"
3104
  HTYPE = constants.HTYPE_INSTANCE
3105
  _OP_REQP = ["instance_name"]
3106
  REQ_BGL = False
3107

    
3108
  def ExpandNames(self):
3109
    self._ExpandAndLockInstance()
3110

    
3111
  def BuildHooksEnv(self):
3112
    """Build hooks env.
3113

3114
    This runs on master, primary and secondary nodes of the instance.
3115

3116
    """
3117
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3118
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3119
    return env, nl, nl
3120

    
3121
  def CheckPrereq(self):
3122
    """Check prerequisites.
3123

3124
    This checks that the instance is in the cluster and is not running.
3125

3126
    """
3127
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3128
    assert instance is not None, \
3129
      "Cannot retrieve locked instance %s" % self.op.instance_name
3130
    _CheckNodeOnline(self, instance.primary_node)
3131

    
3132
    if instance.disk_template == constants.DT_DISKLESS:
3133
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3134
                                 self.op.instance_name)
3135
    if instance.admin_up:
3136
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3137
                                 self.op.instance_name)
3138
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3139
                                              instance.name,
3140
                                              instance.hypervisor)
3141
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3142
                      prereq=True)
3143
    if remote_info.payload:
3144
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3145
                                 (self.op.instance_name,
3146
                                  instance.primary_node))
3147

    
3148
    self.op.os_type = getattr(self.op, "os_type", None)
3149
    if self.op.os_type is not None:
3150
      # OS verification
3151
      pnode = self.cfg.GetNodeInfo(
3152
        self.cfg.ExpandNodeName(instance.primary_node))
3153
      if pnode is None:
3154
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
3155
                                   self.op.pnode)
3156
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3157
      result.Raise("OS '%s' not in supported OS list for primary node %s" %
3158
                   (self.op.os_type, pnode.name), prereq=True)
3159

    
3160
    self.instance = instance
3161

    
3162
  def Exec(self, feedback_fn):
3163
    """Reinstall the instance.
3164

3165
    """
3166
    inst = self.instance
3167

    
3168
    if self.op.os_type is not None:
3169
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3170
      inst.os = self.op.os_type
3171
      self.cfg.Update(inst)
3172

    
3173
    _StartInstanceDisks(self, inst, None)
3174
    try:
3175
      feedback_fn("Running the instance OS create scripts...")
3176
      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3177
      result.Raise("Could not install OS for instance %s on node %s" %
3178
                   (inst.name, inst.primary_node))
3179
    finally:
3180
      _ShutdownInstanceDisks(self, inst)
3181

    
3182

    
3183
class LURenameInstance(LogicalUnit):
3184
  """Rename an instance.
3185

3186
  """
3187
  HPATH = "instance-rename"
3188
  HTYPE = constants.HTYPE_INSTANCE
3189
  _OP_REQP = ["instance_name", "new_name"]
3190

    
3191
  def BuildHooksEnv(self):
3192
    """Build hooks env.
3193

3194
    This runs on master, primary and secondary nodes of the instance.
3195

3196
    """
3197
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3198
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3199
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3200
    return env, nl, nl
3201

    
3202
  def CheckPrereq(self):
3203
    """Check prerequisites.
3204

3205
    This checks that the instance is in the cluster and is not running.
3206

3207
    """
3208
    instance = self.cfg.GetInstanceInfo(
3209
      self.cfg.ExpandInstanceName(self.op.instance_name))
3210
    if instance is None:
3211
      raise errors.OpPrereqError("Instance '%s' not known" %
3212
                                 self.op.instance_name)
3213
    _CheckNodeOnline(self, instance.primary_node)
3214

    
3215
    if instance.admin_up:
3216
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3217
                                 self.op.instance_name)
3218
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3219
                                              instance.name,
3220
                                              instance.hypervisor)
3221
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3222
                      prereq=True)
3223
    if remote_info.payload:
3224
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3225
                                 (self.op.instance_name,
3226
                                  instance.primary_node))
3227
    self.instance = instance
3228

    
3229
    # new name verification
3230
    name_info = utils.HostInfo(self.op.new_name)
3231

    
3232
    self.op.new_name = new_name = name_info.name
3233
    instance_list = self.cfg.GetInstanceList()
3234
    if new_name in instance_list:
3235
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3236
                                 new_name)
3237

    
3238
    if not getattr(self.op, "ignore_ip", False):
3239
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3240
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3241
                                   (name_info.ip, new_name))
3242

    
3243

    
3244
  def Exec(self, feedback_fn):
3245
    """Reinstall the instance.
3246

3247
    """
3248
    inst = self.instance
3249
    old_name = inst.name
3250

    
3251
    if inst.disk_template == constants.DT_FILE:
3252
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3253

    
3254
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3255
    # Change the instance lock. This is definitely safe while we hold the BGL
3256
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3257
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3258

    
3259
    # re-read the instance from the configuration after rename
3260
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3261

    
3262
    if inst.disk_template == constants.DT_FILE:
3263
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3264
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3265
                                                     old_file_storage_dir,
3266
                                                     new_file_storage_dir)
3267
      result.Raise("Could not rename on node %s directory '%s' to '%s'"
3268
                   " (but the instance has been renamed in Ganeti)" %
3269
                   (inst.primary_node, old_file_storage_dir,
3270
                    new_file_storage_dir))
3271

    
3272
    _StartInstanceDisks(self, inst, None)
3273
    try:
3274
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3275
                                                 old_name)
3276
      msg = result.fail_msg
3277
      if msg:
3278
        msg = ("Could not run OS rename script for instance %s on node %s"
3279
               " (but the instance has been renamed in Ganeti): %s" %
3280
               (inst.name, inst.primary_node, msg))
3281
        self.proc.LogWarning(msg)
3282
    finally:
3283
      _ShutdownInstanceDisks(self, inst)
3284

    
3285

    
3286
class LURemoveInstance(LogicalUnit):
3287
  """Remove an instance.
3288

3289
  """
3290
  HPATH = "instance-remove"
3291
  HTYPE = constants.HTYPE_INSTANCE
3292
  _OP_REQP = ["instance_name", "ignore_failures"]
3293
  REQ_BGL = False
3294

    
3295
  def ExpandNames(self):
3296
    self._ExpandAndLockInstance()
3297
    self.needed_locks[locking.LEVEL_NODE] = []
3298
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3299

    
3300
  def DeclareLocks(self, level):
3301
    if level == locking.LEVEL_NODE:
3302
      self._LockInstancesNodes()
3303

    
3304
  def BuildHooksEnv(self):
3305
    """Build hooks env.
3306

3307
    This runs on master, primary and secondary nodes of the instance.
3308

3309
    """
3310
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3311
    nl = [self.cfg.GetMasterNode()]
3312
    return env, nl, nl
3313

    
3314
  def CheckPrereq(self):
3315
    """Check prerequisites.
3316

3317
    This checks that the instance is in the cluster.
3318

3319
    """
3320
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3321
    assert self.instance is not None, \
3322
      "Cannot retrieve locked instance %s" % self.op.instance_name
3323

    
3324
  def Exec(self, feedback_fn):
3325
    """Remove the instance.
3326

3327
    """
3328
    instance = self.instance
3329
    logging.info("Shutting down instance %s on node %s",
3330
                 instance.name, instance.primary_node)
3331

    
3332
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3333
    msg = result.fail_msg
3334
    if msg:
3335
      if self.op.ignore_failures:
3336
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3337
      else:
3338
        raise errors.OpExecError("Could not shutdown instance %s on"
3339
                                 " node %s: %s" %
3340
                                 (instance.name, instance.primary_node, msg))
3341

    
3342
    logging.info("Removing block devices for instance %s", instance.name)
3343

    
3344
    if not _RemoveDisks(self, instance):
3345
      if self.op.ignore_failures:
3346
        feedback_fn("Warning: can't remove instance's disks")
3347
      else:
3348
        raise errors.OpExecError("Can't remove instance's disks")
3349

    
3350
    logging.info("Removing instance %s out of cluster config", instance.name)
3351

    
3352
    self.cfg.RemoveInstance(instance.name)
3353
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3354

    
3355

    
3356
class LUQueryInstances(NoHooksLU):
3357
  """Logical unit for querying instances.
3358

3359
  """
3360
  _OP_REQP = ["output_fields", "names", "use_locking"]
3361
  REQ_BGL = False
3362
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3363
                                    "admin_state",
3364
                                    "disk_template", "ip", "mac", "bridge",
3365
                                    "nic_mode", "nic_link",
3366
                                    "sda_size", "sdb_size", "vcpus", "tags",
3367
                                    "network_port", "beparams",
3368
                                    r"(disk)\.(size)/([0-9]+)",
3369
                                    r"(disk)\.(sizes)", "disk_usage",
3370
                                    r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3371
                                    r"(nic)\.(bridge)/([0-9]+)",
3372
                                    r"(nic)\.(macs|ips|modes|links|bridges)",
3373
                                    r"(disk|nic)\.(count)",
3374
                                    "serial_no", "hypervisor", "hvparams",] +
3375
                                  ["hv/%s" % name
3376
                                   for name in constants.HVS_PARAMETERS] +
3377
                                  ["be/%s" % name
3378
                                   for name in constants.BES_PARAMETERS])
3379
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3380

    
3381

    
3382
  def ExpandNames(self):
3383
    _CheckOutputFields(static=self._FIELDS_STATIC,
3384
                       dynamic=self._FIELDS_DYNAMIC,
3385
                       selected=self.op.output_fields)
3386

    
3387
    self.needed_locks = {}
3388
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3389
    self.share_locks[locking.LEVEL_NODE] = 1
3390

    
3391
    if self.op.names:
3392
      self.wanted = _GetWantedInstances(self, self.op.names)
3393
    else:
3394
      self.wanted = locking.ALL_SET
3395

    
3396
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3397
    self.do_locking = self.do_node_query and self.op.use_locking
3398
    if self.do_locking:
3399
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3400
      self.needed_locks[locking.LEVEL_NODE] = []
3401
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3402

    
3403
  def DeclareLocks(self, level):
3404
    if level == locking.LEVEL_NODE and self.do_locking:
3405
      self._LockInstancesNodes()
3406

    
3407
  def CheckPrereq(self):
3408
    """Check prerequisites.
3409

3410
    """
3411
    pass
3412

    
3413
  def Exec(self, feedback_fn):
3414
    """Computes the list of nodes and their attributes.
3415

3416
    """
3417
    all_info = self.cfg.GetAllInstancesInfo()
3418
    if self.wanted == locking.ALL_SET:
3419
      # caller didn't specify instance names, so ordering is not important
3420
      if self.do_locking:
3421
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3422
      else:
3423
        instance_names = all_info.keys()
3424
      instance_names = utils.NiceSort(instance_names)
3425
    else:
3426
      # caller did specify names, so we must keep the ordering
3427
      if self.do_locking:
3428
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3429
      else:
3430
        tgt_set = all_info.keys()
3431
      missing = set(self.wanted).difference(tgt_set)
3432
      if missing:
3433
        raise errors.OpExecError("Some instances were removed before"
3434
                                 " retrieving their data: %s" % missing)
3435
      instance_names = self.wanted
3436

    
3437
    instance_list = [all_info[iname] for iname in instance_names]
3438

    
3439
    # begin data gathering
3440

    
3441
    nodes = frozenset([inst.primary_node for inst in instance_list])
3442
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3443

    
3444
    bad_nodes = []
3445
    off_nodes = []
3446
    if self.do_node_query:
3447
      live_data = {}
3448
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3449
      for name in nodes:
3450
        result = node_data[name]
3451
        if result.offline:
3452
          # offline nodes will be in both lists
3453
          off_nodes.append(name)
3454
        if result.failed or result.fail_msg:
3455
          bad_nodes.append(name)
3456
        else:
3457
          if result.payload:
3458
            live_data.update(result.payload)
3459
          # else no instance is alive
3460
    else:
3461
      live_data = dict([(name, {}) for name in instance_names])
3462

    
3463
    # end data gathering
3464

    
3465
    HVPREFIX = "hv/"
3466
    BEPREFIX = "be/"
3467
    output = []
3468
    cluster = self.cfg.GetClusterInfo()
3469
    for instance in instance_list:
3470
      iout = []
3471
      i_hv = cluster.FillHV(instance)
3472
      i_be = cluster.FillBE(instance)
3473
      i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
3474
                                 nic.nicparams) for nic in instance.nics]
3475
      for field in self.op.output_fields:
3476
        st_match = self._FIELDS_STATIC.Matches(field)
3477
        if field == "name":
3478
          val = instance.name
3479
        elif field == "os":
3480
          val = instance.os
3481
        elif field == "pnode":
3482
          val = instance.primary_node
3483
        elif field == "snodes":
3484
          val = list(instance.secondary_nodes)
3485
        elif field == "admin_state":
3486
          val = instance.admin_up
3487
        elif field == "oper_state":
3488
          if instance.primary_node in bad_nodes:
3489
            val = None
3490
          else:
3491
            val = bool(live_data.get(instance.name))
3492
        elif field == "status":
3493
          if instance.primary_node in off_nodes:
3494
            val = "ERROR_nodeoffline"
3495
          elif instance.primary_node in bad_nodes:
3496
            val = "ERROR_nodedown"
3497
          else:
3498
            running = bool(live_data.get(instance.name))
3499
            if running:
3500
              if instance.admin_up:
3501
                val = "running"
3502
              else:
3503
                val = "ERROR_up"
3504
            else:
3505
              if instance.admin_up:
3506
                val = "ERROR_down"
3507
              else:
3508
                val = "ADMIN_down"
3509
        elif field == "oper_ram":
3510
          if instance.primary_node in bad_nodes:
3511
            val = None
3512
          elif instance.name in live_data:
3513
            val = live_data[instance.name].get("memory", "?")
3514
          else:
3515
            val = "-"
3516
        elif field == "disk_template":
3517
          val = instance.disk_template
3518
        elif field == "ip":
3519
          if instance.nics:
3520
            val = instance.nics[0].ip
3521
          else:
3522
            val = None
3523
        elif field == "nic_mode":
3524
          if instance.nics:
3525
            val = i_nicp[0][constants.NIC_MODE]
3526
          else:
3527
            val = None
3528
        elif field == "nic_link":
3529
          if instance.nics:
3530
            val = i_nicp[0][constants.NIC_LINK]
3531
          else:
3532
            val = None
3533
        elif field == "bridge":
3534
          if (instance.nics and
3535
              i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
3536
            val = i_nicp[0][constants.NIC_LINK]
3537
          else:
3538
            val = None
3539
        elif field == "mac":
3540
          if instance.nics:
3541
            val = instance.nics[0].mac
3542
          else:
3543
            val = None
3544
        elif field == "sda_size" or field == "sdb_size":
3545
          idx = ord(field[2]) - ord('a')
3546
          try:
3547
            val = instance.FindDisk(idx).size
3548
          except errors.OpPrereqError:
3549
            val = None
3550
        elif field == "disk_usage": # total disk usage per node
3551
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3552
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3553
        elif field == "tags":
3554
          val = list(instance.GetTags())
3555
        elif field == "serial_no":
3556
          val = instance.serial_no
3557
        elif field == "network_port":
3558
          val = instance.network_port
3559
        elif field == "hypervisor":
3560
          val = instance.hypervisor
3561
        elif field == "hvparams":
3562
          val = i_hv
3563
        elif (field.startswith(HVPREFIX) and
3564
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3565
          val = i_hv.get(field[len(HVPREFIX):], None)
3566
        elif field == "beparams":
3567
          val = i_be
3568
        elif (field.startswith(BEPREFIX) and
3569
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3570
          val = i_be.get(field[len(BEPREFIX):], None)
3571
        elif st_match and st_match.groups():
3572
          # matches a variable list
3573
          st_groups = st_match.groups()
3574
          if st_groups and st_groups[0] == "disk":
3575
            if st_groups[1] == "count":
3576
              val = len(instance.disks)
3577
            elif st_groups[1] == "sizes":
3578
              val = [disk.size for disk in instance.disks]
3579
            elif st_groups[1] == "size":
3580
              try:
3581
                val = instance.FindDisk(st_groups[2]).size
3582
              except errors.OpPrereqError:
3583
                val = None
3584
            else:
3585
              assert False, "Unhandled disk parameter"
3586
          elif st_groups[0] == "nic":
3587
            if st_groups[1] == "count":
3588
              val = len(instance.nics)
3589
            elif st_groups[1] == "macs":
3590
              val = [nic.mac for nic in instance.nics]
3591
            elif st_groups[1] == "ips":
3592
              val = [nic.ip for nic in instance.nics]
3593
            elif st_groups[1] == "modes":
3594
              val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
3595
            elif st_groups[1] == "links":
3596
              val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
3597
            elif st_groups[1] == "bridges":
3598
              val = []
3599
              for nicp in i_nicp:
3600
                if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3601
                  val.append(nicp[constants.NIC_LINK])
3602
                else:
3603
                  val.append(None)
3604
            else:
3605
              # index-based item
3606
              nic_idx = int(st_groups[2])
3607
              if nic_idx >= len(instance.nics):
3608
                val = None
3609
              else:
3610
                if st_groups[1] == "mac":
3611
                  val = instance.nics[nic_idx].mac
3612
                elif st_groups[1] == "ip":
3613
                  val = instance.nics[nic_idx].ip
3614
                elif st_groups[1] == "mode":
3615
                  val = i_nicp[nic_idx][constants.NIC_MODE]
3616
                elif st_groups[1] == "link":
3617
                  val = i_nicp[nic_idx][constants.NIC_LINK]
3618
                elif st_groups[1] == "bridge":
3619
                  nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
3620
                  if nic_mode == constants.NIC_MODE_BRIDGED:
3621
                    val = i_nicp[nic_idx][constants.NIC_LINK]
3622
                  else:
3623
                    val = None
3624
                else:
3625
                  assert False, "Unhandled NIC parameter"
3626
          else:
3627
            assert False, "Unhandled variable parameter"
3628
        else:
3629
          raise errors.ParameterError(field)
3630
        iout.append(val)
3631
      output.append(iout)
3632

    
3633
    return output
3634

    
3635

    
3636
class LUFailoverInstance(LogicalUnit):
3637
  """Failover an instance.
3638

3639
  """
3640
  HPATH = "instance-failover"
3641
  HTYPE = constants.HTYPE_INSTANCE
3642
  _OP_REQP = ["instance_name", "ignore_consistency"]
3643
  REQ_BGL = False
3644

    
3645
  def ExpandNames(self):
3646
    self._ExpandAndLockInstance()
3647
    self.needed_locks[locking.LEVEL_NODE] = []
3648
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3649

    
3650
  def DeclareLocks(self, level):
3651
    if level == locking.LEVEL_NODE:
3652
      self._LockInstancesNodes()
3653

    
3654
  def BuildHooksEnv(self):
3655
    """Build hooks env.
3656

3657
    This runs on master, primary and secondary nodes of the instance.
3658

3659
    """
3660
    env = {
3661
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3662
      }
3663
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3664
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3665
    return env, nl, nl
3666

    
3667
  def CheckPrereq(self):
3668
    """Check prerequisites.
3669

3670
    This checks that the instance is in the cluster.
3671

3672
    """
3673
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3674
    assert self.instance is not None, \
3675
      "Cannot retrieve locked instance %s" % self.op.instance_name
3676

    
3677
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3678
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3679
      raise errors.OpPrereqError("Instance's disk layout is not"
3680
                                 " network mirrored, cannot failover.")
3681

    
3682
    secondary_nodes = instance.secondary_nodes
3683
    if not secondary_nodes:
3684
      raise errors.ProgrammerError("no secondary node but using "
3685
                                   "a mirrored disk template")
3686

    
3687
    target_node = secondary_nodes[0]
3688
    _CheckNodeOnline(self, target_node)
3689
    _CheckNodeNotDrained(self, target_node)
3690
    if instance.admin_up:
3691
      # check memory requirements on the secondary node
3692
      _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3693
                           instance.name, bep[constants.BE_MEMORY],
3694
                           instance.hypervisor)
3695
    else:
3696
      self.LogInfo("Not checking memory on the secondary node as"
3697
                   " instance will not be started")
3698

    
3699
    # check bridge existance
3700
    _CheckInstanceBridgesExist(self, instance, node=target_node)
3701

    
3702
  def Exec(self, feedback_fn):
3703
    """Failover an instance.
3704

3705
    The failover is done by shutting it down on its present node and
3706
    starting it on the secondary.
3707

3708
    """
3709
    instance = self.instance
3710

    
3711
    source_node = instance.primary_node
3712
    target_node = instance.secondary_nodes[0]
3713

    
3714
    feedback_fn("* checking disk consistency between source and target")
3715
    for dev in instance.disks:
3716
      # for drbd, these are drbd over lvm
3717
      if not _CheckDiskConsistency(self, dev, target_node, False):
3718
        if instance.admin_up and not self.op.ignore_consistency:
3719
          raise errors.OpExecError("Disk %s is degraded on target node,"
3720
                                   " aborting failover." % dev.iv_name)
3721

    
3722
    feedback_fn("* shutting down instance on source node")
3723
    logging.info("Shutting down instance %s on node %s",
3724
                 instance.name, source_node)
3725

    
3726
    result = self.rpc.call_instance_shutdown(source_node, instance)
3727
    msg = result.fail_msg
3728
    if msg:
3729
      if self.op.ignore_consistency:
3730
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3731
                             " Proceeding anyway. Please make sure node"
3732
                             " %s is down. Error details: %s",
3733
                             instance.name, source_node, source_node, msg)
3734
      else:
3735
        raise errors.OpExecError("Could not shutdown instance %s on"
3736
                                 " node %s: %s" %
3737
                                 (instance.name, source_node, msg))
3738

    
3739
    feedback_fn("* deactivating the instance's disks on source node")
3740
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3741
      raise errors.OpExecError("Can't shut down the instance's disks.")
3742

    
3743
    instance.primary_node = target_node
3744
    # distribute new instance config to the other nodes
3745
    self.cfg.Update(instance)
3746

    
3747
    # Only start the instance if it's marked as up
3748
    if instance.admin_up:
3749
      feedback_fn("* activating the instance's disks on target node")
3750
      logging.info("Starting instance %s on node %s",
3751
                   instance.name, target_node)
3752

    
3753
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3754
                                               ignore_secondaries=True)
3755
      if not disks_ok:
3756
        _ShutdownInstanceDisks(self, instance)
3757
        raise errors.OpExecError("Can't activate the instance's disks")
3758

    
3759
      feedback_fn("* starting the instance on the target node")
3760
      result = self.rpc.call_instance_start(target_node, instance, None, None)
3761
      msg = result.fail_msg
3762
      if msg:
3763
        _ShutdownInstanceDisks(self, instance)
3764
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3765
                                 (instance.name, target_node, msg))
3766

    
3767

    
3768
class LUMigrateInstance(LogicalUnit):
3769
  """Migrate an instance.
3770

3771
  This is migration without shutting down, compared to the failover,
3772
  which is done with shutdown.
3773

3774
  """
3775
  HPATH = "instance-migrate"
3776
  HTYPE = constants.HTYPE_INSTANCE
3777
  _OP_REQP = ["instance_name", "live", "cleanup"]
3778

    
3779
  REQ_BGL = False
3780

    
3781
  def ExpandNames(self):
3782
    self._ExpandAndLockInstance()
3783
    self.needed_locks[locking.LEVEL_NODE] = []
3784
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3785

    
3786
  def DeclareLocks(self, level):
3787
    if level == locking.LEVEL_NODE:
3788
      self._LockInstancesNodes()
3789

    
3790
  def BuildHooksEnv(self):
3791
    """Build hooks env.
3792

3793
    This runs on master, primary and secondary nodes of the instance.
3794

3795
    """
3796
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3797
    env["MIGRATE_LIVE"] = self.op.live
3798
    env["MIGRATE_CLEANUP"] = self.op.cleanup
3799
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3800
    return env, nl, nl
3801

    
3802
  def CheckPrereq(self):
3803
    """Check prerequisites.
3804

3805
    This checks that the instance is in the cluster.
3806

3807
    """
3808
    instance = self.cfg.GetInstanceInfo(
3809
      self.cfg.ExpandInstanceName(self.op.instance_name))
3810
    if instance is None:
3811
      raise errors.OpPrereqError("Instance '%s' not known" %
3812
                                 self.op.instance_name)
3813

    
3814
    if instance.disk_template != constants.DT_DRBD8:
3815
      raise errors.OpPrereqError("Instance's disk layout is not"
3816
                                 " drbd8, cannot migrate.")
3817

    
3818
    secondary_nodes = instance.secondary_nodes
3819
    if not secondary_nodes:
3820
      raise errors.ConfigurationError("No secondary node but using"
3821
                                      " drbd8 disk template")
3822

    
3823
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3824

    
3825
    target_node = secondary_nodes[0]
3826
    # check memory requirements on the secondary node
3827
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3828
                         instance.name, i_be[constants.BE_MEMORY],
3829
                         instance.hypervisor)
3830

    
3831
    # check bridge existance
3832
    _CheckInstanceBridgesExist(self, instance, node=target_node)
3833

    
3834
    if not self.op.cleanup:
3835
      _CheckNodeNotDrained(self, target_node)
3836
      result = self.rpc.call_instance_migratable(instance.primary_node,
3837
                                                 instance)
3838
      result.Raise("Can't migrate, please use failover", prereq=True)
3839

    
3840
    self.instance = instance
3841

    
3842
  def _WaitUntilSync(self):
3843
    """Poll with custom rpc for disk sync.
3844

3845
    This uses our own step-based rpc call.
3846

3847
    """
3848
    self.feedback_fn("* wait until resync is done")
3849
    all_done = False
3850
    while not all_done:
3851
      all_done = True
3852
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3853
                                            self.nodes_ip,
3854
                                            self.instance.disks)
3855
      min_percent = 100
3856
      for node, nres in result.items():
3857
        nres.Raise("Cannot resync disks on node %s" % node)
3858
        node_done, node_percent = nres.payload
3859
        all_done = all_done and node_done
3860
        if node_percent is not None:
3861
          min_percent = min(min_percent, node_percent)
3862
      if not all_done:
3863
        if min_percent < 100:
3864
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3865
        time.sleep(2)
3866

    
3867
  def _EnsureSecondary(self, node):
3868
    """Demote a node to secondary.
3869

3870
    """
3871
    self.feedback_fn("* switching node %s to secondary mode" % node)
3872

    
3873
    for dev in self.instance.disks:
3874
      self.cfg.SetDiskID(dev, node)
3875

    
3876
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3877
                                          self.instance.disks)
3878
    result.Raise("Cannot change disk to secondary on node %s" % node)
3879

    
3880
  def _GoStandalone(self):
3881
    """Disconnect from the network.
3882

3883
    """
3884
    self.feedback_fn("* changing into standalone mode")
3885
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3886
                                               self.instance.disks)
3887
    for node, nres in result.items():
3888
      nres.Raise("Cannot disconnect disks node %s" % node)
3889

    
3890
  def _GoReconnect(self, multimaster):
3891
    """Reconnect to the network.
3892

3893
    """
3894
    if multimaster:
3895
      msg = "dual-master"
3896
    else:
3897
      msg = "single-master"
3898
    self.feedback_fn("* changing disks into %s mode" % msg)
3899
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3900
                                           self.instance.disks,
3901
                                           self.instance.name, multimaster)
3902
    for node, nres in result.items():
3903
      nres.Raise("Cannot change disks config on node %s" % node)
3904

    
3905
  def _ExecCleanup(self):
3906
    """Try to cleanup after a failed migration.
3907

3908
    The cleanup is done by:
3909
      - check that the instance is running only on one node
3910
        (and update the config if needed)
3911
      - change disks on its secondary node to secondary
3912
      - wait until disks are fully synchronized
3913
      - disconnect from the network
3914
      - change disks into single-master mode
3915
      - wait again until disks are fully synchronized
3916

3917
    """
3918
    instance = self.instance
3919
    target_node = self.target_node
3920
    source_node = self.source_node
3921

    
3922
    # check running on only one node
3923
    self.feedback_fn("* checking where the instance actually runs"
3924
                     " (if this hangs, the hypervisor might be in"
3925
                     " a bad state)")
3926
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3927
    for node, result in ins_l.items():
3928
      result.Raise("Can't contact node %s" % node)
3929

    
3930
    runningon_source = instance.name in ins_l[source_node].payload
3931
    runningon_target = instance.name in ins_l[target_node].payload
3932

    
3933
    if runningon_source and runningon_target:
3934
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3935
                               " or the hypervisor is confused. You will have"
3936
                               " to ensure manually that it runs only on one"
3937
                               " and restart this operation.")
3938

    
3939
    if not (runningon_source or runningon_target):
3940
      raise errors.OpExecError("Instance does not seem to be running at all."
3941
                               " In this case, it's safer to repair by"
3942
                               " running 'gnt-instance stop' to ensure disk"
3943
                               " shutdown, and then restarting it.")
3944

    
3945
    if runningon_target:
3946
      # the migration has actually succeeded, we need to update the config
3947
      self.feedback_fn("* instance running on secondary node (%s),"
3948
                       " updating config" % target_node)
3949
      instance.primary_node = target_node
3950
      self.cfg.Update(instance)
3951
      demoted_node = source_node
3952
    else:
3953
      self.feedback_fn("* instance confirmed to be running on its"
3954
                       " primary node (%s)" % source_node)
3955
      demoted_node = target_node
3956

    
3957
    self._EnsureSecondary(demoted_node)
3958
    try:
3959
      self._WaitUntilSync()
3960
    except errors.OpExecError:
3961
      # we ignore here errors, since if the device is standalone, it
3962
      # won't be able to sync
3963
      pass
3964
    self._GoStandalone()
3965
    self._GoReconnect(False)
3966
    self._WaitUntilSync()
3967

    
3968
    self.feedback_fn("* done")
3969

    
3970
  def _RevertDiskStatus(self):
3971
    """Try to revert the disk status after a failed migration.
3972

3973
    """
3974
    target_node = self.target_node
3975
    try:
3976
      self._EnsureSecondary(target_node)
3977
      self._GoStandalone()
3978
      self._GoReconnect(False)
3979
      self._WaitUntilSync()
3980
    except errors.OpExecError, err:
3981
      self.LogWarning("Migration failed and I can't reconnect the"
3982
                      " drives: error '%s'\n"
3983
                      "Please look and recover the instance status" %
3984
                      str(err))
3985

    
3986
  def _AbortMigration(self):
3987
    """Call the hypervisor code to abort a started migration.
3988