Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 6f68a739

History | View | Annotate | Download (253 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import time
29
import tempfile
30
import re
31
import platform
32
import logging
33
import copy
34
import random
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45
from ganeti import ssconf
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_BGL = True
67

    
68
  def __init__(self, processor, op, context, rpc):
69
    """Constructor for LogicalUnit.
70

71
    This needs to be overriden in derived classes in order to check op
72
    validity.
73

74
    """
75
    self.proc = processor
76
    self.op = op
77
    self.cfg = context.cfg
78
    self.context = context
79
    self.rpc = rpc
80
    # Dicts used to declare locking needs to mcpu
81
    self.needed_locks = None
82
    self.acquired_locks = {}
83
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84
    self.add_locks = {}
85
    self.remove_locks = {}
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89
    # logging
90
    self.LogWarning = processor.LogWarning
91
    self.LogInfo = processor.LogInfo
92

    
93
    for attr_name in self._OP_REQP:
94
      attr_val = getattr(op, attr_name, None)
95
      if attr_val is None:
96
        raise errors.OpPrereqError("Required parameter '%s' missing" %
97
                                   attr_name)
98
    self.CheckArguments()
99

    
100
  def __GetSSH(self):
101
    """Returns the SshRunner object
102

103
    """
104
    if not self.__ssh:
105
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106
    return self.__ssh
107

    
108
  ssh = property(fget=__GetSSH)
109

    
110
  def CheckArguments(self):
111
    """Check syntactic validity for the opcode arguments.
112

113
    This method is for doing a simple syntactic check and ensure
114
    validity of opcode parameters, without any cluster-related
115
    checks. While the same can be accomplished in ExpandNames and/or
116
    CheckPrereq, doing these separate is better because:
117

118
      - ExpandNames is left as as purely a lock-related function
119
      - CheckPrereq is run after we have aquired locks (and possible
120
        waited for them)
121

122
    The function is allowed to change the self.op attribute so that
123
    later methods can no longer worry about missing parameters.
124

125
    """
126
    pass
127

    
128
  def ExpandNames(self):
129
    """Expand names for this LU.
130

131
    This method is called before starting to execute the opcode, and it should
132
    update all the parameters of the opcode to their canonical form (e.g. a
133
    short node name must be fully expanded after this method has successfully
134
    completed). This way locking, hooks, logging, ecc. can work correctly.
135

136
    LUs which implement this method must also populate the self.needed_locks
137
    member, as a dict with lock levels as keys, and a list of needed lock names
138
    as values. Rules:
139

140
      - use an empty dict if you don't need any lock
141
      - if you don't need any lock at a particular level omit that level
142
      - don't put anything for the BGL level
143
      - if you want all locks at a level use locking.ALL_SET as a value
144

145
    If you need to share locks (rather than acquire them exclusively) at one
146
    level you can modify self.share_locks, setting a true value (usually 1) for
147
    that level. By default locks are not shared.
148

149
    Examples::
150

151
      # Acquire all nodes and one instance
152
      self.needed_locks = {
153
        locking.LEVEL_NODE: locking.ALL_SET,
154
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155
      }
156
      # Acquire just two nodes
157
      self.needed_locks = {
158
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159
      }
160
      # Acquire no locks
161
      self.needed_locks = {} # No, you can't leave it to the default value None
162

163
    """
164
    # The implementation of this method is mandatory only if the new LU is
165
    # concurrent, so that old LUs don't need to be changed all at the same
166
    # time.
167
    if self.REQ_BGL:
168
      self.needed_locks = {} # Exclusive LUs don't need locks.
169
    else:
170
      raise NotImplementedError
171

    
172
  def DeclareLocks(self, level):
173
    """Declare LU locking needs for a level
174

175
    While most LUs can just declare their locking needs at ExpandNames time,
176
    sometimes there's the need to calculate some locks after having acquired
177
    the ones before. This function is called just before acquiring locks at a
178
    particular level, but after acquiring the ones at lower levels, and permits
179
    such calculations. It can be used to modify self.needed_locks, and by
180
    default it does nothing.
181

182
    This function is only called if you have something already set in
183
    self.needed_locks for the level.
184

185
    @param level: Locking level which is going to be locked
186
    @type level: member of ganeti.locking.LEVELS
187

188
    """
189

    
190
  def CheckPrereq(self):
191
    """Check prerequisites for this LU.
192

193
    This method should check that the prerequisites for the execution
194
    of this LU are fulfilled. It can do internode communication, but
195
    it should be idempotent - no cluster or system changes are
196
    allowed.
197

198
    The method should raise errors.OpPrereqError in case something is
199
    not fulfilled. Its return value is ignored.
200

201
    This method should also update all the parameters of the opcode to
202
    their canonical form if it hasn't been done by ExpandNames before.
203

204
    """
205
    raise NotImplementedError
206

    
207
  def Exec(self, feedback_fn):
208
    """Execute the LU.
209

210
    This method should implement the actual work. It should raise
211
    errors.OpExecError for failures that are somewhat dealt with in
212
    code, or expected.
213

214
    """
215
    raise NotImplementedError
216

    
217
  def BuildHooksEnv(self):
218
    """Build hooks environment for this LU.
219

220
    This method should return a three-node tuple consisting of: a dict
221
    containing the environment that will be used for running the
222
    specific hook for this LU, a list of node names on which the hook
223
    should run before the execution, and a list of node names on which
224
    the hook should run after the execution.
225

226
    The keys of the dict must not have 'GANETI_' prefixed as this will
227
    be handled in the hooks runner. Also note additional keys will be
228
    added by the hooks runner. If the LU doesn't define any
229
    environment, an empty dict (and not None) should be returned.
230

231
    No nodes should be returned as an empty list (and not None).
232

233
    Note that if the HPATH for a LU class is None, this function will
234
    not be called.
235

236
    """
237
    raise NotImplementedError
238

    
239
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240
    """Notify the LU about the results of its hooks.
241

242
    This method is called every time a hooks phase is executed, and notifies
243
    the Logical Unit about the hooks' result. The LU can then use it to alter
244
    its result based on the hooks.  By default the method does nothing and the
245
    previous result is passed back unchanged but any LU can define it if it
246
    wants to use the local cluster hook-scripts somehow.
247

248
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
249
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250
    @param hook_results: the results of the multi-node hooks rpc call
251
    @param feedback_fn: function used send feedback back to the caller
252
    @param lu_result: the previous Exec result this LU had, or None
253
        in the PRE phase
254
    @return: the new Exec result, based on the previous result
255
        and hook results
256

257
    """
258
    return lu_result
259

    
260
  def _ExpandAndLockInstance(self):
261
    """Helper function to expand and lock an instance.
262

263
    Many LUs that work on an instance take its name in self.op.instance_name
264
    and need to expand it and then declare the expanded name for locking. This
265
    function does it, and then updates self.op.instance_name to the expanded
266
    name. It also initializes needed_locks as a dict, if this hasn't been done
267
    before.
268

269
    """
270
    if self.needed_locks is None:
271
      self.needed_locks = {}
272
    else:
273
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274
        "_ExpandAndLockInstance called with instance-level locks set"
275
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276
    if expanded_name is None:
277
      raise errors.OpPrereqError("Instance '%s' not known" %
278
                                  self.op.instance_name)
279
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280
    self.op.instance_name = expanded_name
281

    
282
  def _LockInstancesNodes(self, primary_only=False):
283
    """Helper function to declare instances' nodes for locking.
284

285
    This function should be called after locking one or more instances to lock
286
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287
    with all primary or secondary nodes for instances already locked and
288
    present in self.needed_locks[locking.LEVEL_INSTANCE].
289

290
    It should be called from DeclareLocks, and for safety only works if
291
    self.recalculate_locks[locking.LEVEL_NODE] is set.
292

293
    In the future it may grow parameters to just lock some instance's nodes, or
294
    to just lock primaries or secondary nodes, if needed.
295

296
    If should be called in DeclareLocks in a way similar to::
297

298
      if level == locking.LEVEL_NODE:
299
        self._LockInstancesNodes()
300

301
    @type primary_only: boolean
302
    @param primary_only: only lock primary nodes of locked instances
303

304
    """
305
    assert locking.LEVEL_NODE in self.recalculate_locks, \
306
      "_LockInstancesNodes helper function called with no nodes to recalculate"
307

    
308
    # TODO: check if we're really been called with the instance locks held
309

    
310
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311
    # future we might want to have different behaviors depending on the value
312
    # of self.recalculate_locks[locking.LEVEL_NODE]
313
    wanted_nodes = []
314
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315
      instance = self.context.cfg.GetInstanceInfo(instance_name)
316
      wanted_nodes.append(instance.primary_node)
317
      if not primary_only:
318
        wanted_nodes.extend(instance.secondary_nodes)
319

    
320
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324

    
325
    del self.recalculate_locks[locking.LEVEL_NODE]
326

    
327

    
328
class NoHooksLU(LogicalUnit):
329
  """Simple LU which runs no hooks.
330

331
  This LU is intended as a parent for other LogicalUnits which will
332
  run no hooks, in order to reduce duplicate code.
333

334
  """
335
  HPATH = None
336
  HTYPE = None
337

    
338

    
339
def _GetWantedNodes(lu, nodes):
340
  """Returns list of checked and expanded node names.
341

342
  @type lu: L{LogicalUnit}
343
  @param lu: the logical unit on whose behalf we execute
344
  @type nodes: list
345
  @param nodes: list of node names or None for all nodes
346
  @rtype: list
347
  @return: the list of nodes, sorted
348
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349

350
  """
351
  if not isinstance(nodes, list):
352
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
353

    
354
  if not nodes:
355
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356
      " non-empty list of nodes whose name is to be expanded.")
357

    
358
  wanted = []
359
  for name in nodes:
360
    node = lu.cfg.ExpandNodeName(name)
361
    if node is None:
362
      raise errors.OpPrereqError("No such node name '%s'" % name)
363
    wanted.append(node)
364

    
365
  return utils.NiceSort(wanted)
366

    
367

    
368
def _GetWantedInstances(lu, instances):
369
  """Returns list of checked and expanded instance names.
370

371
  @type lu: L{LogicalUnit}
372
  @param lu: the logical unit on whose behalf we execute
373
  @type instances: list
374
  @param instances: list of instance names or None for all instances
375
  @rtype: list
376
  @return: the list of instances, sorted
377
  @raise errors.OpPrereqError: if the instances parameter is wrong type
378
  @raise errors.OpPrereqError: if any of the passed instances is not found
379

380
  """
381
  if not isinstance(instances, list):
382
    raise errors.OpPrereqError("Invalid argument type 'instances'")
383

    
384
  if instances:
385
    wanted = []
386

    
387
    for name in instances:
388
      instance = lu.cfg.ExpandInstanceName(name)
389
      if instance is None:
390
        raise errors.OpPrereqError("No such instance name '%s'" % name)
391
      wanted.append(instance)
392

    
393
  else:
394
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395
  return wanted
396

    
397

    
398
def _CheckOutputFields(static, dynamic, selected):
399
  """Checks whether all selected fields are valid.
400

401
  @type static: L{utils.FieldSet}
402
  @param static: static fields set
403
  @type dynamic: L{utils.FieldSet}
404
  @param dynamic: dynamic fields set
405

406
  """
407
  f = utils.FieldSet()
408
  f.Extend(static)
409
  f.Extend(dynamic)
410

    
411
  delta = f.NonMatching(selected)
412
  if delta:
413
    raise errors.OpPrereqError("Unknown output fields selected: %s"
414
                               % ",".join(delta))
415

    
416

    
417
def _CheckBooleanOpField(op, name):
418
  """Validates boolean opcode parameters.
419

420
  This will ensure that an opcode parameter is either a boolean value,
421
  or None (but that it always exists).
422

423
  """
424
  val = getattr(op, name, None)
425
  if not (val is None or isinstance(val, bool)):
426
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427
                               (name, str(val)))
428
  setattr(op, name, val)
429

    
430

    
431
def _CheckNodeOnline(lu, node):
432
  """Ensure that a given node is online.
433

434
  @param lu: the LU on behalf of which we make the check
435
  @param node: the node to check
436
  @raise errors.OpPrereqError: if the node is offline
437

438
  """
439
  if lu.cfg.GetNodeInfo(node).offline:
440
    raise errors.OpPrereqError("Can't use offline node %s" % node)
441

    
442

    
443
def _CheckNodeNotDrained(lu, node):
444
  """Ensure that a given node is not drained.
445

446
  @param lu: the LU on behalf of which we make the check
447
  @param node: the node to check
448
  @raise errors.OpPrereqError: if the node is drained
449

450
  """
451
  if lu.cfg.GetNodeInfo(node).drained:
452
    raise errors.OpPrereqError("Can't use drained node %s" % node)
453

    
454

    
455
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456
                          memory, vcpus, nics, disk_template, disks):
457
  """Builds instance related env variables for hooks
458

459
  This builds the hook environment from individual variables.
460

461
  @type name: string
462
  @param name: the name of the instance
463
  @type primary_node: string
464
  @param primary_node: the name of the instance's primary node
465
  @type secondary_nodes: list
466
  @param secondary_nodes: list of secondary nodes as strings
467
  @type os_type: string
468
  @param os_type: the name of the instance's OS
469
  @type status: boolean
470
  @param status: the should_run status of the instance
471
  @type memory: string
472
  @param memory: the memory size of the instance
473
  @type vcpus: string
474
  @param vcpus: the count of VCPUs the instance has
475
  @type nics: list
476
  @param nics: list of tuples (ip, bridge, mac) representing
477
      the NICs the instance  has
478
  @type disk_template: string
479
  @param disk_template: the distk template of the instance
480
  @type disks: list
481
  @param disks: the list of (size, mode) pairs
482
  @rtype: dict
483
  @return: the hook environment for this instance
484

485
  """
486
  if status:
487
    str_status = "up"
488
  else:
489
    str_status = "down"
490
  env = {
491
    "OP_TARGET": name,
492
    "INSTANCE_NAME": name,
493
    "INSTANCE_PRIMARY": primary_node,
494
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495
    "INSTANCE_OS_TYPE": os_type,
496
    "INSTANCE_STATUS": str_status,
497
    "INSTANCE_MEMORY": memory,
498
    "INSTANCE_VCPUS": vcpus,
499
    "INSTANCE_DISK_TEMPLATE": disk_template,
500
  }
501

    
502
  if nics:
503
    nic_count = len(nics)
504
    for idx, (ip, mac, mode, link) in enumerate(nics):
505
      if ip is None:
506
        ip = ""
507
      env["INSTANCE_NIC%d_IP" % idx] = ip
508
      env["INSTANCE_NIC%d_MAC" % idx] = mac
509
      env["INSTANCE_NIC%d_MODE" % idx] = mode
510
      env["INSTANCE_NIC%d_LINK" % idx] = link
511
      if mode == constants.NIC_MODE_BRIDGED:
512
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
513
  else:
514
    nic_count = 0
515

    
516
  env["INSTANCE_NIC_COUNT"] = nic_count
517

    
518
  if disks:
519
    disk_count = len(disks)
520
    for idx, (size, mode) in enumerate(disks):
521
      env["INSTANCE_DISK%d_SIZE" % idx] = size
522
      env["INSTANCE_DISK%d_MODE" % idx] = mode
523
  else:
524
    disk_count = 0
525

    
526
  env["INSTANCE_DISK_COUNT"] = disk_count
527

    
528
  return env
529

    
530
def _PreBuildNICHooksList(lu, nics):
531
  """Build a list of nic information tuples.
532

533
  This list is suitable to be passed to _BuildInstanceHookEnv.
534

535
  @type lu:  L{LogicalUnit}
536
  @param lu: the logical unit on whose behalf we execute
537
  @type nics: list of L{objects.NIC}
538
  @param nics: list of nics to convert to hooks tuples
539

540
  """
541
  hooks_nics = []
542
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
543
  for nic in nics:
544
    ip = nic.ip
545
    mac = nic.mac
546
    filled_params = objects.FillDict(c_nicparams, nic.nicparams)
547
    mode = filled_params[constants.NIC_MODE]
548
    link = filled_params[constants.NIC_LINK]
549
    hooks_nics.append((ip, mac, mode, link))
550
  return hooks_nics
551

    
552
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
553
  """Builds instance related env variables for hooks from an object.
554

555
  @type lu: L{LogicalUnit}
556
  @param lu: the logical unit on whose behalf we execute
557
  @type instance: L{objects.Instance}
558
  @param instance: the instance for which we should build the
559
      environment
560
  @type override: dict
561
  @param override: dictionary with key/values that will override
562
      our values
563
  @rtype: dict
564
  @return: the hook environment dictionary
565

566
  """
