Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 2c2690c9

History | View | Annotate | Download (239.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35
import random
36

    
37
from ganeti import ssh
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46
from ganeti import ssconf
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99
    self.CheckArguments()
100

    
101
  def __GetSSH(self):
102
    """Returns the SshRunner object
103

104
    """
105
    if not self.__ssh:
106
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107
    return self.__ssh
108

    
109
  ssh = property(fget=__GetSSH)
110

    
111
  def CheckArguments(self):
112
    """Check syntactic validity for the opcode arguments.
113

114
    This method is for doing a simple syntactic check and ensure
115
    validity of opcode parameters, without any cluster-related
116
    checks. While the same can be accomplished in ExpandNames and/or
117
    CheckPrereq, doing these separate is better because:
118

119
      - ExpandNames is left as as purely a lock-related function
120
      - CheckPrereq is run after we have aquired locks (and possible
121
        waited for them)
122

123
    The function is allowed to change the self.op attribute so that
124
    later methods can no longer worry about missing parameters.
125

126
    """
127
    pass
128

    
129
  def ExpandNames(self):
130
    """Expand names for this LU.
131

132
    This method is called before starting to execute the opcode, and it should
133
    update all the parameters of the opcode to their canonical form (e.g. a
134
    short node name must be fully expanded after this method has successfully
135
    completed). This way locking, hooks, logging, ecc. can work correctly.
136

137
    LUs which implement this method must also populate the self.needed_locks
138
    member, as a dict with lock levels as keys, and a list of needed lock names
139
    as values. Rules:
140

141
      - use an empty dict if you don't need any lock
142
      - if you don't need any lock at a particular level omit that level
143
      - don't put anything for the BGL level
144
      - if you want all locks at a level use locking.ALL_SET as a value
145

146
    If you need to share locks (rather than acquire them exclusively) at one
147
    level you can modify self.share_locks, setting a true value (usually 1) for
148
    that level. By default locks are not shared.
149

150
    Examples::
151

152
      # Acquire all nodes and one instance
153
      self.needed_locks = {
154
        locking.LEVEL_NODE: locking.ALL_SET,
155
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156
      }
157
      # Acquire just two nodes
158
      self.needed_locks = {
159
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160
      }
161
      # Acquire no locks
162
      self.needed_locks = {} # No, you can't leave it to the default value None
163

164
    """
165
    # The implementation of this method is mandatory only if the new LU is
166
    # concurrent, so that old LUs don't need to be changed all at the same
167
    # time.
168
    if self.REQ_BGL:
169
      self.needed_locks = {} # Exclusive LUs don't need locks.
170
    else:
171
      raise NotImplementedError
172

    
173
  def DeclareLocks(self, level):
174
    """Declare LU locking needs for a level
175

176
    While most LUs can just declare their locking needs at ExpandNames time,
177
    sometimes there's the need to calculate some locks after having acquired
178
    the ones before. This function is called just before acquiring locks at a
179
    particular level, but after acquiring the ones at lower levels, and permits
180
    such calculations. It can be used to modify self.needed_locks, and by
181
    default it does nothing.
182

183
    This function is only called if you have something already set in
184
    self.needed_locks for the level.
185

186
    @param level: Locking level which is going to be locked
187
    @type level: member of ganeti.locking.LEVELS
188

189
    """
190

    
191
  def CheckPrereq(self):
192
    """Check prerequisites for this LU.
193

194
    This method should check that the prerequisites for the execution
195
    of this LU are fulfilled. It can do internode communication, but
196
    it should be idempotent - no cluster or system changes are
197
    allowed.
198

199
    The method should raise errors.OpPrereqError in case something is
200
    not fulfilled. Its return value is ignored.
201

202
    This method should also update all the parameters of the opcode to
203
    their canonical form if it hasn't been done by ExpandNames before.
204

205
    """
206
    raise NotImplementedError
207

    
208
  def Exec(self, feedback_fn):
209
    """Execute the LU.
210

211
    This method should implement the actual work. It should raise
212
    errors.OpExecError for failures that are somewhat dealt with in
213
    code, or expected.
214

215
    """
216
    raise NotImplementedError
217

    
218
  def BuildHooksEnv(self):
219
    """Build hooks environment for this LU.
220

221
    This method should return a three-node tuple consisting of: a dict
222
    containing the environment that will be used for running the
223
    specific hook for this LU, a list of node names on which the hook
224
    should run before the execution, and a list of node names on which
225
    the hook should run after the execution.
226

227
    The keys of the dict must not have 'GANETI_' prefixed as this will
228
    be handled in the hooks runner. Also note additional keys will be
229
    added by the hooks runner. If the LU doesn't define any
230
    environment, an empty dict (and not None) should be returned.
231

232
    No nodes should be returned as an empty list (and not None).
233

234
    Note that if the HPATH for a LU class is None, this function will
235
    not be called.
236

237
    """
238
    raise NotImplementedError
239

    
240
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241
    """Notify the LU about the results of its hooks.
242

243
    This method is called every time a hooks phase is executed, and notifies
244
    the Logical Unit about the hooks' result. The LU can then use it to alter
245
    its result based on the hooks.  By default the method does nothing and the
246
    previous result is passed back unchanged but any LU can define it if it
247
    wants to use the local cluster hook-scripts somehow.
248

249
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
250
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251
    @param hook_results: the results of the multi-node hooks rpc call
252
    @param feedback_fn: function used send feedback back to the caller
253
    @param lu_result: the previous Exec result this LU had, or None
254
        in the PRE phase
255
    @return: the new Exec result, based on the previous result
256
        and hook results
257

258
    """
259
    return lu_result
260

    
261
  def _ExpandAndLockInstance(self):
262
    """Helper function to expand and lock an instance.
263

264
    Many LUs that work on an instance take its name in self.op.instance_name
265
    and need to expand it and then declare the expanded name for locking. This
266
    function does it, and then updates self.op.instance_name to the expanded
267
    name. It also initializes needed_locks as a dict, if this hasn't been done
268
    before.
269

270
    """
271
    if self.needed_locks is None:
272
      self.needed_locks = {}
273
    else:
274
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275
        "_ExpandAndLockInstance called with instance-level locks set"
276
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277
    if expanded_name is None:
278
      raise errors.OpPrereqError("Instance '%s' not known" %
279
                                  self.op.instance_name)
280
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281
    self.op.instance_name = expanded_name
282

    
283
  def _LockInstancesNodes(self, primary_only=False):
284
    """Helper function to declare instances' nodes for locking.
285

286
    This function should be called after locking one or more instances to lock
287
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288
    with all primary or secondary nodes for instances already locked and
289
    present in self.needed_locks[locking.LEVEL_INSTANCE].
290

291
    It should be called from DeclareLocks, and for safety only works if
292
    self.recalculate_locks[locking.LEVEL_NODE] is set.
293

294
    In the future it may grow parameters to just lock some instance's nodes, or
295
    to just lock primaries or secondary nodes, if needed.
296

297
    If should be called in DeclareLocks in a way similar to::
298

299
      if level == locking.LEVEL_NODE:
300
        self._LockInstancesNodes()
301

302
    @type primary_only: boolean
303
    @param primary_only: only lock primary nodes of locked instances
304

305
    """
306
    assert locking.LEVEL_NODE in self.recalculate_locks, \
307
      "_LockInstancesNodes helper function called with no nodes to recalculate"
308

    
309
    # TODO: check if we're really been called with the instance locks held
310

    
311
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312
    # future we might want to have different behaviors depending on the value
313
    # of self.recalculate_locks[locking.LEVEL_NODE]
314
    wanted_nodes = []
315
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316
      instance = self.context.cfg.GetInstanceInfo(instance_name)
317
      wanted_nodes.append(instance.primary_node)
318
      if not primary_only:
319
        wanted_nodes.extend(instance.secondary_nodes)
320

    
321
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325

    
326
    del self.recalculate_locks[locking.LEVEL_NODE]
327

    
328

    
329
class NoHooksLU(LogicalUnit):
330
  """Simple LU which runs no hooks.
331

332
  This LU is intended as a parent for other LogicalUnits which will
333
  run no hooks, in order to reduce duplicate code.
334

335
  """
336
  HPATH = None
337
  HTYPE = None
338

    
339

    
340
def _GetWantedNodes(lu, nodes):
341
  """Returns list of checked and expanded node names.
342

343
  @type lu: L{LogicalUnit}
344
  @param lu: the logical unit on whose behalf we execute
345
  @type nodes: list
346
  @param nodes: list of node names or None for all nodes
347
  @rtype: list
348
  @return: the list of nodes, sorted
349
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350

351
  """
352
  if not isinstance(nodes, list):
353
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
354

    
355
  if not nodes:
356
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357
      " non-empty list of nodes whose name is to be expanded.")
358

    
359
  wanted = []
360
  for name in nodes:
361
    node = lu.cfg.ExpandNodeName(name)
362
    if node is None:
363
      raise errors.OpPrereqError("No such node name '%s'" % name)
364
    wanted.append(node)
365

    
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _GetWantedInstances(lu, instances):
370
  """Returns list of checked and expanded instance names.
371

372
  @type lu: L{LogicalUnit}
373
  @param lu: the logical unit on whose behalf we execute
374
  @type instances: list
375
  @param instances: list of instance names or None for all instances
376
  @rtype: list
377
  @return: the list of instances, sorted
378
  @raise errors.OpPrereqError: if the instances parameter is wrong type
379
  @raise errors.OpPrereqError: if any of the passed instances is not found
380

381
  """
382
  if not isinstance(instances, list):
383
    raise errors.OpPrereqError("Invalid argument type 'instances'")
384

    
385
  if instances:
386
    wanted = []
387

    
388
    for name in instances:
389
      instance = lu.cfg.ExpandInstanceName(name)
390
      if instance is None:
391
        raise errors.OpPrereqError("No such instance name '%s'" % name)
392
      wanted.append(instance)
393

    
394
  else:
395
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
396
  return wanted
397

    
398

    
399
def _CheckOutputFields(static, dynamic, selected):
400
  """Checks whether all selected fields are valid.
401

402
  @type static: L{utils.FieldSet}
403
  @param static: static fields set
404
  @type dynamic: L{utils.FieldSet}
405
  @param dynamic: dynamic fields set
406

407
  """
408
  f = utils.FieldSet()
409
  f.Extend(static)
410
  f.Extend(dynamic)
411

    
412
  delta = f.NonMatching(selected)
413
  if delta:
414
    raise errors.OpPrereqError("Unknown output fields selected: %s"
415
                               % ",".join(delta))
416

    
417

    
418
def _CheckBooleanOpField(op, name):
419
  """Validates boolean opcode parameters.
420

421
  This will ensure that an opcode parameter is either a boolean value,
422
  or None (but that it always exists).
423

424
  """
425
  val = getattr(op, name, None)
426
  if not (val is None or isinstance(val, bool)):
427
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428
                               (name, str(val)))
429
  setattr(op, name, val)
430

    
431

    
432
def _CheckNodeOnline(lu, node):
433
  """Ensure that a given node is online.
434

435
  @param lu: the LU on behalf of which we make the check
436
  @param node: the node to check
437
  @raise errors.OpPrereqError: if the node is offline
438

439
  """
440
  if lu.cfg.GetNodeInfo(node).offline:
441
    raise errors.OpPrereqError("Can't use offline node %s" % node)
442

    
443

    
444
def _CheckNodeNotDrained(lu, node):
445
  """Ensure that a given node is not drained.
446

447
  @param lu: the LU on behalf of which we make the check
448
  @param node: the node to check
449
  @raise errors.OpPrereqError: if the node is drained
450

451
  """
452
  if lu.cfg.GetNodeInfo(node).drained:
453
    raise errors.OpPrereqError("Can't use drained node %s" % node)
454

    
455

    
456
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
457
                          memory, vcpus, nics, disk_template, disks):
458
  """Builds instance related env variables for hooks
459

460
  This builds the hook environment from individual variables.
461

462
  @type name: string
463
  @param name: the name of the instance
464
  @type primary_node: string
465
  @param primary_node: the name of the instance's primary node
466
  @type secondary_nodes: list
467
  @param secondary_nodes: list of secondary nodes as strings
468
  @type os_type: string
469
  @param os_type: the name of the instance's OS
470
  @type status: boolean
471
  @param status: the should_run status of the instance
472
  @type memory: string
473
  @param memory: the memory size of the instance
474
  @type vcpus: string
475
  @param vcpus: the count of VCPUs the instance has
476
  @type nics: list
477
  @param nics: list of tuples (ip, bridge, mac) representing
478
      the NICs the instance  has
479
  @type disk_template: string
480
  @param disk_template: the distk template of the instance
481
  @type disks: list
482
  @param disks: the list of (size, mode) pairs
483
  @rtype: dict
484
  @return: the hook environment for this instance
485

486
  """
487
  if status:
488
    str_status = "up"
489
  else:
490
    str_status = "down"
491
  env = {
492
    "OP_TARGET": name,
493
    "INSTANCE_NAME": name,
494
    "INSTANCE_PRIMARY": primary_node,
495
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
496
    "INSTANCE_OS_TYPE": os_type,
497
    "INSTANCE_STATUS": str_status,
498
    "INSTANCE_MEMORY": memory,
499
    "INSTANCE_VCPUS": vcpus,
500
    "INSTANCE_DISK_TEMPLATE": disk_template,
501
  }
502

    
503
  if nics:
504
    nic_count = len(nics)
505
    for idx, (ip, bridge, mac) in enumerate(nics):
506
      if ip is None:
507
        ip = ""
508
      env["INSTANCE_NIC%d_IP" % idx] = ip
509
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
510
      env["INSTANCE_NIC%d_MAC" % idx] = mac
511
  else:
512
    nic_count = 0
513

    
514
  env["INSTANCE_NIC_COUNT"] = nic_count
515

    
516
  if disks:
517
    disk_count = len(disks)
518
    for idx, (size, mode) in enumerate(disks):
519
      env["INSTANCE_DISK%d_SIZE" % idx] = size
520
      env["INSTANCE_DISK%d_MODE" % idx] = mode
521
  else:
522
    disk_count = 0
523

    
524
  env["INSTANCE_DISK_COUNT"] = disk_count
525

    
526
  return env
527

    
528

    
529
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
530
  """Builds instance related env variables for hooks from an object.
531

532
  @type lu: L{LogicalUnit}
533
  @param lu: the logical unit on whose behalf we execute
534
  @type instance: L{objects.Instance}
535
  @param instance: the instance for which we should build the
536
      environment
537
  @type override: dict
538
  @param override: dictionary with key/values that will override
539
      our values
540
  @rtype: dict
541
  @return: the hook environment dictionary
542

543
  """
544
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
545
  args = {
546
    'name': instance.name,
547
    'primary_node': instance.primary_node,
548
    'secondary_nodes': instance.secondary_nodes,
549
    'os_type': instance.os,
550
    'status': instance.admin_up,
551
    'memory': bep[constants.BE_MEMORY],
552
    'vcpus': bep[constants.BE_VCPUS],
553
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
554
    'disk_template': instance.disk_template,
555
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
556
  }
557
  if override:
558
    args.update(override)
559
  return _BuildInstanceHookEnv(**args)
560

    
561

    
562
def _AdjustCandidatePool(lu):
563
  """Adjust the candidate pool after node operations.
564

565
  """
566
  mod_list = lu.cfg.MaintainCandidatePool()
567
  if mod_list:
568
    lu.LogInfo("Promoted nodes to master candidate role: %s",
569
               ", ".join(node.name for node in mod_list))
570
    for name in mod_list:
571
      lu.context.ReaddNode(name)
572
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
573
  if mc_now > mc_max:
574
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
575
               (mc_now, mc_max))
576

    
577

    
578
def _CheckInstanceBridgesExist(lu, instance):
579
  """Check that the brigdes needed by an instance exist.
580

581
  """
582
  # check bridges existance
583
  brlist = [nic.bridge for nic in instance.nics]
584
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
585
  result.Raise()
586
  if not result.data:
587
    raise errors.OpPrereqError("One or more target bridges %s does not"
588
                               " exist on destination node '%s'" %
589
                               (brlist, instance.primary_node))
590

    
591

    
592
class LUDestroyCluster(NoHooksLU):
593
  """Logical unit for destroying the cluster.
594

595
  """
596
  _OP_REQP = []
597

    
598
  def CheckPrereq(self):
599
    """Check prerequisites.
600

601
    This checks whether the cluster is empty.
602

603
    Any errors are signalled by raising errors.OpPrereqError.
604

605
    """
