Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 3eccac06

History | View | Annotate | Download (250.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import time
29
import tempfile
30
import re
31
import platform
32
import logging
33
import copy
34
import random
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45
from ganeti import ssconf
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_BGL = True
67

    
68
  def __init__(self, processor, op, context, rpc):
69
    """Constructor for LogicalUnit.
70

71
    This needs to be overriden in derived classes in order to check op
72
    validity.
73

74
    """
75
    self.proc = processor
76
    self.op = op
77
    self.cfg = context.cfg
78
    self.context = context
79
    self.rpc = rpc
80
    # Dicts used to declare locking needs to mcpu
81
    self.needed_locks = None
82
    self.acquired_locks = {}
83
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84
    self.add_locks = {}
85
    self.remove_locks = {}
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89
    # logging
90
    self.LogWarning = processor.LogWarning
91
    self.LogInfo = processor.LogInfo
92

    
93
    for attr_name in self._OP_REQP:
94
      attr_val = getattr(op, attr_name, None)
95
      if attr_val is None:
96
        raise errors.OpPrereqError("Required parameter '%s' missing" %
97
                                   attr_name)
98
    self.CheckArguments()
99

    
100
  def __GetSSH(self):
101
    """Returns the SshRunner object
102

103
    """
104
    if not self.__ssh:
105
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106
    return self.__ssh
107

    
108
  ssh = property(fget=__GetSSH)
109

    
110
  def CheckArguments(self):
111
    """Check syntactic validity for the opcode arguments.
112

113
    This method is for doing a simple syntactic check and ensure
114
    validity of opcode parameters, without any cluster-related
115
    checks. While the same can be accomplished in ExpandNames and/or
116
    CheckPrereq, doing these separate is better because:
117

118
      - ExpandNames is left as as purely a lock-related function
119
      - CheckPrereq is run after we have aquired locks (and possible
120
        waited for them)
121

122
    The function is allowed to change the self.op attribute so that
123
    later methods can no longer worry about missing parameters.
124

125
    """
126
    pass
127

    
128
  def ExpandNames(self):
129
    """Expand names for this LU.
130

131
    This method is called before starting to execute the opcode, and it should
132
    update all the parameters of the opcode to their canonical form (e.g. a
133
    short node name must be fully expanded after this method has successfully
134
    completed). This way locking, hooks, logging, ecc. can work correctly.
135

136
    LUs which implement this method must also populate the self.needed_locks
137
    member, as a dict with lock levels as keys, and a list of needed lock names
138
    as values. Rules:
139

140
      - use an empty dict if you don't need any lock
141
      - if you don't need any lock at a particular level omit that level
142
      - don't put anything for the BGL level
143
      - if you want all locks at a level use locking.ALL_SET as a value
144

145
    If you need to share locks (rather than acquire them exclusively) at one
146
    level you can modify self.share_locks, setting a true value (usually 1) for
147
    that level. By default locks are not shared.
148

149
    Examples::
150

151
      # Acquire all nodes and one instance
152
      self.needed_locks = {
153
        locking.LEVEL_NODE: locking.ALL_SET,
154
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155
      }
156
      # Acquire just two nodes
157
      self.needed_locks = {
158
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159
      }
160
      # Acquire no locks
161
      self.needed_locks = {} # No, you can't leave it to the default value None
162

163
    """
164
    # The implementation of this method is mandatory only if the new LU is
165
    # concurrent, so that old LUs don't need to be changed all at the same
166
    # time.
167
    if self.REQ_BGL:
168
      self.needed_locks = {} # Exclusive LUs don't need locks.
169
    else:
170
      raise NotImplementedError
171

    
172
  def DeclareLocks(self, level):
173
    """Declare LU locking needs for a level
174

175
    While most LUs can just declare their locking needs at ExpandNames time,
176
    sometimes there's the need to calculate some locks after having acquired
177
    the ones before. This function is called just before acquiring locks at a
178
    particular level, but after acquiring the ones at lower levels, and permits
179
    such calculations. It can be used to modify self.needed_locks, and by
180
    default it does nothing.
181

182
    This function is only called if you have something already set in
183
    self.needed_locks for the level.
184

185
    @param level: Locking level which is going to be locked
186
    @type level: member of ganeti.locking.LEVELS
187

188
    """
189

    
190
  def CheckPrereq(self):
191
    """Check prerequisites for this LU.
192

193
    This method should check that the prerequisites for the execution
194
    of this LU are fulfilled. It can do internode communication, but
195
    it should be idempotent - no cluster or system changes are
196
    allowed.
197

198
    The method should raise errors.OpPrereqError in case something is
199
    not fulfilled. Its return value is ignored.
200

201
    This method should also update all the parameters of the opcode to
202
    their canonical form if it hasn't been done by ExpandNames before.
203

204
    """
205
    raise NotImplementedError
206

    
207
  def Exec(self, feedback_fn):
208
    """Execute the LU.
209

210
    This method should implement the actual work. It should raise
211
    errors.OpExecError for failures that are somewhat dealt with in
212
    code, or expected.
213

214
    """
215
    raise NotImplementedError
216

    
217
  def BuildHooksEnv(self):
218
    """Build hooks environment for this LU.
219

220
    This method should return a three-node tuple consisting of: a dict
221
    containing the environment that will be used for running the
222
    specific hook for this LU, a list of node names on which the hook
223
    should run before the execution, and a list of node names on which
224
    the hook should run after the execution.
225

226
    The keys of the dict must not have 'GANETI_' prefixed as this will
227
    be handled in the hooks runner. Also note additional keys will be
228
    added by the hooks runner. If the LU doesn't define any
229
    environment, an empty dict (and not None) should be returned.
230

231
    No nodes should be returned as an empty list (and not None).
232

233
    Note that if the HPATH for a LU class is None, this function will
234
    not be called.
235

236
    """
237
    raise NotImplementedError
238

    
239
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240
    """Notify the LU about the results of its hooks.
241

242
    This method is called every time a hooks phase is executed, and notifies
243
    the Logical Unit about the hooks' result. The LU can then use it to alter
244
    its result based on the hooks.  By default the method does nothing and the
245
    previous result is passed back unchanged but any LU can define it if it
246
    wants to use the local cluster hook-scripts somehow.
247

248
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
249
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250
    @param hook_results: the results of the multi-node hooks rpc call
251
    @param feedback_fn: function used send feedback back to the caller
252
    @param lu_result: the previous Exec result this LU had, or None
253
        in the PRE phase
254
    @return: the new Exec result, based on the previous result
255
        and hook results
256

257
    """
258
    return lu_result
259

    
260
  def _ExpandAndLockInstance(self):
261
    """Helper function to expand and lock an instance.
262

263
    Many LUs that work on an instance take its name in self.op.instance_name
264
    and need to expand it and then declare the expanded name for locking. This
265
    function does it, and then updates self.op.instance_name to the expanded
266
    name. It also initializes needed_locks as a dict, if this hasn't been done
267
    before.
268

269
    """
270
    if self.needed_locks is None:
271
      self.needed_locks = {}
272
    else:
273
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274
        "_ExpandAndLockInstance called with instance-level locks set"
275
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276
    if expanded_name is None:
277
      raise errors.OpPrereqError("Instance '%s' not known" %
278
                                  self.op.instance_name)
279
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280
    self.op.instance_name = expanded_name
281

    
282
  def _LockInstancesNodes(self, primary_only=False):
283
    """Helper function to declare instances' nodes for locking.
284

285
    This function should be called after locking one or more instances to lock
286
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287
    with all primary or secondary nodes for instances already locked and
288
    present in self.needed_locks[locking.LEVEL_INSTANCE].
289

290
    It should be called from DeclareLocks, and for safety only works if
291
    self.recalculate_locks[locking.LEVEL_NODE] is set.
292

293
    In the future it may grow parameters to just lock some instance's nodes, or
294
    to just lock primaries or secondary nodes, if needed.
295

296
    If should be called in DeclareLocks in a way similar to::
297

298
      if level == locking.LEVEL_NODE:
299
        self._LockInstancesNodes()
300

301
    @type primary_only: boolean
302
    @param primary_only: only lock primary nodes of locked instances
303

304
    """
305
    assert locking.LEVEL_NODE in self.recalculate_locks, \
306
      "_LockInstancesNodes helper function called with no nodes to recalculate"
307

    
308
    # TODO: check if we're really been called with the instance locks held
309

    
310
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311
    # future we might want to have different behaviors depending on the value
312
    # of self.recalculate_locks[locking.LEVEL_NODE]
313
    wanted_nodes = []
314
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315
      instance = self.context.cfg.GetInstanceInfo(instance_name)
316
      wanted_nodes.append(instance.primary_node)
317
      if not primary_only:
318
        wanted_nodes.extend(instance.secondary_nodes)
319

    
320
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324

    
325
    del self.recalculate_locks[locking.LEVEL_NODE]
326

    
327

    
328
class NoHooksLU(LogicalUnit):
329
  """Simple LU which runs no hooks.
330

331
  This LU is intended as a parent for other LogicalUnits which will
332
  run no hooks, in order to reduce duplicate code.
333

334
  """
335
  HPATH = None
336
  HTYPE = None
337

    
338

    
339
def _GetWantedNodes(lu, nodes):
340
  """Returns list of checked and expanded node names.
341

342
  @type lu: L{LogicalUnit}
343
  @param lu: the logical unit on whose behalf we execute
344
  @type nodes: list
345
  @param nodes: list of node names or None for all nodes
346
  @rtype: list
347
  @return: the list of nodes, sorted
348
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349

350
  """
351
  if not isinstance(nodes, list):
352
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
353

    
354
  if not nodes:
355
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356
      " non-empty list of nodes whose name is to be expanded.")
357

    
358
  wanted = []
359
  for name in nodes:
360
    node = lu.cfg.ExpandNodeName(name)
361
    if node is None:
362
      raise errors.OpPrereqError("No such node name '%s'" % name)
363
    wanted.append(node)
364

    
365
  return utils.NiceSort(wanted)
366

    
367

    
368
def _GetWantedInstances(lu, instances):
369
  """Returns list of checked and expanded instance names.
370

371
  @type lu: L{LogicalUnit}
372
  @param lu: the logical unit on whose behalf we execute
373
  @type instances: list
374
  @param instances: list of instance names or None for all instances
375
  @rtype: list
376
  @return: the list of instances, sorted
377
  @raise errors.OpPrereqError: if the instances parameter is wrong type
378
  @raise errors.OpPrereqError: if any of the passed instances is not found
379

380
  """
381
  if not isinstance(instances, list):
382
    raise errors.OpPrereqError("Invalid argument type 'instances'")
383

    
384
  if instances:
385
    wanted = []
386

    
387
    for name in instances:
388
      instance = lu.cfg.ExpandInstanceName(name)
389
      if instance is None:
390
        raise errors.OpPrereqError("No such instance name '%s'" % name)
391
      wanted.append(instance)
392

    
393
  else:
394
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395
  return wanted
396

    
397

    
398
def _CheckOutputFields(static, dynamic, selected):
399
  """Checks whether all selected fields are valid.
400

401
  @type static: L{utils.FieldSet}
402
  @param static: static fields set
403
  @type dynamic: L{utils.FieldSet}
404
  @param dynamic: dynamic fields set
405

406
  """
407
  f = utils.FieldSet()
408
  f.Extend(static)
409
  f.Extend(dynamic)
410

    
411
  delta = f.NonMatching(selected)
412
  if delta:
413
    raise errors.OpPrereqError("Unknown output fields selected: %s"
414
                               % ",".join(delta))
415

    
416

    
417
def _CheckBooleanOpField(op, name):
418
  """Validates boolean opcode parameters.
419

420
  This will ensure that an opcode parameter is either a boolean value,
421
  or None (but that it always exists).
422

423
  """
424
  val = getattr(op, name, None)
425
  if not (val is None or isinstance(val, bool)):
426
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427
                               (name, str(val)))
428
  setattr(op, name, val)
429

    
430

    
431
def _CheckNodeOnline(lu, node):
432
  """Ensure that a given node is online.
433

434
  @param lu: the LU on behalf of which we make the check
435
  @param node: the node to check
436
  @raise errors.OpPrereqError: if the node is offline
437

438
  """
439
  if lu.cfg.GetNodeInfo(node).offline:
440
    raise errors.OpPrereqError("Can't use offline node %s" % node)
441

    
442

    
443
def _CheckNodeNotDrained(lu, node):
444
  """Ensure that a given node is not drained.
445

446
  @param lu: the LU on behalf of which we make the check
447
  @param node: the node to check
448
  @raise errors.OpPrereqError: if the node is drained
449

450
  """
451
  if lu.cfg.GetNodeInfo(node).drained:
452
    raise errors.OpPrereqError("Can't use drained node %s" % node)
453

    
454

    
455
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456
                          memory, vcpus, nics, disk_template, disks):
457
  """Builds instance related env variables for hooks
458

459
  This builds the hook environment from individual variables.
460

461
  @type name: string
462
  @param name: the name of the instance
463
  @type primary_node: string
464
  @param primary_node: the name of the instance's primary node
465
  @type secondary_nodes: list
466
  @param secondary_nodes: list of secondary nodes as strings
467
  @type os_type: string
468
  @param os_type: the name of the instance's OS
469
  @type status: boolean
470
  @param status: the should_run status of the instance
471
  @type memory: string
472
  @param memory: the memory size of the instance
473
  @type vcpus: string
474
  @param vcpus: the count of VCPUs the instance has
475
  @type nics: list
476
  @param nics: list of tuples (ip, bridge, mac) representing
477
      the NICs the instance  has
478
  @type disk_template: string
479
  @param disk_template: the distk template of the instance
480
  @type disks: list
481
  @param disks: the list of (size, mode) pairs
482
  @rtype: dict
483
  @return: the hook environment for this instance
484

485
  """
486
  if status:
487
    str_status = "up"
488
  else:
489
    str_status = "down"
490
  env = {
491
    "OP_TARGET": name,
492
    "INSTANCE_NAME": name,
493
    "INSTANCE_PRIMARY": primary_node,
494
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495
    "INSTANCE_OS_TYPE": os_type,
496
    "INSTANCE_STATUS": str_status,
497
    "INSTANCE_MEMORY": memory,
498
    "INSTANCE_VCPUS": vcpus,
499
    "INSTANCE_DISK_TEMPLATE": disk_template,
500
  }
501

    
502
  if nics:
503
    nic_count = len(nics)
504
    for idx, (ip, mac, mode, link) in enumerate(nics):
505
      if ip is None:
506
        ip = ""
507
      env["INSTANCE_NIC%d_IP" % idx] = ip
508
      env["INSTANCE_NIC%d_MAC" % idx] = mac
509
      env["INSTANCE_NIC%d_MODE" % idx] = mode
510
      env["INSTANCE_NIC%d_LINK" % idx] = link
511
      if mode == constants.NIC_MODE_BRIDGED:
512
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
513
  else:
514
    nic_count = 0
515

    
516
  env["INSTANCE_NIC_COUNT"] = nic_count
517

    
518
  if disks:
519
    disk_count = len(disks)
520
    for idx, (size, mode) in enumerate(disks):
521
      env["INSTANCE_DISK%d_SIZE" % idx] = size
522
      env["INSTANCE_DISK%d_MODE" % idx] = mode
523
  else:
524
    disk_count = 0
525

    
526
  env["INSTANCE_DISK_COUNT"] = disk_count
527

    
528
  return env
529

    
530
def _PreBuildNICHooksList(lu, nics):
531
  """Build a list of nic information tuples.
532

533
  This list is suitable to be passed to _BuildInstanceHookEnv.
534

535
  @type lu:  L{LogicalUnit}
536
  @param lu: the logical unit on whose behalf we execute
537
  @type nics: list of L{objects.NIC}
538
  @param nics: list of nics to convert to hooks tuples
539

540
  """
541
  hooks_nics = []
542
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
543
  for nic in nics:
544
    ip = nic.ip
545
    mac = nic.mac
546
    filled_params = objects.FillDict(c_nicparams, nic.nicparams)
547
    mode = filled_params[constants.NIC_MODE]
548
    link = filled_params[constants.NIC_LINK]
549
    hooks_nics.append((ip, mac, mode, link))
550
  return hooks_nics
551

    
552
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
553
  """Builds instance related env variables for hooks from an object.
554

555
  @type lu: L{LogicalUnit}
556
  @param lu: the logical unit on whose behalf we execute
557
  @type instance: L{objects.Instance}
558
  @param instance: the instance for which we should build the
559
      environment
560
  @type override: dict
561
  @param override: dictionary with key/values that will override
562
      our values
563
  @rtype: dict
564
  @return: the hook environment dictionary
565

566
  """