567
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
568
  args = {
569
    'name': instance.name,
570
    'primary_node': instance.primary_node,
571
    'secondary_nodes': instance.secondary_nodes,
572
    'os_type': instance.os,
573
    'status': instance.admin_up,
574
    'memory': bep[constants.BE_MEMORY],
575
    'vcpus': bep[constants.BE_VCPUS],
576
    'nics': _PreBuildNICHooksList(lu, instance.nics),
577
    'disk_template': instance.disk_template,
578
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
579
  }
580
  if override:
581
    args.update(override)
582
  return _BuildInstanceHookEnv(**args)
583

    
584

    
585
def _AdjustCandidatePool(lu):
586
  """Adjust the candidate pool after node operations.
587

588
  """
589
  mod_list = lu.cfg.MaintainCandidatePool()
590
  if mod_list:
591
    lu.LogInfo("Promoted nodes to master candidate role: %s",
592
               ", ".join(node.name for node in mod_list))
593
    for name in mod_list:
594
      lu.context.ReaddNode(name)
595
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
596
  if mc_now > mc_max:
597
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
598
               (mc_now, mc_max))
599

    
600

    
601
def _CheckNicsBridgesExist(lu, target_nics, target_node,
602
                               profile=constants.PP_DEFAULT):
603
  """Check that the brigdes needed by a list of nics exist.
604

605
  """
606
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
607
  paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
608
                for nic in target_nics]
609
  brlist = [params[constants.NIC_LINK] for params in paramslist
610
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
611
  if brlist:
612
    result = lu.rpc.call_bridges_exist(target_node, brlist)
613
    msg = result.RemoteFailMsg()
614
    if msg:
615
      raise errors.OpPrereqError("Error checking bridges on destination node"
616
                                 " '%s': %s" % (target_node, msg))
617

    
618

    
619
def _CheckInstanceBridgesExist(lu, instance, node=None):
620
  """Check that the brigdes needed by an instance exist.
621

622
  """
623
  if node is None:
624
    node=instance.primary_node
625
  _CheckNicsBridgesExist(lu, instance.nics, node)
626

    
627

    
628
class LUDestroyCluster(NoHooksLU):
629
  """Logical unit for destroying the cluster.
630

631
  """
632
  _OP_REQP = []
633

    
634
  def CheckPrereq(self):
635
    """Check prerequisites.
636

637
    This checks whether the cluster is empty.
638

639
    Any errors are signalled by raising errors.OpPrereqError.
640

641
    """
642
    master = self.cfg.GetMasterNode()
643

    
644
    nodelist = self.cfg.GetNodeList()
645
    if len(nodelist) != 1 or nodelist[0] != master:
646
      raise errors.OpPrereqError("There are still %d node(s) in"
647
                                 " this cluster." % (len(nodelist) - 1))
648
    instancelist = self.cfg.GetInstanceList()
649
    if instancelist:
650
      raise errors.OpPrereqError("There are still %d instance(s) in"
651
                                 " this cluster." % len(instancelist))
652

    
653
  def Exec(self, feedback_fn):
654
    """Destroys the cluster.
655

656
    """
657
    master = self.cfg.GetMasterNode()
658
    result = self.rpc.call_node_stop_master(master, False)
659
    result.Raise()
660
    if not result.data:
661
      raise errors.OpExecError("Could not disable the master role")
662
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
663
    utils.CreateBackup(priv_key)
664
    utils.CreateBackup(pub_key)
665
    return master
666

    
667

    
668
class LUVerifyCluster(LogicalUnit):
669
  """Verifies the cluster status.
670

671
  """
672
  HPATH = "cluster-verify"
673
  HTYPE = constants.HTYPE_CLUSTER
674
  _OP_REQP = ["skip_checks"]
675
  REQ_BGL = False
676

    
677
  def ExpandNames(self):
678
    self.needed_locks = {
679
      locking.LEVEL_NODE: locking.ALL_SET,
680
      locking.LEVEL_INSTANCE: locking.ALL_SET,
681
    }
682
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
683

    
684
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
685
                  node_result, feedback_fn, master_files,
686
                  drbd_map, vg_name):
687
    """Run multiple tests against a node.
688

689
    Test list:
690

691
      - compares ganeti version
692
      - checks vg existance and size > 20G
693
      - checks config file checksum
694
      - checks ssh to other nodes
695

696
    @type nodeinfo: L{objects.Node}
697
    @param nodeinfo: the node to check
698
    @param file_list: required list of files
699
    @param local_cksum: dictionary of local files and their checksums
700
    @param node_result: the results from the node
701
    @param feedback_fn: function used to accumulate results
702
    @param master_files: list of files that only masters should have
703
    @param drbd_map: the useddrbd minors for this node, in
704
        form of minor: (instance, must_exist) which correspond to instances
705
        and their running status
706
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
707

708
    """
709
    node = nodeinfo.name
710

    
711
    # main result, node_result should be a non-empty dict
712
    if not node_result or not isinstance(node_result, dict):
713
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
714
      return True
715

    
716
    # compares ganeti version
717
    local_version = constants.PROTOCOL_VERSION
718
    remote_version = node_result.get('version', None)
719
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
720
            len(remote_version) == 2):
721
      feedback_fn("  - ERROR: connection to %s failed" % (node))
722
      return True
723

    
724
    if local_version != remote_version[0]:
725
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
726
                  " node %s %s" % (local_version, node, remote_version[0]))
727
      return True
728

    
729
    # node seems compatible, we can actually try to look into its results
730

    
731
    bad = False
732

    
733
    # full package version
734
    if constants.RELEASE_VERSION != remote_version[1]:
735
      feedback_fn("  - WARNING: software version mismatch: master %s,"
736
                  " node %s %s" %
737
                  (constants.RELEASE_VERSION, node, remote_version[1]))
738

    
739
    # checks vg existence and size > 20G
740
    if vg_name is not None:
741
      vglist = node_result.get(constants.NV_VGLIST, None)
742
      if not vglist:
743
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
744
                        (node,))
745
        bad = True
746
      else:
747
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
748
                                              constants.MIN_VG_SIZE)
749
        if vgstatus:
750
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
751
          bad = True
752

    
753
    # checks config file checksum
754

    
755
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
756
    if not isinstance(remote_cksum, dict):
757
      bad = True
758
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
759
    else:
760
      for file_name in file_list:
761
        node_is_mc = nodeinfo.master_candidate
762
        must_have_file = file_name not in master_files
763
        if file_name not in remote_cksum:
764
          if node_is_mc or must_have_file:
765
            bad = True
766
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
767
        elif remote_cksum[file_name] != local_cksum[file_name]:
768
          if node_is_mc or must_have_file:
769
            bad = True
770
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
771
          else:
772
            # not candidate and this is not a must-have file
773
            bad = True
774
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
775
                        " '%s'" % file_name)
776
        else:
777
          # all good, except non-master/non-must have combination
778
          if not node_is_mc and not must_have_file:
779
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
780
                        " candidates" % file_name)
781

    
782
    # checks ssh to any
783

    
784
    if constants.NV_NODELIST not in node_result:
785
      bad = True
786
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
787
    else:
788
      if node_result[constants.NV_NODELIST]:
789
        bad = True
790
        for node in node_result[constants.NV_NODELIST]:
791
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
792
                          (node, node_result[constants.NV_NODELIST][node]))
793

    
794
    if constants.NV_NODENETTEST not in node_result:
795
      bad = True
796
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
797
    else:
798
      if node_result[constants.NV_NODENETTEST]:
799
        bad = True
800
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
801
        for node in nlist:
802
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
803
                          (node, node_result[constants.NV_NODENETTEST][node]))
804

    
805
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
806
    if isinstance(hyp_result, dict):
807
      for hv_name, hv_result in hyp_result.iteritems():
808
        if hv_result is not None:
809
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
810
                      (hv_name, hv_result))
811

    
812
    # check used drbd list
813
    if vg_name is not None:
814
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
815
      if not isinstance(used_minors, (tuple, list)):
816
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
817
                    str(used_minors))
818
      else:
819
        for minor, (iname, must_exist) in drbd_map.items():
820
          if minor not in used_minors and must_exist:
821
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
822
                        " not active" % (minor, iname))
823
            bad = True
824
        for minor in used_minors:
825
          if minor not in drbd_map:
826
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
827
                        minor)
828
            bad = True
829

    
830
    return bad
831

    
832
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
833
                      node_instance, feedback_fn, n_offline):
834
    """Verify an instance.
835

836
    This function checks to see if the required block devices are
837
    available on the instance's node.
838

839
    """
840
    bad = False
841

    
842
    node_current = instanceconfig.primary_node
843

    
844
    node_vol_should = {}
845
    instanceconfig.MapLVsByNode(node_vol_should)
846

    
847
    for node in node_vol_should:
848
      if node in n_offline:
849
        # ignore missing volumes on offline nodes
850
        continue
851
      for volume in node_vol_should[node]:
852
        if node not in node_vol_is or volume not in node_vol_is[node]:
853
          feedback_fn("  - ERROR: volume %s missing on node %s" %
854
                          (volume, node))
855
          bad = True
856

    
857
    if instanceconfig.admin_up:
858
      if ((node_current not in node_instance or
859
          not instance in node_instance[node_current]) and
860
          node_current not in n_offline):
861
        feedback_fn("  - ERROR: instance %s not running on node %s" %
862
                        (instance, node_current))
863
        bad = True
864

    
865
    for node in node_instance:
866
      if (not node == node_current):
867
        if instance in node_instance[node]:
868
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
869
                          (instance, node))
870
          bad = True
871

    
872
    return bad
873

    
874
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
875
    """Verify if there are any unknown volumes in the cluster.
876

877
    The .os, .swap and backup volumes are ignored. All other volumes are
878
    reported as unknown.
879

880
    """
881
    bad = False
882

    
883
    for node in node_vol_is:
884
      for volume in node_vol_is[node]:
885
        if node not in node_vol_should or volume not in node_vol_should[node]:
886
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
887
                      (volume, node))
888
          bad = True
889
    return bad
890

    
891
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
892
    """Verify the list of running instances.
893

894
    This checks what instances are running but unknown to the cluster.
895

896
    """
897
    bad = False
898
    for node in node_instance:
899
      for runninginstance in node_instance[node]:
900
        if runninginstance not in instancelist:
901
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
902
                          (runninginstance, node))
903
          bad = True
904
    return bad
905

    
906
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
907
    """Verify N+1 Memory Resilience.
908

909
    Check that if one single node dies we can still start all the instances it
910
    was primary for.
911

912
    """
913
    bad = False
914

    
915
    for node, nodeinfo in node_info.iteritems():
916
      # This code checks that every node which is now listed as secondary has
917
      # enough memory to host all instances it is supposed to should a single
918
      # other node in the cluster fail.
919
      # FIXME: not ready for failover to an arbitrary node
920
      # FIXME: does not support file-backed instances
921
      # WARNING: we currently take into account down instances as well as up
922
      # ones, considering that even if they're down someone might want to start
923
      # them even in the event of a node failure.
924
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
925
        needed_mem = 0
926
        for instance in instances:
927
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
928
          if bep[constants.BE_AUTO_BALANCE]:
929
            needed_mem += bep[constants.BE_MEMORY]
930
        if nodeinfo['mfree'] < needed_mem:
931
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
932
                      " failovers should node %s fail" % (node, prinode))
933
          bad = True
934
    return bad
935

    
936
  def CheckPrereq(self):
937
    """Check prerequisites.
938

939
    Transform the list of checks we're going to skip into a set and check that
940
    all its members are valid.
941

942
    """
943
    self.skip_set = frozenset(self.op.skip_checks)
944
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
945
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
946

    
947
  def BuildHooksEnv(self):
948
    """Build hooks env.
949

950
    Cluster-Verify hooks just rone in the post phase and their failure makes
951
    the output be logged in the verify output and the verification to fail.
952

953
    """
954
    all_nodes = self.cfg.GetNodeList()
955
    env = {
956
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
957
      }
958
    for node in self.cfg.GetAllNodesInfo().values():
959
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
960

    
961
    return env, [], all_nodes
962

    
963
  def Exec(self, feedback_fn):
964
    """Verify integrity of cluster, performing various test on nodes.
965

966
    """
967
    bad = False
968
    feedback_fn("* Verifying global settings")
969
    for msg in self.cfg.VerifyConfig():
970
      feedback_fn("  - ERROR: %s" % msg)
971

    
972
    vg_name = self.cfg.GetVGName()
973
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
974
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
975
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
976
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
977
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
978
                        for iname in instancelist)
979
    i_non_redundant = [] # Non redundant instances
980
    i_non_a_balanced = [] # Non auto-balanced instances
981
    n_offline = [] # List of offline nodes
982
    n_drained = [] # List of nodes being drained
983
    node_volume = {}
984
    node_instance = {}
985
    node_info = {}
986
    instance_cfg = {}
987

    
988
    # FIXME: verify OS list
989
    # do local checksums
990
    master_files = [constants.CLUSTER_CONF_FILE]
991

    
992
    file_names = ssconf.SimpleStore().GetFileList()
993
    file_names.append(constants.SSL_CERT_FILE)
994
    file_names.append(constants.RAPI_CERT_FILE)
995
    file_names.extend(master_files)
996

    
997
    local_checksums = utils.FingerprintFiles(file_names)
998

    
999
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1000
    node_verify_param = {
1001
      constants.NV_FILELIST: file_names,
1002
      constants.NV_NODELIST: [node.name for node in nodeinfo
1003
                              if not node.offline],
1004
      constants.NV_HYPERVISOR: hypervisors,
1005
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1006
                                  node.secondary_ip) for node in nodeinfo
1007
                                 if not node.offline],
1008
      constants.NV_INSTANCELIST: hypervisors,
1009
      constants.NV_VERSION: None,
1010
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1011
      }
1012
    if vg_name is not None:
1013
      node_verify_param[constants.NV_VGLIST] = None
1014
      node_verify_param[constants.NV_LVLIST] = vg_name
1015
      node_verify_param[constants.NV_DRBDLIST] = None
1016
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1017
                                           self.cfg.GetClusterName())
1018

    
1019
    cluster = self.cfg.GetClusterInfo()
1020
    master_node = self.cfg.GetMasterNode()
1021
    all_drbd_map = self.cfg.ComputeDRBDMap()
1022

    
1023
    for node_i in nodeinfo:
1024
      node = node_i.name
1025

    
1026
      if node_i.offline:
1027
        feedback_fn("* Skipping offline node %s" % (node,))
1028
        n_offline.append(node)
1029
        continue
1030

    
1031
      if node == master_node:
1032
        ntype = "master"
1033
      elif node_i.master_candidate:
1034
        ntype = "master candidate"
1035
      elif node_i.drained:
1036
        ntype = "drained"
1037
        n_drained.append(node)
1038
      else:
1039
        ntype = "regular"
1040
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1041

    
1042
      msg = all_nvinfo[node].RemoteFailMsg()
1043
      if msg:
1044
        feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1045
        bad = True
1046
        continue
1047

    
1048
      nresult = all_nvinfo[node].payload
1049
      node_drbd = {}
1050
      for minor, instance in all_drbd_map[node].items():
1051
        if instance not in instanceinfo:
1052
          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1053
                      instance)
1054
          # ghost instance should not be running, but otherwise we
1055
          # don't give double warnings (both ghost instance and
1056
          # unallocated minor in use)
1057
          node_drbd[minor] = (instance, False)
1058
        else:
1059
          instance = instanceinfo[instance]
1060
          node_drbd[minor] = (instance.name, instance.admin_up)
1061
      result = self._VerifyNode(node_i, file_names, local_checksums,
1062
                                nresult, feedback_fn, master_files,
1063
                                node_drbd, vg_name)
1064
      bad = bad or result
1065

    
1066
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1067
      if vg_name is None:
1068
        node_volume[node] = {}
1069
      elif isinstance(lvdata, basestring):
1070
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1071
                    (node, utils.SafeEncode(lvdata)))
1072
        bad = True
1073
        node_volume[node] = {}
1074
      elif not isinstance(lvdata, dict):
1075
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1076
        bad = True
1077
        continue
1078
      else:
1079
        node_volume[node] = lvdata
1080

    
1081
      # node_instance
1082
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1083
      if not isinstance(idata, list):
1084
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1085
                    (node,))
1086
        bad = True
1087
        continue
1088

    
1089
      node_instance[node] = idata
1090

    
1091
      # node_info
1092
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1093
      if not isinstance(nodeinfo, dict):
1094
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1095
        bad = True
1096
        continue
1097

    
1098
      try:
1099
        node_info[node] = {
1100
          "mfree": int(nodeinfo['memory_free']),
1101
          "pinst": [],
1102
          "sinst": [],
1103
          # dictionary holding all instances this node is secondary for,
1104
          # grouped by their primary node. Each key is a cluster node, and each
1105
          # value is a list of instances which have the key as primary and the
1106
          # current node as secondary.  this is handy to calculate N+1 memory
1107
          # availability if you can only failover from a primary to its
1108
          # secondary.
1109
          "sinst-by-pnode": {},
1110
        }
1111
        # FIXME: devise a free space model for file based instances as well
1112
        if vg_name is not None:
1113
          if (constants.NV_VGLIST not in nresult or
1114
              vg_name not in nresult[constants.NV_VGLIST]):
1115
            feedback_fn("  - ERROR: node %s didn't return data for the"
1116
                        " volume group '%s' - it is either missing or broken" %
1117
                        (node, vg_name))
1118
            bad = True
1119
            continue
1120
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1121
      except (ValueError, KeyError):
1122
        feedback_fn("  - ERROR: invalid nodeinfo value returned"
1123
                    " from node %s" % (node,))
1124
        bad = True
1125
        continue
1126

    
1127
    node_vol_should = {}
1128

    
1129
    for instance in instancelist:
1130
      feedback_fn("* Verifying instance %s" % instance)
1131
      inst_config = instanceinfo[instance]
1132
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1133
                                     node_instance, feedback_fn, n_offline)
1134
      bad = bad or result
1135
      inst_nodes_offline = []
1136

    
1137
      inst_config.MapLVsByNode(node_vol_should)
1138

    
1139
      instance_cfg[instance] = inst_config
1140

    
1141
      pnode = inst_config.primary_node
1142
      if pnode in node_info:
1143
        node_info[pnode]['pinst'].append(instance)
1144
      elif pnode not in n_offline:
1145
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1146
                    " %s failed" % (instance, pnode))
1147
        bad = True
1148

    
1149
      if pnode in n_offline:
1150
        inst_nodes_offline.append(pnode)
1151

    
1152
      # If the instance is non-redundant we cannot survive losing its primary
1153
      # node, so we are not N+1 compliant. On the other hand we have no disk
1154
      # templates with more than one secondary so that situation is not well
1155
      # supported either.
1156
      # FIXME: does not support file-backed instances
1157
      if len(inst_config.secondary_nodes) == 0:
1158
        i_non_redundant.append(instance)
1159
      elif len(inst_config.secondary_nodes) > 1:
1160
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1161
                    % instance)
1162

    
1163
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1164
        i_non_a_balanced.append(instance)
1165

    
1166
      for snode in inst_config.secondary_nodes:
1167
        if snode in node_info:
1168
          node_info[snode]['sinst'].append(instance)
1169
          if pnode not in node_info[snode]['sinst-by-pnode']:
1170
            node_info[snode]['sinst-by-pnode'][pnode] = []
1171
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1172
        elif snode not in n_offline:
1173
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1174
                      " %s failed" % (instance, snode))
1175
          bad = True
1176
        if snode in n_offline:
1177
          inst_nodes_offline.append(snode)
1178

    
1179
      if inst_nodes_offline:
1180
        # warn that the instance lives on offline nodes, and set bad=True
1181
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1182
                    ", ".join(inst_nodes_offline))
1183
        bad = True
1184

    
1185
    feedback_fn("* Verifying orphan volumes")
1186
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1187
                                       feedback_fn)
1188
    bad = bad or result
1189

    
1190
    feedback_fn("* Verifying remaining instances")
1191
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1192
                                         feedback_fn)
1193
    bad = bad or result
1194

    
1195
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1196
      feedback_fn("* Verifying N+1 Memory redundancy")
1197
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1198
      bad = bad or result
1199

    
1200
    feedback_fn("* Other Notes")
1201
    if i_non_redundant:
1202
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1203
                  % len(i_non_redundant))
1204

    
1205
    if i_non_a_balanced:
1206
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1207
                  % len(i_non_a_balanced))
1208

    
1209
    if n_offline:
1210
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1211

    
1212
    if n_drained:
1213
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1214

    
1215
    return not bad
1216

    
1217
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1218
    """Analize the post-hooks' result
1219

1220
    This method analyses the hook result, handles it, and sends some
1221
    nicely-formatted feedback back to the user.
1222

1223
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1224
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1225
    @param hooks_results: the results of the multi-node hooks rpc call
1226
    @param feedback_fn: function used send feedback back to the caller
1227
    @param lu_result: previous Exec result
1228
    @return: the new Exec result, based on the previous result
1229
        and hook results
1230

1231
    """
1232
    # We only really run POST phase hooks, and are only interested in
1233
    # their results
1234
    if phase == constants.HOOKS_PHASE_POST:
1235
      # Used to change hooks' output to proper indentation
1236
      indent_re = re.compile('^', re.M)
1237
      feedback_fn("* Hooks Results")
1238
      if not hooks_results:
1239
        feedback_fn("  - ERROR: general communication failure")
1240
        lu_result = 1
1241
      else:
1242
        for node_name in hooks_results:
1243
          show_node_header = True
1244
          res = hooks_results[node_name]
1245
          if res.failed or res.data is False or not isinstance(res.data, list):
1246
            if res.offline:
1247
              # no need to warn or set fail return value
1248
              continue
1249
            feedback_fn("    Communication failure in hooks execution")
1250
            lu_result = 1
1251
            continue
1252
          for script, hkr, output in res.data:
1253
            if hkr == constants.HKR_FAIL:
1254
              # The node header is only shown once, if there are
1255
              # failing hooks on that node
1256
              if show_node_header:
1257
                feedback_fn("  Node %s:" % node_name)
1258
                show_node_header = False
1259
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1260
              output = indent_re.sub('      ', output)
1261
              feedback_fn("%s" % output)
1262
              lu_result = 1
1263

    
1264
      return lu_result
1265

    
1266

    
1267
class LUVerifyDisks(NoHooksLU):
1268
  """Verifies the cluster disks status.
1269

1270
  """
1271
  _OP_REQP = []
1272
  REQ_BGL = False
1273

    
1274
  def ExpandNames(self):
1275
    self.needed_locks = {
1276
      locking.LEVEL_NODE: locking.ALL_SET,
1277
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1278
    }
1279
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1280

    
1281
  def CheckPrereq(self):
1282
    """Check prerequisites.
1283

1284
    This has no prerequisites.
1285

1286
    """
1287
    pass
1288

    
1289
  def Exec(self, feedback_fn):
1290
    """Verify integrity of cluster disks.
1291

1292
    @rtype: tuple of three items
1293
    @return: a tuple of (dict of node-to-node_error, list of instances
1294
        which need activate-disks, dict of instance: (node, volume) for
1295
        missing volumes
1296

1297
    """
1298
    result = res_nodes, res_instances, res_missing = {}, [], {}
1299

    
1300
    vg_name = self.cfg.GetVGName()
1301
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1302
    instances = [self.cfg.GetInstanceInfo(name)
1303
                 for name in self.cfg.GetInstanceList()]
1304

    
1305
    nv_dict = {}
1306
    for inst in instances:
1307
      inst_lvs = {}
1308
      if (not inst.admin_up or
1309
          inst.disk_template not in constants.DTS_NET_MIRROR):
1310
        continue
1311
      inst.MapLVsByNode(inst_lvs)
1312
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1313
      for node, vol_list in inst_lvs.iteritems():
1314
        for vol in vol_list:
1315
          nv_dict[(node, vol)] = inst
1316

    
1317
    if not nv_dict:
1318
      return result
1319

    
1320
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1321

    
1322
    to_act = set()
1323
    for node in nodes:
1324
      # node_volume
1325
      node_res = node_lvs[node]
1326
      if node_res.offline:
1327
        continue
1328
      msg = node_res.RemoteFailMsg()
1329
      if msg:
1330
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1331
        res_nodes[node] = msg
1332
        continue
1333

    
1334
      lvs = node_res.payload
1335
      for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1336
        inst = nv_dict.pop((node, lv_name), None)
1337
        if (not lv_online and inst is not None
1338
            and inst.name not in res_instances):
1339
          res_instances.append(inst.name)
1340

    
1341
    # any leftover items in nv_dict are missing LVs, let's arrange the
1342
    # data better
1343
    for key, inst in nv_dict.iteritems():
1344
      if inst.name not in res_missing:
1345
        res_missing[inst.name] = []
1346
      res_missing[inst.name].append(key)
1347

    
1348
    return result
1349

    
1350

    
1351
class LURenameCluster(LogicalUnit):
1352
  """Rename the cluster.
1353

1354
  """
1355
  HPATH = "cluster-rename"
1356
  HTYPE = constants.HTYPE_CLUSTER
1357
  _OP_REQP = ["name"]
1358

    
1359
  def BuildHooksEnv(self):
1360
    """Build hooks env.
1361

1362
    """
1363
    env = {
1364
      "OP_TARGET": self.cfg.GetClusterName(),
1365
      "NEW_NAME": self.op.name,
1366
      }
1367
    mn = self.cfg.GetMasterNode()
1368
    return env, [mn], [mn]
1369

    
1370
  def CheckPrereq(self):
1371
    """Verify that the passed name is a valid one.
1372

1373
    """
1374
    hostname = utils.HostInfo(self.op.name)
1375

    
1376
    new_name = hostname.name
1377
    self.ip = new_ip = hostname.ip
1378
    old_name = self.cfg.GetClusterName()
1379
    old_ip = self.cfg.GetMasterIP()
1380
    if new_name == old_name and new_ip == old_ip:
1381
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1382
                                 " cluster has changed")
1383
    if new_ip != old_ip:
1384
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1385
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1386
                                   " reachable on the network. Aborting." %
1387
                                   new_ip)
1388

    
1389
    self.op.name = new_name
1390

    
1391
  def Exec(self, feedback_fn):
1392
    """Rename the cluster.
1393

1394
    """
1395
    clustername = self.op.name
1396
    ip = self.ip
1397

    
1398
    # shutdown the master IP
1399
    master = self.cfg.GetMasterNode()
1400
    result = self.rpc.call_node_stop_master(master, False)
1401
    if result.failed or not result.data:
1402
      raise errors.OpExecError("Could not disable the master role")
1403

    
1404
    try:
1405
      cluster = self.cfg.GetClusterInfo()
1406
      cluster.cluster_name = clustername
1407
      cluster.master_ip = ip
1408
      self.cfg.Update(cluster)
1409

    
1410
      # update the known hosts file
1411
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1412
      node_list = self.cfg.GetNodeList()
1413
      try:
1414
        node_list.remove(master)
1415
      except ValueError:
1416
        pass
1417
      result = self.rpc.call_upload_file(node_list,
1418
                                         constants.SSH_KNOWN_HOSTS_FILE)
1419
      for to_node, to_result in result.iteritems():
1420
         msg = to_result.RemoteFailMsg()
1421
         if msg:
1422
           msg = ("Copy of file %s to node %s failed: %s" %
1423
                   (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1424
           self.proc.LogWarning(msg)
1425

    
1426
    finally:
1427
      result = self.rpc.call_node_start_master(master, False)
1428
      if result.failed or not result.data:
1429
        self.LogWarning("Could not re-enable the master role on"
1430
                        " the master, please restart manually.")
1431

    
1432

    
1433
def _RecursiveCheckIfLVMBased(disk):
1434
  """Check if the given disk or its children are lvm-based.
1435

1436
  @type disk: L{objects.Disk}
1437
  @param disk: the disk to check
1438
  @rtype: booleean
1439
  @return: boolean indicating whether a LD_LV dev_type was found or not
1440

1441
  """
1442
  if disk.children:
1443
    for chdisk in disk.children:
1444
      if _RecursiveCheckIfLVMBased(chdisk):
1445
        return True
1446
  return disk.dev_type == constants.LD_LV
1447

    
1448

    
1449
class LUSetClusterParams(LogicalUnit):
1450
  """Change the parameters of the cluster.
1451

1452
  """
1453
  HPATH = "cluster-modify"
1454
  HTYPE = constants.HTYPE_CLUSTER
1455
  _OP_REQP = []
1456
  REQ_BGL = False
1457

    
1458
  def CheckArguments(self):
1459
    """Check parameters
1460

1461
    """
1462
    if not hasattr(self.op, "candidate_pool_size"):
1463
      self.op.candidate_pool_size = None
1464
    if self.op.candidate_pool_size is not None:
1465
      try:
1466
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1467
      except (ValueError, TypeError), err:
1468
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1469
                                   str(err))
1470
      if self.op.candidate_pool_size < 1:
1471
        raise errors.OpPrereqError("At least one master candidate needed")
1472

    
1473
  def ExpandNames(self):
1474
    # FIXME: in the future maybe other cluster params won't require checking on
1475
    # all nodes to be modified.
1476
    self.needed_locks = {
1477
      locking.LEVEL_NODE: locking.ALL_SET,
1478
    }
1479
    self.share_locks[locking.LEVEL_NODE] = 1
1480

    
1481
  def BuildHooksEnv(self):
1482
    """Build hooks env.
1483

1484
    """
1485
    env = {
1486
      "OP_TARGET": self.cfg.GetClusterName(),
1487
      "NEW_VG_NAME": self.op.vg_name,
1488
      }
1489
    mn = self.cfg.GetMasterNode()
1490
    return env, [mn], [mn]
1491

    
1492
  def CheckPrereq(self):
1493
    """Check prerequisites.
1494

1495
    This checks whether the given params don't conflict and
1496
    if the given volume group is valid.
1497

1498
    """
1499
    if self.op.vg_name is not None and not self.op.vg_name:
1500
      instances = self.cfg.GetAllInstancesInfo().values()
1501
      for inst in instances:
1502
        for disk in inst.disks:
1503
          if _RecursiveCheckIfLVMBased(disk):
1504
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1505
                                       " lvm-based instances exist")
1506

    
1507
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1508

    
1509
    # if vg_name not None, checks given volume group on all nodes
1510
    if self.op.vg_name:
1511
      vglist = self.rpc.call_vg_list(node_list)
1512
      for node in node_list:
1513
        msg = vglist[node].RemoteFailMsg()
1514
        if msg:
1515
          # ignoring down node
1516
          self.LogWarning("Error while gathering data on node %s"
1517
                          " (ignoring node): %s", node, msg)
1518
          continue
1519
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1520
                                              self.op.vg_name,
1521
                                              constants.MIN_VG_SIZE)
1522
        if vgstatus:
1523
          raise errors.OpPrereqError("Error on node '%s': %s" %
1524
                                     (node, vgstatus))
1525

    
1526
    self.cluster = cluster = self.cfg.GetClusterInfo()
1527
    # validate params changes
1528
    if self.op.beparams:
1529
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1530
      self.new_beparams = objects.FillDict(
1531
        cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1532

    
1533
    if self.op.nicparams:
1534
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1535
      self.new_nicparams = objects.FillDict(
1536
        cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1537
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1538

    
1539
    # hypervisor list/parameters
1540
    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1541
    if self.op.hvparams:
1542
      if not isinstance(self.op.hvparams, dict):
1543
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1544
      for hv_name, hv_dict in self.op.hvparams.items():
1545
        if hv_name not in self.new_hvparams:
1546
          self.new_hvparams[hv_name] = hv_dict
1547
        else:
1548
          self.new_hvparams[hv_name].update(hv_dict)
1549

    
1550
    if self.op.enabled_hypervisors is not None:
1551
      self.hv_list = self.op.enabled_hypervisors
1552
    else:
1553
      self.hv_list = cluster.enabled_hypervisors
1554

    
1555
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1556
      # either the enabled list has changed, or the parameters have, validate
1557
      for hv_name, hv_params in self.new_hvparams.items():
1558
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1559
            (self.op.enabled_hypervisors and
1560
             hv_name in self.op.enabled_hypervisors)):
1561
          # either this is a new hypervisor, or its parameters have changed
1562
          hv_class = hypervisor.GetHypervisor(hv_name)
1563
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1564
          hv_class.CheckParameterSyntax(hv_params)
1565
          _CheckHVParams(self, node_list, hv_name, hv_params)
1566

    
1567
  def Exec(self, feedback_fn):
1568
    """Change the parameters of the cluster.
1569

1570
    """
1571
    if self.op.vg_name is not None:
1572
      new_volume = self.op.vg_name
1573
      if not new_volume:
1574
        new_volume = None
1575
      if new_volume != self.cfg.GetVGName():
1576
        self.cfg.SetVGName(new_volume)
1577
      else:
1578
        feedback_fn("Cluster LVM configuration already in desired"
1579
                    " state, not changing")
1580
    if self.op.hvparams:
1581
      self.cluster.hvparams = self.new_hvparams
1582
    if self.op.enabled_hypervisors is not None:
1583
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1584
    if self.op.beparams:
1585
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1586
    if self.op.nicparams:
1587
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1588

    
1589
    if self.op.candidate_pool_size is not None:
1590
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1591

    
1592
    self.cfg.Update(self.cluster)
1593

    
1594
    # we want to update nodes after the cluster so that if any errors
1595
    # happen, we have recorded and saved the cluster info
1596
    if self.op.candidate_pool_size is not None:
1597
      _AdjustCandidatePool(self)
1598

    
1599

    
1600
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1601
  """Distribute additional files which are part of the cluster configuration.
1602

1603
  ConfigWriter takes care of distributing the config and ssconf files, but
1604
  there are more files which should be distributed to all nodes. This function
1605
  makes sure those are copied.
1606

1607
  @param lu: calling logical unit
1608
  @param additional_nodes: list of nodes not in the config to distribute to
1609

1610
  """
1611
  # 1. Gather target nodes
1612
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1613
  dist_nodes = lu.cfg.GetNodeList()
1614
  if additional_nodes is not None:
1615
    dist_nodes.extend(additional_nodes)
1616
  if myself.name in dist_nodes:
1617
    dist_nodes.remove(myself.name)
1618
  # 2. Gather files to distribute
1619
  dist_files = set([constants.ETC_HOSTS,
1620
                    constants.SSH_KNOWN_HOSTS_FILE,
1621
                    constants.RAPI_CERT_FILE,
1622
                    constants.RAPI_USERS_FILE,
1623
                   ])
1624

    
1625
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1626
  for hv_name in enabled_hypervisors:
1627
    hv_class = hypervisor.GetHypervisor(hv_name)
1628
    dist_files.update(hv_class.GetAncillaryFiles())
1629

    
1630
  # 3. Perform the files upload
1631
  for fname in dist_files:
1632
    if os.path.exists(fname):
1633
      result = lu.rpc.call_upload_file(dist_nodes, fname)
1634
      for to_node, to_result in result.items():
1635
         msg = to_result.RemoteFailMsg()
1636
         if msg:
1637
           msg = ("Copy of file %s to node %s failed: %s" %
1638
                   (fname, to_node, msg))
1639
           lu.proc.LogWarning(msg)
1640

    
1641

    
1642
class LURedistributeConfig(NoHooksLU):
1643
  """Force the redistribution of cluster configuration.
1644

1645
  This is a very simple LU.
1646

1647
  """
1648
  _OP_REQP = []
1649
  REQ_BGL = False
1650

    
1651
  def ExpandNames(self):
1652
    self.needed_locks = {
1653
      locking.LEVEL_NODE: locking.ALL_SET,
1654
    }
1655
    self.share_locks[locking.LEVEL_NODE] = 1
1656

    
1657
  def CheckPrereq(self):
1658
    """Check prerequisites.
1659

1660
    """
1661

    
1662
  def Exec(self, feedback_fn):
1663
    """Redistribute the configuration.
1664

1665
    """
1666
    self.cfg.Update(self.cfg.GetClusterInfo())
1667
    _RedistributeAncillaryFiles(self)
1668

    
1669

    
1670
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1671
  """Sleep and poll for an instance's disk to sync.
1672

1673
  """
1674
  if not instance.disks:
1675
    return True
1676

    
1677
  if not oneshot:
1678
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1679

    
1680
  node = instance.primary_node
1681

    
1682
  for dev in instance.disks:
1683
    lu.cfg.SetDiskID(dev, node)
1684

    
1685
  retries = 0
1686
  while True:
1687
    max_time = 0
1688
    done = True
1689
    cumul_degraded = False
1690
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1691
    msg = rstats.RemoteFailMsg()
1692
    if msg:
1693
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1694
      retries += 1
1695
      if retries >= 10:
1696
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1697
                                 " aborting." % node)
1698
      time.sleep(6)
1699
      continue
1700
    rstats = rstats.payload
1701
    retries = 0
1702
    for i, mstat in enumerate(rstats):
1703
      if mstat is None:
1704
        lu.LogWarning("Can't compute data for node %s/%s",
1705
                           node, instance.disks[i].iv_name)
1706
        continue
1707
      # we ignore the ldisk parameter
1708
      perc_done, est_time, is_degraded, _ = mstat
1709
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1710
      if perc_done is not None:
1711
        done = False
1712
        if est_time is not None:
1713
          rem_time = "%d estimated seconds remaining" % est_time
1714
          max_time = est_time
1715
        else:
1716
          rem_time = "no time estimate"
1717
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1718
                        (instance.disks[i].iv_name, perc_done, rem_time))
1719
    if done or oneshot:
1720
      break
1721

    
1722
    time.sleep(min(60, max_time))
1723

    
1724
  if done:
1725
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1726
  return not cumul_degraded
1727

    
1728

    
1729
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1730
  """Check that mirrors are not degraded.
1731

1732
  The ldisk parameter, if True, will change the test from the
1733
  is_degraded attribute (which represents overall non-ok status for
1734
  the device(s)) to the ldisk (representing the local storage status).
1735

1736
  """
1737
  lu.cfg.SetDiskID(dev, node)
1738
  if ldisk:
1739
    idx = 6
1740
  else:
1741
    idx = 5
1742

    
1743
  result = True
1744
  if on_primary or dev.AssembleOnSecondary():
1745
    rstats = lu.rpc.call_blockdev_find(node, dev)
1746
    msg = rstats.RemoteFailMsg()
1747
    if msg:
1748
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1749
      result = False
1750
    elif not rstats.payload:
1751
      lu.LogWarning("Can't find disk on node %s", node)
1752
      result = False
1753
    else:
1754
      result = result and (not rstats.payload[idx])
1755
  if dev.children:
1756
    for child in dev.children:
1757
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1758

    
1759
  return result
1760

    
1761

    
1762
class LUDiagnoseOS(NoHooksLU):
1763
  """Logical unit for OS diagnose/query.
1764

1765
  """
1766
  _OP_REQP = ["output_fields", "names"]
1767
  REQ_BGL = False
1768
  _FIELDS_STATIC = utils.FieldSet()
1769
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1770

    
1771
  def ExpandNames(self):
1772
    if self.op.names:
1773
      raise errors.OpPrereqError("Selective OS query not supported")
1774

    
1775
    _CheckOutputFields(static=self._FIELDS_STATIC,
1776
                       dynamic=self._FIELDS_DYNAMIC,
1777
                       selected=self.op.output_fields)
1778

    
1779
    # Lock all nodes, in shared mode
1780
    # Temporary removal of locks, should be reverted later
1781
    # TODO: reintroduce locks when they are lighter-weight
1782
    self.needed_locks = {}
1783
    #self.share_locks[locking.LEVEL_NODE] = 1
1784
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1785

    
1786
  def CheckPrereq(self):
1787
    """Check prerequisites.
1788

1789
    """
1790

    
1791
  @staticmethod
1792
  def _DiagnoseByOS(node_list, rlist):
1793
    """Remaps a per-node return list into an a per-os per-node dictionary
1794

1795
    @param node_list: a list with the names of all nodes
1796
    @param rlist: a map with node names as keys and OS objects as values
1797

1798
    @rtype: dict
1799
    @return: a dictionary with osnames as keys and as value another map, with
1800
        nodes as keys and list of OS objects as values, eg::
1801

1802
          {"debian-etch": {"node1": [<object>,...],
1803
                           "node2": [<object>,]}
1804
          }
1805

1806
    """
1807
    all_os = {}
1808
    # we build here the list of nodes that didn't fail the RPC (at RPC
1809
    # level), so that nodes with a non-responding node daemon don't
1810
    # make all OSes invalid
1811
    good_nodes = [node_name for node_name in rlist
1812
                  if not rlist[node_name].failed]
1813
    for node_name, nr in rlist.iteritems():
1814
      if nr.failed or not nr.data:
1815
        continue
1816
      for os_obj in nr.data:
1817
        if os_obj.name not in all_os:
1818
          # build a list of nodes for this os containing empty lists
1819
          # for each node in node_list
1820
          all_os[os_obj.name] = {}
1821
          for nname in good_nodes:
1822
            all_os[os_obj.name][nname] = []
1823
        all_os[os_obj.name][node_name].append(os_obj)
1824
    return all_os
1825

    
1826
  def Exec(self, feedback_fn):
1827
    """Compute the list of OSes.
1828

1829
    """
1830
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1831
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1832
    if node_data == False:
1833
      raise errors.OpExecError("Can't gather the list of OSes")
1834
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1835
    output = []
1836
    for os_name, os_data in pol.iteritems():
1837
      row = []
1838
      for field in self.op.output_fields:
1839
        if field == "name":
1840
          val = os_name
1841
        elif field == "valid":
1842
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1843
        elif field == "node_status":
1844
          val = {}
1845
          for node_name, nos_list in os_data.iteritems():
1846
            val[node_name] = [(v.status, v.path) for v in nos_list]
1847
        else:
1848
          raise errors.ParameterError(field)
1849
        row.append(val)
1850
      output.append(row)
1851

    
1852
    return output
1853

    
1854

    
1855
class LURemoveNode(LogicalUnit):
1856
  """Logical unit for removing a node.
1857

1858
  """
1859
  HPATH = "node-remove"
1860
  HTYPE = constants.HTYPE_NODE
1861
  _OP_REQP = ["node_name"]
1862

    
1863
  def BuildHooksEnv(self):
1864
    """Build hooks env.
1865

1866
    This doesn't run on the target node in the pre phase as a failed
1867
    node would then be impossible to remove.
1868

1869
    """
1870
    env = {
1871
      "OP_TARGET": self.op.node_name,
1872
      "NODE_NAME": self.op.node_name,
1873
      }
1874
    all_nodes = self.cfg.GetNodeList()
1875
    all_nodes.remove(self.op.node_name)
1876
    return env, all_nodes, all_nodes
1877

    
1878
  def CheckPrereq(self):
1879
    """Check prerequisites.
1880

1881
    This checks:
1882
     - the node exists in the configuration
1883
     - it does not have primary or secondary instances
1884
     - it's not the master
1885

1886
    Any errors are signalled by raising errors.OpPrereqError.
1887

1888
    """
1889
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1890
    if node is None:
1891
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1892

    
1893
    instance_list = self.cfg.GetInstanceList()
1894

    
1895
    masternode = self.cfg.GetMasterNode()
1896
    if node.name == masternode:
1897
      raise errors.OpPrereqError("Node is the master node,"
1898
                                 " you need to failover first.")
1899

    
1900
    for instance_name in instance_list:
1901
      instance = self.cfg.GetInstanceInfo(instance_name)
1902
      if node.name in instance.all_nodes:
1903
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1904
                                   " please remove first." % instance_name)
1905
    self.op.node_name = node.name
1906
    self.node = node
1907

    
1908
  def Exec(self, feedback_fn):
1909
    """Removes the node from the cluster.
1910

1911
    """
1912
    node = self.node
1913
    logging.info("Stopping the node daemon and removing configs from node %s",
1914
                 node.name)
1915

    
1916
    self.context.RemoveNode(node.name)
1917

    
1918
    self.rpc.call_node_leave_cluster(node.name)
1919

    
1920
    # Promote nodes to master candidate as needed
1921
    _AdjustCandidatePool(self)
1922

    
1923

    
1924
class LUQueryNodes(NoHooksLU):
1925
  """Logical unit for querying nodes.
1926

1927
  """
1928
  _OP_REQP = ["output_fields", "names", "use_locking"]
1929
  REQ_BGL = False
1930
  _FIELDS_DYNAMIC = utils.FieldSet(
1931
    "dtotal", "dfree",
1932
    "mtotal", "mnode", "mfree",
1933
    "bootid",
1934
    "ctotal", "cnodes", "csockets",
1935
    )
1936

    
1937
  _FIELDS_STATIC = utils.FieldSet(
1938
    "name", "pinst_cnt", "sinst_cnt",
1939
    "pinst_list", "sinst_list",
1940
    "pip", "sip", "tags",
1941
    "serial_no",
1942
    "master_candidate",
1943
    "master",
1944
    "offline",
1945
    "drained",
1946
    )
1947

    
1948
  def ExpandNames(self):
1949
    _CheckOutputFields(static=self._FIELDS_STATIC,
1950
                       dynamic=self._FIELDS_DYNAMIC,
1951
                       selected=self.op.output_fields)
1952

    
1953
    self.needed_locks = {}
1954
    self.share_locks[locking.LEVEL_NODE] = 1
1955

    
1956
    if self.op.names:
1957
      self.wanted = _GetWantedNodes(self, self.op.names)
1958
    else:
1959
      self.wanted = locking.ALL_SET
1960

    
1961
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1962
    self.do_locking = self.do_node_query and self.op.use_locking
1963
    if self.do_locking:
1964
      # if we don't request only static fields, we need to lock the nodes
1965
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1966

    
1967

    
1968
  def CheckPrereq(self):
1969
    """Check prerequisites.
1970

1971
    """
1972
    # The validation of the node list is done in the _GetWantedNodes,
1973
    # if non empty, and if empty, there's no validation to do
1974
    pass
1975

    
1976
  def Exec(self, feedback_fn):
1977
    """Computes the list of nodes and their attributes.
1978

1979
    """
1980
    all_info = self.cfg.GetAllNodesInfo()
1981
    if self.do_locking:
1982
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1983
    elif self.wanted != locking.ALL_SET:
1984
      nodenames = self.wanted
1985
      missing = set(nodenames).difference(all_info.keys())
1986
      if missing:
1987
        raise errors.OpExecError(
1988
          "Some nodes were removed before retrieving their data: %s" % missing)
1989
    else:
1990
      nodenames = all_info.keys()
1991

    
1992
    nodenames = utils.NiceSort(nodenames)
1993
    nodelist = [all_info[name] for name in nodenames]
1994

    
1995
    # begin data gathering
1996

    
1997
    if self.do_node_query:
1998
      live_data = {}
1999
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2000
                                          self.cfg.GetHypervisorType())
2001
      for name in nodenames:
2002
        nodeinfo = node_data[name]
2003
        if not nodeinfo.RemoteFailMsg() and nodeinfo.payload:
2004
          nodeinfo = nodeinfo.payload
2005
          fn = utils.TryConvert
2006
          live_data[name] = {
2007
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2008
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2009
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
2010
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2011
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
2012
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2013
            "bootid": nodeinfo.get('bootid', None),
2014
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2015
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2016
            }
2017
        else:
2018
          live_data[name] = {}
2019
    else:
2020
      live_data = dict.fromkeys(nodenames, {})
2021

    
2022
    node_to_primary = dict([(name, set()) for name in nodenames])
2023
    node_to_secondary = dict([(name, set()) for name in nodenames])
2024

    
2025
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
2026
                             "sinst_cnt", "sinst_list"))
2027
    if inst_fields & frozenset(self.op.output_fields):
2028
      instancelist = self.cfg.GetInstanceList()
2029

    
2030
      for instance_name in instancelist:
2031
        inst = self.cfg.GetInstanceInfo(instance_name)
2032
        if inst.primary_node in node_to_primary:
2033
          node_to_primary[inst.primary_node].add(inst.name)
2034
        for secnode in inst.secondary_nodes:
2035
          if secnode in node_to_secondary:
2036
            node_to_secondary[secnode].add(inst.name)
2037

    
2038
    master_node = self.cfg.GetMasterNode()
2039

    
2040
    # end data gathering
2041

    
2042
    output = []
2043
    for node in nodelist:
2044
      node_output = []
2045
      for field in self.op.output_fields:
2046
        if field == "name":
2047
          val = node.name
2048
        elif field == "pinst_list":
2049
          val = list(node_to_primary[node.name])
2050
        elif field == "sinst_list":
2051
          val = list(node_to_secondary[node.name])
2052
        elif field == "pinst_cnt":
2053
          val = len(node_to_primary[node.name])
2054
        elif field == "sinst_cnt":
2055
          val = len(node_to_secondary[node.name])
2056
        elif field == "pip":
2057
          val = node.primary_ip
2058
        elif field == "sip":
2059
          val = node.secondary_ip
2060
        elif field == "tags":
2061
          val = list(node.GetTags())
2062
        elif field == "serial_no":
2063
          val = node.serial_no
2064
        elif field == "master_candidate":
2065
          val = node.master_candidate
2066
        elif field == "master":
2067
          val = node.name == master_node
2068
        elif field == "offline":
2069
          val = node.offline
2070
        elif field == "drained":
2071
          val = node.drained
2072
        elif self._FIELDS_DYNAMIC.Matches(field):
2073
          val = live_data[node.name].get(field, None)
2074
        else:
2075
          raise errors.ParameterError(field)
2076
        node_output.append(val)
2077
      output.append(node_output)