606
    master = self.cfg.GetMasterNode()
607

    
608
    nodelist = self.cfg.GetNodeList()
609
    if len(nodelist) != 1 or nodelist[0] != master:
610
      raise errors.OpPrereqError("There are still %d node(s) in"
611
                                 " this cluster." % (len(nodelist) - 1))
612
    instancelist = self.cfg.GetInstanceList()
613
    if instancelist:
614
      raise errors.OpPrereqError("There are still %d instance(s) in"
615
                                 " this cluster." % len(instancelist))
616

    
617
  def Exec(self, feedback_fn):
618
    """Destroys the cluster.
619

620
    """
621
    master = self.cfg.GetMasterNode()
622
    result = self.rpc.call_node_stop_master(master, False)
623
    result.Raise()
624
    if not result.data:
625
      raise errors.OpExecError("Could not disable the master role")
626
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
627
    utils.CreateBackup(priv_key)
628
    utils.CreateBackup(pub_key)
629
    return master
630

    
631

    
632
class LUVerifyCluster(LogicalUnit):
633
  """Verifies the cluster status.
634

635
  """
636
  HPATH = "cluster-verify"
637
  HTYPE = constants.HTYPE_CLUSTER
638
  _OP_REQP = ["skip_checks"]
639
  REQ_BGL = False
640

    
641
  def ExpandNames(self):
642
    self.needed_locks = {
643
      locking.LEVEL_NODE: locking.ALL_SET,
644
      locking.LEVEL_INSTANCE: locking.ALL_SET,
645
    }
646
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
647

    
648
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
649
                  node_result, feedback_fn, master_files,
650
                  drbd_map):
651
    """Run multiple tests against a node.
652

653
    Test list:
654

655
      - compares ganeti version
656
      - checks vg existance and size > 20G
657
      - checks config file checksum
658
      - checks ssh to other nodes
659

660
    @type nodeinfo: L{objects.Node}
661
    @param nodeinfo: the node to check
662
    @param file_list: required list of files
663
    @param local_cksum: dictionary of local files and their checksums
664
    @param node_result: the results from the node
665
    @param feedback_fn: function used to accumulate results
666
    @param master_files: list of files that only masters should have
667
    @param drbd_map: the useddrbd minors for this node, in
668
        form of minor: (instance, must_exist) which correspond to instances
669
        and their running status
670

671
    """
672
    node = nodeinfo.name
673

    
674
    # main result, node_result should be a non-empty dict
675
    if not node_result or not isinstance(node_result, dict):
676
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
677
      return True
678

    
679
    # compares ganeti version
680
    local_version = constants.PROTOCOL_VERSION
681
    remote_version = node_result.get('version', None)
682
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
683
            len(remote_version) == 2):
684
      feedback_fn("  - ERROR: connection to %s failed" % (node))
685
      return True
686

    
687
    if local_version != remote_version[0]:
688
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
689
                  " node %s %s" % (local_version, node, remote_version[0]))
690
      return True
691

    
692
    # node seems compatible, we can actually try to look into its results
693

    
694
    bad = False
695

    
696
    # full package version
697
    if constants.RELEASE_VERSION != remote_version[1]:
698
      feedback_fn("  - WARNING: software version mismatch: master %s,"
699
                  " node %s %s" %
700
                  (constants.RELEASE_VERSION, node, remote_version[1]))
701

    
702
    # checks vg existence and size > 20G
703

    
704
    vglist = node_result.get(constants.NV_VGLIST, None)
705
    if not vglist:
706
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
707
                      (node,))
708
      bad = True
709
    else:
710
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
711
                                            constants.MIN_VG_SIZE)
712
      if vgstatus:
713
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
714
        bad = True
715

    
716
    # checks config file checksum
717

    
718
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
719
    if not isinstance(remote_cksum, dict):
720
      bad = True
721
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
722
    else:
723
      for file_name in file_list:
724
        node_is_mc = nodeinfo.master_candidate
725
        must_have_file = file_name not in master_files
726
        if file_name not in remote_cksum:
727
          if node_is_mc or must_have_file:
728
            bad = True
729
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
730
        elif remote_cksum[file_name] != local_cksum[file_name]:
731
          if node_is_mc or must_have_file:
732
            bad = True
733
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
734
          else:
735
            # not candidate and this is not a must-have file
736
            bad = True
737
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
738
                        " '%s'" % file_name)
739
        else:
740
          # all good, except non-master/non-must have combination
741
          if not node_is_mc and not must_have_file:
742
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
743
                        " candidates" % file_name)
744

    
745
    # checks ssh to any
746

    
747
    if constants.NV_NODELIST not in node_result:
748
      bad = True
749
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
750
    else:
751
      if node_result[constants.NV_NODELIST]:
752
        bad = True
753
        for node in node_result[constants.NV_NODELIST]:
754
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
755
                          (node, node_result[constants.NV_NODELIST][node]))
756

    
757
    if constants.NV_NODENETTEST not in node_result:
758
      bad = True
759
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
760
    else:
761
      if node_result[constants.NV_NODENETTEST]:
762
        bad = True
763
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
764
        for node in nlist:
765
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
766
                          (node, node_result[constants.NV_NODENETTEST][node]))
767

    
768
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
769
    if isinstance(hyp_result, dict):
770
      for hv_name, hv_result in hyp_result.iteritems():
771
        if hv_result is not None:
772
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
773
                      (hv_name, hv_result))
774

    
775
    # check used drbd list
776
    used_minors = node_result.get(constants.NV_DRBDLIST, [])
777
    if not isinstance(used_minors, (tuple, list)):
778
      feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
779
                  str(used_minors))
780
    else:
781
      for minor, (iname, must_exist) in drbd_map.items():
782
        if minor not in used_minors and must_exist:
783
          feedback_fn("  - ERROR: drbd minor %d of instance %s is not active" %
784
                      (minor, iname))
785
          bad = True
786
      for minor in used_minors:
787
        if minor not in drbd_map:
788
          feedback_fn("  - ERROR: unallocated drbd minor %d is in use" % minor)
789
          bad = True
790

    
791
    return bad
792

    
793
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
794
                      node_instance, feedback_fn, n_offline):
795
    """Verify an instance.
796

797
    This function checks to see if the required block devices are
798
    available on the instance's node.
799

800
    """
801
    bad = False
802

    
803
    node_current = instanceconfig.primary_node
804

    
805
    node_vol_should = {}
806
    instanceconfig.MapLVsByNode(node_vol_should)
807

    
808
    for node in node_vol_should:
809
      if node in n_offline:
810
        # ignore missing volumes on offline nodes
811
        continue
812
      for volume in node_vol_should[node]:
813
        if node not in node_vol_is or volume not in node_vol_is[node]:
814
          feedback_fn("  - ERROR: volume %s missing on node %s" %
815
                          (volume, node))
816
          bad = True
817

    
818
    if instanceconfig.admin_up:
819
      if ((node_current not in node_instance or
820
          not instance in node_instance[node_current]) and
821
          node_current not in n_offline):
822
        feedback_fn("  - ERROR: instance %s not running on node %s" %
823
                        (instance, node_current))
824
        bad = True
825

    
826
    for node in node_instance:
827
      if (not node == node_current):
828
        if instance in node_instance[node]:
829
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
830
                          (instance, node))
831
          bad = True
832

    
833
    return bad
834

    
835
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
836
    """Verify if there are any unknown volumes in the cluster.
837

838
    The .os, .swap and backup volumes are ignored. All other volumes are
839
    reported as unknown.
840

841
    """
842
    bad = False
843

    
844
    for node in node_vol_is:
845
      for volume in node_vol_is[node]:
846
        if node not in node_vol_should or volume not in node_vol_should[node]:
847
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
848
                      (volume, node))
849
          bad = True
850
    return bad
851

    
852
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
853
    """Verify the list of running instances.
854

855
    This checks what instances are running but unknown to the cluster.
856

857
    """
858
    bad = False
859
    for node in node_instance:
860
      for runninginstance in node_instance[node]:
861
        if runninginstance not in instancelist:
862
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
863
                          (runninginstance, node))
864
          bad = True
865
    return bad
866

    
867
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
868
    """Verify N+1 Memory Resilience.
869

870
    Check that if one single node dies we can still start all the instances it
871
    was primary for.
872

873
    """
874
    bad = False
875

    
876
    for node, nodeinfo in node_info.iteritems():
877
      # This code checks that every node which is now listed as secondary has
878
      # enough memory to host all instances it is supposed to should a single
879
      # other node in the cluster fail.
880
      # FIXME: not ready for failover to an arbitrary node
881
      # FIXME: does not support file-backed instances
882
      # WARNING: we currently take into account down instances as well as up
883
      # ones, considering that even if they're down someone might want to start
884
      # them even in the event of a node failure.
885
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
886
        needed_mem = 0
887
        for instance in instances:
888
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
889
          if bep[constants.BE_AUTO_BALANCE]:
890
            needed_mem += bep[constants.BE_MEMORY]
891
        if nodeinfo['mfree'] < needed_mem:
892
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
893
                      " failovers should node %s fail" % (node, prinode))
894
          bad = True
895
    return bad
896

    
897
  def CheckPrereq(self):
898
    """Check prerequisites.
899

900
    Transform the list of checks we're going to skip into a set and check that
901
    all its members are valid.
902

903
    """
904
    self.skip_set = frozenset(self.op.skip_checks)
905
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
906
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
907

    
908
  def BuildHooksEnv(self):
909
    """Build hooks env.
910

911
    Cluster-Verify hooks just rone in the post phase and their failure makes
912
    the output be logged in the verify output and the verification to fail.
913

914
    """
915
    all_nodes = self.cfg.GetNodeList()
916
    # TODO: populate the environment with useful information for verify hooks
917
    env = {}
918
    return env, [], all_nodes
919

    
920
  def Exec(self, feedback_fn):
921
    """Verify integrity of cluster, performing various test on nodes.
922

923
    """
924
    bad = False
925
    feedback_fn("* Verifying global settings")
926
    for msg in self.cfg.VerifyConfig():
927
      feedback_fn("  - ERROR: %s" % msg)
928

    
929
    vg_name = self.cfg.GetVGName()
930
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
931
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
932
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
933
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
934
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
935
                        for iname in instancelist)
936
    i_non_redundant = [] # Non redundant instances
937
    i_non_a_balanced = [] # Non auto-balanced instances
938
    n_offline = [] # List of offline nodes
939
    n_drained = [] # List of nodes being drained
940
    node_volume = {}
941
    node_instance = {}
942
    node_info = {}
943
    instance_cfg = {}
944

    
945
    # FIXME: verify OS list
946
    # do local checksums
947
    master_files = [constants.CLUSTER_CONF_FILE]
948

    
949
    file_names = ssconf.SimpleStore().GetFileList()
950
    file_names.append(constants.SSL_CERT_FILE)
951
    file_names.append(constants.RAPI_CERT_FILE)
952
    file_names.extend(master_files)
953

    
954
    local_checksums = utils.FingerprintFiles(file_names)
955

    
956
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
957
    node_verify_param = {
958
      constants.NV_FILELIST: file_names,
959
      constants.NV_NODELIST: [node.name for node in nodeinfo
960
                              if not node.offline],
961
      constants.NV_HYPERVISOR: hypervisors,
962
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
963
                                  node.secondary_ip) for node in nodeinfo
964
                                 if not node.offline],
965
      constants.NV_LVLIST: vg_name,
966
      constants.NV_INSTANCELIST: hypervisors,
967
      constants.NV_VGLIST: None,
968
      constants.NV_VERSION: None,
969
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
970
      constants.NV_DRBDLIST: None,
971
      }
972
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
973
                                           self.cfg.GetClusterName())
974

    
975
    cluster = self.cfg.GetClusterInfo()
976
    master_node = self.cfg.GetMasterNode()
977
    all_drbd_map = self.cfg.ComputeDRBDMap()
978

    
979
    for node_i in nodeinfo:
980
      node = node_i.name
981
      nresult = all_nvinfo[node].data
982

    
983
      if node_i.offline:
984
        feedback_fn("* Skipping offline node %s" % (node,))
985
        n_offline.append(node)
986
        continue
987

    
988
      if node == master_node:
989
        ntype = "master"
990
      elif node_i.master_candidate:
991
        ntype = "master candidate"
992
      elif node_i.drained:
993
        ntype = "drained"
994
        n_drained.append(node)
995
      else:
996
        ntype = "regular"
997
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
998

    
999
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
1000
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
1001
        bad = True
1002
        continue
1003

    
1004
      node_drbd = {}
1005
      for minor, instance in all_drbd_map[node].items():
1006
        instance = instanceinfo[instance]
1007
        node_drbd[minor] = (instance.name, instance.admin_up)
1008
      result = self._VerifyNode(node_i, file_names, local_checksums,
1009
                                nresult, feedback_fn, master_files,
1010
                                node_drbd)
1011
      bad = bad or result
1012

    
1013
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1014
      if isinstance(lvdata, basestring):
1015
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1016
                    (node, utils.SafeEncode(lvdata)))
1017
        bad = True
1018
        node_volume[node] = {}
1019
      elif not isinstance(lvdata, dict):
1020
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1021
        bad = True
1022
        continue
1023
      else:
1024
        node_volume[node] = lvdata
1025

    
1026
      # node_instance
1027
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1028
      if not isinstance(idata, list):
1029
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1030
                    (node,))
1031
        bad = True
1032
        continue
1033

    
1034
      node_instance[node] = idata
1035

    
1036
      # node_info
1037
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1038
      if not isinstance(nodeinfo, dict):
1039
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1040
        bad = True
1041
        continue
1042

    
1043
      try:
1044
        node_info[node] = {
1045
          "mfree": int(nodeinfo['memory_free']),
1046
          "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
1047
          "pinst": [],
1048
          "sinst": [],
1049
          # dictionary holding all instances this node is secondary for,
1050
          # grouped by their primary node. Each key is a cluster node, and each
1051
          # value is a list of instances which have the key as primary and the
1052
          # current node as secondary.  this is handy to calculate N+1 memory
1053
          # availability if you can only failover from a primary to its
1054
          # secondary.
1055
          "sinst-by-pnode": {},
1056
        }
1057
      except ValueError:
1058
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
1059
        bad = True
1060
        continue
1061

    
1062
    node_vol_should = {}
1063

    
1064
    for instance in instancelist:
1065
      feedback_fn("* Verifying instance %s" % instance)
1066
      inst_config = instanceinfo[instance]
1067
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1068
                                     node_instance, feedback_fn, n_offline)
1069
      bad = bad or result
1070
      inst_nodes_offline = []
1071

    
1072
      inst_config.MapLVsByNode(node_vol_should)
1073

    
1074
      instance_cfg[instance] = inst_config
1075

    
1076
      pnode = inst_config.primary_node
1077
      if pnode in node_info:
1078
        node_info[pnode]['pinst'].append(instance)
1079
      elif pnode not in n_offline:
1080
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1081
                    " %s failed" % (instance, pnode))
1082
        bad = True
1083

    
1084
      if pnode in n_offline:
1085
        inst_nodes_offline.append(pnode)
1086

    
1087
      # If the instance is non-redundant we cannot survive losing its primary
1088
      # node, so we are not N+1 compliant. On the other hand we have no disk
1089
      # templates with more than one secondary so that situation is not well
1090
      # supported either.
1091
      # FIXME: does not support file-backed instances
1092
      if len(inst_config.secondary_nodes) == 0:
1093
        i_non_redundant.append(instance)
1094
      elif len(inst_config.secondary_nodes) > 1:
1095
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1096
                    % instance)
1097

    
1098
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1099
        i_non_a_balanced.append(instance)
1100

    
1101
      for snode in inst_config.secondary_nodes:
1102
        if snode in node_info:
1103
          node_info[snode]['sinst'].append(instance)
1104
          if pnode not in node_info[snode]['sinst-by-pnode']:
1105
            node_info[snode]['sinst-by-pnode'][pnode] = []
1106
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1107
        elif snode not in n_offline:
1108
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1109
                      " %s failed" % (instance, snode))
1110
          bad = True
1111
        if snode in n_offline:
1112
          inst_nodes_offline.append(snode)
1113

    
1114
      if inst_nodes_offline:
1115
        # warn that the instance lives on offline nodes, and set bad=True
1116
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1117
                    ", ".join(inst_nodes_offline))
1118
        bad = True
1119

    
1120
    feedback_fn("* Verifying orphan volumes")