567
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
568
  args = {
569
    'name': instance.name,
570
    'primary_node': instance.primary_node,
571
    'secondary_nodes': instance.secondary_nodes,
572
    'os_type': instance.os,
573
    'status': instance.admin_up,
574
    'memory': bep[constants.BE_MEMORY],
575
    'vcpus': bep[constants.BE_VCPUS],
576
    'nics': _PreBuildNICHooksList(lu, instance.nics),
577
    'disk_template': instance.disk_template,
578
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
579
  }
580
  if override:
581
    args.update(override)
582
  return _BuildInstanceHookEnv(**args)
583

    
584

    
585
def _AdjustCandidatePool(lu):
586
  """Adjust the candidate pool after node operations.
587

588
  """
589
  mod_list = lu.cfg.MaintainCandidatePool()
590
  if mod_list:
591
    lu.LogInfo("Promoted nodes to master candidate role: %s",
592
               ", ".join(node.name for node in mod_list))
593
    for name in mod_list:
594
      lu.context.ReaddNode(name)
595
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
596
  if mc_now > mc_max:
597
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
598
               (mc_now, mc_max))
599

    
600

    
601
def _CheckNicsBridgesExist(lu, target_nics, target_node,
602
                               profile=constants.PP_DEFAULT):
603
  """Check that the brigdes needed by a list of nics exist.
604

605
  """
606
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
607
  paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
608
                for nic in target_nics]
609
  brlist = [params[constants.NIC_LINK] for params in paramslist
610
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
611
  if brlist:
612
    result = lu.rpc.call_bridges_exist(target_node, brlist)
613
    result.Raise()
614
    if not result.data:
615
      raise errors.OpPrereqError("One or more target bridges %s does not"
616
                                 " exist on destination node '%s'" %
617
                                 (brlist, target_node))
618

    
619

    
620
def _CheckInstanceBridgesExist(lu, instance, node=None):
621
  """Check that the brigdes needed by an instance exist.
622

623
  """
624
  if node is None:
625
    node=instance.primary_node
626
  _CheckNicsBridgesExist(lu, instance.nics, node)
627

    
628

    
629
class LUDestroyCluster(NoHooksLU):
630
  """Logical unit for destroying the cluster.
631

632
  """
633
  _OP_REQP = []
634

    
635
  def CheckPrereq(self):
636
    """Check prerequisites.
637

638
    This checks whether the cluster is empty.
639

640
    Any errors are signalled by raising errors.OpPrereqError.
641

642
    """
643
    master = self.cfg.GetMasterNode()
644

    
645
    nodelist = self.cfg.GetNodeList()
646
    if len(nodelist) != 1 or nodelist[0] != master:
647
      raise errors.OpPrereqError("There are still %d node(s) in"
648
                                 " this cluster." % (len(nodelist) - 1))
649
    instancelist = self.cfg.GetInstanceList()
650
    if instancelist:
651
      raise errors.OpPrereqError("There are still %d instance(s) in"
652
                                 " this cluster." % len(instancelist))
653

    
654
  def Exec(self, feedback_fn):
655
    """Destroys the cluster.
656

657
    """
658
    master = self.cfg.GetMasterNode()
659
    result = self.rpc.call_node_stop_master(master, False)
660
    result.Raise()
661
    if not result.data:
662
      raise errors.OpExecError("Could not disable the master role")
663
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
664
    utils.CreateBackup(priv_key)
665
    utils.CreateBackup(pub_key)
666
    return master
667

    
668

    
669
class LUVerifyCluster(LogicalUnit):
670
  """Verifies the cluster status.
671

672
  """
673
  HPATH = "cluster-verify"
674
  HTYPE = constants.HTYPE_CLUSTER
675
  _OP_REQP = ["skip_checks"]
676
  REQ_BGL = False
677

    
678
  def ExpandNames(self):
679
    self.needed_locks = {
680
      locking.LEVEL_NODE: locking.ALL_SET,
681
      locking.LEVEL_INSTANCE: locking.ALL_SET,
682
    }
683
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
684

    
685
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
686
                  node_result, feedback_fn, master_files,
687
                  drbd_map, vg_name):
688
    """Run multiple tests against a node.
689

690
    Test list:
691

692
      - compares ganeti version
693
      - checks vg existance and size > 20G
694
      - checks config file checksum
695
      - checks ssh to other nodes
696

697
    @type nodeinfo: L{objects.Node}
698
    @param nodeinfo: the node to check
699
    @param file_list: required list of files
700
    @param local_cksum: dictionary of local files and their checksums
701
    @param node_result: the results from the node
702
    @param feedback_fn: function used to accumulate results
703
    @param master_files: list of files that only masters should have
704
    @param drbd_map: the useddrbd minors for this node, in
705
        form of minor: (instance, must_exist) which correspond to instances
706
        and their running status
707
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
708

709
    """
710
    node = nodeinfo.name
711

    
712
    # main result, node_result should be a non-empty dict
713
    if not node_result or not isinstance(node_result, dict):
714
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
715
      return True
716

    
717
    # compares ganeti version
718
    local_version = constants.PROTOCOL_VERSION
719
    remote_version = node_result.get('version', None)
720
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
721
            len(remote_version) == 2):
722
      feedback_fn("  - ERROR: connection to %s failed" % (node))
723
      return True
724

    
725
    if local_version != remote_version[0]:
726
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
727
                  " node %s %s" % (local_version, node, remote_version[0]))
728
      return True
729

    
730
    # node seems compatible, we can actually try to look into its results
731

    
732
    bad = False
733

    
734
    # full package version
735
    if constants.RELEASE_VERSION != remote_version[1]:
736
      feedback_fn("  - WARNING: software version mismatch: master %s,"
737
                  " node %s %s" %
738
                  (constants.RELEASE_VERSION, node, remote_version[1]))
739

    
740
    # checks vg existence and size > 20G
741
    if vg_name is not None:
742
      vglist = node_result.get(constants.NV_VGLIST, None)
743
      if not vglist:
744
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
745
                        (node,))
746
        bad = True
747
      else:
748
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
749
                                              constants.MIN_VG_SIZE)
750
        if vgstatus:
751
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
752
          bad = True
753

    
754
    # checks config file checksum
755

    
756
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
757
    if not isinstance(remote_cksum, dict):
758
      bad = True
759
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
760
    else:
761
      for file_name in file_list:
762
        node_is_mc = nodeinfo.master_candidate
763
        must_have_file = file_name not in master_files
764
        if file_name not in remote_cksum:
765
          if node_is_mc or must_have_file:
766
            bad = True
767
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
768
        elif remote_cksum[file_name] != local_cksum[file_name]:
769
          if node_is_mc or must_have_file:
770
            bad = True
771
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
772
          else:
773
            # not candidate and this is not a must-have file
774
            bad = True
775
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
776
                        " '%s'" % file_name)
777
        else:
778
          # all good, except non-master/non-must have combination
779
          if not node_is_mc and not must_have_file:
780
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
781
                        " candidates" % file_name)
782

    
783
    # checks ssh to any
784

    
785
    if constants.NV_NODELIST not in node_result:
786
      bad = True
787
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
788
    else:
789
      if node_result[constants.NV_NODELIST]:
790
        bad = True
791
        for node in node_result[constants.NV_NODELIST]:
792
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
793
                          (node, node_result[constants.NV_NODELIST][node]))
794

    
795
    if constants.NV_NODENETTEST not in node_result:
796
      bad = True
797
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
798
    else:
799
      if node_result[constants.NV_NODENETTEST]:
800
        bad = True
801
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
802
        for node in nlist:
803
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
804
                          (node, node_result[constants.NV_NODENETTEST][node]))
805

    
806
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
807
    if isinstance(hyp_result, dict):
808
      for hv_name, hv_result in hyp_result.iteritems():
809
        if hv_result is not None:
810
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
811
                      (hv_name, hv_result))
812

    
813
    # check used drbd list
814
    if vg_name is not None:
815
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
816
      if not isinstance(used_minors, (tuple, list)):
817
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
818
                    str(used_minors))
819
      else:
820
        for minor, (iname, must_exist) in drbd_map.items():
821
          if minor not in used_minors and must_exist:
822
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
823
                        " not active" % (minor, iname))
824
            bad = True
825
        for minor in used_minors:
826
          if minor not in drbd_map:
827
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
828
                        minor)
829
            bad = True
830

    
831
    return bad
832

    
833
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
834
                      node_instance, feedback_fn, n_offline):
835
    """Verify an instance.
836

837
    This function checks to see if the required block devices are
838
    available on the instance's node.
839

840
    """
841
    bad = False
842

    
843
    node_current = instanceconfig.primary_node
844

    
845
    node_vol_should = {}
846
    instanceconfig.MapLVsByNode(node_vol_should)
847

    
848
    for node in node_vol_should:
849
      if node in n_offline:
850
        # ignore missing volumes on offline nodes
851
        continue
852
      for volume in node_vol_should[node]:
853
        if node not in node_vol_is or volume not in node_vol_is[node]:
854
          feedback_fn("  - ERROR: volume %s missing on node %s" %
855
                          (volume, node))
856
          bad = True
857

    
858
    if instanceconfig.admin_up:
859
      if ((node_current not in node_instance or
860
          not instance in node_instance[node_current]) and
861
          node_current not in n_offline):
862
        feedback_fn("  - ERROR: instance %s not running on node %s" %
863
                        (instance, node_current))
864
        bad = True
865

    
866
    for node in node_instance:
867
      if (not node == node_current):
868
        if instance in node_instance[node]:
869
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
870
                          (instance, node))
871
          bad = True
872

    
873
    return bad
874

    
875
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
876
    """Verify if there are any unknown volumes in the cluster.
877

878
    The .os, .swap and backup volumes are ignored. All other volumes are
879
    reported as unknown.
880

881
    """
882
    bad = False
883

    
884
    for node in node_vol_is:
885
      for volume in node_vol_is[node]:
886
        if node not in node_vol_should or volume not in node_vol_should[node]:
887
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
888
                      (volume, node))
889
          bad = True
890
    return bad
891

    
892
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
893
    """Verify the list of running instances.
894

895
    This checks what instances are running but unknown to the cluster.
896

897
    """
898
    bad = False
899
    for node in node_instance:
900
      for runninginstance in node_instance[node]:
901
        if runninginstance not in instancelist:
902
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
903
                          (runninginstance, node))
904
          bad = True
905
    return bad
906

    
907
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
908
    """Verify N+1 Memory Resilience.
909

910
    Check that if one single node dies we can still start all the instances it
911
    was primary for.
912

913
    """
914
    bad = False
915

    
916
    for node, nodeinfo in node_info.iteritems():
917
      # This code checks that every node which is now listed as secondary has
918
      # enough memory to host all instances it is supposed to should a single
919
      # other node in the cluster fail.
920
      # FIXME: not ready for failover to an arbitrary node
921
      # FIXME: does not support file-backed instances
922
      # WARNING: we currently take into account down instances as well as up
923
      # ones, considering that even if they're down someone might want to start
924
      # them even in the event of a node failure.
925
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
926
        needed_mem = 0
927
        for instance in instances:
928
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
929
          if bep[constants.BE_AUTO_BALANCE]:
930
            needed_mem += bep[constants.BE_MEMORY]
931
        if nodeinfo['mfree'] < needed_mem:
932
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
933
                      " failovers should node %s fail" % (node, prinode))
934
          bad = True
935
    return bad
936

    
937
  def CheckPrereq(self):
938
    """Check prerequisites.
939

940
    Transform the list of checks we're going to skip into a set and check that
941
    all its members are valid.
942

943
    """
944
    self.skip_set = frozenset(self.op.skip_checks)
945
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
946
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
947

    
948
  def BuildHooksEnv(self):
949
    """Build hooks env.
950

951
    Cluster-Verify hooks just rone in the post phase and their failure makes
952
    the output be logged in the verify output and the verification to fail.
953

954
    """
955
    all_nodes = self.cfg.GetNodeList()
956
    env = {
957
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
958
      }
959
    for node in self.cfg.GetAllNodesInfo().values():
960
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
961

    
962
    return env, [], all_nodes
963

    
964
  def Exec(self, feedback_fn):
965
    """Verify integrity of cluster, performing various test on nodes.
966

967
    """
968
    bad = False
969
    feedback_fn("* Verifying global settings")
970
    for msg in self.cfg.VerifyConfig():
971
      feedback_fn("  - ERROR: %s" % msg)
972

    
973
    vg_name = self.cfg.GetVGName()
974
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
975
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
976
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
977
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
978
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
979
                        for iname in instancelist)
980
    i_non_redundant = [] # Non redundant instances
981
    i_non_a_balanced = [] # Non auto-balanced instances
982
    n_offline = [] # List of offline nodes
983
    n_drained = [] # List of nodes being drained
984
    node_volume = {}
985
    node_instance = {}
986
    node_info = {}
987
    instance_cfg = {}
988

    
989
    # FIXME: verify OS list
990
    # do local checksums
991
    master_files = [constants.CLUSTER_CONF_FILE]
992

    
993
    file_names = ssconf.SimpleStore().GetFileList()
994
    file_names.append(constants.SSL_CERT_FILE)
995
    file_names.append(constants.RAPI_CERT_FILE)
996
    file_names.extend(master_files)
997

    
998
    local_checksums = utils.FingerprintFiles(file_names)
999

    
1000
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1001
    node_verify_param = {
1002
      constants.NV_FILELIST: file_names,
1003
      constants.NV_NODELIST: [node.name for node in nodeinfo
1004
                              if not node.offline],
1005
      constants.NV_HYPERVISOR: hypervisors,
1006
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1007
                                  node.secondary_ip) for node in nodeinfo
1008
                                 if not node.offline],
1009
      constants.NV_INSTANCELIST: hypervisors,
1010
      constants.NV_VERSION: None,
1011
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1012
      }
1013
    if vg_name is not None:
1014
      node_verify_param[constants.NV_VGLIST] = None
1015
      node_verify_param[constants.NV_LVLIST] = vg_name
1016
      node_verify_param[constants.NV_DRBDLIST] = None
1017
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1018
                                           self.cfg.GetClusterName())
1019

    
1020
    cluster = self.cfg.GetClusterInfo()
1021
    master_node = self.cfg.GetMasterNode()
1022
    all_drbd_map = self.cfg.ComputeDRBDMap()
1023

    
1024
    for node_i in nodeinfo:
1025
      node = node_i.name
1026
      nresult = all_nvinfo[node].data
1027

    
1028
      if node_i.offline:
1029
        feedback_fn("* Skipping offline node %s" % (node,))
1030
        n_offline.append(node)
1031
        continue
1032

    
1033
      if node == master_node:
1034
        ntype = "master"
1035
      elif node_i.master_candidate:
1036
        ntype = "master candidate"
1037
      elif node_i.drained:
1038
        ntype = "drained"
1039
        n_drained.append(node)
1040
      else:
1041
        ntype = "regular"
1042
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1043

    
1044
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
1045
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
1046
        bad = True
1047
        continue
1048

    
1049
      node_drbd = {}
1050
      for minor, instance in all_drbd_map[node].items():
1051
        if instance not in instanceinfo:
1052
          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1053
                      instance)
1054
          # ghost instance should not be running, but otherwise we
1055
          # don't give double warnings (both ghost instance and
1056
          # unallocated minor in use)
1057
          node_drbd[minor] = (instance, False)
1058
        else:
1059
          instance = instanceinfo[instance]
1060
          node_drbd[minor] = (instance.name, instance.admin_up)
1061
      result = self._VerifyNode(node_i, file_names, local_checksums,
1062
                                nresult, feedback_fn, master_files,
1063
                                node_drbd, vg_name)
1064
      bad = bad or result
1065

    
1066
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1067
      if vg_name is None:
1068
        node_volume[node] = {}
1069
      elif isinstance(lvdata, basestring):
1070
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1071
                    (node, utils.SafeEncode(lvdata)))
1072
        bad = True
1073
        node_volume[node] = {}
1074
      elif not isinstance(lvdata, dict):
1075
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1076
        bad = True
1077
        continue
1078
      else:
1079
        node_volume[node] = lvdata
1080

    
1081
      # node_instance
1082
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1083
      if not isinstance(idata, list):
1084
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1085
                    (node,))
1086
        bad = True
1087
        continue
1088

    
1089
      node_instance[node] = idata
1090

    
1091
      # node_info
1092
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1093
      if not isinstance(nodeinfo, dict):
1094
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1095
        bad = True
1096
        continue
1097

    
1098
      try:
1099
        node_info[node] = {
1100
          "mfree": int(nodeinfo['memory_free']),
1101
          "pinst": [],
1102
          "sinst": [],
1103
          # dictionary holding all instances this node is secondary for,
1104
          # grouped by their primary node. Each key is a cluster node, and each
1105
          # value is a list of instances which have the key as primary and the
1106
          # current node as secondary.  this is handy to calculate N+1 memory
1107
          # availability if you can only failover from a primary to its
1108
          # secondary.
1109
          "sinst-by-pnode": {},
1110
        }
1111
        # FIXME: devise a free space model for file based instances as well
1112
        if vg_name is not None:
1113
          if (constants.NV_VGLIST not in nresult or
1114
              vg_name not in nresult[constants.NV_VGLIST]):
1115
            feedback_fn("  - ERROR: node %s didn't return data for the"
1116
                        " volume group '%s' - it is either missing or broken" %
1117
                        (node, vg_name))
1118
            bad = True
1119
            continue
1120
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1121
      except (ValueError, KeyError):
1122
        feedback_fn("  - ERROR: invalid nodeinfo value returned"
1123
                    " from node %s" % (node,))
1124
        bad = True
1125
        continue
1126

    
1127
    node_vol_should = {}
1128

    
1129
    for instance in instancelist:
1130
      feedback_fn("* Verifying instance %s" % instance)
1131
      inst_config = instanceinfo[instance]
1132
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1133
                                     node_instance, feedback_fn, n_offline)
1134
      bad = bad or result
1135
      inst_nodes_offline = []
1136

    
1137
      inst_config.MapLVsByNode(node_vol_should)
1138

    
1139
      instance_cfg[instance] = inst_config
1140

    
1141
      pnode = inst_config.primary_node
1142
      if pnode in node_info:
1143
        node_info[pnode]['pinst'].append(instance)
1144
      elif pnode not in n_offline:
1145
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1146
                    " %s failed" % (instance, pnode))
1147
        bad = True
1148

    
1149
      if pnode in n_offline:
1150
        inst_nodes_offline.append(pnode)
1151

    
1152
      # If the instance is non-redundant we cannot survive losing its primary
1153
      # node, so we are not N+1 compliant. On the other hand we have no disk
1154
      # templates with more than one secondary so that situation is not well
1155
      # supported either.
1156
      # FIXME: does not support file-backed instances
1157
      if len(inst_config.secondary_nodes) == 0:
1158
        i_non_redundant.append(instance)
1159
      elif len(inst_config.secondary_nodes) > 1:
1160
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1161
                    % instance)
1162

    
1163
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1164
        i_non_a_balanced.append(instance)
1165

    
1166
      for snode in inst_config.secondary_nodes:
1167
        if snode in node_info:
1168
          node_info[snode]['sinst'].append(instance)
1169
          if pnode not in node_info[snode]['sinst-by-pnode']:
1170
            node_info[snode]['sinst-by-pnode'][pnode] = []
1171
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1172
        elif snode not in n_offline:
1173
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1174
                      " %s failed" % (instance, snode))
1175
          bad = True
1176
        if snode in n_offline:
1177
          inst_nodes_offline.append(snode)
1178

    
1179
      if inst_nodes_offline:
1180
        # warn that the instance lives on offline nodes, and set bad=True
1181
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1182
                    ", ".join(inst_nodes_offline))
1183
        bad = True
1184

    
1185
    feedback_fn("* Verifying orphan volumes")
1186
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1187
                                       feedback_fn)
1188
    bad = bad or result
1189

    
1190
    feedback_fn("* Verifying remaining instances")
1191
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1192
                                         feedback_fn)
1193
    bad = bad or result
1194

    
1195
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1196
      feedback_fn("* Verifying N+1 Memory redundancy")
1197
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1198
      bad = bad or result
1199

    
1200
    feedback_fn("* Other Notes")
1201
    if i_non_redundant:
1202
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1203
                  % len(i_non_redundant))
1204

    
1205
    if i_non_a_balanced:
1206
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1207
                  % len(i_non_a_balanced))
1208

    
1209
    if n_offline:
1210
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1211

    
1212
    if n_drained:
1213
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1214

    
1215
    return not bad
1216

    
1217
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1218
    """Analize the post-hooks' result
1219

1220
    This method analyses the hook result, handles it, and sends some
1221
    nicely-formatted feedback back to the user.
1222

1223
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1224
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1225
    @param hooks_results: the results of the multi-node hooks rpc call
1226
    @param feedback_fn: function used send feedback back to the caller
1227
    @param lu_result: previous Exec result
1228
    @return: the new Exec result, based on the previous result
1229
        and hook results
1230

1231
    """
1232
    # We only really run POST phase hooks, and are only interested in
1233
    # their results
1234
    if phase == constants.HOOKS_PHASE_POST:
1235
      # Used to change hooks' output to proper indentation
1236
      indent_re = re.compile('^', re.M)
1237
      feedback_fn("* Hooks Results")
1238
      if not hooks_results:
1239
        feedback_fn("  - ERROR: general communication failure")
1240
        lu_result = 1
1241
      else:
1242
        for node_name in hooks_results:
1243
          show_node_header = True
1244
          res = hooks_results[node_name]
1245
          if res.failed or res.data is False or not isinstance(res.data, list):
1246
            if res.offline:
1247
              # no need to warn or set fail return value
1248
              continue
1249
            feedback_fn("    Communication failure in hooks execution")
1250
            lu_result = 1
1251
            continue
1252
          for script, hkr, output in res.data:
1253
            if hkr == constants.HKR_FAIL:
1254
              # The node header is only shown once, if there are
1255
              # failing hooks on that node
1256
              if show_node_header:
1257
                feedback_fn("  Node %s:" % node_name)
1258
                show_node_header = False
1259
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1260
              output = indent_re.sub('      ', output)
1261
              feedback_fn("%s" % output)
1262
              lu_result = 1
1263

    
1264
      return lu_result
1265

    
1266

    
1267
class LUVerifyDisks(NoHooksLU):
1268
  """Verifies the cluster disks status.
1269

1270
  """
1271
  _OP_REQP = []
1272
  REQ_BGL = False
1273

    
1274
  def ExpandNames(self):
1275
    self.needed_locks = {
1276
      locking.LEVEL_NODE: locking.ALL_SET,
1277
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1278
    }
1279
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1280

    
1281
  def CheckPrereq(self):
1282
    """Check prerequisites.
1283

1284
    This has no prerequisites.
1285

1286
    """
1287
    pass
1288

    
1289
  def Exec(self, feedback_fn):
1290
    """Verify integrity of cluster disks.
1291

1292
    """
1293
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1294

    
1295
    vg_name = self.cfg.GetVGName()
1296
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1297
    instances = [self.cfg.GetInstanceInfo(name)
1298
                 for name in self.cfg.GetInstanceList()]
1299

    
1300
    nv_dict = {}
1301
    for inst in instances:
1302
      inst_lvs = {}
1303
      if (not inst.admin_up or
1304
          inst.disk_template not in constants.DTS_NET_MIRROR):
1305
        continue
1306
      inst.MapLVsByNode(inst_lvs)
1307
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1308
      for node, vol_list in inst_lvs.iteritems():
1309
        for vol in vol_list:
1310
          nv_dict[(node, vol)] = inst
1311

    
1312
    if not nv_dict:
1313
      return result
1314

    
1315
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1316

    
1317
    to_act = set()
1318
    for node in nodes:
1319
      # node_volume
1320
      lvs = node_lvs[node]
1321
      if lvs.failed:
1322
        if not lvs.offline:
1323
          self.LogWarning("Connection to node %s failed: %s" %
1324
                          (node, lvs.data))
1325
        continue
1326
      lvs = lvs.data
1327
      if isinstance(lvs, basestring):
1328
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1329
        res_nlvm[node] = lvs
1330
        continue
1331
      elif not isinstance(lvs, dict):
1332
        logging.warning("Connection to node %s failed or invalid data"
1333
                        " returned", node)
1334
        res_nodes.append(node)
1335
        continue
1336

    
1337
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1338
        inst = nv_dict.pop((node, lv_name), None)
1339
        if (not lv_online and inst is not None
1340
            and inst.name not in res_instances):
1341
          res_instances.append(inst.name)
1342

    
1343
    # any leftover items in nv_dict are missing LVs, let's arrange the
1344
    # data better
1345
    for key, inst in nv_dict.iteritems():
1346
      if inst.name not in res_missing:
1347
        res_missing[inst.name] = []
1348
      res_missing[inst.name].append(key)
1349

    
1350
    return result
1351

    
1352

    
1353
class LURenameCluster(LogicalUnit):
1354
  """Rename the cluster.
1355

1356
  """
1357
  HPATH = "cluster-rename"
1358
  HTYPE = constants.HTYPE_CLUSTER
1359
  _OP_REQP = ["name"]
1360

    
1361
  def BuildHooksEnv(self):
1362
    """Build hooks env.
1363

1364
    """
1365
    env = {
1366
      "OP_TARGET": self.cfg.GetClusterName(),
1367
      "NEW_NAME": self.op.name,
1368
      }
1369
    mn = self.cfg.GetMasterNode()
1370
    return env, [mn], [mn]
1371

    
1372
  def CheckPrereq(self):
1373
    """Verify that the passed name is a valid one.
1374

1375
    """
1376
    hostname = utils.HostInfo(self.op.name)
1377

    
1378
    new_name = hostname.name
1379
    self.ip = new_ip = hostname.ip
1380
    old_name = self.cfg.GetClusterName()
1381
    old_ip = self.cfg.GetMasterIP()
1382
    if new_name == old_name and new_ip == old_ip:
1383
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1384
                                 " cluster has changed")
1385
    if new_ip != old_ip:
1386
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1387
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1388
                                   " reachable on the network. Aborting." %
1389
                                   new_ip)
1390

    
1391
    self.op.name = new_name
1392

    
1393
  def Exec(self, feedback_fn):
1394
    """Rename the cluster.
1395

1396
    """
1397
    clustername = self.op.name
1398
    ip = self.ip
1399

    
1400
    # shutdown the master IP
1401
    master = self.cfg.GetMasterNode()
1402
    result = self.rpc.call_node_stop_master(master, False)
1403
    if result.failed or not result.data:
1404
      raise errors.OpExecError("Could not disable the master role")
1405

    
1406
    try:
1407
      cluster = self.cfg.GetClusterInfo()
1408
      cluster.cluster_name = clustername
1409
      cluster.master_ip = ip
1410
      self.cfg.Update(cluster)
1411

    
1412
      # update the known hosts file
1413
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1414
      node_list = self.cfg.GetNodeList()
1415
      try:
1416
        node_list.remove(master)
1417
      except ValueError:
1418
        pass
1419
      result = self.rpc.call_upload_file(node_list,
1420
                                         constants.SSH_KNOWN_HOSTS_FILE)
1421
      for to_node, to_result in result.iteritems():
1422
         msg = to_result.RemoteFailMsg()
1423
         if msg:
1424
           msg = ("Copy of file %s to node %s failed: %s" %
1425
                   (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1426
           self.proc.LogWarning(msg)
1427

    
1428
    finally:
1429
      result = self.rpc.call_node_start_master(master, False)
1430
      if result.failed or not result.data:
1431
        self.LogWarning("Could not re-enable the master role on"
1432
                        " the master, please restart manually.")
1433

    
1434

    
1435
def _RecursiveCheckIfLVMBased(disk):
1436
  """Check if the given disk or its children are lvm-based.
1437

1438
  @type disk: L{objects.Disk}
1439
  @param disk: the disk to check
1440
  @rtype: booleean
1441
  @return: boolean indicating whether a LD_LV dev_type was found or not
1442

1443
  """
1444
  if disk.children:
1445
    for chdisk in disk.children:
1446
      if _RecursiveCheckIfLVMBased(chdisk):
1447
        return True
1448
  return disk.dev_type == constants.LD_LV
1449

    
1450

    
1451
class LUSetClusterParams(LogicalUnit):
1452
  """Change the parameters of the cluster.
1453

1454
  """
1455
  HPATH = "cluster-modify"
1456
  HTYPE = constants.HTYPE_CLUSTER
1457
  _OP_REQP = []
1458
  REQ_BGL = False
1459

    
1460
  def CheckArguments(self):
1461
    """Check parameters
1462

1463
    """
1464
    if not hasattr(self.op, "candidate_pool_size"):
1465
      self.op.candidate_pool_size = None
1466
    if self.op.candidate_pool_size is not None:
1467
      try:
1468
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1469
      except (ValueError, TypeError), err:
1470
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1471
                                   str(err))
1472
      if self.op.candidate_pool_size < 1:
1473
        raise errors.OpPrereqError("At least one master candidate needed")
1474

    
1475
  def ExpandNames(self):
1476
    # FIXME: in the future maybe other cluster params won't require checking on
1477
    # all nodes to be modified.
1478
    self.needed_locks = {
1479
      locking.LEVEL_NODE: locking.ALL_SET,
1480
    }
1481
    self.share_locks[locking.LEVEL_NODE] = 1
1482

    
1483
  def BuildHooksEnv(self):
1484
    """Build hooks env.
1485

1486
    """
1487
    env = {
1488
      "OP_TARGET": self.cfg.GetClusterName(),
1489
      "NEW_VG_NAME": self.op.vg_name,
1490
      }
1491
    mn = self.cfg.GetMasterNode()
1492
    return env, [mn], [mn]
1493

    
1494
  def CheckPrereq(self):
1495
    """Check prerequisites.
1496

1497
    This checks whether the given params don't conflict and
1498
    if the given volume group is valid.
1499

1500
    """
1501
    if self.op.vg_name is not None and not self.op.vg_name:
1502
      instances = self.cfg.GetAllInstancesInfo().values()
1503
      for inst in instances:
1504
        for disk in inst.disks:
1505
          if _RecursiveCheckIfLVMBased(disk):
1506
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1507
                                       " lvm-based instances exist")
1508

    
1509
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1510

    
1511
    # if vg_name not None, checks given volume group on all nodes
1512
    if self.op.vg_name:
1513
      vglist = self.rpc.call_vg_list(node_list)
1514
      for node in node_list:
1515
        if vglist[node].failed:
1516
          # ignoring down node
1517
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1518
          continue
1519
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1520
                                              self.op.vg_name,
1521
                                              constants.MIN_VG_SIZE)
1522
        if vgstatus:
1523
          raise errors.OpPrereqError("Error on node '%s': %s" %
1524
                                     (node, vgstatus))
1525

    
1526
    self.cluster = cluster = self.cfg.GetClusterInfo()
1527
    # validate params changes
1528
    if self.op.beparams:
1529
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1530
      self.new_beparams = objects.FillDict(
1531
        cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1532

    
1533
    if self.op.nicparams:
1534
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1535
      self.new_nicparams = objects.FillDict(
1536
        cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1537
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1538

    
1539
    # hypervisor list/parameters
1540
    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1541
    if self.op.hvparams:
1542
      if not isinstance(self.op.hvparams, dict):
1543
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1544
      for hv_name, hv_dict in self.op.hvparams.items():
1545
        if hv_name not in self.new_hvparams:
1546
          self.new_hvparams[hv_name] = hv_dict
1547
        else:
1548
          self.new_hvparams[hv_name].update(hv_dict)
1549

    
1550
    if self.op.enabled_hypervisors is not None:
1551
      self.hv_list = self.op.enabled_hypervisors
1552
    else:
1553
      self.hv_list = cluster.enabled_hypervisors
1554

    
1555
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1556
      # either the enabled list has changed, or the parameters have, validate
1557
      for hv_name, hv_params in self.new_hvparams.items():
1558
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1559
            (self.op.enabled_hypervisors and
1560
             hv_name in self.op.enabled_hypervisors)):
1561
          # either this is a new hypervisor, or its parameters have changed
1562
          hv_class = hypervisor.GetHypervisor(hv_name)
1563
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1564
          hv_class.CheckParameterSyntax(hv_params)
1565
          _CheckHVParams(self, node_list, hv_name, hv_params)
1566

    
1567
  def Exec(self, feedback_fn):
1568
    """Change the parameters of the cluster.
1569

1570
    """
1571
    if self.op.vg_name is not None:
1572
      new_volume = self.op.vg_name
1573
      if not new_volume:
1574
        new_volume = None
1575
      if new_volume != self.cfg.GetVGName():
1576
        self.cfg.SetVGName(new_volume)
1577
      else:
1578
        feedback_fn("Cluster LVM configuration already in desired"
1579
                    " state, not changing")
1580
    if self.op.hvparams:
1581
      self.cluster.hvparams = self.new_hvparams
1582
    if self.op.enabled_hypervisors is not None:
1583
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1584
    if self.op.beparams:
1585
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1586
    if self.op.nicparams:
1587
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1588

    
1589
    if self.op.candidate_pool_size is not None:
1590
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1591

    
1592
    self.cfg.Update(self.cluster)
1593

    
1594
    # we want to update nodes after the cluster so that if any errors
1595
    # happen, we have recorded and saved the cluster info
1596
    if self.op.candidate_pool_size is not None:
1597
      _AdjustCandidatePool(self)
1598

    
1599

    
1600
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1601
  """Distribute additional files which are part of the cluster configuration.
1602

1603
  ConfigWriter takes care of distributing the config and ssconf files, but
1604
  there are more files which should be distributed to all nodes. This function
1605
  makes sure those are copied.
1606

1607
  @param lu: calling logical unit
1608
  @param additional_nodes: list of nodes not in the config to distribute to
1609

1610
  """
1611
  # 1. Gather target nodes
1612
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1613
  dist_nodes = lu.cfg.GetNodeList()
1614
  if additional_nodes is not None:
1615
    dist_nodes.extend(additional_nodes)
1616
  if myself.name in dist_nodes:
1617
    dist_nodes.remove(myself.name)
1618
  # 2. Gather files to distribute
1619
  dist_files = set([constants.ETC_HOSTS,
1620
                    constants.SSH_KNOWN_HOSTS_FILE,
1621
                    constants.RAPI_CERT_FILE,
1622
                    constants.RAPI_USERS_FILE,
1623
                   ])
1624

    
1625
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1626
  for hv_name in enabled_hypervisors:
1627
    hv_class = hypervisor.GetHypervisor(hv_name)
1628
    dist_files.update(hv_class.GetAncillaryFiles())
1629

    
1630
  # 3. Perform the files upload
1631
  for fname in dist_files:
1632
    if os.path.exists(fname):
1633
      result = lu.rpc.call_upload_file(dist_nodes, fname)
1634
      for to_node, to_result in result.items():
1635
         msg = to_result.RemoteFailMsg()
1636
         if msg:
1637
           msg = ("Copy of file %s to node %s failed: %s" %
1638
                   (fname, to_node, msg))
1639
           lu.proc.LogWarning(msg)
1640

    
1641

    
1642
class LURedistributeConfig(NoHooksLU):
1643
  """Force the redistribution of cluster configuration.
1644

1645
  This is a very simple LU.
1646

1647
  """
1648
  _OP_REQP = []
1649
  REQ_BGL = False
1650

    
1651
  def ExpandNames(self):
1652
    self.needed_locks = {
1653
      locking.LEVEL_NODE: locking.ALL_SET,
1654
    }
1655
    self.share_locks[locking.LEVEL_NODE] = 1
1656

    
1657
  def CheckPrereq(self):
1658
    """Check prerequisites.
1659

1660
    """
1661

    
1662
  def Exec(self, feedback_fn):
1663
    """Redistribute the configuration.
1664

1665
    """
1666
    self.cfg.Update(self.cfg.GetClusterInfo())
1667
    _RedistributeAncillaryFiles(self)
1668

    
1669

    
1670
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1671
  """Sleep and poll for an instance's disk to sync.
1672

1673
  """
1674
  if not instance.disks:
1675
    return True
1676

    
1677
  if not oneshot:
1678
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1679

    
1680
  node = instance.primary_node
1681

    
1682
  for dev in instance.disks:
1683
    lu.cfg.SetDiskID(dev, node)
1684

    
1685
  retries = 0
1686
  while True:
1687
    max_time = 0
1688
    done = True
1689
    cumul_degraded = False
1690
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1691
    msg = rstats.RemoteFailMsg()
1692
    if msg:
1693
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1694
      retries += 1
1695
      if retries >= 10:
1696
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1697
                                 " aborting." % node)
1698
      time.sleep(6)
1699
      continue
1700
    rstats = rstats.payload
1701
    retries = 0
1702
    for i, mstat in enumerate(rstats):
1703
      if mstat is None:
1704
        lu.LogWarning("Can't compute data for node %s/%s",
1705
                           node, instance.disks[i].iv_name)
1706
        continue
1707
      # we ignore the ldisk parameter
1708
      perc_done, est_time, is_degraded, _ = mstat
1709
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1710
      if perc_done is not None:
1711
        done = False
1712
        if est_time is not None:
1713
          rem_time = "%d estimated seconds remaining" % est_time
1714
          max_time = est_time
1715
        else:
1716
          rem_time = "no time estimate"
1717
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1718
                        (instance.disks[i].iv_name, perc_done, rem_time))
1719
    if done or oneshot:
1720
      break
1721

    
1722
    time.sleep(min(60, max_time))
1723

    
1724
  if done:
1725
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1726
  return not cumul_degraded
1727

    
1728

    
1729
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1730
  """Check that mirrors are not degraded.
1731

1732
  The ldisk parameter, if True, will change the test from the
1733
  is_degraded attribute (which represents overall non-ok status for
1734
  the device(s)) to the ldisk (representing the local storage status).
1735

1736
  """
1737
  lu.cfg.SetDiskID(dev, node)
1738
  if ldisk:
1739
    idx = 6
1740
  else:
1741
    idx = 5
1742

    
1743
  result = True
1744
  if on_primary or dev.AssembleOnSecondary():
1745
    rstats = lu.rpc.call_blockdev_find(node, dev)
1746
    msg = rstats.RemoteFailMsg()
1747
    if msg:
1748
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1749
      result = False
1750
    elif not rstats.payload:
1751
      lu.LogWarning("Can't find disk on node %s", node)
1752
      result = False
1753
    else:
1754
      result = result and (not rstats.payload[idx])
1755
  if dev.children:
1756
    for child in dev.children:
1757
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1758

    
1759
  return result
1760

    
1761

    
1762
class LUDiagnoseOS(NoHooksLU):
1763
  """Logical unit for OS diagnose/query.
1764

1765
  """
1766
  _OP_REQP = ["output_fields", "names"]
1767
  REQ_BGL = False
1768
  _FIELDS_STATIC = utils.FieldSet()
1769
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1770

    
1771
  def ExpandNames(self):
1772
    if self.op.names:
1773
      raise errors.OpPrereqError("Selective OS query not supported")
1774

    
1775
    _CheckOutputFields(static=self._FIELDS_STATIC,
1776
                       dynamic=self._FIELDS_DYNAMIC,
1777
                       selected=self.op.output_fields)
1778

    
1779
    # Lock all nodes, in shared mode
1780
    # Temporary removal of locks, should be reverted later
1781
    # TODO: reintroduce locks when they are lighter-weight
1782
    self.needed_locks = {}
1783
    #self.share_locks[locking.LEVEL_NODE] = 1
1784
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1785

    
1786
  def CheckPrereq(self):
1787
    """Check prerequisites.
1788

1789
    """
1790

    
1791
  @staticmethod
1792
  def _DiagnoseByOS(node_list, rlist):
1793
    """Remaps a per-node return list into an a per-os per-node dictionary
1794

1795
    @param node_list: a list with the names of all nodes
1796
    @param rlist: a map with node names as keys and OS objects as values
1797

1798
    @rtype: dict
1799
    @return: a dictionary with osnames as keys and as value another map, with
1800
        nodes as keys and list of OS objects as values, eg::
1801

1802
          {"debian-etch": {"node1": [<object>,...],
1803
                           "node2": [<object>,]}
1804
          }
1805

1806
    """
1807
    all_os = {}
1808
    # we build here the list of nodes that didn't fail the RPC (at RPC
1809
    # level), so that nodes with a non-responding node daemon don't
1810
    # make all OSes invalid
1811
    good_nodes = [node_name for node_name in rlist
1812
                  if not rlist[node_name].failed]
1813
    for node_name, nr in rlist.iteritems():
1814
      if nr.failed or not nr.data:
1815
        continue
1816
      for os_obj in nr.data:
1817
        if os_obj.name not in all_os:
1818
          # build a list of nodes for this os containing empty lists
1819
          # for each node in node_list
1820
          all_os[os_obj.name] = {}
1821
          for nname in good_nodes:
1822
            all_os[os_obj.name][nname] = []
1823
        all_os[os_obj.name][node_name].append(os_obj)
1824
    return all_os
1825

    
1826
  def Exec(self, feedback_fn):
1827
    """Compute the list of OSes.
1828

1829
    """
1830
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1831
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1832
    if node_data == False:
1833
      raise errors.OpExecError("Can't gather the list of OSes")
1834
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1835
    output = []
1836
    for os_name, os_data in pol.iteritems():
1837
      row = []
1838
      for field in self.op.output_fields:
1839
        if field == "name":
1840
          val = os_name
1841
        elif field == "valid":
1842
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1843
        elif field == "node_status":
1844
          val = {}
1845
          for node_name, nos_list in os_data.iteritems():
1846
            val[node_name] = [(v.status, v.path) for v in nos_list]
1847
        else:
1848
          raise errors.ParameterError(field)
1849
        row.append(val)
1850
      output.append(row)
1851

    
1852
    return output
1853

    
1854

    
1855
class LURemoveNode(LogicalUnit):
1856
  """Logical unit for removing a node.
1857

1858
  """
1859
  HPATH = "node-remove"
1860
  HTYPE = constants.HTYPE_NODE
1861
  _OP_REQP = ["node_name"]
1862

    
1863
  def BuildHooksEnv(self):
1864
    """Build hooks env.
1865

1866
    This doesn't run on the target node in the pre phase as a failed
1867
    node would then be impossible to remove.
1868

1869
    """
1870
    env = {
1871
      "OP_TARGET": self.op.node_name,
1872
      "NODE_NAME": self.op.node_name,
1873
      }
1874
    all_nodes = self.cfg.GetNodeList()
1875
    all_nodes.remove(self.op.node_name)
1876
    return env, all_nodes, all_nodes
1877

    
1878
  def CheckPrereq(self):
1879
    """Check prerequisites.
1880

1881
    This checks:
1882
     - the node exists in the configuration
1883
     - it does not have primary or secondary instances
1884
     - it's not the master
1885

1886
    Any errors are signalled by raising errors.OpPrereqError.
1887

1888
    """
1889
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1890
    if node is None:
1891
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1892

    
1893
    instance_list = self.cfg.GetInstanceList()
1894

    
1895
    masternode = self.cfg.GetMasterNode()
1896
    if node.name == masternode:
1897
      raise errors.OpPrereqError("Node is the master node,"
1898
                                 " you need to failover first.")
1899

    
1900
    for instance_name in instance_list:
1901
      instance = self.cfg.GetInstanceInfo(instance_name)
1902
      if node.name in instance.all_nodes:
1903
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1904
                                   " please remove first." % instance_name)
1905
    self.op.node_name = node.name
1906
    self.node = node
1907

    
1908
  def Exec(self, feedback_fn):
1909
    """Removes the node from the cluster.
1910

1911
    """
1912
    node = self.node
1913
    logging.info("Stopping the node daemon and removing configs from node %s",
1914
                 node.name)
1915

    
1916
    self.context.RemoveNode(node.name)
1917

    
1918
    self.rpc.call_node_leave_cluster(node.name)
1919

    
1920
    # Promote nodes to master candidate as needed
1921
    _AdjustCandidatePool(self)
1922

    
1923

    
1924
class LUQueryNodes(NoHooksLU):
1925
  """Logical unit for querying nodes.
1926

1927
  """
1928
  _OP_REQP = ["output_fields", "names", "use_locking"]
1929
  REQ_BGL = False
1930
  _FIELDS_DYNAMIC = utils.FieldSet(
1931
    "dtotal", "dfree",
1932
    "mtotal", "mnode", "mfree",
1933
    "bootid",
1934
    "ctotal", "cnodes", "csockets",
1935
    )
1936

    
1937
  _FIELDS_STATIC = utils.FieldSet(
1938
    "name", "pinst_cnt", "sinst_cnt",
1939
    "pinst_list", "sinst_list",
1940
    "pip", "sip", "tags",
1941
    "serial_no",
1942
    "master_candidate",
1943
    "master",
1944
    "offline",
1945
    "drained",
1946
    )
1947

    
1948
  def ExpandNames(self):
1949
    _CheckOutputFields(static=self._FIELDS_STATIC,
1950
                       dynamic=self._FIELDS_DYNAMIC,
1951
                       selected=self.op.output_fields)
1952

    
1953
    self.needed_locks = {}
1954
    self.share_locks[locking.LEVEL_NODE] = 1
1955

    
1956
    if self.op.names:
1957
      self.wanted = _GetWantedNodes(self, self.op.names)
1958
    else:
1959
      self.wanted = locking.ALL_SET
1960

    
1961
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1962
    self.do_locking = self.do_node_query and self.op.use_locking
1963
    if self.do_locking:
1964
      # if we don't request only static fields, we need to lock the nodes
1965
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1966

    
1967

    
1968
  def CheckPrereq(self):
1969
    """Check prerequisites.
1970

1971
    """
1972
    # The validation of the node list is done in the _GetWantedNodes,
1973
    # if non empty, and if empty, there's no validation to do
1974
    pass
1975

    
1976
  def Exec(self, feedback_fn):
1977
    """Computes the list of nodes and their attributes.
1978

1979
    """
1980
    all_info = self.cfg.GetAllNodesInfo()
1981
    if self.do_locking:
1982
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1983
    elif self.wanted != locking.ALL_SET:
1984
      nodenames = self.wanted
1985
      missing = set(nodenames).difference(all_info.keys())
1986
      if missing:
1987
        raise errors.OpExecError(
1988
          "Some nodes were removed before retrieving their data: %s" % missing)
1989
    else:
1990
      nodenames = all_info.keys()
1991

    
1992
    nodenames = utils.NiceSort(nodenames)
1993
    nodelist = [all_info[name] for name in nodenames]
1994

    
1995
    # begin data gathering
1996

    
1997
    if self.do_node_query:
1998
      live_data = {}
1999
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2000
                                          self.cfg.GetHypervisorType())
2001
      for name in nodenames:
2002
        nodeinfo = node_data[name]
2003
        if not nodeinfo.failed and nodeinfo.data:
2004
          nodeinfo = nodeinfo.data
2005
          fn = utils.TryConvert
2006
          live_data[name] = {
2007
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2008
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2009
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
2010
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2011
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
2012
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2013
            "bootid": nodeinfo.get('bootid', None),
2014
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2015
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2016
            }
2017
        else:
2018
          live_data[name] = {}
2019
    else:
2020
      live_data = dict.fromkeys(nodenames, {})
2021

    
2022
    node_to_primary = dict([(name, set()) for name in nodenames])
2023
    node_to_secondary = dict([(name, set()) for name in nodenames])
2024

    
2025
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
2026
                             "sinst_cnt", "sinst_list"))
2027
    if inst_fields & frozenset(self.op.output_fields):
2028
      instancelist = self.cfg.GetInstanceList()
2029

    
2030
      for instance_name in instancelist:
2031
        inst = self.cfg.GetInstanceInfo(instance_name)
2032
        if inst.primary_node in node_to_primary:
2033
          node_to_primary[inst.primary_node].add(inst.name)
2034
        for secnode in inst.secondary_nodes:
2035
          if secnode in node_to_secondary:
2036
            node_to_secondary[secnode].add(inst.name)
2037

    
2038
    master_node = self.cfg.GetMasterNode()
2039

    
2040
    # end data gathering
2041

    
2042
    output = []
2043
    for node in nodelist:
2044
      node_output = []
2045
      for field in self.op.output_fields:
2046
        if field == "name":
2047
          val = node.name
2048
        elif field == "pinst_list":
2049
          val = list(node_to_primary[node.name])
2050
        elif field == "sinst_list":
2051
          val = list(node_to_secondary[node.name])
2052
        elif field == "pinst_cnt":
2053
          val = len(node_to_primary[node.name])
2054
        elif field == "sinst_cnt":
2055
          val = len(node_to_secondary[node.name])
2056
        elif field == "pip":
2057
          val = node.primary_ip
2058
        elif field == "sip":
2059
          val = node.secondary_ip
2060
        elif field == "tags":
2061
          val = list(node.GetTags())
2062
        elif field == "serial_no":
2063
          val = node.serial_no
2064
        elif field == "master_candidate":
2065
          val = node.master_candidate
2066
        elif field == "master":
2067
          val = node.name == master_node
2068
        elif field == "offline":
2069
          val = node.offline
2070
        elif field == "drained":
2071
          val = node.drained
2072
        elif self._FIELDS_DYNAMIC.Matches(field):
2073
          val = live_data[node.name].get(field, None)
2074
        else:
2075
          raise errors.ParameterError(field)
2076
        node_output.append(val)
2077
      output.append(node_output)
2078

    
2079
    return output
2080

    
2081

    
2082
class LUQueryNodeVolumes(NoHooksLU):
2083
  """Logical unit for getting volumes on node(s).
2084

2085
  """
2086
  _OP_REQP = ["nodes", "output_fields"]
2087
  REQ_BGL = False
2088
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2089
  _FIELDS_STATIC = utils.FieldSet("node")
2090

    
2091
  def ExpandNames(self):
2092
    _CheckOutputFields(static=self._FIELDS_STATIC,
2093
                       dynamic=self._FIELDS_DYNAMIC,
2094
                       selected=self.op.output_fields)
2095

    
2096
    self.needed_locks = {}
2097
    self.share_locks[locking.LEVEL_NODE] = 1
2098
    if not self.op.nodes:
2099
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2100
    else:
2101
      self.needed_locks[locking.LEVEL_NODE] = \
2102
        _GetWantedNodes(self, self.op.nodes)
2103

    
2104
  def CheckPrereq(self):
2105
    """Check prerequisites.
2106

2107
    This checks that the fields required are valid output fields.
2108

2109
    """
2110
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2111

    
2112
  def Exec(self, feedback_fn):
2113
    """Computes the list of nodes and their attributes.
2114

2115
    """
2116
    nodenames = self.nodes
2117
    volumes = self.rpc.call_node_volumes(nodenames)
2118

    
2119
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2120
             in self.cfg.GetInstanceList()]
2121

    
2122
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2123

    
2124
    output = []
2125
    for node in nodenames:
2126
      if node not in volumes or volumes[node].failed or not volumes[node].data:
2127
        continue
2128

    
2129
      node_vols = volumes[node].data[:]
2130
      node_vols.sort(key=lambda vol: vol['dev'])
2131

    
2132
      for vol in node_vols:
2133
        node_output = []
2134
        for field in self.op.output_fields:
2135
          if field == "node":
2136
            val = node
2137
          elif field == "phys":
2138
            val = vol['dev']
2139
          elif field == "vg":
2140
            val = vol['vg']
2141
          elif field == "name":
2142
            val = vol['name']
2143
          elif field == "size":
2144
            val = int(float(vol['size']))
2145
          elif field == "instance":
2146
            for inst in ilist:
2147
              if node not in lv_by_node[inst]:
2148
                continue
2149
              if vol['name'] in lv_by_node[inst][node]:
2150
                val = inst.name
2151
                break
2152
            else:
2153
              val = '-'
2154
          else:
2155
            raise errors.ParameterError(field)
2156
          node_output.append(str(val))
2157

    
2158
        output.append(node_output)
2159

    
2160
    return output
2161

    
2162

    
2163
class LUAddNode(LogicalUnit):
2164
  """Logical unit for adding node to the cluster.
2165

2166
  """
2167
  HPATH = "node-add"
2168
  HTYPE = constants.HTYPE_NODE
2169
  _OP_REQP = ["node_name"]
2170

    
2171
  def BuildHooksEnv(self):
2172
    """Build hooks env.
2173

2174
    This will run on all nodes before, and on all nodes + the new node after.
2175

2176
    """
2177
    env = {
2178
      "OP_TARGET": self.op.node_name,
2179
      "NODE_NAME": self.op.node_name,
2180
      "NODE_PIP": self.op.primary_ip,
2181
      "NODE_SIP": self.op.secondary_ip,
2182
      }
2183
    nodes_0 = self.cfg.GetNodeList()
2184
    nodes_1 = nodes_0 + [self.op.node_name, ]
2185
    return env, nodes_0, nodes_1
2186

    
2187
  def CheckPrereq(self):
2188
    """Check prerequisites.
2189

2190
    This checks:
2191
     - the new node is not already in the config
2192
     - it is resolvable
2193
     - its parameters (single/dual homed) matches the cluster
2194

2195
    Any errors are signalled by raising errors.OpPrereqError.
2196

2197
    """
2198
    node_name = self.op.node_name
2199
    cfg = self.cfg
2200

    
2201
    dns_data = utils.HostInfo(node_name)
2202

    
2203
    node = dns_data.name
2204
    primary_ip = self.op.primary_ip = dns_data.ip
2205
    secondary_ip = getattr(self.op, "secondary_ip", None)
2206
    if secondary_ip is None:
2207
      secondary_ip = primary_ip
2208
    if not utils.IsValidIP(secondary_ip):
2209
      raise errors.OpPrereqError("Invalid secondary IP given")
2210
    self.op.secondary_ip = secondary_ip
2211

    
2212
    node_list = cfg.GetNodeList()
2213
    if not self.op.readd and node in node_list:
2214
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2215
                                 node)
2216
    elif self.op.readd and node not in node_list:
2217
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2218

    
2219
    for existing_node_name in node_list:
2220
      existing_node = cfg.GetNodeInfo(existing_node_name)
2221

    
2222
      if self.op.readd and node == existing_node_name:
2223
        if (existing_node.primary_ip != primary_ip or
2224
            existing_node.secondary_ip != secondary_ip):
2225
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2226
                                     " address configuration as before")
2227
        continue
2228

    
2229
      if (existing_node.primary_ip == primary_ip or
2230
          existing_node.secondary_ip == primary_ip or
2231
          existing_node.primary_ip == secondary_ip or
2232
          existing_node.secondary_ip == secondary_ip):
2233
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2234
                                   " existing node %s" % existing_node.name)
2235

    
2236
    # check that the type of the node (single versus dual homed) is the
2237
    # same as for the master
2238
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2239
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2240
    newbie_singlehomed = secondary_ip == primary_ip
2241
    if master_singlehomed != newbie_singlehomed:
2242
      if master_singlehomed:
2243
        raise errors.OpPrereqError("The master has no private ip but the"
2244
                                   " new node has one")
2245
      else:
2246
        raise errors.OpPrereqError("The master has a private ip but the"
2247
                                   " new node doesn't have one")
2248

    
2249
    # checks reachablity
2250
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2251
      raise errors.OpPrereqError("Node not reachable by ping")
2252

    
2253
    if not newbie_singlehomed:
2254
      # check reachability from my secondary ip to newbie's secondary ip
2255
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2256
                           source=myself.secondary_ip):
2257
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2258
                                   " based ping to noded port")
2259

    
2260
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2261
    mc_now, _ = self.cfg.GetMasterCandidateStats()
2262
    master_candidate = mc_now < cp_size
2263

    
2264
    self.new_node = objects.Node(name=node,
2265
                                 primary_ip=primary_ip,
2266
                                 secondary_ip=secondary_ip,
2267
                                 master_candidate=master_candidate,
2268
                                 offline=False, drained=False)
2269

    
2270
  def Exec(self, feedback_fn):
2271
    """Adds the new node to the cluster.
2272

2273
    """
2274
    new_node = self.new_node
2275
    node = new_node.name
2276

    
2277
    # check connectivity
2278
    result = self.rpc.call_version([node])[node]
2279
    result.Raise()
2280
    if result.data:
2281
      if constants.PROTOCOL_VERSION == result.data:
2282
        logging.info("Communication to node %s fine, sw version %s match",
2283
                     node, result.data)
2284
      else:
2285
        raise errors.OpExecError("Version mismatch master version %s,"
2286
                                 " node version %s" %
2287
                                 (constants.PROTOCOL_VERSION, result.data))
2288
    else:
2289
      raise errors.OpExecError("Cannot get version from the new node")
2290

    
2291
    # setup ssh on node
2292
    logging.info("Copy ssh key to node %s", node)
2293
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2294
    keyarray = []
2295
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2296
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2297
                priv_key, pub_key]
2298

    
2299
    for i in keyfiles:
2300
      f = open(i, 'r')
2301
      try:
2302
        keyarray.append(f.read())
2303
      finally:
2304
        f.close()
2305

    
2306
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2307
                                    keyarray[2],
2308
                                    keyarray[3], keyarray[4], keyarray[5])
2309

    
2310
    msg = result.RemoteFailMsg()
2311
    if msg:
2312
      raise errors.OpExecError("Cannot transfer ssh keys to the"
2313
                               " new node: %s" % msg)
2314

    
2315
    # Add node to our /etc/hosts, and add key to known_hosts
2316
    if self.cfg.GetClusterInfo().modify_etc_hosts:
2317
      utils.AddHostToEtcHosts(new_node.name)
2318

    
2319
    if new_node.secondary_ip != new_node.primary_ip:
2320
      result = self.rpc.call_node_has_ip_address(new_node.name,
2321
                                                 new_node.secondary_ip)
2322
      if result.failed or not result.data:
2323
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2324
                                 " you gave (%s). Please fix and re-run this"
2325
                                 " command." % new_node.secondary_ip)
2326

    
2327
    node_verify_list = [self.cfg.GetMasterNode()]
2328
    node_verify_param = {
2329
      'nodelist': [node],
2330
      # TODO: do a node-net-test as well?
2331
    }
2332

    
2333
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2334
                                       self.cfg.GetClusterName())
2335
    for verifier in node_verify_list:
2336
      if result[verifier].failed or not result[verifier].data:
2337
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2338
                                 " for remote verification" % verifier)
2339
      if result[verifier].data['nodelist']:
2340
        for failed in result[verifier].data['nodelist']:
2341
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2342
                      (verifier, result[verifier].data['nodelist'][failed]))
2343
        raise errors.OpExecError("ssh/hostname verification failed.")
2344

    
2345
    if self.op.readd:
2346
      _RedistributeAncillaryFiles(self)
2347
      self.context.ReaddNode(new_node)
2348
    else:
2349
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
2350
      self.context.AddNode(new_node)
2351

    
2352

    
2353
class LUSetNodeParams(LogicalUnit):
2354
  """Modifies the parameters of a node.
2355

2356
  """
2357
  HPATH = "node-modify"
2358
  HTYPE = constants.HTYPE_NODE
2359
  _OP_REQP = ["node_name"]
2360
  REQ_BGL = False
2361

    
2362
  def CheckArguments(self):
2363
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2364
    if node_name is None:
2365
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2366
    self.op.node_name = node_name
2367
    _CheckBooleanOpField(self.op, 'master_candidate')
2368
    _CheckBooleanOpField(self.op, 'offline')
2369
    _CheckBooleanOpField(self.op, 'drained')
2370
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2371
    if all_mods.count(None) == 3:
2372
      raise errors.OpPrereqError("Please pass at least one modification")
2373
    if all_mods.count(True) > 1:
2374
      raise errors.OpPrereqError("Can't set the node into more than one"
2375
                                 " state at the same time")
2376

    
2377
  def ExpandNames(self):
2378
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2379

    
2380
  def BuildHooksEnv(self):
2381
    """Build hooks env.
2382

2383
    This runs on the master node.
2384

2385
    """
2386
    env = {
2387
      "OP_TARGET": self.op.node_name,
2388
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2389
      "OFFLINE": str(self.op.offline),
2390
      "DRAINED": str(self.op.drained),
2391
      }
2392
    nl = [self.cfg.GetMasterNode(),
2393
          self.op.node_name]
2394
    return env, nl, nl
2395

    
2396
  def CheckPrereq(self):
2397
    """Check prerequisites.
2398

2399
    This only checks the instance list against the existing names.
2400

2401
    """
2402
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2403

    
2404
    if ((self.op.master_candidate == False or self.op.offline == True or
2405
         self.op.drained == True) and node.master_candidate):
2406
      # we will demote the node from master_candidate
2407
      if self.op.node_name == self.cfg.GetMasterNode():
2408
        raise errors.OpPrereqError("The master node has to be a"
2409
                                   " master candidate, online and not drained")
2410
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2411
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2412
      if num_candidates <= cp_size:
2413
        msg = ("Not enough master candidates (desired"
2414
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2415
        if self.op.force:
2416
          self.LogWarning(msg)
2417
        else:
2418
          raise errors.OpPrereqError(msg)
2419

    
2420
    if (self.op.master_candidate == True and
2421
        ((node.offline and not self.op.offline == False) or
2422
         (node.drained and not self.op.drained == False))):
2423
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2424
                                 " to master_candidate" % node.name)
2425

    
2426
    return
2427

    
2428
  def Exec(self, feedback_fn):
2429
    """Modifies a node.
2430

2431
    """
2432
    node = self.node
2433

    
2434
    result = []
2435
    changed_mc = False
2436

    
2437
    if self.op.offline is not None:
2438
      node.offline = self.op.offline
2439
      result.append(("offline", str(self.op.offline)))
2440
      if self.op.offline == True:
2441
        if node.master_candidate:
2442
          node.master_candidate = False
2443
          changed_mc = True
2444
          result.append(("master_candidate", "auto-demotion due to offline"))
2445
        if node.drained:
2446
          node.drained = False
2447
          result.append(("drained", "clear drained status due to offline"))
2448

    
2449
    if self.op.master_candidate is not None:
2450
      node.master_candidate = self.op.master_candidate
2451
      changed_mc = True
2452
      result.append(("master_candidate", str(self.op.master_candidate)))
2453
      if self.op.master_candidate == False:
2454
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2455
        msg = rrc.RemoteFailMsg()
2456
        if msg:
2457
          self.LogWarning("Node failed to demote itself: %s" % msg)
2458

    
2459
    if self.op.drained is not None:
2460
      node.drained = self.op.drained
2461
      result.append(("drained", str(self.op.drained)))
2462
      if self.op.drained == True:
2463
        if node.master_candidate:
2464
          node.master_candidate = False
2465
          changed_mc = True
2466
          result.append(("master_candidate", "auto-demotion due to drain"))
2467
        if node.offline:
2468
          node.offline = False
2469
          result.append(("offline", "clear offline status due to drain"))
2470

    
2471
    # this will trigger configuration file update, if needed
2472
    self.cfg.Update(node)
2473
    # this will trigger job queue propagation or cleanup
2474
    if changed_mc:
2475
      self.context.ReaddNode(node)
2476

    
2477
    return result
2478

    
2479

    
2480
class LUPowercycleNode(NoHooksLU):
2481
  """Powercycles a node.
2482

2483
  """
2484
  _OP_REQP = ["node_name", "force"]
2485
  REQ_BGL = False
2486

    
2487
  def CheckArguments(self):
2488
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2489
    if node_name is None:
2490
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2491
    self.op.node_name = node_name
2492
    if node_name == self.cfg.GetMasterNode() and not self.op.force:
2493
      raise errors.OpPrereqError("The node is the master and the force"
2494
                                 " parameter was not set")
2495

    
2496
  def ExpandNames(self):
2497
    """Locking for PowercycleNode.
2498

2499
    This is a last-resource option and shouldn't block on other
2500
    jobs. Therefore, we grab no locks.
2501

2502
    """
2503
    self.needed_locks = {}
2504

    
2505
  def CheckPrereq(self):
2506
    """Check prerequisites.
2507

2508
    This LU has no prereqs.
2509

2510
    """
2511
    pass
2512

    
2513
  def Exec(self, feedback_fn):
2514
    """Reboots a node.
2515

2516
    """
2517
    result = self.rpc.call_node_powercycle(self.op.node_name,
2518
                                           self.cfg.GetHypervisorType())
2519
    msg = result.RemoteFailMsg()
2520
    if msg:
2521
      raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2522
    return result.payload
2523

    
2524

    
2525
class LUQueryClusterInfo(NoHooksLU):
2526
  """Query cluster configuration.
2527

2528
  """
2529
  _OP_REQP = []
2530
  REQ_BGL = False
2531

    
2532
  def ExpandNames(self):
2533
    self.needed_locks = {}
2534

    
2535
  def CheckPrereq(self):
2536
    """No prerequsites needed for this LU.
2537

2538
    """
2539
    pass
2540

    
2541
  def Exec(self, feedback_fn):
2542
    """Return cluster config.
2543

2544
    """
2545
    cluster = self.cfg.GetClusterInfo()
2546
    result = {
2547
      "software_version": constants.RELEASE_VERSION,
2548
      "protocol_version": constants.PROTOCOL_VERSION,
2549
      "config_version": constants.CONFIG_VERSION,
2550
      "os_api_version": constants.OS_API_VERSION,
2551
      "export_version": constants.EXPORT_VERSION,
2552
      "architecture": (platform.architecture()[0], platform.machine()),
2553
      "name": cluster.cluster_name,
2554
      "master": cluster.master_node,
2555
      "default_hypervisor": cluster.default_hypervisor,
2556
      "enabled_hypervisors": cluster.enabled_hypervisors,
2557
      "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2558
                        for hypervisor in cluster.enabled_hypervisors]),