2078

    
2079
    return output
2080

    
2081

    
2082
class LUQueryNodeVolumes(NoHooksLU):
2083
  """Logical unit for getting volumes on node(s).
2084

2085
  """
2086
  _OP_REQP = ["nodes", "output_fields"]
2087
  REQ_BGL = False
2088
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2089
  _FIELDS_STATIC = utils.FieldSet("node")
2090

    
2091
  def ExpandNames(self):
2092
    _CheckOutputFields(static=self._FIELDS_STATIC,
2093
                       dynamic=self._FIELDS_DYNAMIC,
2094
                       selected=self.op.output_fields)
2095

    
2096
    self.needed_locks = {}
2097
    self.share_locks[locking.LEVEL_NODE] = 1
2098
    if not self.op.nodes:
2099
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2100
    else:
2101
      self.needed_locks[locking.LEVEL_NODE] = \
2102
        _GetWantedNodes(self, self.op.nodes)
2103

    
2104
  def CheckPrereq(self):
2105
    """Check prerequisites.
2106

2107
    This checks that the fields required are valid output fields.
2108

2109
    """
2110
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2111

    
2112
  def Exec(self, feedback_fn):
2113
    """Computes the list of nodes and their attributes.
2114

2115
    """
2116
    nodenames = self.nodes
2117
    volumes = self.rpc.call_node_volumes(nodenames)
2118

    
2119
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2120
             in self.cfg.GetInstanceList()]
2121

    
2122
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2123

    
2124
    output = []
2125
    for node in nodenames:
2126
      if node not in volumes or volumes[node].failed or not volumes[node].data:
2127
        continue
2128

    
2129
      node_vols = volumes[node].data[:]
2130
      node_vols.sort(key=lambda vol: vol['dev'])
2131

    
2132
      for vol in node_vols:
2133
        node_output = []
2134
        for field in self.op.output_fields:
2135
          if field == "node":
2136
            val = node
2137
          elif field == "phys":
2138
            val = vol['dev']
2139
          elif field == "vg":
2140
            val = vol['vg']
2141
          elif field == "name":
2142
            val = vol['name']
2143
          elif field == "size":
2144
            val = int(float(vol['size']))
2145
          elif field == "instance":
2146
            for inst in ilist:
2147
              if node not in lv_by_node[inst]:
2148
                continue
2149
              if vol['name'] in lv_by_node[inst][node]:
2150
                val = inst.name
2151
                break
2152
            else:
2153
              val = '-'
2154
          else:
2155
            raise errors.ParameterError(field)
2156
          node_output.append(str(val))
2157

    
2158
        output.append(node_output)
2159

    
2160
    return output
2161

    
2162

    
2163
class LUAddNode(LogicalUnit):
2164
  """Logical unit for adding node to the cluster.
2165

2166
  """
2167
  HPATH = "node-add"
2168
  HTYPE = constants.HTYPE_NODE
2169
  _OP_REQP = ["node_name"]
2170

    
2171
  def BuildHooksEnv(self):
2172
    """Build hooks env.
2173

2174
    This will run on all nodes before, and on all nodes + the new node after.
2175

2176
    """
2177
    env = {
2178
      "OP_TARGET": self.op.node_name,
2179
      "NODE_NAME": self.op.node_name,
2180
      "NODE_PIP": self.op.primary_ip,
2181
      "NODE_SIP": self.op.secondary_ip,
2182
      }
2183
    nodes_0 = self.cfg.GetNodeList()
2184
    nodes_1 = nodes_0 + [self.op.node_name, ]
2185
    return env, nodes_0, nodes_1
2186

    
2187
  def CheckPrereq(self):
2188
    """Check prerequisites.
2189

2190
    This checks:
2191
     - the new node is not already in the config
2192
     - it is resolvable
2193
     - its parameters (single/dual homed) matches the cluster
2194

2195
    Any errors are signalled by raising errors.OpPrereqError.
2196

2197
    """
2198
    node_name = self.op.node_name
2199
    cfg = self.cfg
2200

    
2201
    dns_data = utils.HostInfo(node_name)
2202

    
2203
    node = dns_data.name
2204
    primary_ip = self.op.primary_ip = dns_data.ip
2205
    secondary_ip = getattr(self.op, "secondary_ip", None)
2206
    if secondary_ip is None:
2207
      secondary_ip = primary_ip
2208
    if not utils.IsValidIP(secondary_ip):
2209
      raise errors.OpPrereqError("Invalid secondary IP given")
2210
    self.op.secondary_ip = secondary_ip
2211

    
2212
    node_list = cfg.GetNodeList()
2213
    if not self.op.readd and node in node_list:
2214
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2215
                                 node)
2216
    elif self.op.readd and node not in node_list:
2217
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2218

    
2219
    for existing_node_name in node_list:
2220
      existing_node = cfg.GetNodeInfo(existing_node_name)
2221

    
2222
      if self.op.readd and node == existing_node_name:
2223
        if (existing_node.primary_ip != primary_ip or
2224
            existing_node.secondary_ip != secondary_ip):
2225
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2226
                                     " address configuration as before")
2227
        continue
2228

    
2229
      if (existing_node.primary_ip == primary_ip or
2230
          existing_node.secondary_ip == primary_ip or
2231
          existing_node.primary_ip == secondary_ip or
2232
          existing_node.secondary_ip == secondary_ip):
2233
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2234
                                   " existing node %s" % existing_node.name)
2235

    
2236
    # check that the type of the node (single versus dual homed) is the
2237
    # same as for the master
2238
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2239
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2240
    newbie_singlehomed = secondary_ip == primary_ip
2241
    if master_singlehomed != newbie_singlehomed:
2242
      if master_singlehomed:
2243
        raise errors.OpPrereqError("The master has no private ip but the"
2244
                                   " new node has one")
2245
      else:
2246
        raise errors.OpPrereqError("The master has a private ip but the"
2247
                                   " new node doesn't have one")
2248

    
2249
    # checks reachablity
2250
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2251
      raise errors.OpPrereqError("Node not reachable by ping")
2252

    
2253
    if not newbie_singlehomed:
2254
      # check reachability from my secondary ip to newbie's secondary ip
2255
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2256
                           source=myself.secondary_ip):
2257
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2258
                                   " based ping to noded port")
2259

    
2260
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2261
    mc_now, _ = self.cfg.GetMasterCandidateStats()
2262
    master_candidate = mc_now < cp_size
2263

    
2264
    self.new_node = objects.Node(name=node,
2265
                                 primary_ip=primary_ip,
2266
                                 secondary_ip=secondary_ip,
2267
                                 master_candidate=master_candidate,
2268
                                 offline=False, drained=False)
2269

    
2270
  def Exec(self, feedback_fn):
2271
    """Adds the new node to the cluster.
2272

2273
    """
2274
    new_node = self.new_node
2275
    node = new_node.name
2276

    
2277
    # check connectivity
2278
    result = self.rpc.call_version([node])[node]
2279
    result.Raise()
2280
    if result.data:
2281
      if constants.PROTOCOL_VERSION == result.data:
2282
        logging.info("Communication to node %s fine, sw version %s match",
2283
                     node, result.data)
2284
      else:
2285
        raise errors.OpExecError("Version mismatch master version %s,"
2286
                                 " node version %s" %
2287
                                 (constants.PROTOCOL_VERSION, result.data))
2288
    else:
2289
      raise errors.OpExecError("Cannot get version from the new node")
2290

    
2291
    # setup ssh on node
2292
    logging.info("Copy ssh key to node %s", node)
2293
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2294
    keyarray = []
2295
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2296
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2297
                priv_key, pub_key]
2298

    
2299
    for i in keyfiles:
2300
      f = open(i, 'r')
2301
      try:
2302
        keyarray.append(f.read())
2303
      finally:
2304
        f.close()
2305

    
2306
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2307
                                    keyarray[2],
2308
                                    keyarray[3], keyarray[4], keyarray[5])
2309

    
2310
    msg = result.RemoteFailMsg()
2311
    if msg:
2312
      raise errors.OpExecError("Cannot transfer ssh keys to the"
2313
                               " new node: %s" % msg)
2314

    
2315
    # Add node to our /etc/hosts, and add key to known_hosts
2316
    if self.cfg.GetClusterInfo().modify_etc_hosts:
2317
      utils.AddHostToEtcHosts(new_node.name)
2318

    
2319
    if new_node.secondary_ip != new_node.primary_ip:
2320
      result = self.rpc.call_node_has_ip_address(new_node.name,
2321
                                                 new_node.secondary_ip)
2322
      msg = result.RemoteFailMsg()
2323
      if msg:
2324
        raise errors.OpPrereqError("Failure checking secondary ip"
2325
                                   " on node %s: %s" % (new_node.name, msg))
2326
      if not result.payload:
2327
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2328
                                 " you gave (%s). Please fix and re-run this"
2329
                                 " command." % new_node.secondary_ip)
2330

    
2331
    node_verify_list = [self.cfg.GetMasterNode()]
2332
    node_verify_param = {
2333
      'nodelist': [node],
2334
      # TODO: do a node-net-test as well?
2335
    }
2336

    
2337
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2338
                                       self.cfg.GetClusterName())
2339
    for verifier in node_verify_list:
2340
      msg = result[verifier].RemoteFailMsg()
2341
      if msg:
2342
        raise errors.OpExecError("Cannot communicate with node %s: %s" %
2343
                                 (verifier, msg))
2344
      nl_payload = result[verifier].payload['nodelist']
2345
      if nl_payload:
2346
        for failed in nl_payload:
2347
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2348
                      (verifier, nl_payload[failed]))
2349
        raise errors.OpExecError("ssh/hostname verification failed.")
2350

    
2351
    if self.op.readd:
2352
      _RedistributeAncillaryFiles(self)
2353
      self.context.ReaddNode(new_node)
2354
    else:
2355
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
2356
      self.context.AddNode(new_node)
2357

    
2358

    
2359
class LUSetNodeParams(LogicalUnit):
2360
  """Modifies the parameters of a node.
2361

2362
  """
2363
  HPATH = "node-modify"
2364
  HTYPE = constants.HTYPE_NODE
2365
  _OP_REQP = ["node_name"]
2366
  REQ_BGL = False
2367

    
2368
  def CheckArguments(self):
2369
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2370
    if node_name is None:
2371
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2372
    self.op.node_name = node_name
2373
    _CheckBooleanOpField(self.op, 'master_candidate')
2374
    _CheckBooleanOpField(self.op, 'offline')
2375
    _CheckBooleanOpField(self.op, 'drained')
2376
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2377
    if all_mods.count(None) == 3:
2378
      raise errors.OpPrereqError("Please pass at least one modification")
2379
    if all_mods.count(True) > 1:
2380
      raise errors.OpPrereqError("Can't set the node into more than one"
2381
                                 " state at the same time")
2382

    
2383
  def ExpandNames(self):
2384
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2385

    
2386
  def BuildHooksEnv(self):
2387
    """Build hooks env.
2388

2389
    This runs on the master node.
2390

2391
    """
2392
    env = {
2393
      "OP_TARGET": self.op.node_name,
2394
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2395
      "OFFLINE": str(self.op.offline),
2396
      "DRAINED": str(self.op.drained),
2397
      }
2398
    nl = [self.cfg.GetMasterNode(),
2399
          self.op.node_name]
2400
    return env, nl, nl
2401

    
2402
  def CheckPrereq(self):
2403
    """Check prerequisites.
2404

2405
    This only checks the instance list against the existing names.
2406

2407
    """
2408
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2409

    
2410
    if ((self.op.master_candidate == False or self.op.offline == True or
2411
         self.op.drained == True) and node.master_candidate):
2412
      # we will demote the node from master_candidate
2413
      if self.op.node_name == self.cfg.GetMasterNode():
2414
        raise errors.OpPrereqError("The master node has to be a"
2415
                                   " master candidate, online and not drained")
2416
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2417
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2418
      if num_candidates <= cp_size:
2419
        msg = ("Not enough master candidates (desired"
2420
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2421
        if self.op.force:
2422
          self.LogWarning(msg)
2423
        else:
2424
          raise errors.OpPrereqError(msg)
2425

    
2426
    if (self.op.master_candidate == True and
2427
        ((node.offline and not self.op.offline == False) or
2428
         (node.drained and not self.op.drained == False))):
2429
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2430
                                 " to master_candidate" % node.name)
2431

    
2432
    return
2433

    
2434
  def Exec(self, feedback_fn):
2435
    """Modifies a node.
2436

2437
    """
2438
    node = self.node
2439

    
2440
    result = []
2441
    changed_mc = False
2442

    
2443
    if self.op.offline is not None:
2444
      node.offline = self.op.offline
2445
      result.append(("offline", str(self.op.offline)))
2446
      if self.op.offline == True:
2447
        if node.master_candidate:
2448
          node.master_candidate = False
2449
          changed_mc = True
2450
          result.append(("master_candidate", "auto-demotion due to offline"))
2451
        if node.drained:
2452
          node.drained = False
2453
          result.append(("drained", "clear drained status due to offline"))
2454

    
2455
    if self.op.master_candidate is not None:
2456
      node.master_candidate = self.op.master_candidate
2457
      changed_mc = True
2458
      result.append(("master_candidate", str(self.op.master_candidate)))
2459
      if self.op.master_candidate == False:
2460
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2461
        msg = rrc.RemoteFailMsg()
2462
        if msg:
2463
          self.LogWarning("Node failed to demote itself: %s" % msg)
2464

    
2465
    if self.op.drained is not None:
2466
      node.drained = self.op.drained
2467
      result.append(("drained", str(self.op.drained)))
2468
      if self.op.drained == True:
2469
        if node.master_candidate:
2470
          node.master_candidate = False
2471
          changed_mc = True
2472
          result.append(("master_candidate", "auto-demotion due to drain"))
2473
        if node.offline:
2474
          node.offline = False
2475
          result.append(("offline", "clear offline status due to drain"))
2476

    
2477
    # this will trigger configuration file update, if needed
2478
    self.cfg.Update(node)
2479
    # this will trigger job queue propagation or cleanup
2480
    if changed_mc:
2481
      self.context.ReaddNode(node)
2482

    
2483
    return result
2484

    
2485

    
2486
class LUPowercycleNode(NoHooksLU):
2487
  """Powercycles a node.
2488

2489
  """
2490
  _OP_REQP = ["node_name", "force"]
2491
  REQ_BGL = False
2492

    
2493
  def CheckArguments(self):
2494
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2495
    if node_name is None:
2496
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2497
    self.op.node_name = node_name
2498
    if node_name == self.cfg.GetMasterNode() and not self.op.force:
2499
      raise errors.OpPrereqError("The node is the master and the force"
2500
                                 " parameter was not set")
2501

    
2502
  def ExpandNames(self):
2503
    """Locking for PowercycleNode.
2504

2505
    This is a last-resource option and shouldn't block on other
2506
    jobs. Therefore, we grab no locks.
2507

2508
    """
2509
    self.needed_locks = {}
2510

    
2511
  def CheckPrereq(self):
2512
    """Check prerequisites.
2513

2514
    This LU has no prereqs.
2515

2516
    """
2517
    pass
2518

    
2519
  def Exec(self, feedback_fn):
2520
    """Reboots a node.
2521

2522
    """
2523
    result = self.rpc.call_node_powercycle(self.op.node_name,
2524
                                           self.cfg.GetHypervisorType())
2525
    msg = result.RemoteFailMsg()
2526
    if msg:
2527
      raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2528
    return result.payload
2529

    
2530

    
2531
class LUQueryClusterInfo(NoHooksLU):
2532
  """Query cluster configuration.
2533

2534
  """
2535
  _OP_REQP = []
2536
  REQ_BGL = False
2537

    
2538
  def ExpandNames(self):
2539
    self.needed_locks = {}
2540

    
2541
  def CheckPrereq(self):
2542
    """No prerequsites needed for this LU.
2543

2544
    """
2545
    pass
2546

    
2547
  def Exec(self, feedback_fn):
2548
    """Return cluster config.
2549

2550
    """
2551
    cluster = self.cfg.GetClusterInfo()
2552
    result = {
2553
      "software_version": constants.RELEASE_VERSION,
2554
      "protocol_version": constants.PROTOCOL_VERSION,
2555
      "config_version": constants.CONFIG_VERSION,
2556
      "os_api_version": constants.OS_API_VERSION,
2557
      "export_version": constants.EXPORT_VERSION,
2558
      "architecture": (platform.architecture()[0], platform.machine()),
2559
      "name": cluster.cluster_name,
2560
      "master": cluster.master_node,
2561
      "default_hypervisor": cluster.default_hypervisor,
2562
      "enabled_hypervisors": cluster.enabled_hypervisors,
2563
      "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2564
                        for hypervisor in cluster.enabled_hypervisors]),