1121
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1122
                                       feedback_fn)
1123
    bad = bad or result
1124

    
1125
    feedback_fn("* Verifying remaining instances")
1126
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1127
                                         feedback_fn)
1128
    bad = bad or result
1129

    
1130
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1131
      feedback_fn("* Verifying N+1 Memory redundancy")
1132
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1133
      bad = bad or result
1134

    
1135
    feedback_fn("* Other Notes")
1136
    if i_non_redundant:
1137
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1138
                  % len(i_non_redundant))
1139

    
1140
    if i_non_a_balanced:
1141
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1142
                  % len(i_non_a_balanced))
1143

    
1144
    if n_offline:
1145
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1146

    
1147
    if n_drained:
1148
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1149

    
1150
    return not bad
1151

    
1152
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1153
    """Analize the post-hooks' result
1154

1155
    This method analyses the hook result, handles it, and sends some
1156
    nicely-formatted feedback back to the user.
1157

1158
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1159
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1160
    @param hooks_results: the results of the multi-node hooks rpc call
1161
    @param feedback_fn: function used send feedback back to the caller
1162
    @param lu_result: previous Exec result
1163
    @return: the new Exec result, based on the previous result
1164
        and hook results
1165

1166
    """
1167
    # We only really run POST phase hooks, and are only interested in
1168
    # their results
1169
    if phase == constants.HOOKS_PHASE_POST:
1170
      # Used to change hooks' output to proper indentation
1171
      indent_re = re.compile('^', re.M)
1172
      feedback_fn("* Hooks Results")
1173
      if not hooks_results:
1174
        feedback_fn("  - ERROR: general communication failure")
1175
        lu_result = 1
1176
      else:
1177
        for node_name in hooks_results:
1178
          show_node_header = True
1179
          res = hooks_results[node_name]
1180
          if res.failed or res.data is False or not isinstance(res.data, list):
1181
            if res.offline:
1182
              # no need to warn or set fail return value
1183
              continue
1184
            feedback_fn("    Communication failure in hooks execution")
1185
            lu_result = 1
1186
            continue
1187
          for script, hkr, output in res.data:
1188
            if hkr == constants.HKR_FAIL:
1189
              # The node header is only shown once, if there are
1190
              # failing hooks on that node
1191
              if show_node_header:
1192
                feedback_fn("  Node %s:" % node_name)
1193
                show_node_header = False
1194
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1195
              output = indent_re.sub('      ', output)
1196
              feedback_fn("%s" % output)
1197
              lu_result = 1
1198

    
1199
      return lu_result
1200

    
1201

    
1202
class LUVerifyDisks(NoHooksLU):
1203
  """Verifies the cluster disks status.
1204

1205
  """
1206
  _OP_REQP = []
1207
  REQ_BGL = False
1208

    
1209
  def ExpandNames(self):
1210
    self.needed_locks = {
1211
      locking.LEVEL_NODE: locking.ALL_SET,
1212
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1213
    }
1214
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1215

    
1216
  def CheckPrereq(self):
1217
    """Check prerequisites.
1218

1219
    This has no prerequisites.
1220

1221
    """
1222
    pass
1223

    
1224
  def Exec(self, feedback_fn):
1225
    """Verify integrity of cluster disks.
1226

1227
    """
1228
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1229

    
1230
    vg_name = self.cfg.GetVGName()
1231
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1232
    instances = [self.cfg.GetInstanceInfo(name)
1233
                 for name in self.cfg.GetInstanceList()]
1234

    
1235
    nv_dict = {}
1236
    for inst in instances:
1237
      inst_lvs = {}
1238
      if (not inst.admin_up or
1239
          inst.disk_template not in constants.DTS_NET_MIRROR):
1240
        continue
1241
      inst.MapLVsByNode(inst_lvs)
1242
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1243
      for node, vol_list in inst_lvs.iteritems():
1244
        for vol in vol_list:
1245
          nv_dict[(node, vol)] = inst
1246

    
1247
    if not nv_dict:
1248
      return result
1249

    
1250
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1251

    
1252
    to_act = set()
1253
    for node in nodes:
1254
      # node_volume
1255
      lvs = node_lvs[node]
1256
      if lvs.failed:
1257
        if not lvs.offline:
1258
          self.LogWarning("Connection to node %s failed: %s" %
1259
                          (node, lvs.data))
1260
        continue
1261
      lvs = lvs.data
1262
      if isinstance(lvs, basestring):
1263
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1264
        res_nlvm[node] = lvs
1265
      elif not isinstance(lvs, dict):
1266
        logging.warning("Connection to node %s failed or invalid data"
1267
                        " returned", node)
1268
        res_nodes.append(node)
1269
        continue
1270

    
1271
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1272
        inst = nv_dict.pop((node, lv_name), None)
1273
        if (not lv_online and inst is not None
1274
            and inst.name not in res_instances):
1275
          res_instances.append(inst.name)
1276

    
1277
    # any leftover items in nv_dict are missing LVs, let's arrange the
1278
    # data better
1279
    for key, inst in nv_dict.iteritems():
1280
      if inst.name not in res_missing:
1281
        res_missing[inst.name] = []
1282
      res_missing[inst.name].append(key)
1283

    
1284
    return result
1285

    
1286

    
1287
class LURenameCluster(LogicalUnit):
1288
  """Rename the cluster.
1289

1290
  """
1291
  HPATH = "cluster-rename"
1292
  HTYPE = constants.HTYPE_CLUSTER
1293
  _OP_REQP = ["name"]
1294

    
1295
  def BuildHooksEnv(self):
1296
    """Build hooks env.
1297

1298
    """
1299
    env = {
1300
      "OP_TARGET": self.cfg.GetClusterName(),
1301
      "NEW_NAME": self.op.name,
1302
      }
1303
    mn = self.cfg.GetMasterNode()
1304
    return env, [mn], [mn]
1305

    
1306
  def CheckPrereq(self):
1307
    """Verify that the passed name is a valid one.
1308

1309
    """
1310
    hostname = utils.HostInfo(self.op.name)
1311

    
1312
    new_name = hostname.name
1313
    self.ip = new_ip = hostname.ip
1314
    old_name = self.cfg.GetClusterName()
1315
    old_ip = self.cfg.GetMasterIP()
1316
    if new_name == old_name and new_ip == old_ip:
1317
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1318
                                 " cluster has changed")
1319
    if new_ip != old_ip:
1320
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1321
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1322
                                   " reachable on the network. Aborting." %
1323
                                   new_ip)
1324

    
1325
    self.op.name = new_name
1326

    
1327
  def Exec(self, feedback_fn):
1328
    """Rename the cluster.
1329

1330
    """
1331
    clustername = self.op.name
1332
    ip = self.ip
1333

    
1334
    # shutdown the master IP
1335
    master = self.cfg.GetMasterNode()
1336
    result = self.rpc.call_node_stop_master(master, False)
1337
    if result.failed or not result.data:
1338
      raise errors.OpExecError("Could not disable the master role")
1339

    
1340
    try:
1341
      cluster = self.cfg.GetClusterInfo()
1342
      cluster.cluster_name = clustername
1343
      cluster.master_ip = ip
1344
      self.cfg.Update(cluster)
1345

    
1346
      # update the known hosts file
1347
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1348
      node_list = self.cfg.GetNodeList()
1349
      try:
1350
        node_list.remove(master)
1351
      except ValueError:
1352
        pass
1353
      result = self.rpc.call_upload_file(node_list,
1354
                                         constants.SSH_KNOWN_HOSTS_FILE)
1355
      for to_node, to_result in result.iteritems():
1356
        if to_result.failed or not to_result.data:
1357
          logging.error("Copy of file %s to node %s failed",
1358
                        constants.SSH_KNOWN_HOSTS_FILE, to_node)
1359

    
1360
    finally:
1361
      result = self.rpc.call_node_start_master(master, False)
1362
      if result.failed or not result.data:
1363
        self.LogWarning("Could not re-enable the master role on"
1364
                        " the master, please restart manually.")
1365

    
1366

    
1367
def _RecursiveCheckIfLVMBased(disk):
1368
  """Check if the given disk or its children are lvm-based.
1369

1370
  @type disk: L{objects.Disk}
1371
  @param disk: the disk to check
1372
  @rtype: booleean
1373
  @return: boolean indicating whether a LD_LV dev_type was found or not
1374

1375
  """
1376
  if disk.children:
1377
    for chdisk in disk.children:
1378
      if _RecursiveCheckIfLVMBased(chdisk):
1379
        return True
1380
  return disk.dev_type == constants.LD_LV
1381

    
1382

    
1383
class LUSetClusterParams(LogicalUnit):
1384
  """Change the parameters of the cluster.
1385

1386
  """
1387
  HPATH = "cluster-modify"
1388
  HTYPE = constants.HTYPE_CLUSTER
1389
  _OP_REQP = []
1390
  REQ_BGL = False
1391

    
1392
  def CheckParameters(self):
1393
    """Check parameters
1394

1395
    """
1396
    if not hasattr(self.op, "candidate_pool_size"):
1397
      self.op.candidate_pool_size = None
1398
    if self.op.candidate_pool_size is not None:
1399
      try:
1400
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1401
      except ValueError, err:
1402
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1403
                                   str(err))
1404
      if self.op.candidate_pool_size < 1:
1405
        raise errors.OpPrereqError("At least one master candidate needed")
1406

    
1407
  def ExpandNames(self):
1408
    # FIXME: in the future maybe other cluster params won't require checking on
1409
    # all nodes to be modified.
1410
    self.needed_locks = {
1411
      locking.LEVEL_NODE: locking.ALL_SET,
1412
    }
1413
    self.share_locks[locking.LEVEL_NODE] = 1
1414

    
1415
  def BuildHooksEnv(self):
1416
    """Build hooks env.
1417

1418
    """
1419
    env = {
1420
      "OP_TARGET": self.cfg.GetClusterName(),
1421
      "NEW_VG_NAME": self.op.vg_name,
1422
      }
1423
    mn = self.cfg.GetMasterNode()
1424
    return env, [mn], [mn]
1425

    
1426
  def CheckPrereq(self):
1427
    """Check prerequisites.
1428

1429
    This checks whether the given params don't conflict and
1430
    if the given volume group is valid.
1431

1432
    """
1433
    if self.op.vg_name is not None and not self.op.vg_name:
1434
      instances = self.cfg.GetAllInstancesInfo().values()
1435
      for inst in instances:
1436
        for disk in inst.disks:
1437
          if _RecursiveCheckIfLVMBased(disk):
1438
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1439
                                       " lvm-based instances exist")
1440

    
1441
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1442

    
1443
    # if vg_name not None, checks given volume group on all nodes
1444
    if self.op.vg_name:
1445
      vglist = self.rpc.call_vg_list(node_list)
1446
      for node in node_list:
1447
        if vglist[node].failed:
1448
          # ignoring down node
1449
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1450
          continue
1451
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1452
                                              self.op.vg_name,
1453
                                              constants.MIN_VG_SIZE)
1454
        if vgstatus:
1455
          raise errors.OpPrereqError("Error on node '%s': %s" %
1456
                                     (node, vgstatus))
1457

    
1458
    self.cluster = cluster = self.cfg.GetClusterInfo()
1459
    # validate beparams changes
1460
    if self.op.beparams:
1461
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1462
      self.new_beparams = cluster.FillDict(
1463
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1464

    
1465
    # hypervisor list/parameters
1466
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1467
    if self.op.hvparams:
1468
      if not isinstance(self.op.hvparams, dict):
1469
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1470
      for hv_name, hv_dict in self.op.hvparams.items():
1471
        if hv_name not in self.new_hvparams:
1472
          self.new_hvparams[hv_name] = hv_dict
1473
        else:
1474
          self.new_hvparams[hv_name].update(hv_dict)
1475

    
1476
    if self.op.enabled_hypervisors is not None:
1477
      self.hv_list = self.op.enabled_hypervisors
1478
    else:
1479
      self.hv_list = cluster.enabled_hypervisors
1480

    
1481
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1482
      # either the enabled list has changed, or the parameters have, validate
1483
      for hv_name, hv_params in self.new_hvparams.items():
1484
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1485
            (self.op.enabled_hypervisors and
1486
             hv_name in self.op.enabled_hypervisors)):
1487
          # either this is a new hypervisor, or its parameters have changed
1488
          hv_class = hypervisor.GetHypervisor(hv_name)
1489
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1490
          hv_class.CheckParameterSyntax(hv_params)
1491
          _CheckHVParams(self, node_list, hv_name, hv_params)
1492

    
1493
  def Exec(self, feedback_fn):
1494
    """Change the parameters of the cluster.
1495

1496
    """
1497
    if self.op.vg_name is not None:
1498
      if self.op.vg_name != self.cfg.GetVGName():
1499
        self.cfg.SetVGName(self.op.vg_name)
1500
      else:
1501
        feedback_fn("Cluster LVM configuration already in desired"
1502
                    " state, not changing")
1503
    if self.op.hvparams:
1504
      self.cluster.hvparams = self.new_hvparams
1505
    if self.op.enabled_hypervisors is not None:
1506
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1507
    if self.op.beparams:
1508
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1509
    if self.op.candidate_pool_size is not None:
1510
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1511

    
1512
    self.cfg.Update(self.cluster)
1513

    
1514
    # we want to update nodes after the cluster so that if any errors
1515
    # happen, we have recorded and saved the cluster info
1516
    if self.op.candidate_pool_size is not None:
1517
      _AdjustCandidatePool(self)
1518

    
1519

    
1520
class LURedistributeConfig(NoHooksLU):
1521
  """Force the redistribution of cluster configuration.
1522

1523
  This is a very simple LU.
1524

1525
  """
1526
  _OP_REQP = []
1527
  REQ_BGL = False
1528

    
1529
  def ExpandNames(self):
1530
    self.needed_locks = {
1531
      locking.LEVEL_NODE: locking.ALL_SET,
1532
    }
1533
    self.share_locks[locking.LEVEL_NODE] = 1
1534

    
1535
  def CheckPrereq(self):
1536
    """Check prerequisites.
1537

1538
    """
1539

    
1540
  def Exec(self, feedback_fn):
1541
    """Redistribute the configuration.
1542

1543
    """
1544
    self.cfg.Update(self.cfg.GetClusterInfo())
1545

    
1546

    
1547
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1548
  """Sleep and poll for an instance's disk to sync.
1549

1550
  """
1551
  if not instance.disks:
1552
    return True
1553

    
1554
  if not oneshot:
1555
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1556

    
1557
  node = instance.primary_node
1558

    
1559
  for dev in instance.disks:
1560
    lu.cfg.SetDiskID(dev, node)
1561

    
1562
  retries = 0
1563
  while True:
1564
    max_time = 0
1565
    done = True
1566
    cumul_degraded = False
1567
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1568
    if rstats.failed or not rstats.data:
1569
      lu.LogWarning("Can't get any data from node %s", node)
1570
      retries += 1
1571
      if retries >= 10:
1572
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1573
                                 " aborting." % node)
1574
      time.sleep(6)
1575
      continue
1576
    rstats = rstats.data
1577
    retries = 0
1578
    for i, mstat in enumerate(rstats):
1579
      if mstat is None:
1580
        lu.LogWarning("Can't compute data for node %s/%s",
1581
                           node, instance.disks[i].iv_name)
1582
        continue
1583
      # we ignore the ldisk parameter
1584
      perc_done, est_time, is_degraded, _ = mstat
1585
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1586
      if perc_done is not None:
1587
        done = False
1588
        if est_time is not None:
1589
          rem_time = "%d estimated seconds remaining" % est_time
1590
          max_time = est_time
1591
        else:
1592
          rem_time = "no time estimate"
1593
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1594
                        (instance.disks[i].iv_name, perc_done, rem_time))
1595
    if done or oneshot:
1596
      break
1597

    
1598
    time.sleep(min(60, max_time))
1599

    
1600
  if done:
1601
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1602
  return not cumul_degraded
1603

    
1604

    
1605
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1606
  """Check that mirrors are not degraded.
1607

1608
  The ldisk parameter, if True, will change the test from the
1609
  is_degraded attribute (which represents overall non-ok status for
1610
  the device(s)) to the ldisk (representing the local storage status).
1611

1612
  """
1613
  lu.cfg.SetDiskID(dev, node)
1614
  if ldisk:
1615
    idx = 6
1616
  else:
1617
    idx = 5
1618

    
1619
  result = True
1620
  if on_primary or dev.AssembleOnSecondary():