2559
      "beparams": cluster.beparams,
2560
      "nicparams": cluster.nicparams,
2561
      "candidate_pool_size": cluster.candidate_pool_size,
2562
      "master_netdev": cluster.master_netdev,
2563
      "volume_group_name": cluster.volume_group_name,
2564
      "file_storage_dir": cluster.file_storage_dir,
2565
      }
2566

    
2567
    return result
2568

    
2569

    
2570
class LUQueryConfigValues(NoHooksLU):
2571
  """Return configuration values.
2572

2573
  """
2574
  _OP_REQP = []
2575
  REQ_BGL = False
2576
  _FIELDS_DYNAMIC = utils.FieldSet()
2577
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2578

    
2579
  def ExpandNames(self):
2580
    self.needed_locks = {}
2581

    
2582
    _CheckOutputFields(static=self._FIELDS_STATIC,
2583
                       dynamic=self._FIELDS_DYNAMIC,
2584
                       selected=self.op.output_fields)
2585

    
2586
  def CheckPrereq(self):
2587
    """No prerequisites.
2588

2589
    """
2590
    pass
2591

    
2592
  def Exec(self, feedback_fn):
2593
    """Dump a representation of the cluster config to the standard output.
2594

2595
    """
2596
    values = []
2597
    for field in self.op.output_fields:
2598
      if field == "cluster_name":
2599
        entry = self.cfg.GetClusterName()
2600
      elif field == "master_node":
2601
        entry = self.cfg.GetMasterNode()
2602
      elif field == "drain_flag":
2603
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2604
      else:
2605
        raise errors.ParameterError(field)
2606
      values.append(entry)
2607
    return values
2608

    
2609

    
2610
class LUActivateInstanceDisks(NoHooksLU):
2611
  """Bring up an instance's disks.
2612

2613
  """
2614
  _OP_REQP = ["instance_name"]
2615
  REQ_BGL = False
2616

    
2617
  def ExpandNames(self):
2618
    self._ExpandAndLockInstance()
2619
    self.needed_locks[locking.LEVEL_NODE] = []
2620
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2621

    
2622
  def DeclareLocks(self, level):
2623
    if level == locking.LEVEL_NODE:
2624
      self._LockInstancesNodes()
2625

    
2626
  def CheckPrereq(self):
2627
    """Check prerequisites.
2628

2629
    This checks that the instance is in the cluster.
2630

2631
    """
2632
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2633
    assert self.instance is not None, \
2634
      "Cannot retrieve locked instance %s" % self.op.instance_name
2635
    _CheckNodeOnline(self, self.instance.primary_node)
2636

    
2637
  def Exec(self, feedback_fn):
2638
    """Activate the disks.
2639

2640
    """
2641
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2642
    if not disks_ok:
2643
      raise errors.OpExecError("Cannot activate block devices")
2644

    
2645
    return disks_info
2646

    
2647

    
2648
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2649
  """Prepare the block devices for an instance.
2650

2651
  This sets up the block devices on all nodes.
2652

2653
  @type lu: L{LogicalUnit}
2654
  @param lu: the logical unit on whose behalf we execute
2655
  @type instance: L{objects.Instance}
2656
  @param instance: the instance for whose disks we assemble
2657
  @type ignore_secondaries: boolean
2658
  @param ignore_secondaries: if true, errors on secondary nodes
2659
      won't result in an error return from the function
2660
  @return: False if the operation failed, otherwise a list of
2661
      (host, instance_visible_name, node_visible_name)
2662
      with the mapping from node devices to instance devices
2663

2664
  """
2665
  device_info = []
2666
  disks_ok = True
2667
  iname = instance.name
2668
  # With the two passes mechanism we try to reduce the window of
2669
  # opportunity for the race condition of switching DRBD to primary
2670
  # before handshaking occured, but we do not eliminate it
2671

    
2672
  # The proper fix would be to wait (with some limits) until the
2673
  # connection has been made and drbd transitions from WFConnection
2674
  # into any other network-connected state (Connected, SyncTarget,
2675
  # SyncSource, etc.)
2676

    
2677
  # 1st pass, assemble on all nodes in secondary mode
2678
  for inst_disk in instance.disks:
2679
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2680
      lu.cfg.SetDiskID(node_disk, node)
2681
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2682
      msg = result.RemoteFailMsg()
2683
      if msg:
2684
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2685
                           " (is_primary=False, pass=1): %s",
2686
                           inst_disk.iv_name, node, msg)
2687
        if not ignore_secondaries:
2688
          disks_ok = False
2689

    
2690
  # FIXME: race condition on drbd migration to primary
2691

    
2692
  # 2nd pass, do only the primary node
2693
  for inst_disk in instance.disks:
2694
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2695
      if node != instance.primary_node:
2696
        continue
2697
      lu.cfg.SetDiskID(node_disk, node)
2698
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2699
      msg = result.RemoteFailMsg()
2700
      if msg:
2701
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2702
                           " (is_primary=True, pass=2): %s",
2703
                           inst_disk.iv_name, node, msg)
2704
        disks_ok = False
2705
    device_info.append((instance.primary_node, inst_disk.iv_name,
2706
                        result.payload))
2707

    
2708
  # leave the disks configured for the primary node
2709
  # this is a workaround that would be fixed better by
2710
  # improving the logical/physical id handling
2711
  for disk in instance.disks:
2712
    lu.cfg.SetDiskID(disk, instance.primary_node)
2713

    
2714
  return disks_ok, device_info
2715

    
2716

    
2717
def _StartInstanceDisks(lu, instance, force):
2718
  """Start the disks of an instance.
2719

2720
  """
2721
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2722
                                           ignore_secondaries=force)
2723
  if not disks_ok:
2724
    _ShutdownInstanceDisks(lu, instance)
2725
    if force is not None and not force:
2726
      lu.proc.LogWarning("", hint="If the message above refers to a"
2727
                         " secondary node,"
2728
                         " you can retry the operation using '--force'.")
2729
    raise errors.OpExecError("Disk consistency error")
2730

    
2731

    
2732
class LUDeactivateInstanceDisks(NoHooksLU):
2733
  """Shutdown an instance's disks.
2734

2735
  """
2736
  _OP_REQP = ["instance_name"]
2737
  REQ_BGL = False
2738

    
2739
  def ExpandNames(self):
2740
    self._ExpandAndLockInstance()
2741
    self.needed_locks[locking.LEVEL_NODE] = []
2742
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2743

    
2744
  def DeclareLocks(self, level):
2745
    if level == locking.LEVEL_NODE:
2746
      self._LockInstancesNodes()
2747

    
2748
  def CheckPrereq(self):
2749
    """Check prerequisites.
2750

2751
    This checks that the instance is in the cluster.
2752

2753
    """
2754
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2755
    assert self.instance is not None, \
2756
      "Cannot retrieve locked instance %s" % self.op.instance_name
2757

    
2758
  def Exec(self, feedback_fn):
2759
    """Deactivate the disks
2760

2761
    """
2762
    instance = self.instance
2763
    _SafeShutdownInstanceDisks(self, instance)
2764

    
2765

    
2766
def _SafeShutdownInstanceDisks(lu, instance):
2767
  """Shutdown block devices of an instance.
2768

2769
  This function checks if an instance is running, before calling
2770
  _ShutdownInstanceDisks.
2771

2772
  """
2773
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2774
                                      [instance.hypervisor])
2775
  ins_l = ins_l[instance.primary_node]
2776
  if ins_l.failed or not isinstance(ins_l.data, list):
2777
    raise errors.OpExecError("Can't contact node '%s'" %
2778
                             instance.primary_node)
2779

    
2780
  if instance.name in ins_l.data:
2781
    raise errors.OpExecError("Instance is running, can't shutdown"
2782
                             " block devices.")
2783

    
2784
  _ShutdownInstanceDisks(lu, instance)
2785

    
2786

    
2787
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2788
  """Shutdown block devices of an instance.
2789

2790
  This does the shutdown on all nodes of the instance.
2791

2792
  If the ignore_primary is false, errors on the primary node are
2793
  ignored.
2794

2795
  """
2796
  all_result = True
2797
  for disk in instance.disks:
2798
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2799
      lu.cfg.SetDiskID(top_disk, node)
2800
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2801
      msg = result.RemoteFailMsg()
2802
      if msg:
2803
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2804
                      disk.iv_name, node, msg)
2805
        if not ignore_primary or node != instance.primary_node:
2806
          all_result = False
2807
  return all_result
2808

    
2809

    
2810
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2811
  """Checks if a node has enough free memory.
2812

2813
  This function check if a given node has the needed amount of free
2814
  memory. In case the node has less memory or we cannot get the
2815
  information from the node, this function raise an OpPrereqError
2816
  exception.
2817

2818
  @type lu: C{LogicalUnit}
2819
  @param lu: a logical unit from which we get configuration data
2820
  @type node: C{str}
2821
  @param node: the node to check
2822
  @type reason: C{str}
2823
  @param reason: string to use in the error message
2824
  @type requested: C{int}
2825
  @param requested: the amount of memory in MiB to check for
2826
  @type hypervisor_name: C{str}
2827
  @param hypervisor_name: the hypervisor to ask for memory stats
2828
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2829
      we cannot check the node
2830

2831
  """
2832
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2833
  nodeinfo[node].Raise()
2834
  free_mem = nodeinfo[node].data.get('memory_free')
2835
  if not isinstance(free_mem, int):
2836
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2837
                             " was '%s'" % (node, free_mem))
2838
  if requested > free_mem:
2839
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2840
                             " needed %s MiB, available %s MiB" %
2841
                             (node, reason, requested, free_mem))
2842

    
2843

    
2844
class LUStartupInstance(LogicalUnit):
2845
  """Starts an instance.
2846

2847
  """
2848
  HPATH = "instance-start"
2849
  HTYPE = constants.HTYPE_INSTANCE
2850
  _OP_REQP = ["instance_name", "force"]
2851
  REQ_BGL = False
2852

    
2853
  def ExpandNames(self):
2854
    self._ExpandAndLockInstance()
2855

    
2856
  def BuildHooksEnv(self):
2857
    """Build hooks env.
2858

2859
    This runs on master, primary and secondary nodes of the instance.
2860

2861
    """
2862
    env = {
2863
      "FORCE": self.op.force,
2864
      }
2865
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2866
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2867
    return env, nl, nl
2868

    
2869
  def CheckPrereq(self):
2870
    """Check prerequisites.
2871

2872
    This checks that the instance is in the cluster.
2873

2874
    """
2875
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2876
    assert self.instance is not None, \
2877
      "Cannot retrieve locked instance %s" % self.op.instance_name
2878

    
2879
    # extra beparams
2880
    self.beparams = getattr(self.op, "beparams", {})
2881
    if self.beparams:
2882
      if not isinstance(self.beparams, dict):
2883
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2884
                                   " dict" % (type(self.beparams), ))
2885
      # fill the beparams dict
2886
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2887
      self.op.beparams = self.beparams
2888

    
2889
    # extra hvparams
2890
    self.hvparams = getattr(self.op, "hvparams", {})
2891
    if self.hvparams:
2892
      if not isinstance(self.hvparams, dict):
2893
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2894
                                   " dict" % (type(self.hvparams), ))
2895

    
2896
      # check hypervisor parameter syntax (locally)
2897
      cluster = self.cfg.GetClusterInfo()
2898
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2899
      filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2900
                                    instance.hvparams)
2901
      filled_hvp.update(self.hvparams)
2902
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2903
      hv_type.CheckParameterSyntax(filled_hvp)
2904
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2905
      self.op.hvparams = self.hvparams
2906

    
2907
    _CheckNodeOnline(self, instance.primary_node)
2908

    
2909
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2910
    # check bridges existance
2911
    _CheckInstanceBridgesExist(self, instance)
2912

    
2913
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2914
                                              instance.name,
2915
                                              instance.hypervisor)
2916
    remote_info.Raise()
2917
    if not remote_info.data:
2918
      _CheckNodeFreeMemory(self, instance.primary_node,
2919
                           "starting instance %s" % instance.name,
2920
                           bep[constants.BE_MEMORY], instance.hypervisor)
2921

    
2922
  def Exec(self, feedback_fn):
2923
    """Start the instance.
2924

2925
    """
2926
    instance = self.instance
2927
    force = self.op.force
2928

    
2929
    self.cfg.MarkInstanceUp(instance.name)
2930

    
2931
    node_current = instance.primary_node
2932

    
2933
    _StartInstanceDisks(self, instance, force)
2934

    
2935
    result = self.rpc.call_instance_start(node_current, instance,
2936
                                          self.hvparams, self.beparams)
2937
    msg = result.RemoteFailMsg()
2938
    if msg:
2939
      _ShutdownInstanceDisks(self, instance)
2940
      raise errors.OpExecError("Could not start instance: %s" % msg)
2941

    
2942

    
2943
class LURebootInstance(LogicalUnit):
2944
  """Reboot an instance.
2945

2946
  """
2947
  HPATH = "instance-reboot"
2948
  HTYPE = constants.HTYPE_INSTANCE
2949
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2950
  REQ_BGL = False
2951

    
2952
  def ExpandNames(self):
2953
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2954
                                   constants.INSTANCE_REBOOT_HARD,
2955
                                   constants.INSTANCE_REBOOT_FULL]:
2956
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2957
                                  (constants.INSTANCE_REBOOT_SOFT,
2958
                                   constants.INSTANCE_REBOOT_HARD,
2959
                                   constants.INSTANCE_REBOOT_FULL))
2960
    self._ExpandAndLockInstance()
2961

    
2962
  def BuildHooksEnv(self):
2963
    """Build hooks env.
2964

2965
    This runs on master, primary and secondary nodes of the instance.
2966

2967
    """
2968
    env = {
2969
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2970
      "REBOOT_TYPE": self.op.reboot_type,
2971
      }
2972
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2973
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2974
    return env, nl, nl
2975

    
2976
  def CheckPrereq(self):
2977
    """Check prerequisites.
2978

2979
    This checks that the instance is in the cluster.
2980

2981
    """
2982
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2983
    assert self.instance is not None, \
2984
      "Cannot retrieve locked instance %s" % self.op.instance_name
2985

    
2986
    _CheckNodeOnline(self, instance.primary_node)
2987

    
2988
    # check bridges existance
2989
    _CheckInstanceBridgesExist(self, instance)
2990

    
2991
  def Exec(self, feedback_fn):
2992
    """Reboot the instance.
2993

2994
    """
2995
    instance = self.instance
2996
    ignore_secondaries = self.op.ignore_secondaries
2997
    reboot_type = self.op.reboot_type
2998

    
2999
    node_current = instance.primary_node
3000

    
3001
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3002
                       constants.INSTANCE_REBOOT_HARD]:
3003
      for disk in instance.disks:
3004
        self.cfg.SetDiskID(disk, node_current)
3005
      result = self.rpc.call_instance_reboot(node_current, instance,
3006
                                             reboot_type)
3007
      msg = result.RemoteFailMsg()
3008
      if msg:
3009
        raise errors.OpExecError("Could not reboot instance: %s" % msg)
3010
    else:
3011
      result = self.rpc.call_instance_shutdown(node_current, instance)
3012
      msg = result.RemoteFailMsg()
3013
      if msg:
3014
        raise errors.OpExecError("Could not shutdown instance for"
3015
                                 " full reboot: %s" % msg)
3016
      _ShutdownInstanceDisks(self, instance)
3017
      _StartInstanceDisks(self, instance, ignore_secondaries)
3018
      result = self.rpc.call_instance_start(node_current, instance, None, None)
3019
      msg = result.RemoteFailMsg()
3020
      if msg:
3021
        _ShutdownInstanceDisks(self, instance)
3022
        raise errors.OpExecError("Could not start instance for"
3023
                                 " full reboot: %s" % msg)
3024

    
3025
    self.cfg.MarkInstanceUp(instance.name)
3026

    
3027

    
3028
class LUShutdownInstance(LogicalUnit):
3029
  """Shutdown an instance.
3030

3031
  """
3032
  HPATH = "instance-stop"
3033
  HTYPE = constants.HTYPE_INSTANCE
3034
  _OP_REQP = ["instance_name"]
3035
  REQ_BGL = False
3036

    
3037
  def ExpandNames(self):
3038
    self._ExpandAndLockInstance()
3039

    
3040
  def BuildHooksEnv(self):
3041
    """Build hooks env.
3042

3043
    This runs on master, primary and secondary nodes of the instance.
3044

3045
    """
3046
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3047
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3048
    return env, nl, nl
3049

    
3050
  def CheckPrereq(self):
3051
    """Check prerequisites.
3052

3053
    This checks that the instance is in the cluster.
3054

3055
    """