2565
      "beparams": cluster.beparams,
2566
      "nicparams": cluster.nicparams,
2567
      "candidate_pool_size": cluster.candidate_pool_size,
2568
      "master_netdev": cluster.master_netdev,
2569
      "volume_group_name": cluster.volume_group_name,
2570
      "file_storage_dir": cluster.file_storage_dir,
2571
      }
2572

    
2573
    return result
2574

    
2575

    
2576
class LUQueryConfigValues(NoHooksLU):
2577
  """Return configuration values.
2578

2579
  """
2580
  _OP_REQP = []
2581
  REQ_BGL = False
2582
  _FIELDS_DYNAMIC = utils.FieldSet()
2583
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2584

    
2585
  def ExpandNames(self):
2586
    self.needed_locks = {}
2587

    
2588
    _CheckOutputFields(static=self._FIELDS_STATIC,
2589
                       dynamic=self._FIELDS_DYNAMIC,
2590
                       selected=self.op.output_fields)
2591

    
2592
  def CheckPrereq(self):
2593
    """No prerequisites.
2594

2595
    """
2596
    pass
2597

    
2598
  def Exec(self, feedback_fn):
2599
    """Dump a representation of the cluster config to the standard output.
2600

2601
    """
2602
    values = []
2603
    for field in self.op.output_fields:
2604
      if field == "cluster_name":
2605
        entry = self.cfg.GetClusterName()
2606
      elif field == "master_node":
2607
        entry = self.cfg.GetMasterNode()
2608
      elif field == "drain_flag":
2609
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2610
      else:
2611
        raise errors.ParameterError(field)
2612
      values.append(entry)
2613
    return values
2614

    
2615

    
2616
class LUActivateInstanceDisks(NoHooksLU):
2617
  """Bring up an instance's disks.
2618

2619
  """
2620
  _OP_REQP = ["instance_name"]
2621
  REQ_BGL = False
2622

    
2623
  def ExpandNames(self):
2624
    self._ExpandAndLockInstance()
2625
    self.needed_locks[locking.LEVEL_NODE] = []
2626
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2627

    
2628
  def DeclareLocks(self, level):
2629
    if level == locking.LEVEL_NODE:
2630
      self._LockInstancesNodes()
2631

    
2632
  def CheckPrereq(self):
2633
    """Check prerequisites.
2634

2635
    This checks that the instance is in the cluster.
2636

2637
    """
2638
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2639
    assert self.instance is not None, \
2640
      "Cannot retrieve locked instance %s" % self.op.instance_name
2641
    _CheckNodeOnline(self, self.instance.primary_node)
2642

    
2643
  def Exec(self, feedback_fn):
2644
    """Activate the disks.
2645

2646
    """
2647
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2648
    if not disks_ok:
2649
      raise errors.OpExecError("Cannot activate block devices")
2650

    
2651
    return disks_info
2652

    
2653

    
2654
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2655
  """Prepare the block devices for an instance.
2656

2657
  This sets up the block devices on all nodes.
2658

2659
  @type lu: L{LogicalUnit}
2660
  @param lu: the logical unit on whose behalf we execute
2661
  @type instance: L{objects.Instance}
2662
  @param instance: the instance for whose disks we assemble
2663
  @type ignore_secondaries: boolean
2664
  @param ignore_secondaries: if true, errors on secondary nodes
2665
      won't result in an error return from the function
2666
  @return: False if the operation failed, otherwise a list of
2667
      (host, instance_visible_name, node_visible_name)
2668
      with the mapping from node devices to instance devices
2669

2670
  """
2671
  device_info = []
2672
  disks_ok = True
2673
  iname = instance.name
2674
  # With the two passes mechanism we try to reduce the window of
2675
  # opportunity for the race condition of switching DRBD to primary
2676
  # before handshaking occured, but we do not eliminate it
2677

    
2678
  # The proper fix would be to wait (with some limits) until the
2679
  # connection has been made and drbd transitions from WFConnection
2680
  # into any other network-connected state (Connected, SyncTarget,
2681
  # SyncSource, etc.)
2682

    
2683
  # 1st pass, assemble on all nodes in secondary mode
2684
  for inst_disk in instance.disks:
2685
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2686
      lu.cfg.SetDiskID(node_disk, node)
2687
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2688
      msg = result.RemoteFailMsg()
2689
      if msg:
2690
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2691
                           " (is_primary=False, pass=1): %s",
2692
                           inst_disk.iv_name, node, msg)
2693
        if not ignore_secondaries:
2694
          disks_ok = False
2695

    
2696
  # FIXME: race condition on drbd migration to primary
2697

    
2698
  # 2nd pass, do only the primary node
2699
  for inst_disk in instance.disks:
2700
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2701
      if node != instance.primary_node:
2702
        continue
2703
      lu.cfg.SetDiskID(node_disk, node)
2704
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2705
      msg = result.RemoteFailMsg()
2706
      if msg:
2707
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2708
                           " (is_primary=True, pass=2): %s",
2709
                           inst_disk.iv_name, node, msg)
2710
        disks_ok = False
2711
    device_info.append((instance.primary_node, inst_disk.iv_name,
2712
                        result.payload))
2713

    
2714
  # leave the disks configured for the primary node
2715
  # this is a workaround that would be fixed better by
2716
  # improving the logical/physical id handling
2717
  for disk in instance.disks:
2718
    lu.cfg.SetDiskID(disk, instance.primary_node)
2719

    
2720
  return disks_ok, device_info
2721

    
2722

    
2723
def _StartInstanceDisks(lu, instance, force):
2724
  """Start the disks of an instance.
2725

2726
  """
2727
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2728
                                           ignore_secondaries=force)
2729
  if not disks_ok:
2730
    _ShutdownInstanceDisks(lu, instance)
2731
    if force is not None and not force:
2732
      lu.proc.LogWarning("", hint="If the message above refers to a"
2733
                         " secondary node,"
2734
                         " you can retry the operation using '--force'.")
2735
    raise errors.OpExecError("Disk consistency error")
2736

    
2737

    
2738
class LUDeactivateInstanceDisks(NoHooksLU):
2739
  """Shutdown an instance's disks.
2740

2741
  """
2742
  _OP_REQP = ["instance_name"]
2743
  REQ_BGL = False
2744

    
2745
  def ExpandNames(self):
2746
    self._ExpandAndLockInstance()
2747
    self.needed_locks[locking.LEVEL_NODE] = []
2748
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2749

    
2750
  def DeclareLocks(self, level):
2751
    if level == locking.LEVEL_NODE:
2752
      self._LockInstancesNodes()
2753

    
2754
  def CheckPrereq(self):
2755
    """Check prerequisites.
2756

2757
    This checks that the instance is in the cluster.
2758

2759
    """
2760
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2761
    assert self.instance is not None, \
2762
      "Cannot retrieve locked instance %s" % self.op.instance_name
2763

    
2764
  def Exec(self, feedback_fn):
2765
    """Deactivate the disks
2766

2767
    """
2768
    instance = self.instance
2769
    _SafeShutdownInstanceDisks(self, instance)
2770

    
2771

    
2772
def _SafeShutdownInstanceDisks(lu, instance):
2773
  """Shutdown block devices of an instance.
2774

2775
  This function checks if an instance is running, before calling
2776
  _ShutdownInstanceDisks.
2777

2778
  """
2779
  pnode = instance.primary_node
2780
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])
2781
  ins_l = ins_l[pnode]
2782
  msg = ins_l.RemoteFailMsg()
2783
  if msg:
2784
    raise errors.OpExecError("Can't contact node %s: %s" % (pnode, msg))
2785

    
2786
  if instance.name in ins_l.payload:
2787
    raise errors.OpExecError("Instance is running, can't shutdown"
2788
                             " block devices.")
2789

    
2790
  _ShutdownInstanceDisks(lu, instance)
2791

    
2792

    
2793
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2794
  """Shutdown block devices of an instance.
2795

2796
  This does the shutdown on all nodes of the instance.
2797

2798
  If the ignore_primary is false, errors on the primary node are
2799
  ignored.
2800

2801
  """
2802
  all_result = True
2803
  for disk in instance.disks:
2804
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2805
      lu.cfg.SetDiskID(top_disk, node)
2806
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2807
      msg = result.RemoteFailMsg()
2808
      if msg:
2809
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2810
                      disk.iv_name, node, msg)
2811
        if not ignore_primary or node != instance.primary_node:
2812
          all_result = False
2813
  return all_result
2814

    
2815

    
2816
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2817
  """Checks if a node has enough free memory.
2818

2819
  This function check if a given node has the needed amount of free
2820
  memory. In case the node has less memory or we cannot get the
2821
  information from the node, this function raise an OpPrereqError
2822
  exception.
2823

2824
  @type lu: C{LogicalUnit}
2825
  @param lu: a logical unit from which we get configuration data
2826
  @type node: C{str}
2827
  @param node: the node to check
2828
  @type reason: C{str}
2829
  @param reason: string to use in the error message
2830
  @type requested: C{int}
2831
  @param requested: the amount of memory in MiB to check for
2832
  @type hypervisor_name: C{str}
2833
  @param hypervisor_name: the hypervisor to ask for memory stats
2834
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2835
      we cannot check the node
2836

2837
  """
2838
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2839
  msg = nodeinfo[node].RemoteFailMsg()
2840
  if msg:
2841
    raise errors.OpPrereqError("Can't get data from node %s: %s" % (node, msg))
2842
  free_mem = nodeinfo[node].payload.get('memory_free', None)
2843
  if not isinstance(free_mem, int):
2844
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2845
                               " was '%s'" % (node, free_mem))
2846
  if requested > free_mem:
2847
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2848
                               " needed %s MiB, available %s MiB" %
2849
                               (node, reason, requested, free_mem))
2850

    
2851

    
2852
class LUStartupInstance(LogicalUnit):
2853
  """Starts an instance.
2854

2855
  """
2856
  HPATH = "instance-start"
2857
  HTYPE = constants.HTYPE_INSTANCE
2858
  _OP_REQP = ["instance_name", "force"]
2859
  REQ_BGL = False
2860

    
2861
  def ExpandNames(self):
2862
    self._ExpandAndLockInstance()
2863

    
2864
  def BuildHooksEnv(self):
2865
    """Build hooks env.
2866

2867
    This runs on master, primary and secondary nodes of the instance.
2868

2869
    """
2870
    env = {
2871
      "FORCE": self.op.force,
2872
      }
2873
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2874
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2875
    return env, nl, nl
2876

    
2877
  def CheckPrereq(self):
2878
    """Check prerequisites.
2879

2880
    This checks that the instance is in the cluster.
2881

2882
    """
2883
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2884
    assert self.instance is not None, \
2885
      "Cannot retrieve locked instance %s" % self.op.instance_name
2886

    
2887
    # extra beparams
2888
    self.beparams = getattr(self.op, "beparams", {})
2889
    if self.beparams:
2890
      if not isinstance(self.beparams, dict):
2891
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2892
                                   " dict" % (type(self.beparams), ))
2893
      # fill the beparams dict
2894
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2895
      self.op.beparams = self.beparams
2896

    
2897
    # extra hvparams
2898
    self.hvparams = getattr(self.op, "hvparams", {})
2899
    if self.hvparams:
2900
      if not isinstance(self.hvparams, dict):
2901
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2902
                                   " dict" % (type(self.hvparams), ))
2903

    
2904
      # check hypervisor parameter syntax (locally)
2905
      cluster = self.cfg.GetClusterInfo()
2906
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2907
      filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2908
                                    instance.hvparams)
2909
      filled_hvp.update(self.hvparams)
2910
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2911
      hv_type.CheckParameterSyntax(filled_hvp)
2912
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2913
      self.op.hvparams = self.hvparams
2914

    
2915
    _CheckNodeOnline(self, instance.primary_node)
2916

    
2917
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2918
    # check bridges existance
2919
    _CheckInstanceBridgesExist(self, instance)
2920

    
2921
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2922
                                              instance.name,
2923
                                              instance.hypervisor)
2924
    msg = remote_info.RemoteFailMsg()
2925
    if msg:
2926
      raise errors.OpPrereqError("Error checking node %s: %s" %
2927
                                 (instance.primary_node, msg))
2928
    if not remote_info.payload: # not running already
2929
      _CheckNodeFreeMemory(self, instance.primary_node,
2930
                           "starting instance %s" % instance.name,
2931
                           bep[constants.BE_MEMORY], instance.hypervisor)
2932

    
2933
  def Exec(self, feedback_fn):
2934
    """Start the instance.
2935

2936
    """
2937
    instance = self.instance
2938
    force = self.op.force
2939

    
2940
    self.cfg.MarkInstanceUp(instance.name)
2941

    
2942
    node_current = instance.primary_node
2943

    
2944
    _StartInstanceDisks(self, instance, force)
2945

    
2946
    result = self.rpc.call_instance_start(node_current, instance,
2947
                                          self.hvparams, self.beparams)
2948
    msg = result.RemoteFailMsg()
2949
    if msg:
2950
      _ShutdownInstanceDisks(self, instance)
2951
      raise errors.OpExecError("Could not start instance: %s" % msg)
2952

    
2953

    
2954
class LURebootInstance(LogicalUnit):
2955
  """Reboot an instance.
2956

2957
  """
2958
  HPATH = "instance-reboot"
2959
  HTYPE = constants.HTYPE_INSTANCE
2960
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2961
  REQ_BGL = False
2962

    
2963
  def ExpandNames(self):
2964
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2965
                                   constants.INSTANCE_REBOOT_HARD,
2966
                                   constants.INSTANCE_REBOOT_FULL]:
2967
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2968
                                  (constants.INSTANCE_REBOOT_SOFT,
2969
                                   constants.INSTANCE_REBOOT_HARD,
2970
                                   constants.INSTANCE_REBOOT_FULL))
2971
    self._ExpandAndLockInstance()
2972

    
2973
  def BuildHooksEnv(self):
2974
    """Build hooks env.
2975

2976
    This runs on master, primary and secondary nodes of the instance.
2977

2978
    """
2979
    env = {
2980
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2981
      "REBOOT_TYPE": self.op.reboot_type,
2982
      }
2983
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2984
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2985
    return env, nl, nl
2986

    
2987
  def CheckPrereq(self):
2988
    """Check prerequisites.
2989

2990
    This checks that the instance is in the cluster.
2991

2992
    """
2993
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2994
    assert self.instance is not None, \
2995
      "Cannot retrieve locked instance %s" % self.op.instance_name
2996

    
2997
    _CheckNodeOnline(self, instance.primary_node)
2998

    
2999
    # check bridges existance
3000
    _CheckInstanceBridgesExist(self, instance)
3001

    
3002
  def Exec(self, feedback_fn):
3003
    """Reboot the instance.
3004

3005
    """
3006
    instance = self.instance
3007
    ignore_secondaries = self.op.ignore_secondaries
3008
    reboot_type = self.op.reboot_type
3009

    
3010
    node_current = instance.primary_node
3011

    
3012
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3013
                       constants.INSTANCE_REBOOT_HARD]:
3014
      for disk in instance.disks:
3015
        self.cfg.SetDiskID(disk, node_current)
3016
      result = self.rpc.call_instance_reboot(node_current, instance,
3017
                                             reboot_type)
3018
      msg = result.RemoteFailMsg()
3019
      if msg:
3020
        raise errors.OpExecError("Could not reboot instance: %s" % msg)
3021
    else:
3022
      result = self.rpc.call_instance_shutdown(node_current, instance)
3023
      msg = result.RemoteFailMsg()
3024
      if msg:
3025
        raise errors.OpExecError("Could not shutdown instance for"
3026
                                 " full reboot: %s" % msg)
3027
      _ShutdownInstanceDisks(self, instance)
3028
      _StartInstanceDisks(self, instance, ignore_secondaries)
3029
      result = self.rpc.call_instance_start(node_current, instance, None, None)
3030
      msg = result.RemoteFailMsg()
3031
      if msg:
3032
        _ShutdownInstanceDisks(self, instance)
3033
        raise errors.OpExecError("Could not start instance for"
3034
                                 " full reboot: %s" % msg)
3035

    
3036
    self.cfg.MarkInstanceUp(instance.name)
3037

    
3038

    
3039
class LUShutdownInstance(LogicalUnit):
3040
  """Shutdown an instance.
3041

3042
  """
3043
  HPATH = "instance-stop"
3044
  HTYPE = constants.HTYPE_INSTANCE
3045
  _OP_REQP = ["instance_name"]
3046
  REQ_BGL = False
3047

    
3048
  def ExpandNames(self):
3049
    self._ExpandAndLockInstance()