1621
    rstats = lu.rpc.call_blockdev_find(node, dev)
1622
    msg = rstats.RemoteFailMsg()
1623
    if msg:
1624
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1625
      result = False
1626
    elif not rstats.payload:
1627
      lu.LogWarning("Can't find disk on node %s", node)
1628
      result = False
1629
    else:
1630
      result = result and (not rstats.payload[idx])
1631
  if dev.children:
1632
    for child in dev.children:
1633
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1634

    
1635
  return result
1636

    
1637

    
1638
class LUDiagnoseOS(NoHooksLU):
1639
  """Logical unit for OS diagnose/query.
1640

1641
  """
1642
  _OP_REQP = ["output_fields", "names"]
1643
  REQ_BGL = False
1644
  _FIELDS_STATIC = utils.FieldSet()
1645
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1646

    
1647
  def ExpandNames(self):
1648
    if self.op.names:
1649
      raise errors.OpPrereqError("Selective OS query not supported")
1650

    
1651
    _CheckOutputFields(static=self._FIELDS_STATIC,
1652
                       dynamic=self._FIELDS_DYNAMIC,
1653
                       selected=self.op.output_fields)
1654

    
1655
    # Lock all nodes, in shared mode
1656
    self.needed_locks = {}
1657
    self.share_locks[locking.LEVEL_NODE] = 1
1658
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1659

    
1660
  def CheckPrereq(self):
1661
    """Check prerequisites.
1662

1663
    """
1664

    
1665
  @staticmethod
1666
  def _DiagnoseByOS(node_list, rlist):
1667
    """Remaps a per-node return list into an a per-os per-node dictionary
1668

1669
    @param node_list: a list with the names of all nodes
1670
    @param rlist: a map with node names as keys and OS objects as values
1671

1672
    @rtype: dict
1673
    @returns: a dictionary with osnames as keys and as value another map, with
1674
        nodes as keys and list of OS objects as values, eg::
1675

1676
          {"debian-etch": {"node1": [<object>,...],
1677
                           "node2": [<object>,]}
1678
          }
1679

1680
    """
1681
    all_os = {}
1682
    for node_name, nr in rlist.iteritems():
1683
      if nr.failed or not nr.data:
1684
        continue
1685
      for os_obj in nr.data:
1686
        if os_obj.name not in all_os:
1687
          # build a list of nodes for this os containing empty lists
1688
          # for each node in node_list
1689
          all_os[os_obj.name] = {}
1690
          for nname in node_list:
1691
            all_os[os_obj.name][nname] = []
1692
        all_os[os_obj.name][node_name].append(os_obj)
1693
    return all_os
1694

    
1695
  def Exec(self, feedback_fn):
1696
    """Compute the list of OSes.
1697

1698
    """
1699
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1700
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1701
                   if node in node_list]
1702
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1703
    if node_data == False:
1704
      raise errors.OpExecError("Can't gather the list of OSes")
1705
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1706
    output = []
1707
    for os_name, os_data in pol.iteritems():
1708
      row = []
1709
      for field in self.op.output_fields:
1710
        if field == "name":
1711
          val = os_name
1712
        elif field == "valid":
1713
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1714
        elif field == "node_status":
1715
          val = {}
1716
          for node_name, nos_list in os_data.iteritems():
1717
            val[node_name] = [(v.status, v.path) for v in nos_list]
1718
        else:
1719
          raise errors.ParameterError(field)
1720
        row.append(val)
1721
      output.append(row)
1722

    
1723
    return output
1724

    
1725

    
1726
class LURemoveNode(LogicalUnit):
1727
  """Logical unit for removing a node.
1728

1729
  """
1730
  HPATH = "node-remove"
1731
  HTYPE = constants.HTYPE_NODE
1732
  _OP_REQP = ["node_name"]
1733

    
1734
  def BuildHooksEnv(self):
1735
    """Build hooks env.
1736

1737
    This doesn't run on the target node in the pre phase as a failed
1738
    node would then be impossible to remove.
1739

1740
    """
1741
    env = {
1742
      "OP_TARGET": self.op.node_name,
1743
      "NODE_NAME": self.op.node_name,
1744
      }
1745
    all_nodes = self.cfg.GetNodeList()
1746
    all_nodes.remove(self.op.node_name)
1747
    return env, all_nodes, all_nodes
1748

    
1749
  def CheckPrereq(self):
1750
    """Check prerequisites.
1751

1752
    This checks:
1753
     - the node exists in the configuration
1754
     - it does not have primary or secondary instances
1755
     - it's not the master
1756

1757
    Any errors are signalled by raising errors.OpPrereqError.
1758

1759
    """
1760
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1761
    if node is None:
1762
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1763

    
1764
    instance_list = self.cfg.GetInstanceList()
1765

    
1766
    masternode = self.cfg.GetMasterNode()
1767
    if node.name == masternode:
1768
      raise errors.OpPrereqError("Node is the master node,"
1769
                                 " you need to failover first.")
1770

    
1771
    for instance_name in instance_list:
1772
      instance = self.cfg.GetInstanceInfo(instance_name)
1773
      if node.name in instance.all_nodes:
1774
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1775
                                   " please remove first." % instance_name)
1776
    self.op.node_name = node.name
1777
    self.node = node
1778

    
1779
  def Exec(self, feedback_fn):
1780
    """Removes the node from the cluster.
1781

1782
    """
1783
    node = self.node
1784
    logging.info("Stopping the node daemon and removing configs from node %s",
1785
                 node.name)
1786

    
1787
    self.context.RemoveNode(node.name)
1788

    
1789
    self.rpc.call_node_leave_cluster(node.name)
1790

    
1791
    # Promote nodes to master candidate as needed
1792
    _AdjustCandidatePool(self)
1793

    
1794

    
1795
class LUQueryNodes(NoHooksLU):
1796
  """Logical unit for querying nodes.
1797

1798
  """
1799
  _OP_REQP = ["output_fields", "names", "use_locking"]
1800
  REQ_BGL = False
1801
  _FIELDS_DYNAMIC = utils.FieldSet(
1802
    "dtotal", "dfree",
1803
    "mtotal", "mnode", "mfree",
1804
    "bootid",
1805
    "ctotal", "cnodes", "csockets",
1806
    )
1807

    
1808
  _FIELDS_STATIC = utils.FieldSet(
1809
    "name", "pinst_cnt", "sinst_cnt",
1810
    "pinst_list", "sinst_list",
1811
    "pip", "sip", "tags",
1812
    "serial_no",
1813
    "master_candidate",
1814
    "master",
1815
    "offline",
1816
    "drained",
1817
    )
1818

    
1819
  def ExpandNames(self):
1820
    _CheckOutputFields(static=self._FIELDS_STATIC,
1821
                       dynamic=self._FIELDS_DYNAMIC,
1822
                       selected=self.op.output_fields)
1823

    
1824
    self.needed_locks = {}
1825
    self.share_locks[locking.LEVEL_NODE] = 1
1826

    
1827
    if self.op.names:
1828
      self.wanted = _GetWantedNodes(self, self.op.names)
1829
    else:
1830
      self.wanted = locking.ALL_SET
1831

    
1832
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1833
    self.do_locking = self.do_node_query and self.op.use_locking
1834
    if self.do_locking:
1835
      # if we don't request only static fields, we need to lock the nodes
1836
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1837

    
1838

    
1839
  def CheckPrereq(self):
1840
    """Check prerequisites.
1841

1842
    """
1843
    # The validation of the node list is done in the _GetWantedNodes,
1844
    # if non empty, and if empty, there's no validation to do
1845
    pass
1846

    
1847
  def Exec(self, feedback_fn):
1848
    """Computes the list of nodes and their attributes.
1849

1850
    """
1851
    all_info = self.cfg.GetAllNodesInfo()
1852
    if self.do_locking:
1853
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1854
    elif self.wanted != locking.ALL_SET:
1855
      nodenames = self.wanted
1856
      missing = set(nodenames).difference(all_info.keys())
1857
      if missing:
1858
        raise errors.OpExecError(
1859
          "Some nodes were removed before retrieving their data: %s" % missing)
1860
    else:
1861
      nodenames = all_info.keys()
1862

    
1863
    nodenames = utils.NiceSort(nodenames)
1864
    nodelist = [all_info[name] for name in nodenames]
1865

    
1866
    # begin data gathering
1867

    
1868
    if self.do_node_query:
1869
      live_data = {}
1870
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1871
                                          self.cfg.GetHypervisorType())
1872
      for name in nodenames:
1873
        nodeinfo = node_data[name]
1874
        if not nodeinfo.failed and nodeinfo.data:
1875
          nodeinfo = nodeinfo.data
1876
          fn = utils.TryConvert
1877
          live_data[name] = {
1878
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1879
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1880
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1881
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1882
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1883
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1884
            "bootid": nodeinfo.get('bootid', None),
1885
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1886
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1887
            }
1888
        else:
1889
          live_data[name] = {}
1890
    else:
1891
      live_data = dict.fromkeys(nodenames, {})
1892

    
1893
    node_to_primary = dict([(name, set()) for name in nodenames])
1894
    node_to_secondary = dict([(name, set()) for name in nodenames])
1895

    
1896
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1897
                             "sinst_cnt", "sinst_list"))
1898
    if inst_fields & frozenset(self.op.output_fields):
1899
      instancelist = self.cfg.GetInstanceList()
1900

    
1901
      for instance_name in instancelist:
1902
        inst = self.cfg.GetInstanceInfo(instance_name)
1903
        if inst.primary_node in node_to_primary:
1904
          node_to_primary[inst.primary_node].add(inst.name)
1905
        for secnode in inst.secondary_nodes:
1906
          if secnode in node_to_secondary:
1907
            node_to_secondary[secnode].add(inst.name)
1908

    
1909
    master_node = self.cfg.GetMasterNode()
1910

    
1911
    # end data gathering
1912

    
1913
    output = []
1914
    for node in nodelist:
1915
      node_output = []
1916
      for field in self.op.output_fields:
1917
        if field == "name":
1918
          val = node.name
1919
        elif field == "pinst_list":
1920
          val = list(node_to_primary[node.name])
1921
        elif field == "sinst_list":
1922
          val = list(node_to_secondary[node.name])
1923
        elif field == "pinst_cnt":
1924
          val = len(node_to_primary[node.name])
1925
        elif field == "sinst_cnt":
1926
          val = len(node_to_secondary[node.name])
1927
        elif field == "pip":
1928
          val = node.primary_ip
1929
        elif field == "sip":
1930
          val = node.secondary_ip
1931
        elif field == "tags":
1932
          val = list(node.GetTags())
1933
        elif field == "serial_no":
1934
          val = node.serial_no
1935
        elif field == "master_candidate":
1936
          val = node.master_candidate
1937
        elif field == "master":
1938
          val = node.name == master_node
1939
        elif field == "offline":
1940
          val = node.offline
1941
        elif field == "drained":
1942
          val = node.drained
1943
        elif self._FIELDS_DYNAMIC.Matches(field):
1944
          val = live_data[node.name].get(field, None)
1945
        else:
1946
          raise errors.ParameterError(field)
1947
        node_output.append(val)
1948
      output.append(node_output)
1949

    
1950
    return output
1951

    
1952

    
1953
class LUQueryNodeVolumes(NoHooksLU):
1954
  """Logical unit for getting volumes on node(s).
1955

1956
  """
1957
  _OP_REQP = ["nodes", "output_fields"]
1958
  REQ_BGL = False
1959
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1960
  _FIELDS_STATIC = utils.FieldSet("node")
1961

    
1962
  def ExpandNames(self):
1963
    _CheckOutputFields(static=self._FIELDS_STATIC,
1964
                       dynamic=self._FIELDS_DYNAMIC,
1965
                       selected=self.op.output_fields)
1966

    
1967
    self.needed_locks = {}
1968
    self.share_locks[locking.LEVEL_NODE] = 1
1969
    if not self.op.nodes:
1970
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1971
    else:
1972
      self.needed_locks[locking.LEVEL_NODE] = \
1973
        _GetWantedNodes(self, self.op.nodes)
1974

    
1975
  def CheckPrereq(self):
1976
    """Check prerequisites.
1977

1978
    This checks that the fields required are valid output fields.
1979

1980
    """
1981
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1982

    
1983
  def Exec(self, feedback_fn):
1984
    """Computes the list of nodes and their attributes.
1985

1986
    """
1987
    nodenames = self.nodes
1988
    volumes = self.rpc.call_node_volumes(nodenames)
1989

    
1990
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1991
             in self.cfg.GetInstanceList()]
1992

    
1993
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1994

    
1995
    output = []
1996
    for node in nodenames:
1997
      if node not in volumes or volumes[node].failed or not volumes[node].data:
1998
        continue
1999

    
2000
      node_vols = volumes[node].data[:]
2001
      node_vols.sort(key=lambda vol: vol['dev'])
2002

    
2003
      for vol in node_vols:
2004
        node_output = []
2005
        for field in self.op.output_fields:
2006
          if field == "node":
2007
            val = node
2008
          elif field == "phys":
2009
            val = vol['dev']
2010
          elif field == "vg":
2011
            val = vol['vg']
2012
          elif field == "name":
2013
            val = vol['name']
2014
          elif field == "size":
2015
            val = int(float(vol['size']))
2016
          elif field == "instance":
2017
            for inst in ilist:
2018
              if node not in lv_by_node[inst]:
2019
                continue
2020
              if vol['name'] in lv_by_node[inst][node]:
2021
                val = inst.name
2022
                break
2023
            else:
2024
              val = '-'
2025
          else:
2026
            raise errors.ParameterError(field)
2027
          node_output.append(str(val))
2028

    
2029
        output.append(node_output)
2030

    
2031
    return output
2032

    
2033

    
2034
class LUAddNode(LogicalUnit):
2035
  """Logical unit for adding node to the cluster.
2036

2037
  """
2038
  HPATH = "node-add"
2039
  HTYPE = constants.HTYPE_NODE
2040
  _OP_REQP = ["node_name"]
2041

    
2042
  def BuildHooksEnv(self):
2043
    """Build hooks env.
2044

2045
    This will run on all nodes before, and on all nodes + the new node after.
2046

2047
    """
2048
    env = {
2049
      "OP_TARGET": self.op.node_name,
2050
      "NODE_NAME": self.op.node_name,
2051
      "NODE_PIP": self.op.primary_ip,
2052
      "NODE_SIP": self.op.secondary_ip,
2053
      }
2054
    nodes_0 = self.cfg.GetNodeList()
2055
    nodes_1 = nodes_0 + [self.op.node_name, ]
2056
    return env, nodes_0, nodes_1
2057

    
2058
  def CheckPrereq(self):
2059
    """Check prerequisites.
2060

2061
    This checks:
2062
     - the new node is not already in the config
2063
     - it is resolvable
2064
     - its parameters (single/dual homed) matches the cluster
2065

2066
    Any errors are signalled by raising errors.OpPrereqError.
2067

2068
    """
2069
    node_name = self.op.node_name
2070
    cfg = self.cfg
2071

    
2072
    dns_data = utils.HostInfo(node_name)
2073

    
2074
    node = dns_data.name
2075
    primary_ip = self.op.primary_ip = dns_data.ip
2076
    secondary_ip = getattr(self.op, "secondary_ip", None)
2077
    if secondary_ip is None:
2078
      secondary_ip = primary_ip
2079
    if not utils.IsValidIP(secondary_ip):
2080
      raise errors.OpPrereqError("Invalid secondary IP given")
2081
    self.op.secondary_ip = secondary_ip
2082

    
2083
    node_list = cfg.GetNodeList()
2084
    if not self.op.readd and node in node_list:
2085
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2086
                                 node)
2087
    elif self.op.readd and node not in node_list:
2088
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2089

    
2090
    for existing_node_name in node_list:
2091
      existing_node = cfg.GetNodeInfo(existing_node_name)
2092

    
2093
      if self.op.readd and node == existing_node_name:
2094
        if (existing_node.primary_ip != primary_ip or
2095
            existing_node.secondary_ip != secondary_ip):
2096
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2097
                                     " address configuration as before")
2098
        continue
2099

    
2100
      if (existing_node.primary_ip == primary_ip or
2101
          existing_node.secondary_ip == primary_ip or
2102
          existing_node.primary_ip == secondary_ip or
2103
          existing_node.secondary_ip == secondary_ip):
2104
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2105
                                   " existing node %s" % existing_node.name)
2106

    
2107
    # check that the type of the node (single versus dual homed) is the
2108
    # same as for the master