3056
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3057
    assert self.instance is not None, \
3058
      "Cannot retrieve locked instance %s" % self.op.instance_name
3059
    _CheckNodeOnline(self, self.instance.primary_node)
3060

    
3061
  def Exec(self, feedback_fn):
3062
    """Shutdown the instance.
3063

3064
    """
3065
    instance = self.instance
3066
    node_current = instance.primary_node
3067
    self.cfg.MarkInstanceDown(instance.name)
3068
    result = self.rpc.call_instance_shutdown(node_current, instance)
3069
    msg = result.RemoteFailMsg()
3070
    if msg:
3071
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3072

    
3073
    _ShutdownInstanceDisks(self, instance)
3074

    
3075

    
3076
class LUReinstallInstance(LogicalUnit):
3077
  """Reinstall an instance.
3078

3079
  """
3080
  HPATH = "instance-reinstall"
3081
  HTYPE = constants.HTYPE_INSTANCE
3082
  _OP_REQP = ["instance_name"]
3083
  REQ_BGL = False
3084

    
3085
  def ExpandNames(self):
3086
    self._ExpandAndLockInstance()
3087

    
3088
  def BuildHooksEnv(self):
3089
    """Build hooks env.
3090

3091
    This runs on master, primary and secondary nodes of the instance.
3092

3093
    """
3094
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3095
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3096
    return env, nl, nl
3097

    
3098
  def CheckPrereq(self):
3099
    """Check prerequisites.
3100

3101
    This checks that the instance is in the cluster and is not running.
3102

3103
    """
3104
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3105
    assert instance is not None, \
3106
      "Cannot retrieve locked instance %s" % self.op.instance_name
3107
    _CheckNodeOnline(self, instance.primary_node)
3108

    
3109
    if instance.disk_template == constants.DT_DISKLESS:
3110
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3111
                                 self.op.instance_name)
3112
    if instance.admin_up:
3113
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3114
                                 self.op.instance_name)
3115
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3116
                                              instance.name,
3117
                                              instance.hypervisor)
3118
    remote_info.Raise()
3119
    if remote_info.data:
3120
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3121
                                 (self.op.instance_name,
3122
                                  instance.primary_node))
3123

    
3124
    self.op.os_type = getattr(self.op, "os_type", None)
3125
    if self.op.os_type is not None:
3126
      # OS verification
3127
      pnode = self.cfg.GetNodeInfo(
3128
        self.cfg.ExpandNodeName(instance.primary_node))
3129
      if pnode is None:
3130
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
3131
                                   self.op.pnode)
3132
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3133
      result.Raise()
3134
      if not isinstance(result.data, objects.OS):
3135
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3136
                                   " primary node"  % self.op.os_type)
3137

    
3138
    self.instance = instance
3139

    
3140
  def Exec(self, feedback_fn):
3141
    """Reinstall the instance.
3142

3143
    """
3144
    inst = self.instance
3145

    
3146
    if self.op.os_type is not None:
3147
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3148
      inst.os = self.op.os_type
3149
      self.cfg.Update(inst)
3150

    
3151
    _StartInstanceDisks(self, inst, None)
3152
    try:
3153
      feedback_fn("Running the instance OS create scripts...")
3154
      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3155
      msg = result.RemoteFailMsg()
3156
      if msg:
3157
        raise errors.OpExecError("Could not install OS for instance %s"
3158
                                 " on node %s: %s" %
3159
                                 (inst.name, inst.primary_node, msg))
3160
    finally:
3161
      _ShutdownInstanceDisks(self, inst)
3162

    
3163

    
3164
class LURenameInstance(LogicalUnit):
3165
  """Rename an instance.
3166

3167
  """
3168
  HPATH = "instance-rename"
3169
  HTYPE = constants.HTYPE_INSTANCE
3170
  _OP_REQP = ["instance_name", "new_name"]
3171

    
3172
  def BuildHooksEnv(self):
3173
    """Build hooks env.
3174

3175
    This runs on master, primary and secondary nodes of the instance.
3176

3177
    """
3178
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3179
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3180
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3181
    return env, nl, nl
3182

    
3183
  def CheckPrereq(self):
3184
    """Check prerequisites.
3185

3186
    This checks that the instance is in the cluster and is not running.
3187

3188
    """
3189
    instance = self.cfg.GetInstanceInfo(
3190
      self.cfg.ExpandInstanceName(self.op.instance_name))
3191
    if instance is None:
3192
      raise errors.OpPrereqError("Instance '%s' not known" %
3193
                                 self.op.instance_name)
3194
    _CheckNodeOnline(self, instance.primary_node)
3195

    
3196
    if instance.admin_up:
3197
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3198
                                 self.op.instance_name)
3199
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3200
                                              instance.name,
3201
                                              instance.hypervisor)
3202
    remote_info.Raise()
3203
    if remote_info.data:
3204
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3205
                                 (self.op.instance_name,
3206
                                  instance.primary_node))
3207
    self.instance = instance
3208

    
3209
    # new name verification
3210
    name_info = utils.HostInfo(self.op.new_name)
3211

    
3212
    self.op.new_name = new_name = name_info.name
3213
    instance_list = self.cfg.GetInstanceList()
3214
    if new_name in instance_list:
3215
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3216
                                 new_name)
3217

    
3218
    if not getattr(self.op, "ignore_ip", False):
3219
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3220
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3221
                                   (name_info.ip, new_name))
3222

    
3223

    
3224
  def Exec(self, feedback_fn):
3225
    """Reinstall the instance.
3226

3227
    """
3228
    inst = self.instance
3229
    old_name = inst.name
3230

    
3231
    if inst.disk_template == constants.DT_FILE:
3232
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3233

    
3234
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3235
    # Change the instance lock. This is definitely safe while we hold the BGL
3236
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3237
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3238

    
3239
    # re-read the instance from the configuration after rename
3240
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3241

    
3242
    if inst.disk_template == constants.DT_FILE:
3243
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3244
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3245
                                                     old_file_storage_dir,
3246
                                                     new_file_storage_dir)
3247
      result.Raise()
3248
      if not result.data:
3249
        raise errors.OpExecError("Could not connect to node '%s' to rename"
3250
                                 " directory '%s' to '%s' (but the instance"
3251
                                 " has been renamed in Ganeti)" % (
3252
                                 inst.primary_node, old_file_storage_dir,
3253
                                 new_file_storage_dir))
3254

    
3255
      if not result.data[0]:
3256
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3257
                                 " (but the instance has been renamed in"
3258
                                 " Ganeti)" % (old_file_storage_dir,
3259
                                               new_file_storage_dir))
3260

    
3261
    _StartInstanceDisks(self, inst, None)
3262
    try:
3263
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3264
                                                 old_name)
3265
      msg = result.RemoteFailMsg()
3266
      if msg:
3267
        msg = ("Could not run OS rename script for instance %s on node %s"
3268
               " (but the instance has been renamed in Ganeti): %s" %
3269
               (inst.name, inst.primary_node, msg))
3270
        self.proc.LogWarning(msg)
3271
    finally:
3272
      _ShutdownInstanceDisks(self, inst)
3273

    
3274

    
3275
class LURemoveInstance(LogicalUnit):
3276
  """Remove an instance.
3277

3278
  """
3279
  HPATH = "instance-remove"
3280
  HTYPE = constants.HTYPE_INSTANCE
3281
  _OP_REQP = ["instance_name", "ignore_failures"]
3282
  REQ_BGL = False
3283

    
3284
  def ExpandNames(self):
3285
    self._ExpandAndLockInstance()
3286
    self.needed_locks[locking.LEVEL_NODE] = []
3287
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3288

    
3289
  def DeclareLocks(self, level):
3290
    if level == locking.LEVEL_NODE:
3291
      self._LockInstancesNodes()
3292

    
3293
  def BuildHooksEnv(self):
3294
    """Build hooks env.
3295

3296
    This runs on master, primary and secondary nodes of the instance.
3297

3298
    """
3299
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3300
    nl = [self.cfg.GetMasterNode()]
3301
    return env, nl, nl
3302

    
3303
  def CheckPrereq(self):
3304
    """Check prerequisites.
3305

3306
    This checks that the instance is in the cluster.
3307

3308
    """
3309
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3310
    assert self.instance is not None, \
3311
      "Cannot retrieve locked instance %s" % self.op.instance_name
3312

    
3313
  def Exec(self, feedback_fn):
3314
    """Remove the instance.
3315

3316
    """
3317
    instance = self.instance
3318
    logging.info("Shutting down instance %s on node %s",
3319
                 instance.name, instance.primary_node)
3320

    
3321
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3322
    msg = result.RemoteFailMsg()
3323
    if msg:
3324
      if self.op.ignore_failures:
3325
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3326
      else:
3327
        raise errors.OpExecError("Could not shutdown instance %s on"
3328
                                 " node %s: %s" %
3329
                                 (instance.name, instance.primary_node, msg))
3330

    
3331
    logging.info("Removing block devices for instance %s", instance.name)
3332

    
3333
    if not _RemoveDisks(self, instance):
3334
      if self.op.ignore_failures:
3335
        feedback_fn("Warning: can't remove instance's disks")
3336
      else:
3337
        raise errors.OpExecError("Can't remove instance's disks")
3338

    
3339
    logging.info("Removing instance %s out of cluster config", instance.name)
3340

    
3341
    self.cfg.RemoveInstance(instance.name)
3342
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3343

    
3344

    
3345
class LUQueryInstances(NoHooksLU):
3346
  """Logical unit for querying instances.
3347

3348
  """
3349
  _OP_REQP = ["output_fields", "names", "use_locking"]
3350
  REQ_BGL = False
3351
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3352
                                    "admin_state",
3353
                                    "disk_template", "ip", "mac", "bridge",
3354
                                    "sda_size", "sdb_size", "vcpus", "tags",
3355
                                    "network_port", "beparams",
3356
                                    r"(disk)\.(size)/([0-9]+)",
3357
                                    r"(disk)\.(sizes)", "disk_usage",
3358
                                    r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3359
                                    r"(nic)\.(macs|ips|bridges)",
3360
                                    r"(disk|nic)\.(count)",
3361
                                    "serial_no", "hypervisor", "hvparams",] +
3362
                                  ["hv/%s" % name
3363
                                   for name in constants.HVS_PARAMETERS] +
3364
                                  ["be/%s" % name
3365
                                   for name in constants.BES_PARAMETERS])
3366
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3367

    
3368

    
3369
  def ExpandNames(self):
3370
    _CheckOutputFields(static=self._FIELDS_STATIC,
3371
                       dynamic=self._FIELDS_DYNAMIC,
3372
                       selected=self.op.output_fields)
3373

    
3374
    self.needed_locks = {}
3375
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3376
    self.share_locks[locking.LEVEL_NODE] = 1
3377

    
3378
    if self.op.names:
3379
      self.wanted = _GetWantedInstances(self, self.op.names)
3380
    else:
3381
      self.wanted = locking.ALL_SET
3382

    
3383
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3384
    self.do_locking = self.do_node_query and self.op.use_locking
3385
    if self.do_locking:
3386
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3387
      self.needed_locks[locking.LEVEL_NODE] = []
3388
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3389

    
3390
  def DeclareLocks(self, level):
3391
    if level == locking.LEVEL_NODE and self.do_locking:
3392
      self._LockInstancesNodes()
3393

    
3394
  def CheckPrereq(self):
3395
    """Check prerequisites.
3396

3397
    """
3398
    pass
3399

    
3400
  def Exec(self, feedback_fn):
3401
    """Computes the list of nodes and their attributes.
3402

3403
    """
3404
    all_info = self.cfg.GetAllInstancesInfo()
3405
    if self.wanted == locking.ALL_SET:
3406
      # caller didn't specify instance names, so ordering is not important
3407
      if self.do_locking:
3408
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3409
      else:
3410
        instance_names = all_info.keys()
3411
      instance_names = utils.NiceSort(instance_names)
3412
    else:
3413
      # caller did specify names, so we must keep the ordering
3414
      if self.do_locking:
3415
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3416
      else:
3417
        tgt_set = all_info.keys()
3418
      missing = set(self.wanted).difference(tgt_set)
3419
      if missing:
3420
        raise errors.OpExecError("Some instances were removed before"
3421
                                 " retrieving their data: %s" % missing)
3422
      instance_names = self.wanted
3423

    
3424
    instance_list = [all_info[iname] for iname in instance_names]
3425

    
3426
    # begin data gathering
3427

    
3428
    nodes = frozenset([inst.primary_node for inst in instance_list])
3429
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3430

    
3431
    bad_nodes = []
3432
    off_nodes = []
3433
    if self.do_node_query:
3434
      live_data = {}
3435
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3436
      for name in nodes:
3437
        result = node_data[name]
3438
        if result.offline:
3439
          # offline nodes will be in both lists
3440
          off_nodes.append(name)
3441
        if result.failed:
3442
          bad_nodes.append(name)
3443
        else:
3444
          if result.data:
3445
            live_data.update(result.data)
3446
            # else no instance is alive
3447
    else:
3448
      live_data = dict([(name, {}) for name in instance_names])
3449

    
3450
    # end data gathering
3451

    
3452
    HVPREFIX = "hv/"
3453
    BEPREFIX = "be/"
3454
    output = []
3455
    for instance in instance_list:
3456
      iout = []
3457
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3458
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3459
      for field in self.op.output_fields:
3460
        st_match = self._FIELDS_STATIC.Matches(field)
3461
        if field == "name":
3462
          val = instance.name
3463
        elif field == "os":
3464
          val = instance.os
3465
        elif field == "pnode":
3466
          val = instance.primary_node
3467
        elif field == "snodes":
3468
          val = list(instance.secondary_nodes)
3469
        elif field == "admin_state":
3470
          val = instance.admin_up
3471
        elif field == "oper_state":
3472
          if instance.primary_node in bad_nodes:
3473
            val = None
3474
          else:
3475
            val = bool(live_data.get(instance.name))
3476
        elif field == "status":
3477
          if instance.primary_node in off_nodes:
3478
            val = "ERROR_nodeoffline"
3479
          elif instance.primary_node in bad_nodes:
3480
            val = "ERROR_nodedown"
3481
          else:
3482
            running = bool(live_data.get(instance.name))
3483
            if running:
3484
              if instance.admin_up:
3485
                val = "running"
3486
              else:
3487
                val = "ERROR_up"
3488
            else:
3489
              if instance.admin_up:
3490
                val = "ERROR_down"
3491
              else:
3492
                val = "ADMIN_down"
3493
        elif field == "oper_ram":
3494
          if instance.primary_node in bad_nodes:
3495
            val = None
3496
          elif instance.name in live_data:
3497
            val = live_data[instance.name].get("memory", "?")
3498
          else:
3499
            val = "-"
3500
        elif field == "disk_template":
3501
          val = instance.disk_template
3502
        elif field == "ip":
3503
          val = instance.nics[0].ip
3504
        elif field == "bridge":
3505
          val = instance.nics[0].bridge
3506
        elif field == "mac":
3507
          val = instance.nics[0].mac
3508
        elif field == "sda_size" or field == "sdb_size":
3509
          idx = ord(field[2]) - ord('a')
3510
          try:
3511
            val = instance.FindDisk(idx).size
3512
          except errors.OpPrereqError:
3513
            val = None
3514
        elif field == "disk_usage": # total disk usage per node
3515
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3516
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3517
        elif field == "tags":
3518
          val = list(instance.GetTags())
3519
        elif field == "serial_no":
3520
          val = instance.serial_no
3521
        elif field == "network_port":
3522
          val = instance.network_port
3523
        elif field == "hypervisor":
3524
          val = instance.hypervisor
3525
        elif field == "hvparams":
3526
          val = i_hv