3050

    
3051
  def BuildHooksEnv(self):
3052
    """Build hooks env.
3053

3054
    This runs on master, primary and secondary nodes of the instance.
3055

3056
    """
3057
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3058
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3059
    return env, nl, nl
3060

    
3061
  def CheckPrereq(self):
3062
    """Check prerequisites.
3063

3064
    This checks that the instance is in the cluster.
3065

3066
    """
3067
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3068
    assert self.instance is not None, \
3069
      "Cannot retrieve locked instance %s" % self.op.instance_name
3070
    _CheckNodeOnline(self, self.instance.primary_node)
3071

    
3072
  def Exec(self, feedback_fn):
3073
    """Shutdown the instance.
3074

3075
    """
3076
    instance = self.instance
3077
    node_current = instance.primary_node
3078
    self.cfg.MarkInstanceDown(instance.name)
3079
    result = self.rpc.call_instance_shutdown(node_current, instance)
3080
    msg = result.RemoteFailMsg()
3081
    if msg:
3082
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3083

    
3084
    _ShutdownInstanceDisks(self, instance)
3085

    
3086

    
3087
class LUReinstallInstance(LogicalUnit):
3088
  """Reinstall an instance.
3089

3090
  """
3091
  HPATH = "instance-reinstall"
3092
  HTYPE = constants.HTYPE_INSTANCE
3093
  _OP_REQP = ["instance_name"]
3094
  REQ_BGL = False
3095

    
3096
  def ExpandNames(self):
3097
    self._ExpandAndLockInstance()
3098

    
3099
  def BuildHooksEnv(self):
3100
    """Build hooks env.
3101

3102
    This runs on master, primary and secondary nodes of the instance.
3103

3104
    """
3105
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3106
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3107
    return env, nl, nl
3108

    
3109
  def CheckPrereq(self):
3110
    """Check prerequisites.
3111

3112
    This checks that the instance is in the cluster and is not running.
3113

3114
    """
3115
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3116
    assert instance is not None, \
3117
      "Cannot retrieve locked instance %s" % self.op.instance_name
3118
    _CheckNodeOnline(self, instance.primary_node)
3119

    
3120
    if instance.disk_template == constants.DT_DISKLESS:
3121
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3122
                                 self.op.instance_name)
3123
    if instance.admin_up:
3124
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3125
                                 self.op.instance_name)
3126
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3127
                                              instance.name,
3128
                                              instance.hypervisor)
3129
    msg = remote_info.RemoteFailMsg()
3130
    if msg:
3131
      raise errors.OpPrereqError("Error checking node %s: %s" %
3132
                                 (instance.primary_node, msg))
3133
    if remote_info.payload:
3134
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3135
                                 (self.op.instance_name,
3136
                                  instance.primary_node))
3137

    
3138
    self.op.os_type = getattr(self.op, "os_type", None)
3139
    if self.op.os_type is not None:
3140
      # OS verification
3141
      pnode = self.cfg.GetNodeInfo(
3142
        self.cfg.ExpandNodeName(instance.primary_node))
3143
      if pnode is None:
3144
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
3145
                                   self.op.pnode)
3146
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3147
      result.Raise()
3148
      if not isinstance(result.data, objects.OS):
3149
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3150
                                   " primary node"  % self.op.os_type)
3151

    
3152
    self.instance = instance
3153

    
3154
  def Exec(self, feedback_fn):
3155
    """Reinstall the instance.
3156

3157
    """
3158
    inst = self.instance
3159

    
3160
    if self.op.os_type is not None:
3161
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3162
      inst.os = self.op.os_type
3163
      self.cfg.Update(inst)
3164

    
3165
    _StartInstanceDisks(self, inst, None)
3166
    try:
3167
      feedback_fn("Running the instance OS create scripts...")
3168
      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3169
      msg = result.RemoteFailMsg()
3170
      if msg:
3171
        raise errors.OpExecError("Could not install OS for instance %s"
3172
                                 " on node %s: %s" %
3173
                                 (inst.name, inst.primary_node, msg))
3174
    finally:
3175
      _ShutdownInstanceDisks(self, inst)
3176

    
3177

    
3178
class LURenameInstance(LogicalUnit):
3179
  """Rename an instance.
3180

3181
  """
3182
  HPATH = "instance-rename"
3183
  HTYPE = constants.HTYPE_INSTANCE
3184
  _OP_REQP = ["instance_name", "new_name"]
3185

    
3186
  def BuildHooksEnv(self):
3187
    """Build hooks env.
3188

3189
    This runs on master, primary and secondary nodes of the instance.
3190

3191
    """
3192
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3193
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3194
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3195
    return env, nl, nl
3196

    
3197
  def CheckPrereq(self):
3198
    """Check prerequisites.
3199

3200
    This checks that the instance is in the cluster and is not running.
3201

3202
    """
3203
    instance = self.cfg.GetInstanceInfo(
3204
      self.cfg.ExpandInstanceName(self.op.instance_name))
3205
    if instance is None:
3206
      raise errors.OpPrereqError("Instance '%s' not known" %
3207
                                 self.op.instance_name)
3208
    _CheckNodeOnline(self, instance.primary_node)
3209

    
3210
    if instance.admin_up:
3211
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3212
                                 self.op.instance_name)
3213
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3214
                                              instance.name,
3215
                                              instance.hypervisor)
3216
    msg = remote_info.RemoteFailMsg()
3217
    if msg:
3218
      raise errors.OpPrereqError("Error checking node %s: %s" %
3219
                                 (instance.primary_node, msg))
3220
    if remote_info.payload:
3221
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3222
                                 (self.op.instance_name,
3223
                                  instance.primary_node))
3224
    self.instance = instance
3225

    
3226
    # new name verification
3227
    name_info = utils.HostInfo(self.op.new_name)
3228

    
3229
    self.op.new_name = new_name = name_info.name
3230
    instance_list = self.cfg.GetInstanceList()
3231
    if new_name in instance_list:
3232
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3233
                                 new_name)
3234

    
3235
    if not getattr(self.op, "ignore_ip", False):
3236
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3237
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3238
                                   (name_info.ip, new_name))
3239

    
3240

    
3241
  def Exec(self, feedback_fn):
3242
    """Reinstall the instance.
3243

3244
    """
3245
    inst = self.instance
3246
    old_name = inst.name
3247

    
3248
    if inst.disk_template == constants.DT_FILE:
3249
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3250

    
3251
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3252
    # Change the instance lock. This is definitely safe while we hold the BGL
3253
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3254
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3255

    
3256
    # re-read the instance from the configuration after rename
3257
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3258

    
3259
    if inst.disk_template == constants.DT_FILE:
3260
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3261
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3262
                                                     old_file_storage_dir,
3263
                                                     new_file_storage_dir)
3264
      result.Raise()
3265
      if not result.data:
3266
        raise errors.OpExecError("Could not connect to node '%s' to rename"
3267
                                 " directory '%s' to '%s' (but the instance"
3268
                                 " has been renamed in Ganeti)" % (
3269
                                 inst.primary_node, old_file_storage_dir,
3270
                                 new_file_storage_dir))
3271

    
3272
      if not result.data[0]:
3273
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3274
                                 " (but the instance has been renamed in"
3275
                                 " Ganeti)" % (old_file_storage_dir,
3276
                                               new_file_storage_dir))
3277

    
3278
    _StartInstanceDisks(self, inst, None)
3279
    try:
3280
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3281
                                                 old_name)
3282
      msg = result.RemoteFailMsg()
3283
      if msg:
3284
        msg = ("Could not run OS rename script for instance %s on node %s"
3285
               " (but the instance has been renamed in Ganeti): %s" %
3286
               (inst.name, inst.primary_node, msg))
3287
        self.proc.LogWarning(msg)
3288
    finally:
3289
      _ShutdownInstanceDisks(self, inst)
3290

    
3291

    
3292
class LURemoveInstance(LogicalUnit):
3293
  """Remove an instance.
3294

3295
  """
3296
  HPATH = "instance-remove"
3297
  HTYPE = constants.HTYPE_INSTANCE
3298
  _OP_REQP = ["instance_name", "ignore_failures"]
3299
  REQ_BGL = False
3300

    
3301
  def ExpandNames(self):
3302
    self._ExpandAndLockInstance()
3303
    self.needed_locks[locking.LEVEL_NODE] = []
3304
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3305

    
3306
  def DeclareLocks(self, level):
3307
    if level == locking.LEVEL_NODE:
3308
      self._LockInstancesNodes()
3309

    
3310
  def BuildHooksEnv(self):
3311
    """Build hooks env.
3312

3313
    This runs on master, primary and secondary nodes of the instance.
3314

3315
    """
3316
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3317
    nl = [self.cfg.GetMasterNode()]
3318
    return env, nl, nl
3319

    
3320
  def CheckPrereq(self):
3321
    """Check prerequisites.
3322

3323
    This checks that the instance is in the cluster.
3324

3325
    """
3326
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3327
    assert self.instance is not None, \
3328
      "Cannot retrieve locked instance %s" % self.op.instance_name
3329

    
3330
  def Exec(self, feedback_fn):
3331
    """Remove the instance.
3332

3333
    """
3334
    instance = self.instance
3335
    logging.info("Shutting down instance %s on node %s",
3336
                 instance.name, instance.primary_node)
3337

    
3338
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3339
    msg = result.RemoteFailMsg()
3340
    if msg:
3341
      if self.op.ignore_failures:
3342
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3343
      else:
3344
        raise errors.OpExecError("Could not shutdown instance %s on"
3345
                                 " node %s: %s" %
3346
                                 (instance.name, instance.primary_node, msg))
3347

    
3348
    logging.info("Removing block devices for instance %s", instance.name)
3349

    
3350
    if not _RemoveDisks(self, instance):
3351
      if self.op.ignore_failures:
3352
        feedback_fn("Warning: can't remove instance's disks")
3353
      else:
3354
        raise errors.OpExecError("Can't remove instance's disks")
3355

    
3356
    logging.info("Removing instance %s out of cluster config", instance.name)
3357

    
3358
    self.cfg.RemoveInstance(instance.name)
3359
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3360

    
3361

    
3362
class LUQueryInstances(NoHooksLU):
3363
  """Logical unit for querying instances.
3364

3365
  """
3366
  _OP_REQP = ["output_fields", "names", "use_locking"]
3367
  REQ_BGL = False
3368
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3369
                                    "admin_state",
3370
                                    "disk_template", "ip", "mac", "bridge",
3371
                                    "sda_size", "sdb_size", "vcpus", "tags",
3372
                                    "network_port", "beparams",
3373
                                    r"(disk)\.(size)/([0-9]+)",
3374
                                    r"(disk)\.(sizes)", "disk_usage",
3375
                                    r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3376
                                    r"(nic)\.(macs|ips|bridges)",
3377
                                    r"(disk|nic)\.(count)",
3378
                                    "serial_no", "hypervisor", "hvparams",] +
3379
                                  ["hv/%s" % name
3380
                                   for name in constants.HVS_PARAMETERS] +
3381
                                  ["be/%s" % name
3382
                                   for name in constants.BES_PARAMETERS])
3383
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3384

    
3385

    
3386
  def ExpandNames(self):
3387
    _CheckOutputFields(static=self._FIELDS_STATIC,
3388
                       dynamic=self._FIELDS_DYNAMIC,
3389
                       selected=self.op.output_fields)
3390

    
3391
    self.needed_locks = {}
3392
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3393
    self.share_locks[locking.LEVEL_NODE] = 1
3394

    
3395
    if self.op.names:
3396
      self.wanted = _GetWantedInstances(self, self.op.names)
3397
    else:
3398
      self.wanted = locking.ALL_SET
3399

    
3400
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3401
    self.do_locking = self.do_node_query and self.op.use_locking
3402
    if self.do_locking:
3403
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3404
      self.needed_locks[locking.LEVEL_NODE] = []
3405
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3406

    
3407
  def DeclareLocks(self, level):
3408
    if level == locking.LEVEL_NODE and self.do_locking:
3409
      self._LockInstancesNodes()
3410

    
3411
  def CheckPrereq(self):
3412
    """Check prerequisites.
3413

3414
    """
3415
    pass
3416

    
3417
  def Exec(self, feedback_fn):
3418
    """Computes the list of nodes and their attributes.
3419

3420
    """
3421
    all_info = self.cfg.GetAllInstancesInfo()
3422
    if self.wanted == locking.ALL_SET:
3423
      # caller didn't specify instance names, so ordering is not important
3424
      if self.do_locking:
3425
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3426
      else:
3427
        instance_names = all_info.keys()
3428
      instance_names = utils.NiceSort(instance_names)
3429
    else:
3430
      # caller did specify names, so we must keep the ordering
3431
      if self.do_locking:
3432
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3433
      else:
3434
        tgt_set = all_info.keys()
3435
      missing = set(self.wanted).difference(tgt_set)
3436
      if missing:
3437
        raise errors.OpExecError("Some instances were removed before"
3438
                                 " retrieving their data: %s" % missing)
3439
      instance_names = self.wanted
3440

    
3441
    instance_list = [all_info[iname] for iname in instance_names]
3442

    
3443
    # begin data gathering
3444

    
3445
    nodes = frozenset([inst.primary_node for inst in instance_list])
3446
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3447

    
3448
    bad_nodes = []
3449
    off_nodes = []
3450
    if self.do_node_query:
3451
      live_data = {}
3452
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3453
      for name in nodes:
3454
        result = node_data[name]
3455
        if result.offline:
3456
          # offline nodes will be in both lists
3457
          off_nodes.append(name)
3458
        if result.failed or result.RemoteFailMsg():
3459
          bad_nodes.append(name)
3460
        else:
3461
          if result.payload:
3462
            live_data.update(result.payload)
3463
          # else no instance is alive
3464
    else:
3465
      live_data = dict([(name, {}) for name in instance_names])
3466

    
3467
    # end data gathering
3468

    
3469
    HVPREFIX = "hv/"
3470
    BEPREFIX = "be/"
3471
    output = []
3472
    for instance in instance_list:
3473
      iout = []
3474
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3475
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3476
      for field in self.op.output_fields:
3477
        st_match = self._FIELDS_STATIC.Matches(field)
3478
        if field == "name":
3479
          val = instance.name
3480
        elif field == "os":
3481
          val = instance.os
3482
        elif field == "pnode":
3483
          val = instance.primary_node
3484
        elif field == "snodes":
3485
          val = list(instance.secondary_nodes)
3486
        elif field == "admin_state":
3487
          val = instance.admin_up
3488
        elif field == "oper_state":
3489
          if instance.primary_node in bad_nodes:
3490
            val = None
3491
          else:
3492
            val = bool(live_data.get(instance.name))
3493
        elif field == "status":
3494
          if instance.primary_node in off_nodes:
3495
            val = "ERROR_nodeoffline"
3496
          elif instance.primary_node in bad_nodes:
3497
            val = "ERROR_nodedown"
3498
          else:
3499
            running = bool(live_data.get(instance.name))
3500
            if running:
3501
              if instance.admin_up:
3502
                val = "running"
3503
              else:
3504
                val = "ERROR_up"
3505
            else:
3506
              if instance.admin_up:
3507
                val = "ERROR_down"
3508
              else:
3509
                val = "ADMIN_down"
3510
        elif field == "oper_ram":
3511
          if instance.primary_node in bad_nodes:
3512
            val = None
3513
          elif instance.name in live_data:
3514
            val = live_data[instance.name].get("memory", "?")
3515
          else:
3516
            val = "-"
3517
        elif field == "disk_template":
3518
          val = instance.disk_template
3519
        elif field == "ip":
3520
          val = instance.nics[0].ip
3521
        elif field == "bridge":
3522
          val = instance.nics[0].bridge
3523
        elif field == "mac":
3524
          val = instance.nics[0].mac
3525
        elif field == "sda_size" or field == "sdb_size":
3526
          idx = ord(field[2]) - ord('a')
3527
          try:
3528
            val = instance.FindDisk(idx).size
3529
          except errors.OpPrereqError:
3530
            val = None
3531
        elif field == "disk_usage": # total disk usage per node
3532
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3533
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3534
        elif field == "tags":
3535
          val = list(instance.GetTags())
3536
        elif field == "serial_no":
3537
          val = instance.serial_no
3538
        elif field == "network_port":
3539
          val = instance.network_port
3540
        elif field == "hypervisor":
3541
          val = instance.hypervisor
3542
        elif field == "hvparams":
3543
          val = i_hv