2109
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2110
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2111
    newbie_singlehomed = secondary_ip == primary_ip
2112
    if master_singlehomed != newbie_singlehomed:
2113
      if master_singlehomed:
2114
        raise errors.OpPrereqError("The master has no private ip but the"
2115
                                   " new node has one")
2116
      else:
2117
        raise errors.OpPrereqError("The master has a private ip but the"
2118
                                   " new node doesn't have one")
2119

    
2120
    # checks reachablity
2121
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2122
      raise errors.OpPrereqError("Node not reachable by ping")
2123

    
2124
    if not newbie_singlehomed:
2125
      # check reachability from my secondary ip to newbie's secondary ip
2126
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2127
                           source=myself.secondary_ip):
2128
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2129
                                   " based ping to noded port")
2130

    
2131
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2132
    mc_now, _ = self.cfg.GetMasterCandidateStats()
2133
    master_candidate = mc_now < cp_size
2134

    
2135
    self.new_node = objects.Node(name=node,
2136
                                 primary_ip=primary_ip,
2137
                                 secondary_ip=secondary_ip,
2138
                                 master_candidate=master_candidate,
2139
                                 offline=False, drained=False)
2140

    
2141
  def Exec(self, feedback_fn):
2142
    """Adds the new node to the cluster.
2143

2144
    """
2145
    new_node = self.new_node
2146
    node = new_node.name
2147

    
2148
    # check connectivity
2149
    result = self.rpc.call_version([node])[node]
2150
    result.Raise()
2151
    if result.data:
2152
      if constants.PROTOCOL_VERSION == result.data:
2153
        logging.info("Communication to node %s fine, sw version %s match",
2154
                     node, result.data)
2155
      else:
2156
        raise errors.OpExecError("Version mismatch master version %s,"
2157
                                 " node version %s" %
2158
                                 (constants.PROTOCOL_VERSION, result.data))
2159
    else:
2160
      raise errors.OpExecError("Cannot get version from the new node")
2161

    
2162
    # setup ssh on node
2163
    logging.info("Copy ssh key to node %s", node)
2164
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2165
    keyarray = []
2166
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2167
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2168
                priv_key, pub_key]
2169

    
2170
    for i in keyfiles:
2171
      f = open(i, 'r')
2172
      try:
2173
        keyarray.append(f.read())
2174
      finally:
2175
        f.close()
2176

    
2177
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2178
                                    keyarray[2],
2179
                                    keyarray[3], keyarray[4], keyarray[5])
2180

    
2181
    msg = result.RemoteFailMsg()
2182
    if msg:
2183
      raise errors.OpExecError("Cannot transfer ssh keys to the"
2184
                               " new node: %s" % msg)
2185

    
2186
    # Add node to our /etc/hosts, and add key to known_hosts
2187
    utils.AddHostToEtcHosts(new_node.name)
2188

    
2189
    if new_node.secondary_ip != new_node.primary_ip:
2190
      result = self.rpc.call_node_has_ip_address(new_node.name,
2191
                                                 new_node.secondary_ip)
2192
      if result.failed or not result.data:
2193
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2194
                                 " you gave (%s). Please fix and re-run this"
2195
                                 " command." % new_node.secondary_ip)
2196

    
2197
    node_verify_list = [self.cfg.GetMasterNode()]
2198
    node_verify_param = {
2199
      'nodelist': [node],
2200
      # TODO: do a node-net-test as well?
2201
    }
2202

    
2203
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2204
                                       self.cfg.GetClusterName())
2205
    for verifier in node_verify_list:
2206
      if result[verifier].failed or not result[verifier].data:
2207
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2208
                                 " for remote verification" % verifier)
2209
      if result[verifier].data['nodelist']:
2210
        for failed in result[verifier].data['nodelist']:
2211
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2212
                      (verifier, result[verifier].data['nodelist'][failed]))
2213
        raise errors.OpExecError("ssh/hostname verification failed.")
2214

    
2215
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2216
    # including the node just added
2217
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2218
    dist_nodes = self.cfg.GetNodeList()
2219
    if not self.op.readd:
2220
      dist_nodes.append(node)
2221
    if myself.name in dist_nodes:
2222
      dist_nodes.remove(myself.name)
2223

    
2224
    logging.debug("Copying hosts and known_hosts to all nodes")
2225
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2226
      result = self.rpc.call_upload_file(dist_nodes, fname)
2227
      for to_node, to_result in result.iteritems():
2228
        if to_result.failed or not to_result.data:
2229
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2230

    
2231
    to_copy = []
2232
    enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2233
    if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2234
      to_copy.append(constants.VNC_PASSWORD_FILE)
2235

    
2236
    for fname in to_copy:
2237
      result = self.rpc.call_upload_file([node], fname)
2238
      if result[node].failed or not result[node]:
2239
        logging.error("Could not copy file %s to node %s", fname, node)
2240

    
2241
    if self.op.readd:
2242
      self.context.ReaddNode(new_node)
2243
    else:
2244
      self.context.AddNode(new_node)
2245

    
2246

    
2247
class LUSetNodeParams(LogicalUnit):
2248
  """Modifies the parameters of a node.
2249

2250
  """
2251
  HPATH = "node-modify"
2252
  HTYPE = constants.HTYPE_NODE
2253
  _OP_REQP = ["node_name"]
2254
  REQ_BGL = False
2255

    
2256
  def CheckArguments(self):
2257
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2258
    if node_name is None:
2259
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2260
    self.op.node_name = node_name
2261
    _CheckBooleanOpField(self.op, 'master_candidate')
2262
    _CheckBooleanOpField(self.op, 'offline')
2263
    _CheckBooleanOpField(self.op, 'drained')
2264
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2265
    if all_mods.count(None) == 3:
2266
      raise errors.OpPrereqError("Please pass at least one modification")
2267
    if all_mods.count(True) > 1:
2268
      raise errors.OpPrereqError("Can't set the node into more than one"
2269
                                 " state at the same time")
2270

    
2271
  def ExpandNames(self):
2272
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2273

    
2274
  def BuildHooksEnv(self):
2275
    """Build hooks env.
2276

2277
    This runs on the master node.
2278

2279
    """
2280
    env = {
2281
      "OP_TARGET": self.op.node_name,
2282
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2283
      "OFFLINE": str(self.op.offline),
2284
      "DRAINED": str(self.op.drained),
2285
      }
2286
    nl = [self.cfg.GetMasterNode(),
2287
          self.op.node_name]
2288
    return env, nl, nl
2289

    
2290
  def CheckPrereq(self):
2291
    """Check prerequisites.
2292

2293
    This only checks the instance list against the existing names.
2294

2295
    """
2296
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2297

    
2298
    if ((self.op.master_candidate == False or self.op.offline == True or
2299
         self.op.drained == True) and node.master_candidate):
2300
      # we will demote the node from master_candidate
2301
      if self.op.node_name == self.cfg.GetMasterNode():
2302
        raise errors.OpPrereqError("The master node has to be a"
2303
                                   " master candidate, online and not drained")
2304
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2305
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2306
      if num_candidates <= cp_size:
2307
        msg = ("Not enough master candidates (desired"
2308
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2309
        if self.op.force:
2310
          self.LogWarning(msg)
2311
        else:
2312
          raise errors.OpPrereqError(msg)
2313

    
2314
    if (self.op.master_candidate == True and
2315
        ((node.offline and not self.op.offline == False) or
2316
         (node.drained and not self.op.drained == False))):
2317
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2318
                                 " to master_candidate")
2319

    
2320
    return
2321

    
2322
  def Exec(self, feedback_fn):
2323
    """Modifies a node.
2324

2325
    """
2326
    node = self.node
2327

    
2328
    result = []
2329
    changed_mc = False
2330

    
2331
    if self.op.offline is not None:
2332
      node.offline = self.op.offline
2333
      result.append(("offline", str(self.op.offline)))
2334
      if self.op.offline == True:
2335
        if node.master_candidate:
2336
          node.master_candidate = False
2337
          changed_mc = True
2338
          result.append(("master_candidate", "auto-demotion due to offline"))
2339
        if node.drained:
2340
          node.drained = False
2341
          result.append(("drained", "clear drained status due to offline"))
2342

    
2343
    if self.op.master_candidate is not None:
2344
      node.master_candidate = self.op.master_candidate
2345
      changed_mc = True
2346
      result.append(("master_candidate", str(self.op.master_candidate)))
2347
      if self.op.master_candidate == False:
2348
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2349
        msg = rrc.RemoteFailMsg()
2350
        if msg:
2351
          self.LogWarning("Node failed to demote itself: %s" % msg)
2352

    
2353
    if self.op.drained is not None:
2354
      node.drained = self.op.drained
2355
      result.append(("drained", str(self.op.drained)))
2356
      if self.op.drained == True:
2357
        if node.master_candidate:
2358
          node.master_candidate = False
2359
          changed_mc = True
2360
          result.append(("master_candidate", "auto-demotion due to drain"))
2361
        if node.offline:
2362
          node.offline = False
2363
          result.append(("offline", "clear offline status due to drain"))
2364

    
2365
    # this will trigger configuration file update, if needed
2366
    self.cfg.Update(node)
2367
    # this will trigger job queue propagation or cleanup
2368
    if changed_mc:
2369
      self.context.ReaddNode(node)
2370

    
2371
    return result
2372

    
2373

    
2374
class LUQueryClusterInfo(NoHooksLU):
2375
  """Query cluster configuration.
2376

2377
  """
2378
  _OP_REQP = []
2379
  REQ_BGL = False
2380

    
2381
  def ExpandNames(self):
2382
    self.needed_locks = {}
2383

    
2384
  def CheckPrereq(self):
2385
    """No prerequsites needed for this LU.
2386

2387
    """
2388
    pass
2389

    
2390
  def Exec(self, feedback_fn):
2391
    """Return cluster config.
2392

2393
    """
2394
    cluster = self.cfg.GetClusterInfo()
2395
    result = {
2396
      "software_version": constants.RELEASE_VERSION,
2397
      "protocol_version": constants.PROTOCOL_VERSION,
2398
      "config_version": constants.CONFIG_VERSION,
2399
      "os_api_version": constants.OS_API_VERSION,
2400
      "export_version": constants.EXPORT_VERSION,
2401
      "architecture": (platform.architecture()[0], platform.machine()),
2402
      "name": cluster.cluster_name,
2403
      "master": cluster.master_node,
2404
      "default_hypervisor": cluster.default_hypervisor,
2405
      "enabled_hypervisors": cluster.enabled_hypervisors,
2406
      "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2407
                        for hypervisor in cluster.enabled_hypervisors]),
2408
      "beparams": cluster.beparams,
2409
      "candidate_pool_size": cluster.candidate_pool_size,
2410
      }
2411

    
2412
    return result
2413

    
2414

    
2415
class LUQueryConfigValues(NoHooksLU):
2416
  """Return configuration values.
2417

2418
  """
2419
  _OP_REQP = []
2420
  REQ_BGL = False
2421
  _FIELDS_DYNAMIC = utils.FieldSet()
2422
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2423

    
2424
  def ExpandNames(self):
2425
    self.needed_locks = {}
2426

    
2427
    _CheckOutputFields(static=self._FIELDS_STATIC,
2428
                       dynamic=self._FIELDS_DYNAMIC,
2429
                       selected=self.op.output_fields)
2430

    
2431
  def CheckPrereq(self):
2432
    """No prerequisites.
2433

2434
    """
2435
    pass
2436

    
2437
  def Exec(self, feedback_fn):
2438
    """Dump a representation of the cluster config to the standard output.
2439

2440
    """
2441
    values = []
2442
    for field in self.op.output_fields:
2443
      if field == "cluster_name":
2444
        entry = self.cfg.GetClusterName()
2445
      elif field == "master_node":
2446
        entry = self.cfg.GetMasterNode()
2447
      elif field == "drain_flag":
2448
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2449
      else:
2450
        raise errors.ParameterError(field)
2451
      values.append(entry)
2452
    return values
2453

    
2454

    
2455
class LUActivateInstanceDisks(NoHooksLU):
2456
  """Bring up an instance's disks.
2457

2458
  """
2459
  _OP_REQP = ["instance_name"]
2460
  REQ_BGL = False
2461

    
2462
  def ExpandNames(self):
2463
    self._ExpandAndLockInstance()
2464
    self.needed_locks[locking.LEVEL_NODE] = []
2465
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2466

    
2467
  def DeclareLocks(self, level):
2468
    if level == locking.LEVEL_NODE:
2469
      self._LockInstancesNodes()
2470

    
2471
  def CheckPrereq(self):
2472
    """Check prerequisites.
2473

2474
    This checks that the instance is in the cluster.
2475

2476
    """
2477
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2478
    assert self.instance is not None, \
2479
      "Cannot retrieve locked instance %s" % self.op.instance_name
2480
    _CheckNodeOnline(self, self.instance.primary_node)
2481

    
2482
  def Exec(self, feedback_fn):
2483
    """Activate the disks.
2484

2485
    """
2486
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2487
    if not disks_ok:
2488
      raise errors.OpExecError("Cannot activate block devices")
2489

    
2490
    return disks_info
2491

    
2492

    
2493
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2494
  """Prepare the block devices for an instance.
2495

2496
  This sets up the block devices on all nodes.
2497

2498
  @type lu: L{LogicalUnit}
2499
  @param lu: the logical unit on whose behalf we execute
2500
  @type instance: L{objects.Instance}
2501
  @param instance: the instance for whose disks we assemble
2502
  @type ignore_secondaries: boolean
2503
  @param ignore_secondaries: if true, errors on secondary nodes
2504
      won't result in an error return from the function
2505
  @return: False if the operation failed, otherwise a list of
2506
      (host, instance_visible_name, node_visible_name)
2507
      with the mapping from node devices to instance devices
2508

2509
  """
2510
  device_info = []
2511
  disks_ok = True
2512
  iname = instance.name
2513
  # With the two passes mechanism we try to reduce the window of
2514
  # opportunity for the race condition of switching DRBD to primary
2515
  # before handshaking occured, but we do not eliminate it
2516

    
2517
  # The proper fix would be to wait (with some limits) until the
2518
  # connection has been made and drbd transitions from WFConnection
2519
  # into any other network-connected state (Connected, SyncTarget,
2520
  # SyncSource, etc.)
2521

    
2522
  # 1st pass, assemble on all nodes in secondary mode
2523
  for inst_disk in instance.disks:
2524
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2525
      lu.cfg.SetDiskID(node_disk, node)
2526
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2527
      msg = result.RemoteFailMsg()
2528
      if msg:
2529
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2530
                           " (is_primary=False, pass=1): %s",
2531
                           inst_disk.iv_name, node, msg)
2532
        if not ignore_secondaries:
2533
          disks_ok = False
2534

    
2535
  # FIXME: race condition on drbd migration to primary
2536

    
2537
  # 2nd pass, do only the primary node
2538
  for inst_disk in instance.disks:
2539
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2540
      if node != instance.primary_node:
2541
        continue
2542
      lu.cfg.SetDiskID(node_disk, node)
2543
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2544
      msg = result.RemoteFailMsg()
2545
      if msg:
2546
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2547
                           " (is_primary=True, pass=2): %s",
2548
                           inst_disk.iv_name, node, msg)
2549
        disks_ok = False
2550
    device_info.append((instance.primary_node, inst_disk.iv_name,
2551
                        result.payload))
2552

    
2553
  # leave the disks configured for the primary node
2554
  # this is a workaround that would be fixed better by
2555
  # improving the logical/physical id handling
2556
  for disk in instance.disks:
2557
    lu.cfg.SetDiskID(disk, instance.primary_node)
2558

    
2559
  return disks_ok, device_info
2560

    
2561

    
2562
def _StartInstanceDisks(lu, instance, force):
2563
  """Start the disks of an instance.
2564

2565
  """
2566
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2567
                                           ignore_secondaries=force)
2568
  if not disks_ok:
2569
    _ShutdownInstanceDisks(lu, instance)
2570
    if force is not None and not force:
2571
      lu.proc.LogWarning("", hint="If the message above refers to a"
2572
                         " secondary node,"
2573
                         " you can retry the operation using '--force'.")
2574
    raise errors.OpExecError("Disk consistency error")
2575

    
2576

    
2577
class LUDeactivateInstanceDisks(NoHooksLU):
2578
  """Shutdown an instance's disks.
2579

2580
  """
2581
  _OP_REQP = ["instance_name"]
2582
  REQ_BGL = False
2583

    
2584
  def ExpandNames(self):