3527
        elif (field.startswith(HVPREFIX) and
3528
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3529
          val = i_hv.get(field[len(HVPREFIX):], None)
3530
        elif field == "beparams":
3531
          val = i_be
3532
        elif (field.startswith(BEPREFIX) and
3533
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3534
          val = i_be.get(field[len(BEPREFIX):], None)
3535
        elif st_match and st_match.groups():
3536
          # matches a variable list
3537
          st_groups = st_match.groups()
3538
          if st_groups and st_groups[0] == "disk":
3539
            if st_groups[1] == "count":
3540
              val = len(instance.disks)
3541
            elif st_groups[1] == "sizes":
3542
              val = [disk.size for disk in instance.disks]
3543
            elif st_groups[1] == "size":
3544
              try:
3545
                val = instance.FindDisk(st_groups[2]).size
3546
              except errors.OpPrereqError:
3547
                val = None
3548
            else:
3549
              assert False, "Unhandled disk parameter"
3550
          elif st_groups[0] == "nic":
3551
            if st_groups[1] == "count":
3552
              val = len(instance.nics)
3553
            elif st_groups[1] == "macs":
3554
              val = [nic.mac for nic in instance.nics]
3555
            elif st_groups[1] == "ips":
3556
              val = [nic.ip for nic in instance.nics]
3557
            elif st_groups[1] == "bridges":
3558
              val = [nic.bridge for nic in instance.nics]
3559
            else:
3560
              # index-based item
3561
              nic_idx = int(st_groups[2])
3562
              if nic_idx >= len(instance.nics):
3563
                val = None
3564
              else:
3565
                if st_groups[1] == "mac":
3566
                  val = instance.nics[nic_idx].mac
3567
                elif st_groups[1] == "ip":
3568
                  val = instance.nics[nic_idx].ip
3569
                elif st_groups[1] == "bridge":
3570
                  val = instance.nics[nic_idx].bridge
3571
                else:
3572
                  assert False, "Unhandled NIC parameter"
3573
          else:
3574
            assert False, "Unhandled variable parameter"
3575
        else:
3576
          raise errors.ParameterError(field)
3577
        iout.append(val)
3578
      output.append(iout)
3579

    
3580
    return output
3581

    
3582

    
3583
class LUFailoverInstance(LogicalUnit):
3584
  """Failover an instance.
3585

3586
  """
3587
  HPATH = "instance-failover"
3588
  HTYPE = constants.HTYPE_INSTANCE
3589
  _OP_REQP = ["instance_name", "ignore_consistency"]
3590
  REQ_BGL = False
3591

    
3592
  def ExpandNames(self):
3593
    self._ExpandAndLockInstance()
3594
    self.needed_locks[locking.LEVEL_NODE] = []
3595
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3596

    
3597
  def DeclareLocks(self, level):
3598
    if level == locking.LEVEL_NODE:
3599
      self._LockInstancesNodes()
3600

    
3601
  def BuildHooksEnv(self):
3602
    """Build hooks env.
3603

3604
    This runs on master, primary and secondary nodes of the instance.
3605

3606
    """
3607
    env = {
3608
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3609
      }
3610
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3611
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3612
    return env, nl, nl
3613

    
3614
  def CheckPrereq(self):
3615
    """Check prerequisites.
3616

3617
    This checks that the instance is in the cluster.
3618

3619
    """
3620
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3621
    assert self.instance is not None, \
3622
      "Cannot retrieve locked instance %s" % self.op.instance_name
3623

    
3624
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3625
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3626
      raise errors.OpPrereqError("Instance's disk layout is not"
3627
                                 " network mirrored, cannot failover.")
3628

    
3629
    secondary_nodes = instance.secondary_nodes
3630
    if not secondary_nodes:
3631
      raise errors.ProgrammerError("no secondary node but using "
3632
                                   "a mirrored disk template")
3633

    
3634
    target_node = secondary_nodes[0]
3635
    _CheckNodeOnline(self, target_node)
3636
    _CheckNodeNotDrained(self, target_node)
3637
    # check memory requirements on the secondary node
3638
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3639
                         instance.name, bep[constants.BE_MEMORY],
3640
                         instance.hypervisor)
3641
    # check bridge existance
3642
    _CheckInstanceBridgesExist(self, instance, node=target_node)
3643

    
3644
  def Exec(self, feedback_fn):
3645
    """Failover an instance.
3646

3647
    The failover is done by shutting it down on its present node and
3648
    starting it on the secondary.
3649

3650
    """
3651
    instance = self.instance
3652

    
3653
    source_node = instance.primary_node
3654
    target_node = instance.secondary_nodes[0]
3655

    
3656
    feedback_fn("* checking disk consistency between source and target")
3657
    for dev in instance.disks:
3658
      # for drbd, these are drbd over lvm
3659
      if not _CheckDiskConsistency(self, dev, target_node, False):
3660
        if instance.admin_up and not self.op.ignore_consistency:
3661
          raise errors.OpExecError("Disk %s is degraded on target node,"
3662
                                   " aborting failover." % dev.iv_name)
3663

    
3664
    feedback_fn("* shutting down instance on source node")
3665
    logging.info("Shutting down instance %s on node %s",
3666
                 instance.name, source_node)
3667

    
3668
    result = self.rpc.call_instance_shutdown(source_node, instance)
3669
    msg = result.RemoteFailMsg()
3670
    if msg:
3671
      if self.op.ignore_consistency:
3672
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3673
                             " Proceeding anyway. Please make sure node"
3674
                             " %s is down. Error details: %s",
3675
                             instance.name, source_node, source_node, msg)
3676
      else:
3677
        raise errors.OpExecError("Could not shutdown instance %s on"
3678
                                 " node %s: %s" %
3679
                                 (instance.name, source_node, msg))
3680

    
3681
    feedback_fn("* deactivating the instance's disks on source node")
3682
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3683
      raise errors.OpExecError("Can't shut down the instance's disks.")
3684

    
3685
    instance.primary_node = target_node
3686
    # distribute new instance config to the other nodes
3687
    self.cfg.Update(instance)
3688

    
3689
    # Only start the instance if it's marked as up
3690
    if instance.admin_up:
3691
      feedback_fn("* activating the instance's disks on target node")
3692
      logging.info("Starting instance %s on node %s",
3693
                   instance.name, target_node)
3694

    
3695
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3696
                                               ignore_secondaries=True)
3697
      if not disks_ok:
3698
        _ShutdownInstanceDisks(self, instance)
3699
        raise errors.OpExecError("Can't activate the instance's disks")
3700

    
3701
      feedback_fn("* starting the instance on the target node")
3702
      result = self.rpc.call_instance_start(target_node, instance, None, None)
3703
      msg = result.RemoteFailMsg()
3704
      if msg:
3705
        _ShutdownInstanceDisks(self, instance)
3706
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3707
                                 (instance.name, target_node, msg))
3708

    
3709

    
3710
class LUMigrateInstance(LogicalUnit):
3711
  """Migrate an instance.
3712

3713
  This is migration without shutting down, compared to the failover,
3714
  which is done with shutdown.
3715

3716
  """
3717
  HPATH = "instance-migrate"
3718
  HTYPE = constants.HTYPE_INSTANCE
3719
  _OP_REQP = ["instance_name", "live", "cleanup"]
3720

    
3721
  REQ_BGL = False
3722

    
3723
  def ExpandNames(self):
3724
    self._ExpandAndLockInstance()
3725
    self.needed_locks[locking.LEVEL_NODE] = []
3726
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3727

    
3728
  def DeclareLocks(self, level):
3729
    if level == locking.LEVEL_NODE:
3730
      self._LockInstancesNodes()
3731

    
3732
  def BuildHooksEnv(self):
3733
    """Build hooks env.
3734

3735
    This runs on master, primary and secondary nodes of the instance.
3736

3737
    """
3738
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3739
    env["MIGRATE_LIVE"] = self.op.live
3740
    env["MIGRATE_CLEANUP"] = self.op.cleanup
3741
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3742
    return env, nl, nl
3743

    
3744
  def CheckPrereq(self):
3745
    """Check prerequisites.
3746

3747
    This checks that the instance is in the cluster.
3748

3749
    """
3750
    instance = self.cfg.GetInstanceInfo(
3751
      self.cfg.ExpandInstanceName(self.op.instance_name))
3752
    if instance is None:
3753
      raise errors.OpPrereqError("Instance '%s' not known" %
3754
                                 self.op.instance_name)
3755

    
3756
    if instance.disk_template != constants.DT_DRBD8:
3757
      raise errors.OpPrereqError("Instance's disk layout is not"
3758
                                 " drbd8, cannot migrate.")
3759

    
3760
    secondary_nodes = instance.secondary_nodes
3761
    if not secondary_nodes:
3762
      raise errors.ConfigurationError("No secondary node but using"
3763
                                      " drbd8 disk template")
3764

    
3765
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3766

    
3767
    target_node = secondary_nodes[0]
3768
    # check memory requirements on the secondary node
3769
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3770
                         instance.name, i_be[constants.BE_MEMORY],
3771
                         instance.hypervisor)
3772

    
3773
    # check bridge existance
3774
    _CheckInstanceBridgesExist(self, instance, node=target_node)
3775

    
3776
    if not self.op.cleanup:
3777
      _CheckNodeNotDrained(self, target_node)
3778
      result = self.rpc.call_instance_migratable(instance.primary_node,
3779
                                                 instance)
3780
      msg = result.RemoteFailMsg()
3781
      if msg:
3782
        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3783
                                   msg)
3784

    
3785
    self.instance = instance
3786

    
3787
  def _WaitUntilSync(self):
3788
    """Poll with custom rpc for disk sync.
3789

3790
    This uses our own step-based rpc call.
3791

3792
    """
3793
    self.feedback_fn("* wait until resync is done")
3794
    all_done = False
3795
    while not all_done:
3796
      all_done = True
3797
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3798
                                            self.nodes_ip,
3799
                                            self.instance.disks)
3800
      min_percent = 100
3801
      for node, nres in result.items():
3802
        msg = nres.RemoteFailMsg()
3803
        if msg:
3804
          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3805
                                   (node, msg))
3806
        node_done, node_percent = nres.payload
3807
        all_done = all_done and node_done
3808
        if node_percent is not None:
3809
          min_percent = min(min_percent, node_percent)
3810
      if not all_done:
3811
        if min_percent < 100:
3812
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3813
        time.sleep(2)
3814

    
3815
  def _EnsureSecondary(self, node):
3816
    """Demote a node to secondary.
3817

3818
    """
3819
    self.feedback_fn("* switching node %s to secondary mode" % node)
3820

    
3821
    for dev in self.instance.disks:
3822
      self.cfg.SetDiskID(dev, node)
3823

    
3824
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3825
                                          self.instance.disks)
3826
    msg = result.RemoteFailMsg()
3827
    if msg:
3828
      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3829
                               " error %s" % (node, msg))
3830

    
3831
  def _GoStandalone(self):
3832
    """Disconnect from the network.
3833

3834
    """
3835
    self.feedback_fn("* changing into standalone mode")
3836
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3837
                                               self.instance.disks)
3838
    for node, nres in result.items():
3839
      msg = nres.RemoteFailMsg()
3840
      if msg:
3841
        raise errors.OpExecError("Cannot disconnect disks node %s,"
3842
                                 " error %s" % (node, msg))
3843

    
3844
  def _GoReconnect(self, multimaster):
3845
    """Reconnect to the network.
3846

3847
    """
3848
    if multimaster:
3849
      msg = "dual-master"
3850
    else:
3851
      msg = "single-master"
3852
    self.feedback_fn("* changing disks into %s mode" % msg)
3853
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3854
                                           self.instance.disks,
3855
                                           self.instance.name, multimaster)
3856
    for node, nres in result.items():
3857
      msg = nres.RemoteFailMsg()
3858
      if msg:
3859
        raise errors.OpExecError("Cannot change disks config on node %s,"
3860
                                 " error: %s" % (node, msg))
3861

    
3862
  def _ExecCleanup(self):
3863
    """Try to cleanup after a failed migration.
3864

3865
    The cleanup is done by:
3866
      - check that the instance is running only on one node
3867
        (and update the config if needed)
3868
      - change disks on its secondary node to secondary
3869
      - wait until disks are fully synchronized
3870
      - disconnect from the network
3871
      - change disks into single-master mode
3872
      - wait again until disks are fully synchronized
3873

3874
    """
3875
    instance = self.instance
3876
    target_node = self.target_node
3877
    source_node = self.source_node
3878

    
3879
    # check running on only one node
3880
    self.feedback_fn("* checking where the instance actually runs"
3881
                     " (if this hangs, the hypervisor might be in"
3882
                     " a bad state)")
3883
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3884
    for node, result in ins_l.items():
3885
      result.Raise()
3886
      if not isinstance(result.data, list):
3887
        raise errors.OpExecError("Can't contact node '%s'" % node)
3888

    
3889
    runningon_source = instance.name in ins_l[source_node].data
3890
    runningon_target = instance.name in ins_l[target_node].data
3891

    
3892
    if runningon_source and runningon_target:
3893
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3894
                               " or the hypervisor is confused. You will have"
3895
                               " to ensure manually that it runs only on one"
3896
                               " and restart this operation.")
3897

    
3898
    if not (runningon_source or runningon_target):
3899
      raise errors.OpExecError("Instance does not seem to be running at all."
3900
                               " In this case, it's safer to repair by"
3901
                               " running 'gnt-instance stop' to ensure disk"
3902
                               " shutdown, and then restarting it.")
3903

    
3904
    if runningon_target:
3905
      # the migration has actually succeeded, we need to update the config
3906
      self.feedback_fn("* instance running on secondary node (%s),"
3907
                       " updating config" % target_node)
3908
      instance.primary_node = target_node
3909
      self.cfg.Update(instance)
3910
      demoted_node = source_node
3911
    else:
3912
      self.feedback_fn("* instance confirmed to be running on its"
3913
                       " primary node (%s)" % source_node)
3914
      demoted_node = target_node
3915

    
3916
    self._EnsureSecondary(demoted_node)
3917
    try:
3918
      self._WaitUntilSync()
3919
    except errors.OpExecError:
3920
      # we ignore here errors, since if the device is standalone, it
3921
      # won't be able to sync
3922
      pass
3923
    self._GoStandalone()
3924
    self._GoReconnect(False)
3925
    self._WaitUntilSync()
3926

    
3927
    self.feedback_fn("* done")
3928

    
3929
  def _RevertDiskStatus(self):
3930
    """Try to revert the disk status after a failed migration.
3931

3932
    """
3933
    target_node = self.target_node
3934
    try:
3935
      self._EnsureSecondary(target_node)
3936
      self._GoStandalone()
3937
      self._GoReconnect(False)
3938
      self._WaitUntilSync()
3939
    except errors.OpExecError, err:
3940
      self.LogWarning("Migration failed and I can't reconnect the"
3941
                      " drives: error '%s'\n"
3942
                      "Please look and recover the instance status" %
3943
                      str(err))
3944

    
3945
  def _AbortMigration(self):
3946
    """Call the hypervisor code to abort a started migration.
3947

3948
    """
3949
    instance = self.instance
3950
    target_node = self.target_node
3951
    migration_info = self.migration_info
3952

    
3953
    abort_result = self.rpc.call_finalize_migration(target_node,
3954
                                                    instance,
3955
                                                    migration_info,
3956
                                                    False)
3957
    abort_msg = abort_result.RemoteFailMsg()
3958
    if abort_msg:
3959
      logging.error("Aborting migration failed on target node %s: %s" %
3960
                    (target_node, abort_msg))
3961
      # Don't raise an exception here, as we stil have to try to revert the
3962
      # disk status, even if this step failed.
3963

    
3964
  def _ExecMigration(self):
3965
    """Migrate an instance.
3966

3967
    The migrate is done by:
3968
      - change the disks into dual-master mode
3969
      - wait until disks are fully synchronized again
3970
      - migrate the instance
3971
      - change disks on the new secondary node (the old primary) to secondary
3972
      - wait until disks are fully synchronized
3973
      - change disks into single-master mode
3974

3975
    """
3976
    instance = self.instance
3977
    target_node = self.target_node
3978
    source_node = self.source_node
3979

    
3980
    self.feedback_fn("* checking disk consistency between source and target")
3981
    for dev in instance.disks:
3982
      if not _CheckDiskConsistency(self, dev, target_node, False):
3983
        raise errors.OpExecError("Disk %s is degraded or not fully"
3984
                                 " synchronized on target node,"
3985
                                 " aborting migrate." % dev.iv_name)
3986

    
3987
    # First get the migration information from the remote node
3988
    result = self.rpc.call_migration_info(source_node, instance)
3989
    msg = result.RemoteFailMsg()
3990
    if msg:
3991
      log_err = ("Failed fetching source migration information from %s: %s" %
3992
                 (source_node, msg))
3993
      logging.error(log_err)
3994
      raise errors.OpExecError(log_err)
3995

    
3996
    self.migration_info = migration_info = result.payload
3997

    
3998
    # Then switch the disks to master/master mode
3999
    self._EnsureSecondary(target_node)
4000
    self._GoStandalone()
4001
    self._GoReconnect(True)
4002
    self._WaitUntilSync()
4003

    
4004
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
4005
    result = self.rpc.call_accept_instance(target_node,
4006
                                           instance,
4007
                                           migration_info,
4008
                                           self.nodes_ip[target_node])
4009

    
4010
    msg = result.RemoteFailMsg()