3544
        elif (field.startswith(HVPREFIX) and
3545
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3546
          val = i_hv.get(field[len(HVPREFIX):], None)
3547
        elif field == "beparams":
3548
          val = i_be
3549
        elif (field.startswith(BEPREFIX) and
3550
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3551
          val = i_be.get(field[len(BEPREFIX):], None)
3552
        elif st_match and st_match.groups():
3553
          # matches a variable list
3554
          st_groups = st_match.groups()
3555
          if st_groups and st_groups[0] == "disk":
3556
            if st_groups[1] == "count":
3557
              val = len(instance.disks)
3558
            elif st_groups[1] == "sizes":
3559
              val = [disk.size for disk in instance.disks]
3560
            elif st_groups[1] == "size":
3561
              try:
3562
                val = instance.FindDisk(st_groups[2]).size
3563
              except errors.OpPrereqError:
3564
                val = None
3565
            else:
3566
              assert False, "Unhandled disk parameter"
3567
          elif st_groups[0] == "nic":
3568
            if st_groups[1] == "count":
3569
              val = len(instance.nics)
3570
            elif st_groups[1] == "macs":
3571
              val = [nic.mac for nic in instance.nics]
3572
            elif st_groups[1] == "ips":
3573
              val = [nic.ip for nic in instance.nics]
3574
            elif st_groups[1] == "bridges":
3575
              val = [nic.bridge for nic in instance.nics]
3576
            else:
3577
              # index-based item
3578
              nic_idx = int(st_groups[2])
3579
              if nic_idx >= len(instance.nics):
3580
                val = None
3581
              else:
3582
                if st_groups[1] == "mac":
3583
                  val = instance.nics[nic_idx].mac
3584
                elif st_groups[1] == "ip":
3585
                  val = instance.nics[nic_idx].ip
3586
                elif st_groups[1] == "bridge":
3587
                  val = instance.nics[nic_idx].bridge
3588
                else:
3589
                  assert False, "Unhandled NIC parameter"
3590
          else:
3591
            assert False, "Unhandled variable parameter"
3592
        else:
3593
          raise errors.ParameterError(field)
3594
        iout.append(val)
3595
      output.append(iout)
3596

    
3597
    return output
3598

    
3599

    
3600
class LUFailoverInstance(LogicalUnit):
3601
  """Failover an instance.
3602

3603
  """
3604
  HPATH = "instance-failover"
3605
  HTYPE = constants.HTYPE_INSTANCE
3606
  _OP_REQP = ["instance_name", "ignore_consistency"]
3607
  REQ_BGL = False
3608

    
3609
  def ExpandNames(self):
3610
    self._ExpandAndLockInstance()
3611
    self.needed_locks[locking.LEVEL_NODE] = []
3612
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3613

    
3614
  def DeclareLocks(self, level):
3615
    if level == locking.LEVEL_NODE:
3616
      self._LockInstancesNodes()
3617

    
3618
  def BuildHooksEnv(self):
3619
    """Build hooks env.
3620

3621
    This runs on master, primary and secondary nodes of the instance.
3622

3623
    """
3624
    env = {
3625
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3626
      }
3627
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3628
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3629
    return env, nl, nl
3630

    
3631
  def CheckPrereq(self):
3632
    """Check prerequisites.
3633

3634
    This checks that the instance is in the cluster.
3635

3636
    """
3637
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3638
    assert self.instance is not None, \
3639
      "Cannot retrieve locked instance %s" % self.op.instance_name
3640

    
3641
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3642
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3643
      raise errors.OpPrereqError("Instance's disk layout is not"
3644
                                 " network mirrored, cannot failover.")
3645

    
3646
    secondary_nodes = instance.secondary_nodes
3647
    if not secondary_nodes:
3648
      raise errors.ProgrammerError("no secondary node but using "
3649
                                   "a mirrored disk template")
3650

    
3651
    target_node = secondary_nodes[0]
3652
    _CheckNodeOnline(self, target_node)
3653
    _CheckNodeNotDrained(self, target_node)
3654
    # check memory requirements on the secondary node
3655
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3656
                         instance.name, bep[constants.BE_MEMORY],
3657
                         instance.hypervisor)
3658
    # check bridge existance
3659
    _CheckInstanceBridgesExist(self, instance, node=target_node)
3660

    
3661
  def Exec(self, feedback_fn):
3662
    """Failover an instance.
3663

3664
    The failover is done by shutting it down on its present node and
3665
    starting it on the secondary.
3666

3667
    """
3668
    instance = self.instance
3669

    
3670
    source_node = instance.primary_node
3671
    target_node = instance.secondary_nodes[0]
3672

    
3673
    feedback_fn("* checking disk consistency between source and target")
3674
    for dev in instance.disks:
3675
      # for drbd, these are drbd over lvm
3676
      if not _CheckDiskConsistency(self, dev, target_node, False):
3677
        if instance.admin_up and not self.op.ignore_consistency:
3678
          raise errors.OpExecError("Disk %s is degraded on target node,"
3679
                                   " aborting failover." % dev.iv_name)
3680

    
3681
    feedback_fn("* shutting down instance on source node")
3682
    logging.info("Shutting down instance %s on node %s",
3683
                 instance.name, source_node)
3684

    
3685
    result = self.rpc.call_instance_shutdown(source_node, instance)
3686
    msg = result.RemoteFailMsg()
3687
    if msg:
3688
      if self.op.ignore_consistency:
3689
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3690
                             " Proceeding anyway. Please make sure node"
3691
                             " %s is down. Error details: %s",
3692
                             instance.name, source_node, source_node, msg)
3693
      else:
3694
        raise errors.OpExecError("Could not shutdown instance %s on"
3695
                                 " node %s: %s" %
3696
                                 (instance.name, source_node, msg))
3697

    
3698
    feedback_fn("* deactivating the instance's disks on source node")
3699
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3700
      raise errors.OpExecError("Can't shut down the instance's disks.")
3701

    
3702
    instance.primary_node = target_node
3703
    # distribute new instance config to the other nodes
3704
    self.cfg.Update(instance)
3705

    
3706
    # Only start the instance if it's marked as up
3707
    if instance.admin_up:
3708
      feedback_fn("* activating the instance's disks on target node")
3709
      logging.info("Starting instance %s on node %s",
3710
                   instance.name, target_node)
3711

    
3712
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3713
                                               ignore_secondaries=True)
3714
      if not disks_ok:
3715
        _ShutdownInstanceDisks(self, instance)
3716
        raise errors.OpExecError("Can't activate the instance's disks")
3717

    
3718
      feedback_fn("* starting the instance on the target node")
3719
      result = self.rpc.call_instance_start(target_node, instance, None, None)
3720
      msg = result.RemoteFailMsg()
3721
      if msg:
3722
        _ShutdownInstanceDisks(self, instance)
3723
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3724
                                 (instance.name, target_node, msg))
3725

    
3726

    
3727
class LUMigrateInstance(LogicalUnit):
3728
  """Migrate an instance.
3729

3730
  This is migration without shutting down, compared to the failover,
3731
  which is done with shutdown.
3732

3733
  """
3734
  HPATH = "instance-migrate"
3735
  HTYPE = constants.HTYPE_INSTANCE
3736
  _OP_REQP = ["instance_name", "live", "cleanup"]
3737

    
3738
  REQ_BGL = False
3739

    
3740
  def ExpandNames(self):
3741
    self._ExpandAndLockInstance()
3742
    self.needed_locks[locking.LEVEL_NODE] = []
3743
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3744

    
3745
  def DeclareLocks(self, level):
3746
    if level == locking.LEVEL_NODE:
3747
      self._LockInstancesNodes()
3748

    
3749
  def BuildHooksEnv(self):
3750
    """Build hooks env.
3751

3752
    This runs on master, primary and secondary nodes of the instance.
3753

3754
    """
3755
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3756
    env["MIGRATE_LIVE"] = self.op.live
3757
    env["MIGRATE_CLEANUP"] = self.op.cleanup
3758
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3759
    return env, nl, nl
3760

    
3761
  def CheckPrereq(self):
3762
    """Check prerequisites.
3763

3764
    This checks that the instance is in the cluster.
3765

3766
    """
3767
    instance = self.cfg.GetInstanceInfo(
3768
      self.cfg.ExpandInstanceName(self.op.instance_name))
3769
    if instance is None:
3770
      raise errors.OpPrereqError("Instance '%s' not known" %
3771
                                 self.op.instance_name)
3772

    
3773
    if instance.disk_template != constants.DT_DRBD8:
3774
      raise errors.OpPrereqError("Instance's disk layout is not"
3775
                                 " drbd8, cannot migrate.")
3776

    
3777
    secondary_nodes = instance.secondary_nodes
3778
    if not secondary_nodes:
3779
      raise errors.ConfigurationError("No secondary node but using"
3780
                                      " drbd8 disk template")
3781

    
3782
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3783

    
3784
    target_node = secondary_nodes[0]
3785
    # check memory requirements on the secondary node
3786
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3787
                         instance.name, i_be[constants.BE_MEMORY],
3788
                         instance.hypervisor)
3789

    
3790
    # check bridge existance
3791
    _CheckInstanceBridgesExist(self, instance, node=target_node)
3792

    
3793
    if not self.op.cleanup:
3794
      _CheckNodeNotDrained(self, target_node)
3795
      result = self.rpc.call_instance_migratable(instance.primary_node,
3796
                                                 instance)
3797
      msg = result.RemoteFailMsg()
3798
      if msg:
3799
        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3800
                                   msg)
3801

    
3802
    self.instance = instance
3803

    
3804
  def _WaitUntilSync(self):
3805
    """Poll with custom rpc for disk sync.
3806

3807
    This uses our own step-based rpc call.
3808

3809
    """
3810
    self.feedback_fn("* wait until resync is done")
3811
    all_done = False
3812
    while not all_done:
3813
      all_done = True
3814
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3815
                                            self.nodes_ip,
3816
                                            self.instance.disks)
3817
      min_percent = 100
3818
      for node, nres in result.items():
3819
        msg = nres.RemoteFailMsg()
3820
        if msg:
3821
          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3822
                                   (node, msg))
3823
        node_done, node_percent = nres.payload
3824
        all_done = all_done and node_done
3825
        if node_percent is not None:
3826
          min_percent = min(min_percent, node_percent)
3827
      if not all_done:
3828
        if min_percent < 100:
3829
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3830
        time.sleep(2)
3831

    
3832
  def _EnsureSecondary(self, node):
3833
    """Demote a node to secondary.
3834

3835
    """
3836
    self.feedback_fn("* switching node %s to secondary mode" % node)
3837

    
3838
    for dev in self.instance.disks:
3839
      self.cfg.SetDiskID(dev, node)
3840

    
3841
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3842
                                          self.instance.disks)
3843
    msg = result.RemoteFailMsg()
3844
    if msg:
3845
      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3846
                               " error %s" % (node, msg))
3847

    
3848
  def _GoStandalone(self):
3849
    """Disconnect from the network.
3850

3851
    """
3852
    self.feedback_fn("* changing into standalone mode")
3853
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3854
                                               self.instance.disks)
3855
    for node, nres in result.items():
3856
      msg = nres.RemoteFailMsg()
3857
      if msg:
3858
        raise errors.OpExecError("Cannot disconnect disks node %s,"
3859
                                 " error %s" % (node, msg))
3860

    
3861
  def _GoReconnect(self, multimaster):
3862
    """Reconnect to the network.
3863

3864
    """
3865
    if multimaster:
3866
      msg = "dual-master"
3867
    else:
3868
      msg = "single-master"
3869
    self.feedback_fn("* changing disks into %s mode" % msg)
3870
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3871
                                           self.instance.disks,
3872
                                           self.instance.name, multimaster)
3873
    for node, nres in result.items():
3874
      msg = nres.RemoteFailMsg()
3875
      if msg:
3876
        raise errors.OpExecError("Cannot change disks config on node %s,"
3877
                                 " error: %s" % (node, msg))
3878

    
3879
  def _ExecCleanup(self):
3880
    """Try to cleanup after a failed migration.
3881

3882
    The cleanup is done by:
3883
      - check that the instance is running only on one node
3884
        (and update the config if needed)
3885
      - change disks on its secondary node to secondary
3886
      - wait until disks are fully synchronized
3887
      - disconnect from the network
3888
      - change disks into single-master mode
3889
      - wait again until disks are fully synchronized
3890

3891
    """
3892
    instance = self.instance
3893
    target_node = self.target_node
3894
    source_node = self.source_node
3895

    
3896
    # check running on only one node
3897
    self.feedback_fn("* checking where the instance actually runs"
3898
                     " (if this hangs, the hypervisor might be in"
3899
                     " a bad state)")
3900
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3901
    for node, result in ins_l.items():
3902
      msg = result.RemoteFailMsg()
3903
      if msg:
3904
        raise errors.OpExecError("Can't contact node %s: %s" % (node, msg))
3905

    
3906
    runningon_source = instance.name in ins_l[source_node].payload
3907
    runningon_target = instance.name in ins_l[target_node].payload
3908

    
3909
    if runningon_source and runningon_target:
3910
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3911
                               " or the hypervisor is confused. You will have"
3912
                               " to ensure manually that it runs only on one"
3913
                               " and restart this operation.")
3914

    
3915
    if not (runningon_source or runningon_target):
3916
      raise errors.OpExecError("Instance does not seem to be running at all."
3917
                               " In this case, it's safer to repair by"
3918
                               " running 'gnt-instance stop' to ensure disk"
3919
                               " shutdown, and then restarting it.")
3920

    
3921
    if runningon_target:
3922
      # the migration has actually succeeded, we need to update the config
3923
      self.feedback_fn("* instance running on secondary node (%s),"
3924
                       " updating config" % target_node)
3925
      instance.primary_node = target_node
3926
      self.cfg.Update(instance)
3927
      demoted_node = source_node
3928
    else:
3929
      self.feedback_fn("* instance confirmed to be running on its"
3930
                       " primary node (%s)" % source_node)
3931
      demoted_node = target_node
3932

    
3933
    self._EnsureSecondary(demoted_node)
3934
    try:
3935
      self._WaitUntilSync()
3936
    except errors.OpExecError:
3937
      # we ignore here errors, since if the device is standalone, it
3938
      # won't be able to sync
3939
      pass
3940
    self._GoStandalone()
3941
    self._GoReconnect(False)
3942
    self._WaitUntilSync()
3943

    
3944
    self.feedback_fn("* done")
3945

    
3946
  def _RevertDiskStatus(self):
3947
    """Try to revert the disk status after a failed migration.
3948

3949
    """
3950
    target_node = self.target_node
3951
    try:
3952
      self._EnsureSecondary(target_node)
3953
      self._GoStandalone()
3954
      self._GoReconnect(False)
3955
      self._WaitUntilSync()
3956
    except errors.OpExecError, err:
3957
      self.LogWarning("Migration failed and I can't reconnect the"
3958
                      " drives: error '%s'\n"
3959
                      "Please look and recover the instance status" %
3960
                      str(err))
3961

    
3962
  def _AbortMigration(self):
3963
    """Call the hypervisor code to abort a started migration.
3964

3965
    """
3966
    instance = self.instance
3967
    target_node = self.target_node
3968
    migration_info = self.migration_info
3969

    
3970
    abort_result = self.rpc.call_finalize_migration(target_node,
3971
                                                    instance,
3972
                                                    migration_info,
3973
                                                    False)
3974
    abort_msg = abort_result.RemoteFailMsg()
3975
    if abort_msg:
3976
      logging.error("Aborting migration failed on target node %s: %s" %
3977
                    (target_node, abort_msg))
3978
      # Don't raise an exception here, as we stil have to try to revert the
3979
      # disk status, even if this step failed.
3980

    
3981
  def _ExecMigration(self):
3982
    """Migrate an instance.
3983

3984
    The migrate is done by:
3985
      - change the disks into dual-master mode
3986
      - wait until disks are fully synchronized again
3987
      - migrate the instance
3988
      - change disks on the new secondary node (the old primary) to secondary
3989
      - wait until disks are fully synchronized
3990
      - change disks into single-master mode
3991

3992
    """
3993
    instance = self.instance
3994
    target_node = self.target_node
3995
    source_node = self.source_node
3996

    
3997
    self.feedback_fn("* checking disk consistency between source and target")
3998
    for dev in instance.disks:
3999
      if not _CheckDiskConsistency(self, dev, target_node, False):
4000
        raise errors.OpExecError("Disk %s is degraded or not fully"
4001
                                 " synchronized on target node,"
4002
                                 " aborting migrate." % dev.iv_name)
4003

    
4004
    # First get the migration information from the remote node
4005
    result = self.rpc.call_migration_info(source_node, instance)
4006
    msg = result.RemoteFailMsg()
4007
    if msg:
4008
      log_err = ("Failed fetching source migration information from %s: %s" %
4009
                 (source_node, msg))
4010
      logging.error(log_err)
4011
      raise errors.OpExecError(log_err)