2585
    self._ExpandAndLockInstance()
2586
    self.needed_locks[locking.LEVEL_NODE] = []
2587
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2588

    
2589
  def DeclareLocks(self, level):
2590
    if level == locking.LEVEL_NODE:
2591
      self._LockInstancesNodes()
2592

    
2593
  def CheckPrereq(self):
2594
    """Check prerequisites.
2595

2596
    This checks that the instance is in the cluster.
2597

2598
    """
2599
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2600
    assert self.instance is not None, \
2601
      "Cannot retrieve locked instance %s" % self.op.instance_name
2602

    
2603
  def Exec(self, feedback_fn):
2604
    """Deactivate the disks
2605

2606
    """
2607
    instance = self.instance
2608
    _SafeShutdownInstanceDisks(self, instance)
2609

    
2610

    
2611
def _SafeShutdownInstanceDisks(lu, instance):
2612
  """Shutdown block devices of an instance.
2613

2614
  This function checks if an instance is running, before calling
2615
  _ShutdownInstanceDisks.
2616

2617
  """
2618
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2619
                                      [instance.hypervisor])
2620
  ins_l = ins_l[instance.primary_node]
2621
  if ins_l.failed or not isinstance(ins_l.data, list):
2622
    raise errors.OpExecError("Can't contact node '%s'" %
2623
                             instance.primary_node)
2624

    
2625
  if instance.name in ins_l.data:
2626
    raise errors.OpExecError("Instance is running, can't shutdown"
2627
                             " block devices.")
2628

    
2629
  _ShutdownInstanceDisks(lu, instance)
2630

    
2631

    
2632
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2633
  """Shutdown block devices of an instance.
2634

2635
  This does the shutdown on all nodes of the instance.
2636

2637
  If the ignore_primary is false, errors on the primary node are
2638
  ignored.
2639

2640
  """
2641
  all_result = True
2642
  for disk in instance.disks:
2643
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2644
      lu.cfg.SetDiskID(top_disk, node)
2645
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2646
      msg = result.RemoteFailMsg()
2647
      if msg:
2648
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2649
                      disk.iv_name, node, msg)
2650
        if not ignore_primary or node != instance.primary_node:
2651
          all_result = False
2652
  return all_result
2653

    
2654

    
2655
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2656
  """Checks if a node has enough free memory.
2657

2658
  This function check if a given node has the needed amount of free
2659
  memory. In case the node has less memory or we cannot get the
2660
  information from the node, this function raise an OpPrereqError
2661
  exception.
2662

2663
  @type lu: C{LogicalUnit}
2664
  @param lu: a logical unit from which we get configuration data
2665
  @type node: C{str}
2666
  @param node: the node to check
2667
  @type reason: C{str}
2668
  @param reason: string to use in the error message
2669
  @type requested: C{int}
2670
  @param requested: the amount of memory in MiB to check for
2671
  @type hypervisor_name: C{str}
2672
  @param hypervisor_name: the hypervisor to ask for memory stats
2673
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2674
      we cannot check the node
2675

2676
  """
2677
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2678
  nodeinfo[node].Raise()
2679
  free_mem = nodeinfo[node].data.get('memory_free')
2680
  if not isinstance(free_mem, int):
2681
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2682
                             " was '%s'" % (node, free_mem))
2683
  if requested > free_mem:
2684
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2685
                             " needed %s MiB, available %s MiB" %
2686
                             (node, reason, requested, free_mem))
2687

    
2688

    
2689
class LUStartupInstance(LogicalUnit):
2690
  """Starts an instance.
2691

2692
  """
2693
  HPATH = "instance-start"
2694
  HTYPE = constants.HTYPE_INSTANCE
2695
  _OP_REQP = ["instance_name", "force"]
2696
  REQ_BGL = False
2697

    
2698
  def ExpandNames(self):
2699
    self._ExpandAndLockInstance()
2700

    
2701
  def BuildHooksEnv(self):
2702
    """Build hooks env.
2703

2704
    This runs on master, primary and secondary nodes of the instance.
2705

2706
    """
2707
    env = {
2708
      "FORCE": self.op.force,
2709
      }
2710
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2711
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2712
    return env, nl, nl
2713

    
2714
  def CheckPrereq(self):
2715
    """Check prerequisites.
2716

2717
    This checks that the instance is in the cluster.
2718

2719
    """
2720
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2721
    assert self.instance is not None, \
2722
      "Cannot retrieve locked instance %s" % self.op.instance_name
2723

    
2724
    _CheckNodeOnline(self, instance.primary_node)
2725

    
2726
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2727
    # check bridges existance
2728
    _CheckInstanceBridgesExist(self, instance)
2729

    
2730
    _CheckNodeFreeMemory(self, instance.primary_node,
2731
                         "starting instance %s" % instance.name,
2732
                         bep[constants.BE_MEMORY], instance.hypervisor)
2733

    
2734
  def Exec(self, feedback_fn):
2735
    """Start the instance.
2736

2737
    """
2738
    instance = self.instance
2739
    force = self.op.force
2740

    
2741
    self.cfg.MarkInstanceUp(instance.name)
2742

    
2743
    node_current = instance.primary_node
2744

    
2745
    _StartInstanceDisks(self, instance, force)
2746

    
2747
    result = self.rpc.call_instance_start(node_current, instance)
2748
    msg = result.RemoteFailMsg()
2749
    if msg:
2750
      _ShutdownInstanceDisks(self, instance)
2751
      raise errors.OpExecError("Could not start instance: %s" % msg)
2752

    
2753

    
2754
class LURebootInstance(LogicalUnit):
2755
  """Reboot an instance.
2756

2757
  """
2758
  HPATH = "instance-reboot"
2759
  HTYPE = constants.HTYPE_INSTANCE
2760
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2761
  REQ_BGL = False
2762

    
2763
  def ExpandNames(self):
2764
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2765
                                   constants.INSTANCE_REBOOT_HARD,
2766
                                   constants.INSTANCE_REBOOT_FULL]:
2767
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2768
                                  (constants.INSTANCE_REBOOT_SOFT,
2769
                                   constants.INSTANCE_REBOOT_HARD,
2770
                                   constants.INSTANCE_REBOOT_FULL))
2771
    self._ExpandAndLockInstance()
2772

    
2773
  def BuildHooksEnv(self):
2774
    """Build hooks env.
2775

2776
    This runs on master, primary and secondary nodes of the instance.
2777

2778
    """
2779
    env = {
2780
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2781
      "REBOOT_TYPE": self.op.reboot_type,
2782
      }
2783
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2784
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2785
    return env, nl, nl
2786

    
2787
  def CheckPrereq(self):
2788
    """Check prerequisites.
2789

2790
    This checks that the instance is in the cluster.
2791

2792
    """
2793
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2794
    assert self.instance is not None, \
2795
      "Cannot retrieve locked instance %s" % self.op.instance_name
2796

    
2797
    _CheckNodeOnline(self, instance.primary_node)
2798

    
2799
    # check bridges existance
2800
    _CheckInstanceBridgesExist(self, instance)
2801

    
2802
  def Exec(self, feedback_fn):
2803
    """Reboot the instance.
2804

2805
    """
2806
    instance = self.instance
2807
    ignore_secondaries = self.op.ignore_secondaries
2808
    reboot_type = self.op.reboot_type
2809

    
2810
    node_current = instance.primary_node
2811

    
2812
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2813
                       constants.INSTANCE_REBOOT_HARD]:
2814
      for disk in instance.disks:
2815
        self.cfg.SetDiskID(disk, node_current)
2816
      result = self.rpc.call_instance_reboot(node_current, instance,
2817
                                             reboot_type)
2818
      msg = result.RemoteFailMsg()
2819
      if msg:
2820
        raise errors.OpExecError("Could not reboot instance: %s" % msg)
2821
    else:
2822
      result = self.rpc.call_instance_shutdown(node_current, instance)
2823
      msg = result.RemoteFailMsg()
2824
      if msg:
2825
        raise errors.OpExecError("Could not shutdown instance for"
2826
                                 " full reboot: %s" % msg)
2827
      _ShutdownInstanceDisks(self, instance)
2828
      _StartInstanceDisks(self, instance, ignore_secondaries)
2829
      result = self.rpc.call_instance_start(node_current, instance)
2830
      msg = result.RemoteFailMsg()
2831
      if msg:
2832
        _ShutdownInstanceDisks(self, instance)
2833
        raise errors.OpExecError("Could not start instance for"
2834
                                 " full reboot: %s" % msg)
2835

    
2836
    self.cfg.MarkInstanceUp(instance.name)
2837

    
2838

    
2839
class LUShutdownInstance(LogicalUnit):
2840
  """Shutdown an instance.
2841

2842
  """
2843
  HPATH = "instance-stop"
2844
  HTYPE = constants.HTYPE_INSTANCE
2845
  _OP_REQP = ["instance_name"]
2846
  REQ_BGL = False
2847

    
2848
  def ExpandNames(self):
2849
    self._ExpandAndLockInstance()
2850

    
2851
  def BuildHooksEnv(self):
2852
    """Build hooks env.
2853

2854
    This runs on master, primary and secondary nodes of the instance.
2855

2856
    """
2857
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2858
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2859
    return env, nl, nl
2860

    
2861
  def CheckPrereq(self):
2862
    """Check prerequisites.
2863

2864
    This checks that the instance is in the cluster.
2865

2866
    """
2867
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2868
    assert self.instance is not None, \
2869
      "Cannot retrieve locked instance %s" % self.op.instance_name
2870
    _CheckNodeOnline(self, self.instance.primary_node)
2871

    
2872
  def Exec(self, feedback_fn):
2873
    """Shutdown the instance.
2874

2875
    """
2876
    instance = self.instance
2877
    node_current = instance.primary_node
2878
    self.cfg.MarkInstanceDown(instance.name)
2879
    result = self.rpc.call_instance_shutdown(node_current, instance)
2880
    msg = result.RemoteFailMsg()
2881
    if msg:
2882
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2883

    
2884
    _ShutdownInstanceDisks(self, instance)
2885

    
2886

    
2887
class LUReinstallInstance(LogicalUnit):
2888
  """Reinstall an instance.
2889

2890
  """
2891
  HPATH = "instance-reinstall"
2892
  HTYPE = constants.HTYPE_INSTANCE
2893
  _OP_REQP = ["instance_name"]
2894
  REQ_BGL = False
2895

    
2896
  def ExpandNames(self):
2897
    self._ExpandAndLockInstance()
2898

    
2899
  def BuildHooksEnv(self):
2900
    """Build hooks env.
2901

2902
    This runs on master, primary and secondary nodes of the instance.
2903

2904
    """
2905
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2906
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2907
    return env, nl, nl
2908

    
2909
  def CheckPrereq(self):
2910
    """Check prerequisites.
2911

2912
    This checks that the instance is in the cluster and is not running.
2913

2914
    """
2915
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2916
    assert instance is not None, \
2917
      "Cannot retrieve locked instance %s" % self.op.instance_name
2918
    _CheckNodeOnline(self, instance.primary_node)
2919

    
2920
    if instance.disk_template == constants.DT_DISKLESS:
2921
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2922
                                 self.op.instance_name)
2923
    if instance.admin_up:
2924
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2925
                                 self.op.instance_name)
2926
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2927
                                              instance.name,
2928
                                              instance.hypervisor)
2929
    if remote_info.failed or remote_info.data:
2930
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2931
                                 (self.op.instance_name,
2932
                                  instance.primary_node))
2933

    
2934
    self.op.os_type = getattr(self.op, "os_type", None)
2935
    if self.op.os_type is not None:
2936
      # OS verification
2937
      pnode = self.cfg.GetNodeInfo(
2938
        self.cfg.ExpandNodeName(instance.primary_node))
2939
      if pnode is None:
2940
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2941
                                   self.op.pnode)
2942
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2943
      result.Raise()
2944
      if not isinstance(result.data, objects.OS):
2945
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2946
                                   " primary node"  % self.op.os_type)
2947

    
2948
    self.instance = instance
2949

    
2950
  def Exec(self, feedback_fn):
2951
    """Reinstall the instance.
2952

2953
    """
2954
    inst = self.instance
2955

    
2956
    if self.op.os_type is not None:
2957
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2958
      inst.os = self.op.os_type
2959
      self.cfg.Update(inst)
2960

    
2961
    _StartInstanceDisks(self, inst, None)
2962
    try:
2963
      feedback_fn("Running the instance OS create scripts...")
2964
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2965
      msg = result.RemoteFailMsg()
2966
      if msg:
2967
        raise errors.OpExecError("Could not install OS for instance %s"
2968
                                 " on node %s: %s" %
2969
                                 (inst.name, inst.primary_node, msg))
2970
    finally:
2971
      _ShutdownInstanceDisks(self, inst)
2972

    
2973

    
2974
class LURenameInstance(LogicalUnit):
2975
  """Rename an instance.
2976

2977
  """
2978
  HPATH = "instance-rename"
2979
  HTYPE = constants.HTYPE_INSTANCE
2980
  _OP_REQP = ["instance_name", "new_name"]
2981

    
2982
  def BuildHooksEnv(self):
2983
    """Build hooks env.
2984

2985
    This runs on master, primary and secondary nodes of the instance.
2986

2987
    """
2988
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2989
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2990
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2991
    return env, nl, nl
2992

    
2993
  def CheckPrereq(self):
2994
    """Check prerequisites.
2995

2996
    This checks that the instance is in the cluster and is not running.
2997

2998
    """
2999
    instance = self.cfg.GetInstanceInfo(
3000
      self.cfg.ExpandInstanceName(self.op.instance_name))
3001
    if instance is None:
3002
      raise errors.OpPrereqError("Instance '%s' not known" %
3003
                                 self.op.instance_name)
3004
    _CheckNodeOnline(self, instance.primary_node)
3005

    
3006
    if instance.admin_up:
3007
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3008
                                 self.op.instance_name)
3009
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3010
                                              instance.name,
3011
                                              instance.hypervisor)
3012
    remote_info.Raise()
3013
    if remote_info.data:
3014
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3015
                                 (self.op.instance_name,
3016
                                  instance.primary_node))
3017
    self.instance = instance
3018

    
3019
    # new name verification
3020
    name_info = utils.HostInfo(self.op.new_name)
3021

    
3022
    self.op.new_name = new_name = name_info.name
3023
    instance_list = self.cfg.GetInstanceList()
3024
    if new_name in instance_list:
3025
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3026
                                 new_name)
3027

    
3028
    if not getattr(self.op, "ignore_ip", False):
3029
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3030
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3031
                                   (name_info.ip, new_name))
3032

    
3033

    
3034
  def Exec(self, feedback_fn):
3035
    """Reinstall the instance.
3036

3037
    """
3038
    inst = self.instance
3039
    old_name = inst.name
3040

    
3041
    if inst.disk_template == constants.DT_FILE:
3042
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3043

    
3044
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3045
    # Change the instance lock. This is definitely safe while we hold the BGL
3046
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3047
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3048

    
3049
    # re-read the instance from the configuration after rename
3050
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3051

    
3052
    if inst.disk_template == constants.DT_FILE:
3053
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3054
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3055
                                                     old_file_storage_dir,
3056
                                                     new_file_storage_dir)
3057
      result.Raise()
3058
      if not result.data:
3059
        raise errors.OpExecError("Could not connect to node '%s' to rename"
3060
                                 " directory '%s' to '%s' (but the instance"
3061
                                 " has been renamed in Ganeti)" % (
3062
                                 inst.primary_node, old_file_storage_dir,
3063
                                 new_file_storage_dir))
3064

    
3065
      if not result.data[0]:
3066
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3067
                                 " (but the instance has been renamed in"
3068
                                 " Ganeti)" % (old_file_storage_dir,
3069
                                               new_file_storage_dir))
3070

    
3071
    _StartInstanceDisks(self, inst, None)
3072
    try:
3073
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3074
                                                 old_name)
3075
      msg = result.RemoteFailMsg()
3076
      if msg:
3077
        msg = ("Could not run OS rename script for instance %s on node %s"
3078
               " (but the instance has been renamed in Ganeti): %s" %
3079
               (inst.name, inst.primary_node, msg))
3080
        self.proc.LogWarning(msg)
3081
    finally:
3082
      _ShutdownInstanceDisks(self, inst)
3083

    
3084

    
3085
class LURemoveInstance(LogicalUnit):
3086
  """Remove an instance.
3087

3088
  """
3089
  HPATH = "instance-remove"
3090
  HTYPE = constants.HTYPE_INSTANCE
3091
  _OP_REQP = ["instance_name", "ignore_failures"]
3092
  REQ_BGL = False
3093

    
3094
  def ExpandNames(self):
3095
    self._ExpandAndLockInstance()
3096
    self.needed_locks[locking.LEVEL_NODE] = []
3097
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3098

    
3099
  def DeclareLocks(self, level):
3100
    if level == locking.LEVEL_NODE:
3101
      self._LockInstancesNodes()
3102

    
3103
  def BuildHooksEnv(self):
3104
    """Build hooks env.
3105

3106
    This runs on master, primary and secondary nodes of the instance.
3107

3108
    """
3109
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3110
    nl = [self.cfg.GetMasterNode()]
3111
    return env, nl, nl
3112

    
3113
  def CheckPrereq(self):
3114
    """Check prerequisites.
3115

3116
    This checks that the instance is in the cluster.
3117

3118
    """
3119
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3120
    assert self.instance is not None, \
3121
      "Cannot retrieve locked instance %s" % self.op.instance_name
3122

    
3123
  def Exec(self, feedback_fn):
3124
    """Remove the instance.
3125

3126
    """
3127
    instance = self.instance
3128
    logging.info("Shutting down instance %s on node %s",
3129
                 instance.name, instance.primary_node)
3130

    
3131
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3132
    msg = result.RemoteFailMsg()
3133
    if msg:
3134
      if self.op.ignore_failures:
3135
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3136
      else:
3137
        raise errors.OpExecError("Could not shutdown instance %s on"
3138
                                 " node %s: %s" %
3139
                                 (instance.name, instance.primary_node, msg))
3140

    
3141
    logging.info("Removing block devices for instance %s", instance.name)
3142

    
3143
    if not _RemoveDisks(self, instance):
3144
      if self.op.ignore_failures:
3145
        feedback_fn("Warning: can't remove instance's disks")
3146
      else:
3147
        raise errors.OpExecError("Can't remove instance's disks")
3148

    
3149
    logging.info("Removing instance %s out of cluster config", instance.name)
3150

    
3151
    self.cfg.RemoveInstance(instance.name)
3152
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3153

    
3154

    
3155
class LUQueryInstances(NoHooksLU):
3156
  """Logical unit for querying instances.
3157

3158
  """
3159
  _OP_REQP = ["output_fields", "names", "use_locking"]
3160
  REQ_BGL = False
3161
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3162
                                    "admin_state",
3163
                                    "disk_template", "ip", "mac", "bridge",
3164
                                    "sda_size", "sdb_size", "vcpus", "tags",
3165
                                    "network_port", "beparams",
3166
                                    r"(disk)\.(size)/([0-9]+)",
3167
                                    r"(disk)\.(sizes)", "disk_usage",
3168
                                    r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3169
                                    r"(nic)\.(macs|ips|bridges)",
3170
                                    r"(disk|nic)\.(count)",
3171
                                    "serial_no", "hypervisor", "hvparams",] +
3172
                                  ["hv/%s" % name
3173
                                   for name in constants.HVS_PARAMETERS] +
3174
                                  ["be/%s" % name
3175
                                   for name in constants.BES_PARAMETERS])
3176
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3177

    
3178

    
3179
  def ExpandNames(self):
3180
    _CheckOutputFields(static=self._FIELDS_STATIC,
3181
                       dynamic=self._FIELDS_DYNAMIC,
3182
                       selected=self.op.output_fields)
3183

    
3184
    self.needed_locks = {}
3185
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3186
    self.share_locks[locking.LEVEL_NODE] = 1
3187

    
3188
    if self.op.names:
3189
      self.wanted = _GetWantedInstances(self, self.op.names)
3190
    else:
3191
      self.wanted = locking.ALL_SET
3192

    
3193
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3194
    self.do_locking = self.do_node_query and self.op.use_locking
3195
    if self.do_locking:
3196
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3197
      self.needed_locks[locking.LEVEL_NODE] = []
3198
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3199

    
3200
  def DeclareLocks(self, level):
3201
    if level == locking.LEVEL_NODE and self.do_locking:
3202
      self._LockInstancesNodes()
3203

    
3204
  def CheckPrereq(self):
3205
    """Check prerequisites.
3206

3207
    """
3208
    pass
3209

    
3210
  def Exec(self, feedback_fn):
3211
    """Computes the list of nodes and their attributes.
3212

3213
    """
3214
    all_info = self.cfg.GetAllInstancesInfo()
3215
    if self.wanted == locking.ALL_SET:
3216
      # caller didn't specify instance names, so ordering is not important
3217
      if self.do_locking:
3218
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3219
      else:
3220
        instance_names = all_info.keys()
3221
      instance_names = utils.NiceSort(instance_names)
3222
    else:
3223
      # caller did specify names, so we must keep the ordering
3224
      if self.do_locking:
3225
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3226
      else:
3227
        tgt_set = all_info.keys()
3228
      missing = set(self.wanted).difference(tgt_set)
3229
      if missing:
3230
        raise errors.OpExecError("Some instances were removed before"
3231
                                 " retrieving their data: %s" % missing)
3232
      instance_names = self.wanted
3233

    
3234
    instance_list = [all_info[iname] for iname in instance_names]
3235

    
3236
    # begin data gathering
3237

    
3238
    nodes = frozenset([inst.primary_node for inst in instance_list])
3239
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3240

    
3241
    bad_nodes = []
3242
    off_nodes = []
3243
    if self.do_node_query:
3244
      live_data = {}
3245
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3246
      for name in nodes:
3247
        result = node_data[name]
3248
        if result.offline:
3249
          # offline nodes will be in both lists
3250
          off_nodes.append(name)
3251
        if result.failed:
3252
          bad_nodes.append(name)
3253
        else:
3254
          if result.data:
3255
            live_data.update(result.data)
3256
            # else no instance is alive
3257
    else:
3258
      live_data = dict([(name, {}) for name in instance_names])
3259

    
3260
    # end data gathering
3261

    
3262
    HVPREFIX = "hv/"
3263
    BEPREFIX = "be/"
3264
    output = []
3265
    for instance in instance_list:
3266
      iout = []
3267
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3268
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3269
      for field in self.op.output_fields:
3270
        st_match = self._FIELDS_STATIC.Matches(field)
3271
        if field == "name":
3272
          val = instance.name
3273
        elif field == "os":
3274
          val = instance.os
3275
        elif field == "pnode":
3276
          val = instance.primary_node
3277
        elif field == "snodes":
3278
          val = list(instance.secondary_nodes)
3279
        elif field == "admin_state":
3280
          val = instance.admin_up
3281
        elif field == "oper_state":
3282
          if instance.primary_node in bad_nodes:
3283
            val = None
3284
          else:
3285
            val = bool(live_data.get(instance.name))
3286
        elif field == "status":
3287
          if instance.primary_node in off_nodes:
3288
            val = "ERROR_nodeoffline"
3289
          elif instance.primary_node in bad_nodes:
3290
            val = "ERROR_nodedown"
3291
          else:
3292
            running = bool(live_data.get(instance.name))
3293
            if running:
3294
              if instance.admin_up:
3295
                val = "running"
3296
              else:
3297
                val = "ERROR_up"
3298
            else:
3299
              if instance.admin_up:
3300
                val = "ERROR_down"
3301
              else:
3302
                val = "ADMIN_down"
3303
        elif field == "oper_ram":
3304
          if instance.primary_node in bad_nodes:
3305
            val = None
3306
          elif instance.name in live_data:
3307
            val = live_data[instance.name].get("memory", "?")
3308
          else:
3309
            val = "-"
3310
        elif field == "disk_template":
3311
          val = instance.disk_template
3312
        elif field == "ip":
3313
          val = instance.nics[0].ip
3314
        elif field == "bridge":
3315
          val = instance.nics[0].bridge
3316
        elif field == "mac":
3317
          val = instance.nics[0].mac
3318
        elif field == "sda_size" or field == "sdb_size":
3319
          idx = ord(field[2]) - ord('a')
3320
          try:
3321
            val = instance.FindDisk(idx).size
3322
          except errors.OpPrereqError:
3323
            val = None
3324
        elif field == "disk_usage": # total disk usage per node
3325
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3326
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3327
        elif field == "tags":
3328
          val = list(instance.GetTags())
3329
        elif field == "serial_no":
3330
          val = instance.serial_no
3331
        elif field == "network_port":
3332
          val = instance.network_port
3333
        elif field == "hypervisor":
3334
          val = instance.hypervisor
3335
        elif field == "hvparams":
3336
          val = i_hv
3337
        elif (field.startswith(HVPREFIX) and
3338
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3339
          val = i_hv.get(field[len(HVPREFIX):], None)
3340
        elif field == "beparams":
3341
          val = i_be
3342
        elif (field.startswith(BEPREFIX) and
3343
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3344
          val = i_be.get(field[len(BEPREFIX):], None)
3345
        elif st_match and st_match.groups():
3346
          # matches a variable list
3347
          st_groups = st_match.groups()
3348
          if st_groups and st_groups[0] == "disk":
3349
            if st_groups[1] == "count":
3350
              val = len(instance.disks)
3351
            elif st_groups[1] == "sizes":
3352
              val = [disk.size for disk in instance.disks]
3353
            elif st_groups[1] == "size":
3354
              try:
3355
                val = instance.FindDisk(st_groups[2]).size
3356
              except errors.OpPrereqError:
3357
                val = None
3358
            else:
3359
              assert False, "Unhandled disk parameter"
3360
          elif st_groups[0] == "nic":
3361
            if st_groups[1] == "count":
3362
              val = len(instance.nics)
3363
            elif st_groups[1] == "macs":
3364
              val = [nic.mac for nic in instance.nics]
3365
            elif st_groups[1] == "ips":
3366
              val = [nic.ip for nic in instance.nics]
3367
            elif st_groups[1] == "bridges":
3368
              val = [nic.bridge for nic in instance.nics]
3369
            else:
3370
              # index-based item
3371
              nic_idx = int(st_groups[2])
3372
              if nic_idx >= len(instance.nics):
3373
                val = None
3374
              else:
3375
                if st_groups[1] == "mac":
3376
                  val = instance.nics[nic_idx].mac
3377
                elif st_groups[1] == "ip":
3378
                  val = instance.nics[nic_idx].ip
3379
                elif st_groups[1] == "bridge":
3380
                  val = instance.nics[nic_idx].bridge
3381
                else:
3382
                  assert False, "Unhandled NIC parameter"
3383
          else:
3384
            assert False, "Unhandled variable parameter"
3385
        else:
3386
          raise errors.ParameterError(field)
3387
        iout.append(val)
3388
      output.append(iout)
3389

    
3390
    return output
3391

    
3392

    
3393
class LUFailoverInstance(LogicalUnit):
3394
  """Failover an instance.
3395

3396
  """
3397
  HPATH = "instance-failover"
3398
  HTYPE = constants.HTYPE_INSTANCE
3399
  _OP_REQP = ["instance_name", "ignore_consistency"]
3400
  REQ_BGL = False
3401

    
3402
  def ExpandNames(self):
3403
    self._ExpandAndLockInstance()
3404
    self.needed_locks[locking.LEVEL_NODE] = []
3405
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3406

    
3407
  def DeclareLocks(self, level):
3408
    if level == locking.LEVEL_NODE:
3409
      self._LockInstancesNodes()
3410

    
3411
  def BuildHooksEnv(self):
3412
    """Build hooks env.
3413

3414
    This runs on master, primary and secondary nodes of the instance.
3415

3416
    """
3417
    env = {
3418
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3419
      }
3420
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3421
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3422
    return env, nl, nl
3423

    
3424
  def CheckPrereq(self):
3425
    """Check prerequisites.
3426

3427
    This checks that the instance is in the cluster.
3428

3429
    """
3430
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3431
    assert self.instance is not None, \
3432
      "Cannot retrieve locked instance %s" % self.op.instance_name
3433

    
3434
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3435
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3436
      raise errors.OpPrereqError("Instance's disk layout is not"
3437
                                 " network mirrored, cannot failover.")
3438

    
3439
    secondary_nodes = instance.secondary_nodes
3440
    if not secondary_nodes:
3441
      raise errors.ProgrammerError("no secondary node but using "
3442
                                   "a mirrored disk template")
3443

    
3444
    target_node = secondary_nodes[0]
3445
    _CheckNodeOnline(self, target_node)
3446
    _CheckNodeNotDrained(self, target_node)
3447
    # check memory requirements on the secondary node
3448
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3449
                         instance.name, bep[constants.BE_MEMORY],
3450
                         instance.hypervisor)
3451

    
3452
    # check bridge existance
3453
    brlist = [nic.bridge for nic in instance.nics]
3454
    result = self.rpc.call_bridges_exist(target_node, brlist)
3455
    result.Raise()
3456
    if not result.data:
3457
      raise errors.OpPrereqError("One or more target bridges %s does not"
3458
                                 " exist on destination node '%s'" %
3459
                                 (brlist, target_node))
3460

    
3461
  def Exec(self, feedback_fn):
3462
    """Failover an instance.
3463

3464
    The failover is done by shutting it down on its present node and
3465
    starting it on the secondary.
3466

3467
    """
3468
    instance = self.instance
3469

    
3470
    source_node = instance.primary_node
3471
    target_node = instance.secondary_nodes[0]
3472

    
3473
    feedback_fn("* checking disk consistency between source and target")
3474
    for dev in instance.disks:
3475
      # for drbd, these are drbd over lvm
3476
      if not _CheckDiskConsistency(self, dev, target_node, False):
3477
        if instance.admin_up and not self.op.ignore_consistency:
3478
          raise errors.OpExecError("Disk %s is degraded on target node,"
3479
                                   " aborting failover." % dev.iv_name)
3480

    
3481
    feedback_fn("* shutting down instance on source node")
3482
    logging.info("Shutting down instance %s on node %s",
3483
                 instance.name, source_node)
3484

    
3485
    result = self.rpc.call_instance_shutdown(source_node, instance)
3486
    msg = result.RemoteFailMsg()
3487
    if msg:
3488
      if self.op.ignore_consistency:
3489
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3490
                             " Proceeding anyway. Please make sure node"
3491
                             " %s is down. Error details: %s",
3492
                             instance.name, source_node, source_node, msg)
3493
      else:
3494
        raise errors.OpExecError("Could not shutdown instance %s on"
3495
                                 " node %s: %s" %
3496
                                 (instance.name, source_node, msg))
3497

    
3498
    feedback_fn("* deactivating the instance's disks on source node")
3499
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3500
      raise errors.OpExecError("Can't shut down the instance's disks.")
3501

    
3502
    instance.primary_node = target_node
3503
    # distribute new instance config to the other nodes
3504
    self.cfg.Update(instance)
3505

    
3506
    # Only start the instance if it's marked as up
3507
    if instance.admin_up:
3508
      feedback_fn("* activating the instance's disks on target node")
3509
      logging.info("Starting instance %s on node %s",
3510
                   instance.name, target_node)
3511

    
3512
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3513
                                               ignore_secondaries=True)
3514
      if not disks_ok:
3515
        _ShutdownInstanceDisks(self, instance)
3516
        raise errors.OpExecError("Can't activate the instance's disks")
3517

    
3518
      feedback_fn("* starting the instance on the target node")
3519
      result = self.rpc.call_instance_start(target_node, instance)
3520
      msg = result.RemoteFailMsg()
3521
      if msg:
3522
        _ShutdownInstanceDisks(self, instance)
3523
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3524
                                 (instance.name, target_node, msg))
3525

    
3526

    
3527
class LUMigrateInstance(LogicalUnit):
3528
  """Migrate an instance.
3529

3530
  This is migration without shutting down, compared to the failover,
3531
  which is done with shutdown.
3532

3533
  """
3534
  HPATH = "instance-migrate"
3535
  HTYPE = constants.HTYPE_INSTANCE
3536
  _OP_REQP = ["instance_name", "live", "cleanup"]
3537

    
3538
  REQ_BGL = False
3539

    
3540
  def ExpandNames(self):
3541
    self._ExpandAndLockInstance()
3542
    self.needed_locks[locking.LEVEL_NODE] = []
3543
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3544

    
3545
  def DeclareLocks(self, level):
3546
    if level == locking.LEVEL_NODE:
3547
      self._LockInstancesNodes()
3548

    
3549
  def BuildHooksEnv(self):
3550
    """Build hooks env.
3551

3552
    This runs on master, primary and secondary nodes of the instance.
3553

3554
    """
3555
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3556
    env["MIGRATE_LIVE"] = self.op.live
3557
    env["MIGRATE_CLEANUP"] = self.op.cleanup
3558
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3559
    return env, nl, nl
3560

    
3561
  def CheckPrereq(self):
3562
    """Check prerequisites.
3563

3564
    This checks that the instance is in the cluster.
3565

3566
    """
3567
    instance = self.cfg.GetInstanceInfo(
3568
      self.cfg.ExpandInstanceName(self.op.instance_name))
3569
    if instance is None:
3570
      raise errors.OpPrereqError("Instance '%s' not known" %
3571
                                 self.op.instance_name)
3572

    
3573
    if instance.disk_template != constants.DT_DRBD8:
3574
      raise errors.OpPrereqError("Instance's disk layout is not"
3575
                                 " drbd8, cannot migrate.")
3576

    
3577
    secondary_nodes = instance.secondary_nodes
3578
    if not secondary_nodes:
3579
      raise errors.ConfigurationError("No secondary node but using"
3580
                                      " drbd8 disk template")
3581

    
3582
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3583

    
3584
    target_node = secondary_nodes[0]
3585
    # check memory requirements on the secondary node
3586
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3587
                         instance.name, i_be[constants.BE_MEMORY],
3588
                         instance.hypervisor)
3589

    
3590
    # check bridge existance
3591
    brlist = [nic.bridge for nic in instance.nics]
3592
    result = self.rpc.call_bridges_exist(target_node, brlist)
3593
    if result.failed or not result.data:
3594
      raise errors.OpPrereqError("One or more target bridges %s does not"
3595
                                 " exist on destination node '%s'" %
3596
                                 (brlist, target_node))
3597

    
3598
    if not self.op.cleanup:
3599
      _CheckNodeNotDrained(self, target_node)
3600
      result = self.rpc.call_instance_migratable(instance.primary_node,
3601
                                                 instance)
3602
      msg = result.RemoteFailMsg()
3603
      if msg:
3604
        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3605
                                   msg)
3606

    
3607
    self.instance = instance
3608

    
3609
  def _WaitUntilSync(self):
3610
    """Poll with custom rpc for disk sync.
3611

3612
    This uses our own step-based rpc call.
3613

3614
    """
3615
    self.feedback_fn("* wait until resync is done")
3616
    all_done = False
3617
    while not all_done:
3618
      all_done = True
3619
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3620
                                            self.nodes_ip,
3621
                                            self.instance.disks)
3622
      min_percent = 100
3623
      for node, nres in result.items():
3624
        msg = nres.RemoteFailMsg()
3625
        if msg:
3626
          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3627
                                   (node, msg))
3628
        node_done, node_percent = nres.payload
3629
        all_done = all_done and node_done
3630
        if node_percent is not None:
3631
          min_percent = min(min_percent, node_percent)
3632
      if not all_done:
3633
        if min_percent < 100:
3634
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3635
        time.sleep(2)
3636

    
3637
  def _EnsureSecondary(self, node):
3638
    """Demote a node to secondary.
3639

3640
    """
3641
    self.feedback_fn("* switching node %s to secondary mode" % node)
3642

    
3643
    for dev in self.instance.disks:
3644
      self.cfg.SetDiskID(dev, node)
3645

    
3646
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3647
                                          self.instance.disks)
3648
    msg = result.RemoteFailMsg()
3649
    if msg:
3650
      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3651
                               " error %s" % (node, msg))
3652

    
3653
  def _GoStandalone(self):
3654
    """Disconnect from the network.
3655

3656
    """
3657
    self.feedback_fn("* changing into standalone mode")
3658
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3659
                                               self.instance.disks)
3660
    for node, nres in result.items():
3661
      msg = nres.RemoteFailMsg()
3662
      if msg:
3663
        raise errors.OpExecError("Cannot disconnect disks node %s,"
3664
                                 " error %s" % (node, msg))
3665

    
3666
  def _GoReconnect(self, multimaster):
3667
    """Reconnect to the network.
3668

3669
    """
3670
    if multimaster:
3671
      msg = "dual-master"
3672
    else:
3673
      msg = "single-master"
3674
    self.feedback_fn("* changing disks into %s mode" % msg)
3675
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3676
                                           self.instance.disks,
3677
                                           self.instance.name, multimaster)
3678
    for node, nres in result.items():
3679
      msg = nres.RemoteFailMsg()
3680
      if msg:
3681
        raise errors.OpExecError("Cannot change disks config on node %s,"
3682
                                 " error: %s" % (node, msg))
3683

    
3684
  def _ExecCleanup(self):
3685
    """Try to cleanup after a failed migration.
3686

3687
    The cleanup is done by:
3688
      - check that the instance is running only on one node
3689
        (and update the config if needed)
3690
      - change disks on its secondary node to secondary
3691
      - wait until disks are fully synchronized
3692
      - disconnect from the network
3693
      - change disks into single-master mode
3694
      - wait again until disks are fully synchronized
3695

3696
    """
3697
    instance = self.instance
3698
    target_node = self.target_node
3699
    source_node = self.source_node
3700

    
3701
    # check running on only one node
3702
    self.feedback_fn("* checking where the instance actually runs"
3703
                     " (if this hangs, the hypervisor might be in"
3704
                     " a bad state)")
3705
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3706
    for node, result in ins_l.items():
3707
      result.Raise()
3708
      if not isinstance(result.data, list):
3709
        raise errors.OpExecError("Can't contact node '%s'" % node)
3710

    
3711
    runningon_source = instance.name in ins_l[source_node].data
3712
    runningon_target = instance.name in ins_l[target_node].data
3713

    
3714
    if runningon_source and runningon_target:
3715
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3716
                               " or the hypervisor is confused. You will have"
3717
                               " to ensure manually that it runs only on one"
3718
                               " and restart this operation.")
3719

    
3720
    if not (runningon_source or runningon_target):
3721
      raise errors.OpExecError("Instance does not seem to be running at all."
3722
                               " In this case, it's safer to repair by"
3723
                               " running 'gnt-instance stop' to ensure disk"
3724
                               " shutdown, and then restarting it.")
3725

    
3726
    if runningon_target:
3727
      # the migration has actually succeeded, we need to update the config
3728
      self.feedback_fn("* instance running on secondary node (%s),"
3729
                       " updating config" % target_node)
3730
      instance.primary_node = target_node
3731
      self.cfg.Update(instance)
3732
      demoted_node = source_node
3733
    else:
3734
      self.feedback_fn("* instance confirmed to be running on its"
3735
                       " primary node (%s)" % source_node)
3736
      demoted_node = target_node
3737

    
3738
    self._EnsureSecondary(demoted_node)
3739
    try:
3740
      self._WaitUntilSync()
3741
    except errors.OpExecError:
3742
      # we ignore here errors, since if the device is standalone, it
3743
      # won't be able to sync
3744
      pass
3745
    self._GoStandalone()
3746
    self._GoReconnect(False)
3747
    self._WaitUntilSync()
3748

    
3749
    self.feedback_fn("* done")
3750

    
3751
  def _RevertDiskStatus(self):
3752
    """Try to revert the disk status after a failed migration.
3753

3754
    """
3755
    target_node = self.target_node
3756
    try:
3757
      self._EnsureSecondary(target_node)
3758
      self._GoStandalone()
3759
      self._GoReconnect(False)
3760
      self._WaitUntilSync()
3761
    except errors.OpExecError, err:
3762
      self.LogWarning("Migration failed and I can't reconnect the"
3763
                      " drives: error '%s'\n"
3764
                      "Please look and recover the instance status" %
3765
                      str(err))
3766

    
3767
  def _AbortMigration(self):
3768
    """Call the hypervisor code to abort a started migration.
3769

3770
    """
3771
    instance = self.instance
3772
    target_node = self.target_node
3773
    migration_info = self.migration_info
3774

    
3775
    abort_result = self.rpc.call_finalize_migration(target_node,
3776
                                                    instance,
3777
                                                    migration_info,
3778
                                                    False)
3779
    abort_msg = abort_result.RemoteFailMsg()
3780
    if abort_msg:
3781
      logging.error("Aborting migration failed on target node %s: %s" %
3782
                    (target_node, abort_msg))
3783
      # Don't raise an exception here, as we stil have to try to revert the
3784
      # disk status, even if this step failed.
3785

    
3786
  def _ExecMigration(self):
3787
    """Migrate an instance.
3788

3789
    The migrate is done by:
3790
      - change the disks into dual-master mode
3791
      - wait until disks are fully synchronized again
3792
      - migrate the instance
3793
      - change disks on the new secondary node (the old primary) to secondary
3794
      - wait until disks are fully synchronized
3795
      - change disks into single-master mode
3796

3797
    """
3798
    instance = self.instance
3799
    target_node = self.target_node
3800
    source_node = self.source_node
3801

    
3802
    self.feedback_fn("* checking disk consistency between source and target")
3803
    for dev in instance.disks:
3804
      if not _CheckDiskConsistency(self, dev, target_node, False):
3805
        raise errors.OpExecError("Disk %s is degraded or not fully"
3806
                                 " synchronized on target node,"
3807
                                 " aborting migrate." % dev.iv_name)
3808

    
3809
    # First get the migration information from the remote node
3810
    result = self.rpc.call_migration_info(source_node, instance)
3811
    msg = result.RemoteFailMsg()
3812
    if msg:
3813
      log_err = ("Failed fetching source migration information from %s: %s" %
3814
                 (source_node, msg))
3815
      logging.error(log_err)
3816
      raise errors.OpExecError(log_err)
3817

    
3818
    self.migration_info = migration_info = result.payload
3819

    
3820
    # Then switch the disks to master/master mode
3821
    self._EnsureSecondary(target_node)
3822
    self._GoStandalone()
3823
    self._GoReconnect(True)
3824
    self._WaitUntilSync()
3825

    
3826
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
3827
    result = self.rpc.call_accept_instance(target_node,
3828
                                           instance,
3829
                                           migration_info,
3830
                                           self.nodes_ip[target_node])
3831

    
3832
    msg = result.RemoteFailMsg()
3833
    if msg:
3834
      logging.error("Instance pre-migration failed, trying to revert"
3835
                    " disk status: %s", msg)
3836
      self._AbortMigration()
3837
      self._RevertDiskStatus()
3838
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3839
                               (instance.name, msg))
3840

    
3841
    self.feedback_fn("* migrating instance to %s" % target_node)
3842
    time.sleep(10)
3843
    result = self.rpc.call_instance_migrate(source_node, instance,
3844
                                            self.nodes_ip[target_node],
3845
                                            self.op.live)
3846
    msg = result.RemoteFailMsg()
3847
    if msg:
3848
      logging.error("Instance migration failed, trying to revert"
3849
                    " disk status: %s", msg)
3850
      self._AbortMigration()
3851
      self._RevertDiskStatus()
3852
      raise errors.OpExecError("Could not migrate instance %s: %s" %
3853
                               (instance.name, msg))
3854
    time.sleep(10)
3855

    
3856
    instance.primary_node = target_node
3857
    # distribute new instance config to the other nodes
3858
    self.cfg.Update(instance)
3859

    
3860
    result = self.rpc.call_finalize_migration(target_node,
3861
                                              instance,
3862
                                              migration_info,
3863
                                              True)
3864
    msg = result.RemoteFailMsg()
3865
    if msg:
3866
      logging.error("Instance migration succeeded, but finalization failed:"
3867
                    " %s" % msg)
3868
      raise errors.OpExecError("Could not finalize instance migration: %s" %
3869
                               msg)
3870

    
3871
    self._EnsureSecondary(source_node)
3872
    self._WaitUntilSync()
3873
    self._GoStandalone()
3874
    self._GoReconnect(False)
3875
    self._WaitUntilSync()
3876

    
3877
    self.feedback_fn("* done")
3878

    
3879
  def Exec(self, feedback_fn):
3880
    """Perform the migration.
3881

3882
    """
3883
    self.feedback_fn = feedback_fn
3884

    
3885
    self.source_node = self.instance.primary_node
3886
    self.target_node = self.instance.secondary_nodes[0]
3887
    self.all_nodes = [self.source_node, self.target_node]
3888
    self.nodes_ip = {
3889
      self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3890
      self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3891
      }
3892
    if self.op.cleanup:
3893
      return self._ExecCleanup()
3894
    else:
3895
      return self._ExecMigration()
3896

    
3897

    
3898
def _CreateBlockDev(lu, node, instance, device, force_create,
3899
                    info, force_open):
3900
  """Create a tree of block devices on a given node.
3901

3902
  If this device type has to be created on secondaries, create it and
3903
  all its children.
3904

3905
  If not, just recurse to children keeping the same 'force' value.
3906

3907
  @param lu: the lu on whose behalf we execute
3908
  @param node: the node on which to create the device
3909
  @type instance: L{objects.Instance}
3910
  @param instance: the instance which owns the device
3911
  @type device: L{objects.Disk}
3912
  @param device: the device to create
3913
  @type force_create: boolean
3914
  @param force_create: whether to force creation of this device; this
3915
      will be change to True whenever we find a device which has
3916
      CreateOnSecondary() attribute
3917
  @param info: the extra 'metadata' we should attach to the device
3918
      (this will be represented as a LVM tag)
3919
  @type force_open: boolean
3920
  @param force_open: this parameter will be passes to the
3921
      L{backend.BlockdevCreate} function where it specifies
3922
      whether we run on primary or not, and it affects both
3923
      the child assembly and the device own Open() execution
3924

3925
  """
3926
  if device.CreateOnSecondary():
3927
    force_create = True
3928

    
3929
  if device.children:
3930
    for child in device.children:
3931
      _CreateBlockDev(lu, node, instance, child, force_create,
3932
                      info, force_open)
3933

    
3934
  if not force_create:
3935
    return
3936

    
3937
  _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3938

    
3939

    
3940
def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3941
  """Create a single block device on a given node.
3942

3943
  This will not recurse over children of the device, so they must be
3944
  created in advance.
3945

3946
  @param lu: the lu on whose behalf we execute
3947
  @param node: the node on which to create the device
3948
  @type instance: L{objects.Instance}
3949
  @param instance: the instance which owns the device
3950
  @type device: L{objects.Disk}
3951
  @param device: the device to create
3952
  @param info: the extra 'metadata' we should attach to the device
3953
      (this will be represented as a LVM tag)
3954
  @type force_open: boolean
3955
  @param force_open: this parameter will be passes to the
3956
      L{backend.BlockdevCreate} function where it specifies
3957
      whether we run on primary or not, and it affects both
3958
      the child assembly and the device own Open() execution
3959

3960
  """
3961
  lu.cfg.SetDiskID(device, node)
3962
  result = lu.rpc.call_blockdev_create(node, device, device.size,
3963
                                       instance.name, force_open, info)
3964
  msg = result.RemoteFailMsg()
3965
  if msg:
3966
    raise errors.OpExecError("Can't create block device %s on"
3967
                             " node %s for instance %s: %s" %
3968
                             (device, node, instance.name, msg))
3969
  if device.physical_id is None:
3970
    device.physical_id = result.payload
3971

    
3972

    
3973
def _GenerateUniqueNames(lu, exts):
3974
  """Generate a suitable LV name.
3975

3976
  This will generate a logical volume name for the given instance.
3977

3978
  """
3979
  results = []
3980
  for val in exts:
3981
    new_id = lu.cfg.GenerateUniqueID()
3982
    results.append("%s%s" % (new_id, val))
3983
  return results
3984

    
3985

    
3986
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3987
                         p_minor, s_minor):
3988
  """Generate a drbd8 device complete with its children.
3989

3990
  """
3991
  port = lu.cfg.AllocatePort()
3992
  vgname = lu.cfg.GetVGName()
3993
  shared_secret = lu.cfg.GenerateDRBDSecret()
3994
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3995
                          logical_id=(vgname, names[0]))
3996
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3997
                          logical_id=(vgname, names[1]))
3998
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3999
                          logical_id=(primary, secondary, port,
4000
                                      p_minor, s_minor,
4001
                                      shared_secret),
4002
                          children=[dev_data, dev_meta],
4003
                          iv_name=iv_name)
4004
  return drbd_dev
4005

    
4006

    
4007
def _GenerateDiskTemplate(lu, template_name,
4008
                          instance_name, primary_node,
4009
                          secondary_nodes, disk_info,
4010
                          file_storage_dir, file_driver,
4011
                          base_index):
4012
  """Generate the entire disk layout for a given template type.
4013

4014
  """