Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ c614e5fb

History | View | Annotate | Download (240.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35
import random
36

    
37
from ganeti import ssh
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46
from ganeti import ssconf
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99
    self.CheckArguments()
100

    
101
  def __GetSSH(self):
102
    """Returns the SshRunner object
103

104
    """
105
    if not self.__ssh:
106
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107
    return self.__ssh
108

    
109
  ssh = property(fget=__GetSSH)
110

    
111
  def CheckArguments(self):
112
    """Check syntactic validity for the opcode arguments.
113

114
    This method is for doing a simple syntactic check and ensure
115
    validity of opcode parameters, without any cluster-related
116
    checks. While the same can be accomplished in ExpandNames and/or
117
    CheckPrereq, doing these separate is better because:
118

119
      - ExpandNames is left as as purely a lock-related function
120
      - CheckPrereq is run after we have aquired locks (and possible
121
        waited for them)
122

123
    The function is allowed to change the self.op attribute so that
124
    later methods can no longer worry about missing parameters.
125

126
    """
127
    pass
128

    
129
  def ExpandNames(self):
130
    """Expand names for this LU.
131

132
    This method is called before starting to execute the opcode, and it should
133
    update all the parameters of the opcode to their canonical form (e.g. a
134
    short node name must be fully expanded after this method has successfully
135
    completed). This way locking, hooks, logging, ecc. can work correctly.
136

137
    LUs which implement this method must also populate the self.needed_locks
138
    member, as a dict with lock levels as keys, and a list of needed lock names
139
    as values. Rules:
140

141
      - use an empty dict if you don't need any lock
142
      - if you don't need any lock at a particular level omit that level
143
      - don't put anything for the BGL level
144
      - if you want all locks at a level use locking.ALL_SET as a value
145

146
    If you need to share locks (rather than acquire them exclusively) at one
147
    level you can modify self.share_locks, setting a true value (usually 1) for
148
    that level. By default locks are not shared.
149

150
    Examples::
151

152
      # Acquire all nodes and one instance
153
      self.needed_locks = {
154
        locking.LEVEL_NODE: locking.ALL_SET,
155
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156
      }
157
      # Acquire just two nodes
158
      self.needed_locks = {
159
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160
      }
161
      # Acquire no locks
162
      self.needed_locks = {} # No, you can't leave it to the default value None
163

164
    """
165
    # The implementation of this method is mandatory only if the new LU is
166
    # concurrent, so that old LUs don't need to be changed all at the same
167
    # time.
168
    if self.REQ_BGL:
169
      self.needed_locks = {} # Exclusive LUs don't need locks.
170
    else:
171
      raise NotImplementedError
172

    
173
  def DeclareLocks(self, level):
174
    """Declare LU locking needs for a level
175

176
    While most LUs can just declare their locking needs at ExpandNames time,
177
    sometimes there's the need to calculate some locks after having acquired
178
    the ones before. This function is called just before acquiring locks at a
179
    particular level, but after acquiring the ones at lower levels, and permits
180
    such calculations. It can be used to modify self.needed_locks, and by
181
    default it does nothing.
182

183
    This function is only called if you have something already set in
184
    self.needed_locks for the level.
185

186
    @param level: Locking level which is going to be locked
187
    @type level: member of ganeti.locking.LEVELS
188

189
    """
190

    
191
  def CheckPrereq(self):
192
    """Check prerequisites for this LU.
193

194
    This method should check that the prerequisites for the execution
195
    of this LU are fulfilled. It can do internode communication, but
196
    it should be idempotent - no cluster or system changes are
197
    allowed.
198

199
    The method should raise errors.OpPrereqError in case something is
200
    not fulfilled. Its return value is ignored.
201

202
    This method should also update all the parameters of the opcode to
203
    their canonical form if it hasn't been done by ExpandNames before.
204

205
    """
206
    raise NotImplementedError
207

    
208
  def Exec(self, feedback_fn):
209
    """Execute the LU.
210

211
    This method should implement the actual work. It should raise
212
    errors.OpExecError for failures that are somewhat dealt with in
213
    code, or expected.
214

215
    """
216
    raise NotImplementedError
217

    
218
  def BuildHooksEnv(self):
219
    """Build hooks environment for this LU.
220

221
    This method should return a three-node tuple consisting of: a dict
222
    containing the environment that will be used for running the
223
    specific hook for this LU, a list of node names on which the hook
224
    should run before the execution, and a list of node names on which
225
    the hook should run after the execution.
226

227
    The keys of the dict must not have 'GANETI_' prefixed as this will
228
    be handled in the hooks runner. Also note additional keys will be
229
    added by the hooks runner. If the LU doesn't define any
230
    environment, an empty dict (and not None) should be returned.
231

232
    No nodes should be returned as an empty list (and not None).
233

234
    Note that if the HPATH for a LU class is None, this function will
235
    not be called.
236

237
    """
238
    raise NotImplementedError
239

    
240
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241
    """Notify the LU about the results of its hooks.
242

243
    This method is called every time a hooks phase is executed, and notifies
244
    the Logical Unit about the hooks' result. The LU can then use it to alter
245
    its result based on the hooks.  By default the method does nothing and the
246
    previous result is passed back unchanged but any LU can define it if it
247
    wants to use the local cluster hook-scripts somehow.
248

249
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
250
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251
    @param hook_results: the results of the multi-node hooks rpc call
252
    @param feedback_fn: function used send feedback back to the caller
253
    @param lu_result: the previous Exec result this LU had, or None
254
        in the PRE phase
255
    @return: the new Exec result, based on the previous result
256
        and hook results
257

258
    """
259
    return lu_result
260

    
261
  def _ExpandAndLockInstance(self):
262
    """Helper function to expand and lock an instance.
263

264
    Many LUs that work on an instance take its name in self.op.instance_name
265
    and need to expand it and then declare the expanded name for locking. This
266
    function does it, and then updates self.op.instance_name to the expanded
267
    name. It also initializes needed_locks as a dict, if this hasn't been done
268
    before.
269

270
    """
271
    if self.needed_locks is None:
272
      self.needed_locks = {}
273
    else:
274
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275
        "_ExpandAndLockInstance called with instance-level locks set"
276
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277
    if expanded_name is None:
278
      raise errors.OpPrereqError("Instance '%s' not known" %
279
                                  self.op.instance_name)
280
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281
    self.op.instance_name = expanded_name
282

    
283
  def _LockInstancesNodes(self, primary_only=False):
284
    """Helper function to declare instances' nodes for locking.
285

286
    This function should be called after locking one or more instances to lock
287
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288
    with all primary or secondary nodes for instances already locked and
289
    present in self.needed_locks[locking.LEVEL_INSTANCE].
290

291
    It should be called from DeclareLocks, and for safety only works if
292
    self.recalculate_locks[locking.LEVEL_NODE] is set.
293

294
    In the future it may grow parameters to just lock some instance's nodes, or
295
    to just lock primaries or secondary nodes, if needed.
296

297
    If should be called in DeclareLocks in a way similar to::
298

299
      if level == locking.LEVEL_NODE:
300
        self._LockInstancesNodes()
301

302
    @type primary_only: boolean
303
    @param primary_only: only lock primary nodes of locked instances
304

305
    """
306
    assert locking.LEVEL_NODE in self.recalculate_locks, \
307
      "_LockInstancesNodes helper function called with no nodes to recalculate"
308

    
309
    # TODO: check if we're really been called with the instance locks held
310

    
311
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312
    # future we might want to have different behaviors depending on the value
313
    # of self.recalculate_locks[locking.LEVEL_NODE]
314
    wanted_nodes = []
315
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316
      instance = self.context.cfg.GetInstanceInfo(instance_name)
317
      wanted_nodes.append(instance.primary_node)
318
      if not primary_only:
319
        wanted_nodes.extend(instance.secondary_nodes)
320

    
321
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325

    
326
    del self.recalculate_locks[locking.LEVEL_NODE]
327

    
328

    
329
class NoHooksLU(LogicalUnit):
330
  """Simple LU which runs no hooks.
331

332
  This LU is intended as a parent for other LogicalUnits which will
333
  run no hooks, in order to reduce duplicate code.
334

335
  """
336
  HPATH = None
337
  HTYPE = None
338

    
339

    
340
def _GetWantedNodes(lu, nodes):
341
  """Returns list of checked and expanded node names.
342

343
  @type lu: L{LogicalUnit}
344
  @param lu: the logical unit on whose behalf we execute
345
  @type nodes: list
346
  @param nodes: list of node names or None for all nodes
347
  @rtype: list
348
  @return: the list of nodes, sorted
349
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350

351
  """
352
  if not isinstance(nodes, list):
353
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
354

    
355
  if not nodes:
356
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357
      " non-empty list of nodes whose name is to be expanded.")
358

    
359
  wanted = []
360
  for name in nodes:
361
    node = lu.cfg.ExpandNodeName(name)
362
    if node is None:
363
      raise errors.OpPrereqError("No such node name '%s'" % name)
364
    wanted.append(node)
365

    
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _GetWantedInstances(lu, instances):
370
  """Returns list of checked and expanded instance names.
371

372
  @type lu: L{LogicalUnit}
373
  @param lu: the logical unit on whose behalf we execute
374
  @type instances: list
375
  @param instances: list of instance names or None for all instances
376
  @rtype: list
377
  @return: the list of instances, sorted
378
  @raise errors.OpPrereqError: if the instances parameter is wrong type
379
  @raise errors.OpPrereqError: if any of the passed instances is not found
380

381
  """
382
  if not isinstance(instances, list):
383
    raise errors.OpPrereqError("Invalid argument type 'instances'")
384

    
385
  if instances:
386
    wanted = []
387

    
388
    for name in instances:
389
      instance = lu.cfg.ExpandInstanceName(name)
390
      if instance is None:
391
        raise errors.OpPrereqError("No such instance name '%s'" % name)
392
      wanted.append(instance)
393

    
394
  else:
395
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
396
  return wanted
397

    
398

    
399
def _CheckOutputFields(static, dynamic, selected):
400
  """Checks whether all selected fields are valid.
401

402
  @type static: L{utils.FieldSet}
403
  @param static: static fields set
404
  @type dynamic: L{utils.FieldSet}
405
  @param dynamic: dynamic fields set
406

407
  """
408
  f = utils.FieldSet()
409
  f.Extend(static)
410
  f.Extend(dynamic)
411

    
412
  delta = f.NonMatching(selected)
413
  if delta:
414
    raise errors.OpPrereqError("Unknown output fields selected: %s"
415
                               % ",".join(delta))
416

    
417

    
418
def _CheckBooleanOpField(op, name):
419
  """Validates boolean opcode parameters.
420

421
  This will ensure that an opcode parameter is either a boolean value,
422
  or None (but that it always exists).
423

424
  """
425
  val = getattr(op, name, None)
426
  if not (val is None or isinstance(val, bool)):
427
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428
                               (name, str(val)))
429
  setattr(op, name, val)
430

    
431

    
432
def _CheckNodeOnline(lu, node):
433
  """Ensure that a given node is online.
434

435
  @param lu: the LU on behalf of which we make the check
436
  @param node: the node to check
437
  @raise errors.OpPrereqError: if the node is offline
438

439
  """
440
  if lu.cfg.GetNodeInfo(node).offline:
441
    raise errors.OpPrereqError("Can't use offline node %s" % node)
442

    
443

    
444
def _CheckNodeNotDrained(lu, node):
445
  """Ensure that a given node is not drained.
446

447
  @param lu: the LU on behalf of which we make the check
448
  @param node: the node to check
449
  @raise errors.OpPrereqError: if the node is drained
450

451
  """
452
  if lu.cfg.GetNodeInfo(node).drained:
453
    raise errors.OpPrereqError("Can't use drained node %s" % node)
454

    
455

    
456
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
457
                          memory, vcpus, nics, disk_template, disks):
458
  """Builds instance related env variables for hooks
459

460
  This builds the hook environment from individual variables.
461

462
  @type name: string
463
  @param name: the name of the instance
464
  @type primary_node: string
465
  @param primary_node: the name of the instance's primary node
466
  @type secondary_nodes: list
467
  @param secondary_nodes: list of secondary nodes as strings
468
  @type os_type: string
469
  @param os_type: the name of the instance's OS
470
  @type status: boolean
471
  @param status: the should_run status of the instance
472
  @type memory: string
473
  @param memory: the memory size of the instance
474
  @type vcpus: string
475
  @param vcpus: the count of VCPUs the instance has
476
  @type nics: list
477
  @param nics: list of tuples (ip, bridge, mac) representing
478
      the NICs the instance  has
479
  @type disk_template: string
480
  @param disk_template: the distk template of the instance
481
  @type disks: list
482
  @param disks: the list of (size, mode) pairs
483
  @rtype: dict
484
  @return: the hook environment for this instance
485

486
  """
487
  if status:
488
    str_status = "up"
489
  else:
490
    str_status = "down"
491
  env = {
492
    "OP_TARGET": name,
493
    "INSTANCE_NAME": name,
494
    "INSTANCE_PRIMARY": primary_node,
495
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
496
    "INSTANCE_OS_TYPE": os_type,
497
    "INSTANCE_STATUS": str_status,
498
    "INSTANCE_MEMORY": memory,
499
    "INSTANCE_VCPUS": vcpus,
500
    "INSTANCE_DISK_TEMPLATE": disk_template,
501
  }
502

    
503
  if nics:
504
    nic_count = len(nics)
505
    for idx, (ip, bridge, mac) in enumerate(nics):
506
      if ip is None:
507
        ip = ""
508
      env["INSTANCE_NIC%d_IP" % idx] = ip
509
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
510
      env["INSTANCE_NIC%d_MAC" % idx] = mac
511
  else:
512
    nic_count = 0
513

    
514
  env["INSTANCE_NIC_COUNT"] = nic_count
515

    
516
  if disks:
517
    disk_count = len(disks)
518
    for idx, (size, mode) in enumerate(disks):
519
      env["INSTANCE_DISK%d_SIZE" % idx] = size
520
      env["INSTANCE_DISK%d_MODE" % idx] = mode
521
  else:
522
    disk_count = 0
523

    
524
  env["INSTANCE_DISK_COUNT"] = disk_count
525

    
526
  return env
527

    
528

    
529
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
530
  """Builds instance related env variables for hooks from an object.
531

532
  @type lu: L{LogicalUnit}
533
  @param lu: the logical unit on whose behalf we execute
534
  @type instance: L{objects.Instance}
535
  @param instance: the instance for which we should build the
536
      environment
537
  @type override: dict
538
  @param override: dictionary with key/values that will override
539
      our values
540
  @rtype: dict
541
  @return: the hook environment dictionary
542

543
  """
544
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
545
  args = {
546
    'name': instance.name,
547
    'primary_node': instance.primary_node,
548
    'secondary_nodes': instance.secondary_nodes,
549
    'os_type': instance.os,
550
    'status': instance.admin_up,
551
    'memory': bep[constants.BE_MEMORY],
552
    'vcpus': bep[constants.BE_VCPUS],
553
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
554
    'disk_template': instance.disk_template,
555
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
556
  }
557
  if override:
558
    args.update(override)
559
  return _BuildInstanceHookEnv(**args)
560

    
561

    
562
def _AdjustCandidatePool(lu):
563
  """Adjust the candidate pool after node operations.
564

565
  """
566
  mod_list = lu.cfg.MaintainCandidatePool()
567
  if mod_list:
568
    lu.LogInfo("Promoted nodes to master candidate role: %s",
569
               ", ".join(node.name for node in mod_list))
570
    for name in mod_list:
571
      lu.context.ReaddNode(name)
572
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
573
  if mc_now > mc_max:
574
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
575
               (mc_now, mc_max))
576

    
577

    
578
def _CheckInstanceBridgesExist(lu, instance):
579
  """Check that the brigdes needed by an instance exist.
580

581
  """
582
  # check bridges existance
583
  brlist = [nic.bridge for nic in instance.nics]
584
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
585
  result.Raise()
586
  if not result.data:
587
    raise errors.OpPrereqError("One or more target bridges %s does not"
588
                               " exist on destination node '%s'" %
589
                               (brlist, instance.primary_node))
590

    
591

    
592
class LUDestroyCluster(NoHooksLU):
593
  """Logical unit for destroying the cluster.
594

595
  """
596
  _OP_REQP = []
597

    
598
  def CheckPrereq(self):
599
    """Check prerequisites.
600

601
    This checks whether the cluster is empty.
602

603
    Any errors are signalled by raising errors.OpPrereqError.
604

605
    """
606
    master = self.cfg.GetMasterNode()
607

    
608
    nodelist = self.cfg.GetNodeList()
609
    if len(nodelist) != 1 or nodelist[0] != master:
610
      raise errors.OpPrereqError("There are still %d node(s) in"
611
                                 " this cluster." % (len(nodelist) - 1))
612
    instancelist = self.cfg.GetInstanceList()
613
    if instancelist:
614
      raise errors.OpPrereqError("There are still %d instance(s) in"
615
                                 " this cluster." % len(instancelist))
616

    
617
  def Exec(self, feedback_fn):
618
    """Destroys the cluster.
619

620
    """
621
    master = self.cfg.GetMasterNode()
622
    result = self.rpc.call_node_stop_master(master, False)
623
    result.Raise()
624
    if not result.data:
625
      raise errors.OpExecError("Could not disable the master role")
626
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
627
    utils.CreateBackup(priv_key)
628
    utils.CreateBackup(pub_key)
629
    return master
630

    
631

    
632
class LUVerifyCluster(LogicalUnit):
633
  """Verifies the cluster status.
634

635
  """
636
  HPATH = "cluster-verify"
637
  HTYPE = constants.HTYPE_CLUSTER
638
  _OP_REQP = ["skip_checks"]
639
  REQ_BGL = False
640

    
641
  def ExpandNames(self):
642
    self.needed_locks = {
643
      locking.LEVEL_NODE: locking.ALL_SET,
644
      locking.LEVEL_INSTANCE: locking.ALL_SET,
645
    }
646
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
647

    
648
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
649
                  node_result, feedback_fn, master_files,
650
                  drbd_map, vg_name):
651
    """Run multiple tests against a node.
652

653
    Test list:
654

655
      - compares ganeti version
656
      - checks vg existance and size > 20G
657
      - checks config file checksum
658
      - checks ssh to other nodes
659

660
    @type nodeinfo: L{objects.Node}
661
    @param nodeinfo: the node to check
662
    @param file_list: required list of files
663
    @param local_cksum: dictionary of local files and their checksums
664
    @param node_result: the results from the node
665
    @param feedback_fn: function used to accumulate results
666
    @param master_files: list of files that only masters should have
667
    @param drbd_map: the useddrbd minors for this node, in
668
        form of minor: (instance, must_exist) which correspond to instances
669
        and their running status
670
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
671

672
    """
673
    node = nodeinfo.name
674

    
675
    # main result, node_result should be a non-empty dict
676
    if not node_result or not isinstance(node_result, dict):
677
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
678
      return True
679

    
680
    # compares ganeti version
681
    local_version = constants.PROTOCOL_VERSION
682
    remote_version = node_result.get('version', None)
683
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
684
            len(remote_version) == 2):
685
      feedback_fn("  - ERROR: connection to %s failed" % (node))
686
      return True
687

    
688
    if local_version != remote_version[0]:
689
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
690
                  " node %s %s" % (local_version, node, remote_version[0]))
691
      return True
692

    
693
    # node seems compatible, we can actually try to look into its results
694

    
695
    bad = False
696

    
697
    # full package version
698
    if constants.RELEASE_VERSION != remote_version[1]:
699
      feedback_fn("  - WARNING: software version mismatch: master %s,"
700
                  " node %s %s" %
701
                  (constants.RELEASE_VERSION, node, remote_version[1]))
702

    
703
    # checks vg existence and size > 20G
704
    if vg_name is not None:
705
      vglist = node_result.get(constants.NV_VGLIST, None)
706
      if not vglist:
707
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
708
                        (node,))
709
        bad = True
710
      else:
711
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
712
                                              constants.MIN_VG_SIZE)
713
        if vgstatus:
714
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
715
          bad = True
716

    
717
    # checks config file checksum
718

    
719
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
720
    if not isinstance(remote_cksum, dict):
721
      bad = True
722
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
723
    else:
724
      for file_name in file_list:
725
        node_is_mc = nodeinfo.master_candidate
726
        must_have_file = file_name not in master_files
727
        if file_name not in remote_cksum:
728
          if node_is_mc or must_have_file:
729
            bad = True
730
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
731
        elif remote_cksum[file_name] != local_cksum[file_name]:
732
          if node_is_mc or must_have_file:
733
            bad = True
734
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
735
          else:
736
            # not candidate and this is not a must-have file
737
            bad = True
738
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
739
                        " '%s'" % file_name)
740
        else:
741
          # all good, except non-master/non-must have combination
742
          if not node_is_mc and not must_have_file:
743
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
744
                        " candidates" % file_name)
745

    
746
    # checks ssh to any
747

    
748
    if constants.NV_NODELIST not in node_result:
749
      bad = True
750
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
751
    else:
752
      if node_result[constants.NV_NODELIST]:
753
        bad = True
754
        for node in node_result[constants.NV_NODELIST]:
755
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
756
                          (node, node_result[constants.NV_NODELIST][node]))
757

    
758
    if constants.NV_NODENETTEST not in node_result:
759
      bad = True
760
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
761
    else:
762
      if node_result[constants.NV_NODENETTEST]:
763
        bad = True
764
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
765
        for node in nlist:
766
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
767
                          (node, node_result[constants.NV_NODENETTEST][node]))
768

    
769
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
770
    if isinstance(hyp_result, dict):
771
      for hv_name, hv_result in hyp_result.iteritems():
772
        if hv_result is not None:
773
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
774
                      (hv_name, hv_result))
775

    
776
    # check used drbd list
777
    if vg_name is not None:
778
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
779
      if not isinstance(used_minors, (tuple, list)):
780
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
781
                    str(used_minors))
782
      else:
783
        for minor, (iname, must_exist) in drbd_map.items():
784
          if minor not in used_minors and must_exist:
785
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
786
                        " not active" % (minor, iname))
787
            bad = True
788
        for minor in used_minors:
789
          if minor not in drbd_map:
790
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
791
                        minor)
792
            bad = True
793

    
794
    return bad
795

    
796
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
797
                      node_instance, feedback_fn, n_offline):
798
    """Verify an instance.
799

800
    This function checks to see if the required block devices are
801
    available on the instance's node.
802

803
    """
804
    bad = False
805

    
806
    node_current = instanceconfig.primary_node
807

    
808
    node_vol_should = {}
809
    instanceconfig.MapLVsByNode(node_vol_should)
810

    
811
    for node in node_vol_should:
812
      if node in n_offline:
813
        # ignore missing volumes on offline nodes
814
        continue
815
      for volume in node_vol_should[node]:
816
        if node not in node_vol_is or volume not in node_vol_is[node]:
817
          feedback_fn("  - ERROR: volume %s missing on node %s" %
818
                          (volume, node))
819
          bad = True
820

    
821
    if instanceconfig.admin_up:
822
      if ((node_current not in node_instance or
823
          not instance in node_instance[node_current]) and
824
          node_current not in n_offline):
825
        feedback_fn("  - ERROR: instance %s not running on node %s" %
826
                        (instance, node_current))
827
        bad = True
828

    
829
    for node in node_instance:
830
      if (not node == node_current):
831
        if instance in node_instance[node]:
832
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
833
                          (instance, node))
834
          bad = True
835

    
836
    return bad
837

    
838
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
839
    """Verify if there are any unknown volumes in the cluster.
840

841
    The .os, .swap and backup volumes are ignored. All other volumes are
842
    reported as unknown.
843

844
    """
845
    bad = False
846

    
847
    for node in node_vol_is:
848
      for volume in node_vol_is[node]:
849
        if node not in node_vol_should or volume not in node_vol_should[node]:
850
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
851
                      (volume, node))
852
          bad = True
853
    return bad
854

    
855
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
856
    """Verify the list of running instances.
857

858
    This checks what instances are running but unknown to the cluster.
859

860
    """
861
    bad = False
862
    for node in node_instance:
863
      for runninginstance in node_instance[node]:
864
        if runninginstance not in instancelist:
865
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
866
                          (runninginstance, node))
867
          bad = True
868
    return bad
869

    
870
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
871
    """Verify N+1 Memory Resilience.
872

873
    Check that if one single node dies we can still start all the instances it
874
    was primary for.
875

876
    """
877
    bad = False
878

    
879
    for node, nodeinfo in node_info.iteritems():
880
      # This code checks that every node which is now listed as secondary has
881
      # enough memory to host all instances it is supposed to should a single
882
      # other node in the cluster fail.
883
      # FIXME: not ready for failover to an arbitrary node
884
      # FIXME: does not support file-backed instances
885
      # WARNING: we currently take into account down instances as well as up
886
      # ones, considering that even if they're down someone might want to start
887
      # them even in the event of a node failure.
888
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
889
        needed_mem = 0
890
        for instance in instances:
891
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
892
          if bep[constants.BE_AUTO_BALANCE]:
893
            needed_mem += bep[constants.BE_MEMORY]
894
        if nodeinfo['mfree'] < needed_mem:
895
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
896
                      " failovers should node %s fail" % (node, prinode))
897
          bad = True
898
    return bad
899

    
900
  def CheckPrereq(self):
901
    """Check prerequisites.
902

903
    Transform the list of checks we're going to skip into a set and check that
904
    all its members are valid.
905

906
    """
907
    self.skip_set = frozenset(self.op.skip_checks)
908
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
909
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
910

    
911
  def BuildHooksEnv(self):
912
    """Build hooks env.
913

914
    Cluster-Verify hooks just rone in the post phase and their failure makes
915
    the output be logged in the verify output and the verification to fail.
916

917
    """
918
    all_nodes = self.cfg.GetNodeList()
919
    env = {
920
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
921
      }
922
    for node in self.cfg.GetAllNodesInfo().values():
923
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
924

    
925
    return env, [], all_nodes
926

    
927
  def Exec(self, feedback_fn):
928
    """Verify integrity of cluster, performing various test on nodes.
929

930
    """
931
    bad = False
932
    feedback_fn("* Verifying global settings")
933
    for msg in self.cfg.VerifyConfig():
934
      feedback_fn("  - ERROR: %s" % msg)
935

    
936
    vg_name = self.cfg.GetVGName()
937
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
938
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
939
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
940
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
941
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
942
                        for iname in instancelist)
943
    i_non_redundant = [] # Non redundant instances
944
    i_non_a_balanced = [] # Non auto-balanced instances
945
    n_offline = [] # List of offline nodes
946
    n_drained = [] # List of nodes being drained
947
    node_volume = {}
948
    node_instance = {}
949
    node_info = {}
950
    instance_cfg = {}
951

    
952
    # FIXME: verify OS list
953
    # do local checksums
954
    master_files = [constants.CLUSTER_CONF_FILE]
955

    
956
    file_names = ssconf.SimpleStore().GetFileList()
957
    file_names.append(constants.SSL_CERT_FILE)
958
    file_names.append(constants.RAPI_CERT_FILE)
959
    file_names.extend(master_files)
960

    
961
    local_checksums = utils.FingerprintFiles(file_names)
962

    
963
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
964
    node_verify_param = {
965
      constants.NV_FILELIST: file_names,
966
      constants.NV_NODELIST: [node.name for node in nodeinfo
967
                              if not node.offline],
968
      constants.NV_HYPERVISOR: hypervisors,
969
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
970
                                  node.secondary_ip) for node in nodeinfo
971
                                 if not node.offline],
972
      constants.NV_INSTANCELIST: hypervisors,
973
      constants.NV_VERSION: None,
974
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
975
      }
976
    if vg_name is not None:
977
      node_verify_param[constants.NV_VGLIST] = None
978
      node_verify_param[constants.NV_LVLIST] = vg_name
979
      node_verify_param[constants.NV_DRBDLIST] = None
980
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
981
                                           self.cfg.GetClusterName())
982

    
983
    cluster = self.cfg.GetClusterInfo()
984
    master_node = self.cfg.GetMasterNode()
985
    all_drbd_map = self.cfg.ComputeDRBDMap()
986

    
987
    for node_i in nodeinfo:
988
      node = node_i.name
989
      nresult = all_nvinfo[node].data
990

    
991
      if node_i.offline:
992
        feedback_fn("* Skipping offline node %s" % (node,))
993
        n_offline.append(node)
994
        continue
995

    
996
      if node == master_node:
997
        ntype = "master"
998
      elif node_i.master_candidate:
999
        ntype = "master candidate"
1000
      elif node_i.drained:
1001
        ntype = "drained"
1002
        n_drained.append(node)
1003
      else:
1004
        ntype = "regular"
1005
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1006

    
1007
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
1008
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
1009
        bad = True
1010
        continue
1011

    
1012
      node_drbd = {}
1013
      for minor, instance in all_drbd_map[node].items():
1014
        if instance not in instanceinfo:
1015
          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1016
                      instance)
1017
          # ghost instance should not be running, but otherwise we
1018
          # don't give double warnings (both ghost instance and
1019
          # unallocated minor in use)
1020
          node_drbd[minor] = (instance, False)
1021
        else:
1022
          instance = instanceinfo[instance]
1023
          node_drbd[minor] = (instance.name, instance.admin_up)
1024
      result = self._VerifyNode(node_i, file_names, local_checksums,
1025
                                nresult, feedback_fn, master_files,
1026
                                node_drbd, vg_name)
1027
      bad = bad or result
1028

    
1029
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1030
      if vg_name is None:
1031
        node_volume[node] = {}
1032
      elif isinstance(lvdata, basestring):
1033
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1034
                    (node, utils.SafeEncode(lvdata)))
1035
        bad = True
1036
        node_volume[node] = {}
1037
      elif not isinstance(lvdata, dict):
1038
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1039
        bad = True
1040
        continue
1041
      else:
1042
        node_volume[node] = lvdata
1043

    
1044
      # node_instance
1045
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1046
      if not isinstance(idata, list):
1047
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1048
                    (node,))
1049
        bad = True
1050
        continue
1051

    
1052
      node_instance[node] = idata
1053

    
1054
      # node_info
1055
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1056
      if not isinstance(nodeinfo, dict):
1057
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1058
        bad = True
1059
        continue
1060

    
1061
      try:
1062
        node_info[node] = {
1063
          "mfree": int(nodeinfo['memory_free']),
1064
          "pinst": [],
1065
          "sinst": [],
1066
          # dictionary holding all instances this node is secondary for,
1067
          # grouped by their primary node. Each key is a cluster node, and each
1068
          # value is a list of instances which have the key as primary and the
1069
          # current node as secondary.  this is handy to calculate N+1 memory
1070
          # availability if you can only failover from a primary to its
1071
          # secondary.
1072
          "sinst-by-pnode": {},
1073
        }
1074
        # FIXME: devise a free space model for file based instances as well
1075
        if vg_name is not None:
1076
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1077
      except ValueError:
1078
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
1079
        bad = True
1080
        continue
1081

    
1082
    node_vol_should = {}
1083

    
1084
    for instance in instancelist:
1085
      feedback_fn("* Verifying instance %s" % instance)
1086
      inst_config = instanceinfo[instance]
1087
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1088
                                     node_instance, feedback_fn, n_offline)
1089
      bad = bad or result
1090
      inst_nodes_offline = []
1091

    
1092
      inst_config.MapLVsByNode(node_vol_should)
1093

    
1094
      instance_cfg[instance] = inst_config
1095

    
1096
      pnode = inst_config.primary_node
1097
      if pnode in node_info:
1098
        node_info[pnode]['pinst'].append(instance)
1099
      elif pnode not in n_offline:
1100
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1101
                    " %s failed" % (instance, pnode))
1102
        bad = True
1103

    
1104
      if pnode in n_offline:
1105
        inst_nodes_offline.append(pnode)
1106

    
1107
      # If the instance is non-redundant we cannot survive losing its primary
1108
      # node, so we are not N+1 compliant. On the other hand we have no disk
1109
      # templates with more than one secondary so that situation is not well
1110
      # supported either.
1111
      # FIXME: does not support file-backed instances
1112
      if len(inst_config.secondary_nodes) == 0:
1113
        i_non_redundant.append(instance)
1114
      elif len(inst_config.secondary_nodes) > 1:
1115
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1116
                    % instance)
1117

    
1118
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1119
        i_non_a_balanced.append(instance)
1120

    
1121
      for snode in inst_config.secondary_nodes:
1122
        if snode in node_info:
1123
          node_info[snode]['sinst'].append(instance)
1124
          if pnode not in node_info[snode]['sinst-by-pnode']:
1125
            node_info[snode]['sinst-by-pnode'][pnode] = []
1126
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1127
        elif snode not in n_offline:
1128
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1129
                      " %s failed" % (instance, snode))
1130
          bad = True
1131
        if snode in n_offline:
1132
          inst_nodes_offline.append(snode)
1133

    
1134
      if inst_nodes_offline:
1135
        # warn that the instance lives on offline nodes, and set bad=True
1136
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1137
                    ", ".join(inst_nodes_offline))
1138
        bad = True
1139

    
1140
    feedback_fn("* Verifying orphan volumes")
1141
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1142
                                       feedback_fn)
1143
    bad = bad or result
1144

    
1145
    feedback_fn("* Verifying remaining instances")
1146
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1147
                                         feedback_fn)
1148
    bad = bad or result
1149

    
1150
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1151
      feedback_fn("* Verifying N+1 Memory redundancy")
1152
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1153
      bad = bad or result
1154

    
1155
    feedback_fn("* Other Notes")
1156
    if i_non_redundant:
1157
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1158
                  % len(i_non_redundant))
1159

    
1160
    if i_non_a_balanced:
1161
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1162
                  % len(i_non_a_balanced))
1163

    
1164
    if n_offline:
1165
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1166

    
1167
    if n_drained:
1168
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1169

    
1170
    return not bad
1171

    
1172
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1173
    """Analize the post-hooks' result
1174

1175
    This method analyses the hook result, handles it, and sends some
1176
    nicely-formatted feedback back to the user.
1177

1178
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1179
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1180
    @param hooks_results: the results of the multi-node hooks rpc call
1181
    @param feedback_fn: function used send feedback back to the caller
1182
    @param lu_result: previous Exec result
1183
    @return: the new Exec result, based on the previous result
1184
        and hook results
1185

1186
    """
1187
    # We only really run POST phase hooks, and are only interested in
1188
    # their results
1189
    if phase == constants.HOOKS_PHASE_POST:
1190
      # Used to change hooks' output to proper indentation
1191
      indent_re = re.compile('^', re.M)
1192
      feedback_fn("* Hooks Results")
1193
      if not hooks_results:
1194
        feedback_fn("  - ERROR: general communication failure")
1195
        lu_result = 1
1196
      else:
1197
        for node_name in hooks_results:
1198
          show_node_header = True
1199
          res = hooks_results[node_name]
1200
          if res.failed or res.data is False or not isinstance(res.data, list):
1201
            if res.offline:
1202
              # no need to warn or set fail return value
1203
              continue
1204
            feedback_fn("    Communication failure in hooks execution")
1205
            lu_result = 1
1206
            continue
1207
          for script, hkr, output in res.data:
1208
            if hkr == constants.HKR_FAIL:
1209
              # The node header is only shown once, if there are
1210
              # failing hooks on that node
1211
              if show_node_header:
1212
                feedback_fn("  Node %s:" % node_name)
1213
                show_node_header = False
1214
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1215
              output = indent_re.sub('      ', output)
1216
              feedback_fn("%s" % output)
1217
              lu_result = 1
1218

    
1219
      return lu_result
1220

    
1221

    
1222
class LUVerifyDisks(NoHooksLU):
1223
  """Verifies the cluster disks status.
1224

1225
  """
1226
  _OP_REQP = []
1227
  REQ_BGL = False
1228

    
1229
  def ExpandNames(self):
1230
    self.needed_locks = {
1231
      locking.LEVEL_NODE: locking.ALL_SET,
1232
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1233
    }
1234
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1235

    
1236
  def CheckPrereq(self):
1237
    """Check prerequisites.
1238

1239
    This has no prerequisites.
1240

1241
    """
1242
    pass
1243

    
1244
  def Exec(self, feedback_fn):
1245
    """Verify integrity of cluster disks.
1246

1247
    """
1248
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1249

    
1250
    vg_name = self.cfg.GetVGName()
1251
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1252
    instances = [self.cfg.GetInstanceInfo(name)
1253
                 for name in self.cfg.GetInstanceList()]
1254

    
1255
    nv_dict = {}
1256
    for inst in instances:
1257
      inst_lvs = {}
1258
      if (not inst.admin_up or
1259
          inst.disk_template not in constants.DTS_NET_MIRROR):
1260
        continue
1261
      inst.MapLVsByNode(inst_lvs)
1262
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1263
      for node, vol_list in inst_lvs.iteritems():
1264
        for vol in vol_list:
1265
          nv_dict[(node, vol)] = inst
1266

    
1267
    if not nv_dict:
1268
      return result
1269

    
1270
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1271

    
1272
    to_act = set()
1273
    for node in nodes:
1274
      # node_volume
1275
      lvs = node_lvs[node]
1276
      if lvs.failed:
1277
        if not lvs.offline:
1278
          self.LogWarning("Connection to node %s failed: %s" %
1279
                          (node, lvs.data))
1280
        continue
1281
      lvs = lvs.data
1282
      if isinstance(lvs, basestring):
1283
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1284
        res_nlvm[node] = lvs
1285
      elif not isinstance(lvs, dict):
1286
        logging.warning("Connection to node %s failed or invalid data"
1287
                        " returned", node)
1288
        res_nodes.append(node)
1289
        continue
1290

    
1291
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1292
        inst = nv_dict.pop((node, lv_name), None)
1293
        if (not lv_online and inst is not None
1294
            and inst.name not in res_instances):
1295
          res_instances.append(inst.name)
1296

    
1297
    # any leftover items in nv_dict are missing LVs, let's arrange the
1298
    # data better
1299
    for key, inst in nv_dict.iteritems():
1300
      if inst.name not in res_missing:
1301
        res_missing[inst.name] = []
1302
      res_missing[inst.name].append(key)
1303

    
1304
    return result
1305

    
1306

    
1307
class LURenameCluster(LogicalUnit):
1308
  """Rename the cluster.
1309

1310
  """
1311
  HPATH = "cluster-rename"
1312
  HTYPE = constants.HTYPE_CLUSTER
1313
  _OP_REQP = ["name"]
1314

    
1315
  def BuildHooksEnv(self):
1316
    """Build hooks env.
1317

1318
    """
1319
    env = {
1320
      "OP_TARGET": self.cfg.GetClusterName(),
1321
      "NEW_NAME": self.op.name,
1322
      }
1323
    mn = self.cfg.GetMasterNode()
1324
    return env, [mn], [mn]
1325

    
1326
  def CheckPrereq(self):
1327
    """Verify that the passed name is a valid one.
1328

1329
    """
1330
    hostname = utils.HostInfo(self.op.name)
1331

    
1332
    new_name = hostname.name
1333
    self.ip = new_ip = hostname.ip
1334
    old_name = self.cfg.GetClusterName()
1335
    old_ip = self.cfg.GetMasterIP()
1336
    if new_name == old_name and new_ip == old_ip:
1337
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1338
                                 " cluster has changed")
1339
    if new_ip != old_ip:
1340
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1341
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1342
                                   " reachable on the network. Aborting." %
1343
                                   new_ip)
1344

    
1345
    self.op.name = new_name
1346

    
1347
  def Exec(self, feedback_fn):
1348
    """Rename the cluster.
1349

1350
    """
1351
    clustername = self.op.name
1352
    ip = self.ip
1353

    
1354
    # shutdown the master IP
1355
    master = self.cfg.GetMasterNode()
1356
    result = self.rpc.call_node_stop_master(master, False)
1357
    if result.failed or not result.data:
1358
      raise errors.OpExecError("Could not disable the master role")
1359

    
1360
    try:
1361
      cluster = self.cfg.GetClusterInfo()
1362
      cluster.cluster_name = clustername
1363
      cluster.master_ip = ip
1364
      self.cfg.Update(cluster)
1365

    
1366
      # update the known hosts file
1367
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1368
      node_list = self.cfg.GetNodeList()
1369
      try:
1370
        node_list.remove(master)
1371
      except ValueError:
1372
        pass
1373
      result = self.rpc.call_upload_file(node_list,
1374
                                         constants.SSH_KNOWN_HOSTS_FILE)
1375
      for to_node, to_result in result.iteritems():
1376
        if to_result.failed or not to_result.data:
1377
          logging.error("Copy of file %s to node %s failed",
1378
                        constants.SSH_KNOWN_HOSTS_FILE, to_node)
1379

    
1380
    finally:
1381
      result = self.rpc.call_node_start_master(master, False)
1382
      if result.failed or not result.data:
1383
        self.LogWarning("Could not re-enable the master role on"
1384
                        " the master, please restart manually.")
1385

    
1386

    
1387
def _RecursiveCheckIfLVMBased(disk):
1388
  """Check if the given disk or its children are lvm-based.
1389

1390
  @type disk: L{objects.Disk}
1391
  @param disk: the disk to check
1392
  @rtype: booleean
1393
  @return: boolean indicating whether a LD_LV dev_type was found or not
1394

1395
  """
1396
  if disk.children:
1397
    for chdisk in disk.children:
1398
      if _RecursiveCheckIfLVMBased(chdisk):
1399
        return True
1400
  return disk.dev_type == constants.LD_LV
1401

    
1402

    
1403
class LUSetClusterParams(LogicalUnit):
1404
  """Change the parameters of the cluster.
1405

1406
  """
1407
  HPATH = "cluster-modify"
1408
  HTYPE = constants.HTYPE_CLUSTER
1409
  _OP_REQP = []
1410
  REQ_BGL = False
1411

    
1412
  def CheckParameters(self):
1413
    """Check parameters
1414

1415
    """
1416
    if not hasattr(self.op, "candidate_pool_size"):
1417
      self.op.candidate_pool_size = None
1418
    if self.op.candidate_pool_size is not None:
1419
      try:
1420
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1421
      except ValueError, err:
1422
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1423
                                   str(err))
1424
      if self.op.candidate_pool_size < 1:
1425
        raise errors.OpPrereqError("At least one master candidate needed")
1426

    
1427
  def ExpandNames(self):
1428
    # FIXME: in the future maybe other cluster params won't require checking on
1429
    # all nodes to be modified.
1430
    self.needed_locks = {
1431
      locking.LEVEL_NODE: locking.ALL_SET,
1432
    }
1433
    self.share_locks[locking.LEVEL_NODE] = 1
1434

    
1435
  def BuildHooksEnv(self):
1436
    """Build hooks env.
1437

1438
    """
1439
    env = {
1440
      "OP_TARGET": self.cfg.GetClusterName(),
1441
      "NEW_VG_NAME": self.op.vg_name,
1442
      }
1443
    mn = self.cfg.GetMasterNode()
1444
    return env, [mn], [mn]
1445

    
1446
  def CheckPrereq(self):
1447
    """Check prerequisites.
1448

1449
    This checks whether the given params don't conflict and
1450
    if the given volume group is valid.
1451

1452
    """
1453
    if self.op.vg_name is not None and not self.op.vg_name:
1454
      instances = self.cfg.GetAllInstancesInfo().values()
1455
      for inst in instances:
1456
        for disk in inst.disks:
1457
          if _RecursiveCheckIfLVMBased(disk):
1458
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1459
                                       " lvm-based instances exist")
1460

    
1461
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1462

    
1463
    # if vg_name not None, checks given volume group on all nodes
1464
    if self.op.vg_name:
1465
      vglist = self.rpc.call_vg_list(node_list)
1466
      for node in node_list:
1467
        if vglist[node].failed:
1468
          # ignoring down node
1469
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1470
          continue
1471
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1472
                                              self.op.vg_name,
1473
                                              constants.MIN_VG_SIZE)
1474
        if vgstatus:
1475
          raise errors.OpPrereqError("Error on node '%s': %s" %
1476
                                     (node, vgstatus))
1477

    
1478
    self.cluster = cluster = self.cfg.GetClusterInfo()
1479
    # validate beparams changes
1480
    if self.op.beparams:
1481
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1482
      self.new_beparams = cluster.FillDict(
1483
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1484

    
1485
    # hypervisor list/parameters
1486
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1487
    if self.op.hvparams:
1488
      if not isinstance(self.op.hvparams, dict):
1489
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1490
      for hv_name, hv_dict in self.op.hvparams.items():
1491
        if hv_name not in self.new_hvparams:
1492
          self.new_hvparams[hv_name] = hv_dict
1493
        else:
1494
          self.new_hvparams[hv_name].update(hv_dict)
1495

    
1496
    if self.op.enabled_hypervisors is not None:
1497
      self.hv_list = self.op.enabled_hypervisors
1498
    else:
1499
      self.hv_list = cluster.enabled_hypervisors
1500

    
1501
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1502
      # either the enabled list has changed, or the parameters have, validate
1503
      for hv_name, hv_params in self.new_hvparams.items():
1504
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1505
            (self.op.enabled_hypervisors and
1506
             hv_name in self.op.enabled_hypervisors)):
1507
          # either this is a new hypervisor, or its parameters have changed
1508
          hv_class = hypervisor.GetHypervisor(hv_name)
1509
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1510
          hv_class.CheckParameterSyntax(hv_params)
1511
          _CheckHVParams(self, node_list, hv_name, hv_params)
1512

    
1513
  def Exec(self, feedback_fn):
1514
    """Change the parameters of the cluster.
1515

1516
    """
1517
    if self.op.vg_name is not None:
1518
      if self.op.vg_name != self.cfg.GetVGName():
1519
        self.cfg.SetVGName(self.op.vg_name)
1520
      else:
1521
        feedback_fn("Cluster LVM configuration already in desired"
1522
                    " state, not changing")
1523
    if self.op.hvparams:
1524
      self.cluster.hvparams = self.new_hvparams
1525
    if self.op.enabled_hypervisors is not None:
1526
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1527
    if self.op.beparams:
1528
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1529
    if self.op.candidate_pool_size is not None:
1530
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1531

    
1532
    self.cfg.Update(self.cluster)
1533

    
1534
    # we want to update nodes after the cluster so that if any errors
1535
    # happen, we have recorded and saved the cluster info
1536
    if self.op.candidate_pool_size is not None:
1537
      _AdjustCandidatePool(self)
1538

    
1539

    
1540
class LURedistributeConfig(NoHooksLU):
1541
  """Force the redistribution of cluster configuration.
1542

1543
  This is a very simple LU.
1544

1545
  """
1546
  _OP_REQP = []
1547
  REQ_BGL = False
1548

    
1549
  def ExpandNames(self):
1550
    self.needed_locks = {
1551
      locking.LEVEL_NODE: locking.ALL_SET,
1552
    }
1553
    self.share_locks[locking.LEVEL_NODE] = 1
1554

    
1555
  def CheckPrereq(self):
1556
    """Check prerequisites.
1557

1558
    """
1559

    
1560
  def Exec(self, feedback_fn):
1561
    """Redistribute the configuration.
1562

1563
    """
1564
    self.cfg.Update(self.cfg.GetClusterInfo())
1565

    
1566

    
1567
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1568
  """Sleep and poll for an instance's disk to sync.
1569

1570
  """
1571
  if not instance.disks:
1572
    return True
1573

    
1574
  if not oneshot:
1575
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1576

    
1577
  node = instance.primary_node
1578

    
1579
  for dev in instance.disks:
1580
    lu.cfg.SetDiskID(dev, node)
1581

    
1582
  retries = 0
1583
  while True:
1584
    max_time = 0
1585
    done = True
1586
    cumul_degraded = False
1587
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1588
    if rstats.failed or not rstats.data:
1589
      lu.LogWarning("Can't get any data from node %s", node)
1590
      retries += 1
1591
      if retries >= 10:
1592
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1593
                                 " aborting." % node)
1594
      time.sleep(6)
1595
      continue
1596
    rstats = rstats.data
1597
    retries = 0
1598
    for i, mstat in enumerate(rstats):
1599
      if mstat is None:
1600
        lu.LogWarning("Can't compute data for node %s/%s",
1601
                           node, instance.disks[i].iv_name)
1602
        continue
1603
      # we ignore the ldisk parameter
1604
      perc_done, est_time, is_degraded, _ = mstat
1605
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1606
      if perc_done is not None:
1607
        done = False
1608
        if est_time is not None:
1609
          rem_time = "%d estimated seconds remaining" % est_time
1610
          max_time = est_time
1611
        else:
1612
          rem_time = "no time estimate"
1613
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1614
                        (instance.disks[i].iv_name, perc_done, rem_time))
1615
    if done or oneshot:
1616
      break
1617

    
1618
    time.sleep(min(60, max_time))
1619

    
1620
  if done:
1621
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1622
  return not cumul_degraded
1623

    
1624

    
1625
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1626
  """Check that mirrors are not degraded.
1627

1628
  The ldisk parameter, if True, will change the test from the
1629
  is_degraded attribute (which represents overall non-ok status for
1630
  the device(s)) to the ldisk (representing the local storage status).
1631

1632
  """
1633
  lu.cfg.SetDiskID(dev, node)
1634
  if ldisk:
1635
    idx = 6
1636
  else:
1637
    idx = 5
1638

    
1639
  result = True
1640
  if on_primary or dev.AssembleOnSecondary():
1641
    rstats = lu.rpc.call_blockdev_find(node, dev)
1642
    msg = rstats.RemoteFailMsg()
1643
    if msg:
1644
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1645
      result = False
1646
    elif not rstats.payload:
1647
      lu.LogWarning("Can't find disk on node %s", node)
1648
      result = False
1649
    else:
1650
      result = result and (not rstats.payload[idx])
1651
  if dev.children:
1652
    for child in dev.children:
1653
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1654

    
1655
  return result
1656

    
1657

    
1658
class LUDiagnoseOS(NoHooksLU):
1659
  """Logical unit for OS diagnose/query.
1660

1661
  """
1662
  _OP_REQP = ["output_fields", "names"]
1663
  REQ_BGL = False
1664
  _FIELDS_STATIC = utils.FieldSet()
1665
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1666

    
1667
  def ExpandNames(self):
1668
    if self.op.names:
1669
      raise errors.OpPrereqError("Selective OS query not supported")
1670

    
1671
    _CheckOutputFields(static=self._FIELDS_STATIC,
1672
                       dynamic=self._FIELDS_DYNAMIC,
1673
                       selected=self.op.output_fields)
1674

    
1675
    # Lock all nodes, in shared mode
1676
    self.needed_locks = {}
1677
    self.share_locks[locking.LEVEL_NODE] = 1
1678
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1679

    
1680
  def CheckPrereq(self):
1681
    """Check prerequisites.
1682

1683
    """
1684

    
1685
  @staticmethod
1686
  def _DiagnoseByOS(node_list, rlist):
1687
    """Remaps a per-node return list into an a per-os per-node dictionary
1688

1689
    @param node_list: a list with the names of all nodes
1690
    @param rlist: a map with node names as keys and OS objects as values
1691

1692
    @rtype: dict
1693
    @return: a dictionary with osnames as keys and as value another map, with
1694
        nodes as keys and list of OS objects as values, eg::
1695

1696
          {"debian-etch": {"node1": [<object>,...],
1697
                           "node2": [<object>,]}
1698
          }
1699

1700
    """
1701
    all_os = {}
1702
    for node_name, nr in rlist.iteritems():
1703
      if nr.failed or not nr.data:
1704
        continue
1705
      for os_obj in nr.data:
1706
        if os_obj.name not in all_os:
1707
          # build a list of nodes for this os containing empty lists
1708
          # for each node in node_list
1709
          all_os[os_obj.name] = {}
1710
          for nname in node_list:
1711
            all_os[os_obj.name][nname] = []
1712
        all_os[os_obj.name][node_name].append(os_obj)
1713
    return all_os
1714

    
1715
  def Exec(self, feedback_fn):
1716
    """Compute the list of OSes.
1717

1718
    """
1719
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1720
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1721
                   if node in node_list]
1722
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1723
    if node_data == False:
1724
      raise errors.OpExecError("Can't gather the list of OSes")
1725
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1726
    output = []
1727
    for os_name, os_data in pol.iteritems():
1728
      row = []
1729
      for field in self.op.output_fields:
1730
        if field == "name":
1731
          val = os_name
1732
        elif field == "valid":
1733
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1734
        elif field == "node_status":
1735
          val = {}
1736
          for node_name, nos_list in os_data.iteritems():
1737
            val[node_name] = [(v.status, v.path) for v in nos_list]
1738
        else:
1739
          raise errors.ParameterError(field)
1740
        row.append(val)
1741
      output.append(row)
1742

    
1743
    return output
1744

    
1745

    
1746
class LURemoveNode(LogicalUnit):
1747
  """Logical unit for removing a node.
1748

1749
  """
1750
  HPATH = "node-remove"
1751
  HTYPE = constants.HTYPE_NODE
1752
  _OP_REQP = ["node_name"]
1753

    
1754
  def BuildHooksEnv(self):
1755
    """Build hooks env.
1756

1757
    This doesn't run on the target node in the pre phase as a failed
1758
    node would then be impossible to remove.
1759

1760
    """
1761
    env = {
1762
      "OP_TARGET": self.op.node_name,
1763
      "NODE_NAME": self.op.node_name,
1764
      }
1765
    all_nodes = self.cfg.GetNodeList()
1766
    all_nodes.remove(self.op.node_name)
1767
    return env, all_nodes, all_nodes
1768

    
1769
  def CheckPrereq(self):
1770
    """Check prerequisites.
1771

1772
    This checks:
1773
     - the node exists in the configuration
1774
     - it does not have primary or secondary instances
1775
     - it's not the master
1776

1777
    Any errors are signalled by raising errors.OpPrereqError.
1778

1779
    """
1780
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1781
    if node is None:
1782
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1783

    
1784
    instance_list = self.cfg.GetInstanceList()
1785

    
1786
    masternode = self.cfg.GetMasterNode()
1787
    if node.name == masternode:
1788
      raise errors.OpPrereqError("Node is the master node,"
1789
                                 " you need to failover first.")
1790

    
1791
    for instance_name in instance_list:
1792
      instance = self.cfg.GetInstanceInfo(instance_name)
1793
      if node.name in instance.all_nodes:
1794
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1795
                                   " please remove first." % instance_name)
1796
    self.op.node_name = node.name
1797
    self.node = node
1798

    
1799
  def Exec(self, feedback_fn):
1800
    """Removes the node from the cluster.
1801

1802
    """
1803
    node = self.node
1804
    logging.info("Stopping the node daemon and removing configs from node %s",
1805
                 node.name)
1806

    
1807
    self.context.RemoveNode(node.name)
1808

    
1809
    self.rpc.call_node_leave_cluster(node.name)
1810

    
1811
    # Promote nodes to master candidate as needed
1812
    _AdjustCandidatePool(self)
1813

    
1814

    
1815
class LUQueryNodes(NoHooksLU):
1816
  """Logical unit for querying nodes.
1817

1818
  """
1819
  _OP_REQP = ["output_fields", "names", "use_locking"]
1820
  REQ_BGL = False
1821
  _FIELDS_DYNAMIC = utils.FieldSet(
1822
    "dtotal", "dfree",
1823
    "mtotal", "mnode", "mfree",
1824
    "bootid",
1825
    "ctotal", "cnodes", "csockets",
1826
    )
1827

    
1828
  _FIELDS_STATIC = utils.FieldSet(
1829
    "name", "pinst_cnt", "sinst_cnt",
1830
    "pinst_list", "sinst_list",
1831
    "pip", "sip", "tags",
1832
    "serial_no",
1833
    "master_candidate",
1834
    "master",
1835
    "offline",
1836
    "drained",
1837
    )
1838

    
1839
  def ExpandNames(self):
1840
    _CheckOutputFields(static=self._FIELDS_STATIC,
1841
                       dynamic=self._FIELDS_DYNAMIC,
1842
                       selected=self.op.output_fields)
1843

    
1844
    self.needed_locks = {}
1845
    self.share_locks[locking.LEVEL_NODE] = 1
1846

    
1847
    if self.op.names:
1848
      self.wanted = _GetWantedNodes(self, self.op.names)
1849
    else:
1850
      self.wanted = locking.ALL_SET
1851

    
1852
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1853
    self.do_locking = self.do_node_query and self.op.use_locking
1854
    if self.do_locking:
1855
      # if we don't request only static fields, we need to lock the nodes
1856
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1857

    
1858

    
1859
  def CheckPrereq(self):
1860
    """Check prerequisites.
1861

1862
    """
1863
    # The validation of the node list is done in the _GetWantedNodes,
1864
    # if non empty, and if empty, there's no validation to do
1865
    pass
1866

    
1867
  def Exec(self, feedback_fn):
1868
    """Computes the list of nodes and their attributes.
1869

1870
    """
1871
    all_info = self.cfg.GetAllNodesInfo()
1872
    if self.do_locking:
1873
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1874
    elif self.wanted != locking.ALL_SET:
1875
      nodenames = self.wanted
1876
      missing = set(nodenames).difference(all_info.keys())
1877
      if missing:
1878
        raise errors.OpExecError(
1879
          "Some nodes were removed before retrieving their data: %s" % missing)
1880
    else:
1881
      nodenames = all_info.keys()
1882

    
1883
    nodenames = utils.NiceSort(nodenames)
1884
    nodelist = [all_info[name] for name in nodenames]
1885

    
1886
    # begin data gathering
1887

    
1888
    if self.do_node_query:
1889
      live_data = {}
1890
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1891
                                          self.cfg.GetHypervisorType())
1892
      for name in nodenames:
1893
        nodeinfo = node_data[name]
1894
        if not nodeinfo.failed and nodeinfo.data:
1895
          nodeinfo = nodeinfo.data
1896
          fn = utils.TryConvert
1897
          live_data[name] = {
1898
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1899
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1900
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1901
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1902
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1903
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1904
            "bootid": nodeinfo.get('bootid', None),
1905
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1906
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1907
            }
1908
        else:
1909
          live_data[name] = {}
1910
    else:
1911
      live_data = dict.fromkeys(nodenames, {})
1912

    
1913
    node_to_primary = dict([(name, set()) for name in nodenames])
1914
    node_to_secondary = dict([(name, set()) for name in nodenames])
1915

    
1916
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1917
                             "sinst_cnt", "sinst_list"))
1918
    if inst_fields & frozenset(self.op.output_fields):
1919
      instancelist = self.cfg.GetInstanceList()
1920

    
1921
      for instance_name in instancelist:
1922
        inst = self.cfg.GetInstanceInfo(instance_name)
1923
        if inst.primary_node in node_to_primary:
1924
          node_to_primary[inst.primary_node].add(inst.name)
1925
        for secnode in inst.secondary_nodes:
1926
          if secnode in node_to_secondary:
1927
            node_to_secondary[secnode].add(inst.name)
1928

    
1929
    master_node = self.cfg.GetMasterNode()
1930

    
1931
    # end data gathering
1932

    
1933
    output = []
1934
    for node in nodelist:
1935
      node_output = []
1936
      for field in self.op.output_fields:
1937
        if field == "name":
1938
          val = node.name
1939
        elif field == "pinst_list":
1940
          val = list(node_to_primary[node.name])
1941
        elif field == "sinst_list":
1942
          val = list(node_to_secondary[node.name])
1943
        elif field == "pinst_cnt":
1944
          val = len(node_to_primary[node.name])
1945
        elif field == "sinst_cnt":
1946
          val = len(node_to_secondary[node.name])
1947
        elif field == "pip":
1948
          val = node.primary_ip
1949
        elif field == "sip":
1950
          val = node.secondary_ip
1951
        elif field == "tags":
1952
          val = list(node.GetTags())
1953
        elif field == "serial_no":
1954
          val = node.serial_no
1955
        elif field == "master_candidate":
1956
          val = node.master_candidate
1957
        elif field == "master":
1958
          val = node.name == master_node
1959
        elif field == "offline":
1960
          val = node.offline
1961
        elif field == "drained":
1962
          val = node.drained
1963
        elif self._FIELDS_DYNAMIC.Matches(field):
1964
          val = live_data[node.name].get(field, None)
1965
        else:
1966
          raise errors.ParameterError(field)
1967
        node_output.append(val)
1968
      output.append(node_output)
1969

    
1970
    return output
1971

    
1972

    
1973
class LUQueryNodeVolumes(NoHooksLU):
1974
  """Logical unit for getting volumes on node(s).
1975

1976
  """
1977
  _OP_REQP = ["nodes", "output_fields"]
1978
  REQ_BGL = False
1979
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1980
  _FIELDS_STATIC = utils.FieldSet("node")
1981

    
1982
  def ExpandNames(self):
1983
    _CheckOutputFields(static=self._FIELDS_STATIC,
1984
                       dynamic=self._FIELDS_DYNAMIC,
1985
                       selected=self.op.output_fields)
1986

    
1987
    self.needed_locks = {}
1988
    self.share_locks[locking.LEVEL_NODE] = 1
1989
    if not self.op.nodes:
1990
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1991
    else:
1992
      self.needed_locks[locking.LEVEL_NODE] = \
1993
        _GetWantedNodes(self, self.op.nodes)
1994

    
1995
  def CheckPrereq(self):
1996
    """Check prerequisites.
1997

1998
    This checks that the fields required are valid output fields.
1999

2000
    """
2001
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2002

    
2003
  def Exec(self, feedback_fn):
2004
    """Computes the list of nodes and their attributes.
2005

2006
    """
2007
    nodenames = self.nodes
2008
    volumes = self.rpc.call_node_volumes(nodenames)
2009

    
2010
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2011
             in self.cfg.GetInstanceList()]
2012

    
2013
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2014

    
2015
    output = []
2016
    for node in nodenames:
2017
      if node not in volumes or volumes[node].failed or not volumes[node].data:
2018
        continue
2019

    
2020
      node_vols = volumes[node].data[:]
2021
      node_vols.sort(key=lambda vol: vol['dev'])
2022

    
2023
      for vol in node_vols:
2024
        node_output = []
2025
        for field in self.op.output_fields:
2026
          if field == "node":
2027
            val = node
2028
          elif field == "phys":
2029
            val = vol['dev']
2030
          elif field == "vg":
2031
            val = vol['vg']
2032
          elif field == "name":
2033
            val = vol['name']
2034
          elif field == "size":
2035
            val = int(float(vol['size']))
2036
          elif field == "instance":
2037
            for inst in ilist:
2038
              if node not in lv_by_node[inst]:
2039
                continue
2040
              if vol['name'] in lv_by_node[inst][node]:
2041
                val = inst.name
2042
                break
2043
            else:
2044
              val = '-'
2045
          else:
2046
            raise errors.ParameterError(field)
2047
          node_output.append(str(val))
2048

    
2049
        output.append(node_output)
2050

    
2051
    return output
2052

    
2053

    
2054
class LUAddNode(LogicalUnit):
2055
  """Logical unit for adding node to the cluster.
2056

2057
  """
2058
  HPATH = "node-add"
2059
  HTYPE = constants.HTYPE_NODE
2060
  _OP_REQP = ["node_name"]
2061

    
2062
  def BuildHooksEnv(self):
2063
    """Build hooks env.
2064

2065
    This will run on all nodes before, and on all nodes + the new node after.
2066

2067
    """
2068
    env = {
2069
      "OP_TARGET": self.op.node_name,
2070
      "NODE_NAME": self.op.node_name,
2071
      "NODE_PIP": self.op.primary_ip,
2072
      "NODE_SIP": self.op.secondary_ip,
2073
      }
2074
    nodes_0 = self.cfg.GetNodeList()
2075
    nodes_1 = nodes_0 + [self.op.node_name, ]
2076
    return env, nodes_0, nodes_1
2077

    
2078
  def CheckPrereq(self):
2079
    """Check prerequisites.
2080

2081
    This checks:
2082
     - the new node is not already in the config
2083
     - it is resolvable
2084
     - its parameters (single/dual homed) matches the cluster
2085

2086
    Any errors are signalled by raising errors.OpPrereqError.
2087

2088
    """
2089
    node_name = self.op.node_name
2090
    cfg = self.cfg
2091

    
2092
    dns_data = utils.HostInfo(node_name)
2093

    
2094
    node = dns_data.name
2095
    primary_ip = self.op.primary_ip = dns_data.ip
2096
    secondary_ip = getattr(self.op, "secondary_ip", None)
2097
    if secondary_ip is None:
2098
      secondary_ip = primary_ip
2099
    if not utils.IsValidIP(secondary_ip):
2100
      raise errors.OpPrereqError("Invalid secondary IP given")
2101
    self.op.secondary_ip = secondary_ip
2102

    
2103
    node_list = cfg.GetNodeList()
2104
    if not self.op.readd and node in node_list:
2105
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2106
                                 node)
2107
    elif self.op.readd and node not in node_list:
2108
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2109

    
2110
    for existing_node_name in node_list:
2111
      existing_node = cfg.GetNodeInfo(existing_node_name)
2112

    
2113
      if self.op.readd and node == existing_node_name:
2114
        if (existing_node.primary_ip != primary_ip or
2115
            existing_node.secondary_ip != secondary_ip):
2116
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2117
                                     " address configuration as before")
2118
        continue
2119

    
2120
      if (existing_node.primary_ip == primary_ip or
2121
          existing_node.secondary_ip == primary_ip or
2122
          existing_node.primary_ip == secondary_ip or
2123
          existing_node.secondary_ip == secondary_ip):
2124
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2125
                                   " existing node %s" % existing_node.name)
2126

    
2127
    # check that the type of the node (single versus dual homed) is the
2128
    # same as for the master
2129
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2130
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2131
    newbie_singlehomed = secondary_ip == primary_ip
2132
    if master_singlehomed != newbie_singlehomed:
2133
      if master_singlehomed:
2134
        raise errors.OpPrereqError("The master has no private ip but the"
2135
                                   " new node has one")
2136
      else:
2137
        raise errors.OpPrereqError("The master has a private ip but the"
2138
                                   " new node doesn't have one")
2139

    
2140
    # checks reachablity
2141
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2142
      raise errors.OpPrereqError("Node not reachable by ping")
2143

    
2144
    if not newbie_singlehomed:
2145
      # check reachability from my secondary ip to newbie's secondary ip
2146
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2147
                           source=myself.secondary_ip):
2148
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2149
                                   " based ping to noded port")
2150

    
2151
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2152
    mc_now, _ = self.cfg.GetMasterCandidateStats()
2153
    master_candidate = mc_now < cp_size
2154

    
2155
    self.new_node = objects.Node(name=node,
2156
                                 primary_ip=primary_ip,
2157
                                 secondary_ip=secondary_ip,
2158
                                 master_candidate=master_candidate,
2159
                                 offline=False, drained=False)
2160

    
2161
  def Exec(self, feedback_fn):
2162
    """Adds the new node to the cluster.
2163

2164
    """
2165
    new_node = self.new_node
2166
    node = new_node.name
2167

    
2168
    # check connectivity
2169
    result = self.rpc.call_version([node])[node]
2170
    result.Raise()
2171
    if result.data:
2172
      if constants.PROTOCOL_VERSION == result.data:
2173
        logging.info("Communication to node %s fine, sw version %s match",
2174
                     node, result.data)
2175
      else:
2176
        raise errors.OpExecError("Version mismatch master version %s,"
2177
                                 " node version %s" %
2178
                                 (constants.PROTOCOL_VERSION, result.data))
2179
    else:
2180
      raise errors.OpExecError("Cannot get version from the new node")
2181

    
2182
    # setup ssh on node
2183
    logging.info("Copy ssh key to node %s", node)
2184
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2185
    keyarray = []
2186
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2187
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2188
                priv_key, pub_key]
2189

    
2190
    for i in keyfiles:
2191
      f = open(i, 'r')
2192
      try:
2193
        keyarray.append(f.read())
2194
      finally:
2195
        f.close()
2196

    
2197
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2198
                                    keyarray[2],
2199
                                    keyarray[3], keyarray[4], keyarray[5])
2200

    
2201
    msg = result.RemoteFailMsg()
2202
    if msg:
2203
      raise errors.OpExecError("Cannot transfer ssh keys to the"
2204
                               " new node: %s" % msg)
2205

    
2206
    # Add node to our /etc/hosts, and add key to known_hosts
2207
    utils.AddHostToEtcHosts(new_node.name)
2208

    
2209
    if new_node.secondary_ip != new_node.primary_ip:
2210
      result = self.rpc.call_node_has_ip_address(new_node.name,
2211
                                                 new_node.secondary_ip)
2212
      if result.failed or not result.data:
2213
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2214
                                 " you gave (%s). Please fix and re-run this"
2215
                                 " command." % new_node.secondary_ip)
2216

    
2217
    node_verify_list = [self.cfg.GetMasterNode()]
2218
    node_verify_param = {
2219
      'nodelist': [node],
2220
      # TODO: do a node-net-test as well?
2221
    }
2222

    
2223
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2224
                                       self.cfg.GetClusterName())
2225
    for verifier in node_verify_list:
2226
      if result[verifier].failed or not result[verifier].data:
2227
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2228
                                 " for remote verification" % verifier)
2229
      if result[verifier].data['nodelist']:
2230
        for failed in result[verifier].data['nodelist']:
2231
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2232
                      (verifier, result[verifier].data['nodelist'][failed]))
2233
        raise errors.OpExecError("ssh/hostname verification failed.")
2234

    
2235
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2236
    # including the node just added
2237
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2238
    dist_nodes = self.cfg.GetNodeList()
2239
    if not self.op.readd:
2240
      dist_nodes.append(node)
2241
    if myself.name in dist_nodes:
2242
      dist_nodes.remove(myself.name)
2243

    
2244
    logging.debug("Copying hosts and known_hosts to all nodes")
2245
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2246
      result = self.rpc.call_upload_file(dist_nodes, fname)
2247
      for to_node, to_result in result.iteritems():
2248
        if to_result.failed or not to_result.data:
2249
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2250

    
2251
    to_copy = []
2252
    enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2253
    if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2254
      to_copy.append(constants.VNC_PASSWORD_FILE)
2255

    
2256
    for fname in to_copy:
2257
      result = self.rpc.call_upload_file([node], fname)
2258
      if result[node].failed or not result[node]:
2259
        logging.error("Could not copy file %s to node %s", fname, node)
2260

    
2261
    if self.op.readd:
2262
      self.context.ReaddNode(new_node)
2263
    else:
2264
      self.context.AddNode(new_node)
2265

    
2266

    
2267
class LUSetNodeParams(LogicalUnit):
2268
  """Modifies the parameters of a node.
2269

2270
  """
2271
  HPATH = "node-modify"
2272
  HTYPE = constants.HTYPE_NODE
2273
  _OP_REQP = ["node_name"]
2274
  REQ_BGL = False
2275

    
2276
  def CheckArguments(self):
2277
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2278
    if node_name is None:
2279
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2280
    self.op.node_name = node_name
2281
    _CheckBooleanOpField(self.op, 'master_candidate')
2282
    _CheckBooleanOpField(self.op, 'offline')
2283
    _CheckBooleanOpField(self.op, 'drained')
2284
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2285
    if all_mods.count(None) == 3:
2286
      raise errors.OpPrereqError("Please pass at least one modification")
2287
    if all_mods.count(True) > 1:
2288
      raise errors.OpPrereqError("Can't set the node into more than one"
2289
                                 " state at the same time")
2290

    
2291
  def ExpandNames(self):
2292
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2293

    
2294
  def BuildHooksEnv(self):
2295
    """Build hooks env.
2296

2297
    This runs on the master node.
2298

2299
    """
2300
    env = {
2301
      "OP_TARGET": self.op.node_name,
2302
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2303
      "OFFLINE": str(self.op.offline),
2304
      "DRAINED": str(self.op.drained),
2305
      }
2306
    nl = [self.cfg.GetMasterNode(),
2307
          self.op.node_name]
2308
    return env, nl, nl
2309

    
2310
  def CheckPrereq(self):
2311
    """Check prerequisites.
2312

2313
    This only checks the instance list against the existing names.
2314

2315
    """
2316
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2317

    
2318
    if ((self.op.master_candidate == False or self.op.offline == True or
2319
         self.op.drained == True) and node.master_candidate):
2320
      # we will demote the node from master_candidate
2321
      if self.op.node_name == self.cfg.GetMasterNode():
2322
        raise errors.OpPrereqError("The master node has to be a"
2323
                                   " master candidate, online and not drained")
2324
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2325
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2326
      if num_candidates <= cp_size:
2327
        msg = ("Not enough master candidates (desired"
2328
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2329
        if self.op.force:
2330
          self.LogWarning(msg)
2331
        else:
2332
          raise errors.OpPrereqError(msg)
2333

    
2334
    if (self.op.master_candidate == True and
2335
        ((node.offline and not self.op.offline == False) or
2336
         (node.drained and not self.op.drained == False))):
2337
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2338
                                 " to master_candidate")
2339

    
2340
    return
2341

    
2342
  def Exec(self, feedback_fn):
2343
    """Modifies a node.
2344

2345
    """
2346
    node = self.node
2347

    
2348
    result = []
2349
    changed_mc = False
2350

    
2351
    if self.op.offline is not None:
2352
      node.offline = self.op.offline
2353
      result.append(("offline", str(self.op.offline)))
2354
      if self.op.offline == True:
2355
        if node.master_candidate:
2356
          node.master_candidate = False
2357
          changed_mc = True
2358
          result.append(("master_candidate", "auto-demotion due to offline"))
2359
        if node.drained:
2360
          node.drained = False
2361
          result.append(("drained", "clear drained status due to offline"))
2362

    
2363
    if self.op.master_candidate is not None:
2364
      node.master_candidate = self.op.master_candidate
2365
      changed_mc = True
2366
      result.append(("master_candidate", str(self.op.master_candidate)))
2367
      if self.op.master_candidate == False:
2368
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2369
        msg = rrc.RemoteFailMsg()
2370
        if msg:
2371
          self.LogWarning("Node failed to demote itself: %s" % msg)
2372

    
2373
    if self.op.drained is not None:
2374
      node.drained = self.op.drained
2375
      result.append(("drained", str(self.op.drained)))
2376
      if self.op.drained == True:
2377
        if node.master_candidate:
2378
          node.master_candidate = False
2379
          changed_mc = True
2380
          result.append(("master_candidate", "auto-demotion due to drain"))
2381
        if node.offline:
2382
          node.offline = False
2383
          result.append(("offline", "clear offline status due to drain"))
2384

    
2385
    # this will trigger configuration file update, if needed
2386
    self.cfg.Update(node)
2387
    # this will trigger job queue propagation or cleanup
2388
    if changed_mc:
2389
      self.context.ReaddNode(node)
2390

    
2391
    return result
2392

    
2393

    
2394
class LUQueryClusterInfo(NoHooksLU):
2395
  """Query cluster configuration.
2396

2397
  """
2398
  _OP_REQP = []
2399
  REQ_BGL = False
2400

    
2401
  def ExpandNames(self):
2402
    self.needed_locks = {}
2403

    
2404
  def CheckPrereq(self):
2405
    """No prerequsites needed for this LU.
2406

2407
    """
2408
    pass
2409

    
2410
  def Exec(self, feedback_fn):
2411
    """Return cluster config.
2412

2413
    """
2414
    cluster = self.cfg.GetClusterInfo()
2415
    result = {
2416
      "software_version": constants.RELEASE_VERSION,
2417
      "protocol_version": constants.PROTOCOL_VERSION,
2418
      "config_version": constants.CONFIG_VERSION,
2419
      "os_api_version": constants.OS_API_VERSION,
2420
      "export_version": constants.EXPORT_VERSION,
2421
      "architecture": (platform.architecture()[0], platform.machine()),
2422
      "name": cluster.cluster_name,
2423
      "master": cluster.master_node,
2424
      "default_hypervisor": cluster.default_hypervisor,
2425
      "enabled_hypervisors": cluster.enabled_hypervisors,
2426
      "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2427
                        for hypervisor in cluster.enabled_hypervisors]),
2428
      "beparams": cluster.beparams,
2429
      "candidate_pool_size": cluster.candidate_pool_size,
2430
      }
2431

    
2432
    return result
2433

    
2434

    
2435
class LUQueryConfigValues(NoHooksLU):
2436
  """Return configuration values.
2437

2438
  """
2439
  _OP_REQP = []
2440
  REQ_BGL = False
2441
  _FIELDS_DYNAMIC = utils.FieldSet()
2442
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2443

    
2444
  def ExpandNames(self):
2445
    self.needed_locks = {}
2446

    
2447
    _CheckOutputFields(static=self._FIELDS_STATIC,
2448
                       dynamic=self._FIELDS_DYNAMIC,
2449
                       selected=self.op.output_fields)
2450

    
2451
  def CheckPrereq(self):
2452
    """No prerequisites.
2453

2454
    """
2455
    pass
2456

    
2457
  def Exec(self, feedback_fn):
2458
    """Dump a representation of the cluster config to the standard output.
2459

2460
    """
2461
    values = []
2462
    for field in self.op.output_fields:
2463
      if field == "cluster_name":
2464
        entry = self.cfg.GetClusterName()
2465
      elif field == "master_node":
2466
        entry = self.cfg.GetMasterNode()
2467
      elif field == "drain_flag":
2468
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2469
      else:
2470
        raise errors.ParameterError(field)
2471
      values.append(entry)
2472
    return values
2473

    
2474

    
2475
class LUActivateInstanceDisks(NoHooksLU):
2476
  """Bring up an instance's disks.
2477

2478
  """
2479
  _OP_REQP = ["instance_name"]
2480
  REQ_BGL = False
2481

    
2482
  def ExpandNames(self):
2483
    self._ExpandAndLockInstance()
2484
    self.needed_locks[locking.LEVEL_NODE] = []
2485
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2486

    
2487
  def DeclareLocks(self, level):
2488
    if level == locking.LEVEL_NODE:
2489
      self._LockInstancesNodes()
2490

    
2491
  def CheckPrereq(self):
2492
    """Check prerequisites.
2493

2494
    This checks that the instance is in the cluster.
2495

2496
    """
2497
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2498
    assert self.instance is not None, \
2499
      "Cannot retrieve locked instance %s" % self.op.instance_name
2500
    _CheckNodeOnline(self, self.instance.primary_node)
2501

    
2502
  def Exec(self, feedback_fn):
2503
    """Activate the disks.
2504

2505
    """
2506
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2507
    if not disks_ok:
2508
      raise errors.OpExecError("Cannot activate block devices")
2509

    
2510
    return disks_info
2511

    
2512

    
2513
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2514
  """Prepare the block devices for an instance.
2515

2516
  This sets up the block devices on all nodes.
2517

2518
  @type lu: L{LogicalUnit}
2519
  @param lu: the logical unit on whose behalf we execute
2520
  @type instance: L{objects.Instance}
2521
  @param instance: the instance for whose disks we assemble
2522
  @type ignore_secondaries: boolean
2523
  @param ignore_secondaries: if true, errors on secondary nodes
2524
      won't result in an error return from the function
2525
  @return: False if the operation failed, otherwise a list of
2526
      (host, instance_visible_name, node_visible_name)
2527
      with the mapping from node devices to instance devices
2528

2529
  """
2530
  device_info = []
2531
  disks_ok = True
2532
  iname = instance.name
2533
  # With the two passes mechanism we try to reduce the window of
2534
  # opportunity for the race condition of switching DRBD to primary
2535
  # before handshaking occured, but we do not eliminate it
2536

    
2537
  # The proper fix would be to wait (with some limits) until the
2538
  # connection has been made and drbd transitions from WFConnection
2539
  # into any other network-connected state (Connected, SyncTarget,
2540
  # SyncSource, etc.)
2541

    
2542
  # 1st pass, assemble on all nodes in secondary mode
2543
  for inst_disk in instance.disks:
2544
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2545
      lu.cfg.SetDiskID(node_disk, node)
2546
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2547
      msg = result.RemoteFailMsg()
2548
      if msg:
2549
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2550
                           " (is_primary=False, pass=1): %s",
2551
                           inst_disk.iv_name, node, msg)
2552
        if not ignore_secondaries:
2553
          disks_ok = False
2554

    
2555
  # FIXME: race condition on drbd migration to primary
2556

    
2557
  # 2nd pass, do only the primary node
2558
  for inst_disk in instance.disks:
2559
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2560
      if node != instance.primary_node:
2561
        continue
2562
      lu.cfg.SetDiskID(node_disk, node)
2563
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2564
      msg = result.RemoteFailMsg()
2565
      if msg:
2566
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2567
                           " (is_primary=True, pass=2): %s",
2568
                           inst_disk.iv_name, node, msg)
2569
        disks_ok = False
2570
    device_info.append((instance.primary_node, inst_disk.iv_name,
2571
                        result.payload))
2572

    
2573
  # leave the disks configured for the primary node
2574
  # this is a workaround that would be fixed better by
2575
  # improving the logical/physical id handling
2576
  for disk in instance.disks:
2577
    lu.cfg.SetDiskID(disk, instance.primary_node)
2578

    
2579
  return disks_ok, device_info
2580

    
2581

    
2582
def _StartInstanceDisks(lu, instance, force):
2583
  """Start the disks of an instance.
2584

2585
  """
2586
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2587
                                           ignore_secondaries=force)
2588
  if not disks_ok:
2589
    _ShutdownInstanceDisks(lu, instance)
2590
    if force is not None and not force:
2591
      lu.proc.LogWarning("", hint="If the message above refers to a"
2592
                         " secondary node,"
2593
                         " you can retry the operation using '--force'.")
2594
    raise errors.OpExecError("Disk consistency error")
2595

    
2596

    
2597
class LUDeactivateInstanceDisks(NoHooksLU):
2598
  """Shutdown an instance's disks.
2599

2600
  """
2601
  _OP_REQP = ["instance_name"]
2602
  REQ_BGL = False
2603

    
2604
  def ExpandNames(self):
2605
    self._ExpandAndLockInstance()
2606
    self.needed_locks[locking.LEVEL_NODE] = []
2607
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2608

    
2609
  def DeclareLocks(self, level):
2610
    if level == locking.LEVEL_NODE:
2611
      self._LockInstancesNodes()
2612

    
2613
  def CheckPrereq(self):
2614
    """Check prerequisites.
2615

2616
    This checks that the instance is in the cluster.
2617

2618
    """
2619
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2620
    assert self.instance is not None, \
2621
      "Cannot retrieve locked instance %s" % self.op.instance_name
2622

    
2623
  def Exec(self, feedback_fn):
2624
    """Deactivate the disks
2625

2626
    """
2627
    instance = self.instance
2628
    _SafeShutdownInstanceDisks(self, instance)
2629

    
2630

    
2631
def _SafeShutdownInstanceDisks(lu, instance):
2632
  """Shutdown block devices of an instance.
2633

2634
  This function checks if an instance is running, before calling
2635
  _ShutdownInstanceDisks.
2636

2637
  """
2638
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2639
                                      [instance.hypervisor])
2640
  ins_l = ins_l[instance.primary_node]
2641
  if ins_l.failed or not isinstance(ins_l.data, list):
2642
    raise errors.OpExecError("Can't contact node '%s'" %
2643
                             instance.primary_node)
2644

    
2645
  if instance.name in ins_l.data:
2646
    raise errors.OpExecError("Instance is running, can't shutdown"
2647
                             " block devices.")
2648

    
2649
  _ShutdownInstanceDisks(lu, instance)
2650

    
2651

    
2652
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2653
  """Shutdown block devices of an instance.
2654

2655
  This does the shutdown on all nodes of the instance.
2656

2657
  If the ignore_primary is false, errors on the primary node are
2658
  ignored.
2659

2660
  """
2661
  all_result = True
2662
  for disk in instance.disks:
2663
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2664
      lu.cfg.SetDiskID(top_disk, node)
2665
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2666
      msg = result.RemoteFailMsg()
2667
      if msg:
2668
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2669
                      disk.iv_name, node, msg)
2670
        if not ignore_primary or node != instance.primary_node:
2671
          all_result = False
2672
  return all_result
2673

    
2674

    
2675
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2676
  """Checks if a node has enough free memory.
2677

2678
  This function check if a given node has the needed amount of free
2679
  memory. In case the node has less memory or we cannot get the
2680
  information from the node, this function raise an OpPrereqError
2681
  exception.
2682

2683
  @type lu: C{LogicalUnit}
2684
  @param lu: a logical unit from which we get configuration data
2685
  @type node: C{str}
2686
  @param node: the node to check
2687
  @type reason: C{str}
2688
  @param reason: string to use in the error message
2689
  @type requested: C{int}
2690
  @param requested: the amount of memory in MiB to check for
2691
  @type hypervisor_name: C{str}
2692
  @param hypervisor_name: the hypervisor to ask for memory stats
2693
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2694
      we cannot check the node
2695

2696
  """
2697
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2698
  nodeinfo[node].Raise()
2699
  free_mem = nodeinfo[node].data.get('memory_free')
2700
  if not isinstance(free_mem, int):
2701
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2702
                             " was '%s'" % (node, free_mem))
2703
  if requested > free_mem:
2704
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2705
                             " needed %s MiB, available %s MiB" %
2706
                             (node, reason, requested, free_mem))
2707

    
2708

    
2709
class LUStartupInstance(LogicalUnit):
2710
  """Starts an instance.
2711

2712
  """
2713
  HPATH = "instance-start"
2714
  HTYPE = constants.HTYPE_INSTANCE
2715
  _OP_REQP = ["instance_name", "force"]
2716
  REQ_BGL = False
2717

    
2718
  def ExpandNames(self):
2719
    self._ExpandAndLockInstance()
2720

    
2721
  def BuildHooksEnv(self):
2722
    """Build hooks env.
2723

2724
    This runs on master, primary and secondary nodes of the instance.
2725

2726
    """
2727
    env = {
2728
      "FORCE": self.op.force,
2729
      }
2730
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2731
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2732
    return env, nl, nl
2733

    
2734
  def CheckPrereq(self):
2735
    """Check prerequisites.
2736

2737
    This checks that the instance is in the cluster.
2738

2739
    """
2740
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2741
    assert self.instance is not None, \
2742
      "Cannot retrieve locked instance %s" % self.op.instance_name
2743

    
2744
    _CheckNodeOnline(self, instance.primary_node)
2745

    
2746
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2747
    # check bridges existance
2748
    _CheckInstanceBridgesExist(self, instance)
2749

    
2750
    _CheckNodeFreeMemory(self, instance.primary_node,
2751
                         "starting instance %s" % instance.name,
2752
                         bep[constants.BE_MEMORY], instance.hypervisor)
2753

    
2754
  def Exec(self, feedback_fn):
2755
    """Start the instance.
2756

2757
    """
2758
    instance = self.instance
2759
    force = self.op.force
2760

    
2761
    self.cfg.MarkInstanceUp(instance.name)
2762

    
2763
    node_current = instance.primary_node
2764

    
2765
    _StartInstanceDisks(self, instance, force)
2766

    
2767
    result = self.rpc.call_instance_start(node_current, instance)
2768
    msg = result.RemoteFailMsg()
2769
    if msg:
2770
      _ShutdownInstanceDisks(self, instance)
2771
      raise errors.OpExecError("Could not start instance: %s" % msg)
2772

    
2773

    
2774
class LURebootInstance(LogicalUnit):
2775
  """Reboot an instance.
2776

2777
  """
2778
  HPATH = "instance-reboot"
2779
  HTYPE = constants.HTYPE_INSTANCE
2780
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2781
  REQ_BGL = False
2782

    
2783
  def ExpandNames(self):
2784
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2785
                                   constants.INSTANCE_REBOOT_HARD,
2786
                                   constants.INSTANCE_REBOOT_FULL]:
2787
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2788
                                  (constants.INSTANCE_REBOOT_SOFT,
2789
                                   constants.INSTANCE_REBOOT_HARD,
2790
                                   constants.INSTANCE_REBOOT_FULL))
2791
    self._ExpandAndLockInstance()
2792

    
2793
  def BuildHooksEnv(self):
2794
    """Build hooks env.
2795

2796
    This runs on master, primary and secondary nodes of the instance.
2797

2798
    """
2799
    env = {
2800
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2801
      "REBOOT_TYPE": self.op.reboot_type,
2802
      }
2803
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2804
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2805
    return env, nl, nl
2806

    
2807
  def CheckPrereq(self):
2808
    """Check prerequisites.
2809

2810
    This checks that the instance is in the cluster.
2811

2812
    """
2813
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2814
    assert self.instance is not None, \
2815
      "Cannot retrieve locked instance %s" % self.op.instance_name
2816

    
2817
    _CheckNodeOnline(self, instance.primary_node)
2818

    
2819
    # check bridges existance
2820
    _CheckInstanceBridgesExist(self, instance)
2821

    
2822
  def Exec(self, feedback_fn):
2823
    """Reboot the instance.
2824

2825
    """
2826
    instance = self.instance
2827
    ignore_secondaries = self.op.ignore_secondaries
2828
    reboot_type = self.op.reboot_type
2829

    
2830
    node_current = instance.primary_node
2831

    
2832
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2833
                       constants.INSTANCE_REBOOT_HARD]:
2834
      for disk in instance.disks:
2835
        self.cfg.SetDiskID(disk, node_current)
2836
      result = self.rpc.call_instance_reboot(node_current, instance,
2837
                                             reboot_type)
2838
      msg = result.RemoteFailMsg()
2839
      if msg:
2840
        raise errors.OpExecError("Could not reboot instance: %s" % msg)
2841
    else:
2842
      result = self.rpc.call_instance_shutdown(node_current, instance)
2843
      msg = result.RemoteFailMsg()
2844
      if msg:
2845
        raise errors.OpExecError("Could not shutdown instance for"
2846
                                 " full reboot: %s" % msg)
2847
      _ShutdownInstanceDisks(self, instance)
2848
      _StartInstanceDisks(self, instance, ignore_secondaries)
2849
      result = self.rpc.call_instance_start(node_current, instance)
2850
      msg = result.RemoteFailMsg()
2851
      if msg:
2852
        _ShutdownInstanceDisks(self, instance)
2853
        raise errors.OpExecError("Could not start instance for"
2854
                                 " full reboot: %s" % msg)
2855

    
2856
    self.cfg.MarkInstanceUp(instance.name)
2857

    
2858

    
2859
class LUShutdownInstance(LogicalUnit):
2860
  """Shutdown an instance.
2861

2862
  """
2863
  HPATH = "instance-stop"
2864
  HTYPE = constants.HTYPE_INSTANCE
2865
  _OP_REQP = ["instance_name"]
2866
  REQ_BGL = False
2867

    
2868
  def ExpandNames(self):
2869
    self._ExpandAndLockInstance()
2870

    
2871
  def BuildHooksEnv(self):
2872
    """Build hooks env.
2873

2874
    This runs on master, primary and secondary nodes of the instance.
2875

2876
    """
2877
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2878
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2879
    return env, nl, nl
2880

    
2881
  def CheckPrereq(self):
2882
    """Check prerequisites.
2883

2884
    This checks that the instance is in the cluster.
2885

2886
    """
2887
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2888
    assert self.instance is not None, \
2889
      "Cannot retrieve locked instance %s" % self.op.instance_name
2890
    _CheckNodeOnline(self, self.instance.primary_node)
2891

    
2892
  def Exec(self, feedback_fn):
2893
    """Shutdown the instance.
2894

2895
    """
2896
    instance = self.instance
2897
    node_current = instance.primary_node
2898
    self.cfg.MarkInstanceDown(instance.name)
2899
    result = self.rpc.call_instance_shutdown(node_current, instance)
2900
    msg = result.RemoteFailMsg()
2901
    if msg:
2902
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2903

    
2904
    _ShutdownInstanceDisks(self, instance)
2905

    
2906

    
2907
class LUReinstallInstance(LogicalUnit):
2908
  """Reinstall an instance.
2909

2910
  """
2911
  HPATH = "instance-reinstall"
2912
  HTYPE = constants.HTYPE_INSTANCE
2913
  _OP_REQP = ["instance_name"]
2914
  REQ_BGL = False
2915

    
2916
  def ExpandNames(self):
2917
    self._ExpandAndLockInstance()
2918

    
2919
  def BuildHooksEnv(self):
2920
    """Build hooks env.
2921

2922
    This runs on master, primary and secondary nodes of the instance.
2923

2924
    """
2925
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2926
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2927
    return env, nl, nl
2928

    
2929
  def CheckPrereq(self):
2930
    """Check prerequisites.
2931

2932
    This checks that the instance is in the cluster and is not running.
2933

2934
    """
2935
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2936
    assert instance is not None, \
2937
      "Cannot retrieve locked instance %s" % self.op.instance_name
2938
    _CheckNodeOnline(self, instance.primary_node)
2939

    
2940
    if instance.disk_template == constants.DT_DISKLESS:
2941
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2942
                                 self.op.instance_name)
2943
    if instance.admin_up:
2944
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2945
                                 self.op.instance_name)
2946
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2947
                                              instance.name,
2948
                                              instance.hypervisor)
2949
    if remote_info.failed or remote_info.data:
2950
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2951
                                 (self.op.instance_name,
2952
                                  instance.primary_node))
2953

    
2954
    self.op.os_type = getattr(self.op, "os_type", None)
2955
    if self.op.os_type is not None:
2956
      # OS verification
2957
      pnode = self.cfg.GetNodeInfo(
2958
        self.cfg.ExpandNodeName(instance.primary_node))
2959
      if pnode is None:
2960
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2961
                                   self.op.pnode)
2962
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2963
      result.Raise()
2964
      if not isinstance(result.data, objects.OS):
2965
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2966
                                   " primary node"  % self.op.os_type)
2967

    
2968
    self.instance = instance
2969

    
2970
  def Exec(self, feedback_fn):
2971
    """Reinstall the instance.
2972

2973
    """
2974
    inst = self.instance
2975

    
2976
    if self.op.os_type is not None:
2977
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2978
      inst.os = self.op.os_type
2979
      self.cfg.Update(inst)
2980

    
2981
    _StartInstanceDisks(self, inst, None)
2982
    try:
2983
      feedback_fn("Running the instance OS create scripts...")
2984
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2985
      msg = result.RemoteFailMsg()
2986
      if msg:
2987
        raise errors.OpExecError("Could not install OS for instance %s"
2988
                                 " on node %s: %s" %
2989
                                 (inst.name, inst.primary_node, msg))
2990
    finally:
2991
      _ShutdownInstanceDisks(self, inst)
2992

    
2993

    
2994
class LURenameInstance(LogicalUnit):
2995
  """Rename an instance.
2996

2997
  """
2998
  HPATH = "instance-rename"
2999
  HTYPE = constants.HTYPE_INSTANCE
3000
  _OP_REQP = ["instance_name", "new_name"]
3001

    
3002
  def BuildHooksEnv(self):
3003
    """Build hooks env.
3004

3005
    This runs on master, primary and secondary nodes of the instance.
3006

3007
    """
3008
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3009
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3010
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3011
    return env, nl, nl
3012

    
3013
  def CheckPrereq(self):
3014
    """Check prerequisites.
3015

3016
    This checks that the instance is in the cluster and is not running.
3017

3018
    """
3019
    instance = self.cfg.GetInstanceInfo(
3020
      self.cfg.ExpandInstanceName(self.op.instance_name))
3021
    if instance is None:
3022
      raise errors.OpPrereqError("Instance '%s' not known" %
3023
                                 self.op.instance_name)
3024
    _CheckNodeOnline(self, instance.primary_node)
3025

    
3026
    if instance.admin_up:
3027
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3028
                                 self.op.instance_name)
3029
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3030
                                              instance.name,
3031
                                              instance.hypervisor)
3032
    remote_info.Raise()
3033
    if remote_info.data:
3034
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3035
                                 (self.op.instance_name,
3036
                                  instance.primary_node))
3037
    self.instance = instance
3038

    
3039
    # new name verification
3040
    name_info = utils.HostInfo(self.op.new_name)
3041

    
3042
    self.op.new_name = new_name = name_info.name
3043
    instance_list = self.cfg.GetInstanceList()
3044
    if new_name in instance_list:
3045
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3046
                                 new_name)
3047

    
3048
    if not getattr(self.op, "ignore_ip", False):
3049
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3050
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3051
                                   (name_info.ip, new_name))
3052

    
3053

    
3054
  def Exec(self, feedback_fn):
3055
    """Reinstall the instance.
3056

3057
    """
3058
    inst = self.instance
3059
    old_name = inst.name
3060

    
3061
    if inst.disk_template == constants.DT_FILE:
3062
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3063

    
3064
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3065
    # Change the instance lock. This is definitely safe while we hold the BGL
3066
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3067
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3068

    
3069
    # re-read the instance from the configuration after rename
3070
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3071

    
3072
    if inst.disk_template == constants.DT_FILE:
3073
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3074
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3075
                                                     old_file_storage_dir,
3076
                                                     new_file_storage_dir)
3077
      result.Raise()
3078
      if not result.data:
3079
        raise errors.OpExecError("Could not connect to node '%s' to rename"
3080
                                 " directory '%s' to '%s' (but the instance"
3081
                                 " has been renamed in Ganeti)" % (
3082
                                 inst.primary_node, old_file_storage_dir,
3083
                                 new_file_storage_dir))
3084

    
3085
      if not result.data[0]:
3086
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3087
                                 " (but the instance has been renamed in"
3088
                                 " Ganeti)" % (old_file_storage_dir,
3089
                                               new_file_storage_dir))
3090

    
3091
    _StartInstanceDisks(self, inst, None)
3092
    try:
3093
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3094
                                                 old_name)
3095
      msg = result.RemoteFailMsg()
3096
      if msg:
3097
        msg = ("Could not run OS rename script for instance %s on node %s"
3098
               " (but the instance has been renamed in Ganeti): %s" %
3099
               (inst.name, inst.primary_node, msg))
3100
        self.proc.LogWarning(msg)
3101
    finally:
3102
      _ShutdownInstanceDisks(self, inst)
3103

    
3104

    
3105
class LURemoveInstance(LogicalUnit):
3106
  """Remove an instance.
3107

3108
  """
3109
  HPATH = "instance-remove"
3110
  HTYPE = constants.HTYPE_INSTANCE
3111
  _OP_REQP = ["instance_name", "ignore_failures"]
3112
  REQ_BGL = False
3113

    
3114
  def ExpandNames(self):
3115
    self._ExpandAndLockInstance()
3116
    self.needed_locks[locking.LEVEL_NODE] = []
3117
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3118

    
3119
  def DeclareLocks(self, level):
3120
    if level == locking.LEVEL_NODE:
3121
      self._LockInstancesNodes()
3122

    
3123
  def BuildHooksEnv(self):
3124
    """Build hooks env.
3125

3126
    This runs on master, primary and secondary nodes of the instance.
3127

3128
    """
3129
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3130
    nl = [self.cfg.GetMasterNode()]
3131
    return env, nl, nl
3132

    
3133
  def CheckPrereq(self):
3134
    """Check prerequisites.
3135

3136
    This checks that the instance is in the cluster.
3137

3138
    """
3139
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3140
    assert self.instance is not None, \
3141
      "Cannot retrieve locked instance %s" % self.op.instance_name
3142

    
3143
  def Exec(self, feedback_fn):
3144
    """Remove the instance.
3145

3146
    """
3147
    instance = self.instance
3148
    logging.info("Shutting down instance %s on node %s",
3149
                 instance.name, instance.primary_node)
3150

    
3151
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3152
    msg = result.RemoteFailMsg()
3153
    if msg:
3154
      if self.op.ignore_failures:
3155
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3156
      else:
3157
        raise errors.OpExecError("Could not shutdown instance %s on"
3158
                                 " node %s: %s" %
3159
                                 (instance.name, instance.primary_node, msg))
3160

    
3161
    logging.info("Removing block devices for instance %s", instance.name)
3162

    
3163
    if not _RemoveDisks(self, instance):
3164
      if self.op.ignore_failures:
3165
        feedback_fn("Warning: can't remove instance's disks")
3166
      else:
3167
        raise errors.OpExecError("Can't remove instance's disks")
3168

    
3169
    logging.info("Removing instance %s out of cluster config", instance.name)
3170

    
3171
    self.cfg.RemoveInstance(instance.name)
3172
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3173

    
3174

    
3175
class LUQueryInstances(NoHooksLU):
3176
  """Logical unit for querying instances.
3177

3178
  """
3179
  _OP_REQP = ["output_fields", "names", "use_locking"]
3180
  REQ_BGL = False
3181
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3182
                                    "admin_state",
3183
                                    "disk_template", "ip", "mac", "bridge",
3184
                                    "sda_size", "sdb_size", "vcpus", "tags",
3185
                                    "network_port", "beparams",
3186
                                    r"(disk)\.(size)/([0-9]+)",
3187
                                    r"(disk)\.(sizes)", "disk_usage",
3188
                                    r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3189
                                    r"(nic)\.(macs|ips|bridges)",
3190
                                    r"(disk|nic)\.(count)",
3191
                                    "serial_no", "hypervisor", "hvparams",] +
3192
                                  ["hv/%s" % name
3193
                                   for name in constants.HVS_PARAMETERS] +
3194
                                  ["be/%s" % name
3195
                                   for name in constants.BES_PARAMETERS])
3196
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3197

    
3198

    
3199
  def ExpandNames(self):
3200
    _CheckOutputFields(static=self._FIELDS_STATIC,
3201
                       dynamic=self._FIELDS_DYNAMIC,
3202
                       selected=self.op.output_fields)
3203

    
3204
    self.needed_locks = {}
3205
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3206
    self.share_locks[locking.LEVEL_NODE] = 1
3207

    
3208
    if self.op.names:
3209
      self.wanted = _GetWantedInstances(self, self.op.names)
3210
    else:
3211
      self.wanted = locking.ALL_SET
3212

    
3213
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3214
    self.do_locking = self.do_node_query and self.op.use_locking
3215
    if self.do_locking:
3216
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3217
      self.needed_locks[locking.LEVEL_NODE] = []
3218
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3219

    
3220
  def DeclareLocks(self, level):
3221
    if level == locking.LEVEL_NODE and self.do_locking:
3222
      self._LockInstancesNodes()
3223

    
3224
  def CheckPrereq(self):
3225
    """Check prerequisites.
3226

3227
    """
3228
    pass
3229

    
3230
  def Exec(self, feedback_fn):
3231
    """Computes the list of nodes and their attributes.
3232

3233
    """
3234
    all_info = self.cfg.GetAllInstancesInfo()
3235
    if self.wanted == locking.ALL_SET:
3236
      # caller didn't specify instance names, so ordering is not important
3237
      if self.do_locking:
3238
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3239
      else:
3240
        instance_names = all_info.keys()
3241
      instance_names = utils.NiceSort(instance_names)
3242
    else:
3243
      # caller did specify names, so we must keep the ordering
3244
      if self.do_locking:
3245
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3246
      else:
3247
        tgt_set = all_info.keys()
3248
      missing = set(self.wanted).difference(tgt_set)
3249
      if missing:
3250
        raise errors.OpExecError("Some instances were removed before"
3251
                                 " retrieving their data: %s" % missing)
3252
      instance_names = self.wanted
3253

    
3254
    instance_list = [all_info[iname] for iname in instance_names]
3255

    
3256
    # begin data gathering
3257

    
3258
    nodes = frozenset([inst.primary_node for inst in instance_list])
3259
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3260

    
3261
    bad_nodes = []
3262
    off_nodes = []
3263
    if self.do_node_query:
3264
      live_data = {}
3265
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3266
      for name in nodes:
3267
        result = node_data[name]
3268
        if result.offline:
3269
          # offline nodes will be in both lists
3270
          off_nodes.append(name)
3271
        if result.failed:
3272
          bad_nodes.append(name)
3273
        else:
3274
          if result.data:
3275
            live_data.update(result.data)
3276
            # else no instance is alive
3277
    else:
3278
      live_data = dict([(name, {}) for name in instance_names])
3279

    
3280
    # end data gathering
3281

    
3282
    HVPREFIX = "hv/"
3283
    BEPREFIX = "be/"
3284
    output = []
3285
    for instance in instance_list:
3286
      iout = []
3287
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3288
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3289
      for field in self.op.output_fields:
3290
        st_match = self._FIELDS_STATIC.Matches(field)
3291
        if field == "name":
3292
          val = instance.name
3293
        elif field == "os":
3294
          val = instance.os
3295
        elif field == "pnode":
3296
          val = instance.primary_node
3297
        elif field == "snodes":
3298
          val = list(instance.secondary_nodes)
3299
        elif field == "admin_state":
3300
          val = instance.admin_up
3301
        elif field == "oper_state":
3302
          if instance.primary_node in bad_nodes:
3303
            val = None
3304
          else:
3305
            val = bool(live_data.get(instance.name))
3306
        elif field == "status":
3307
          if instance.primary_node in off_nodes:
3308
            val = "ERROR_nodeoffline"
3309
          elif instance.primary_node in bad_nodes:
3310
            val = "ERROR_nodedown"
3311
          else:
3312
            running = bool(live_data.get(instance.name))
3313
            if running:
3314
              if instance.admin_up:
3315
                val = "running"
3316
              else:
3317
                val = "ERROR_up"
3318
            else:
3319
              if instance.admin_up:
3320
                val = "ERROR_down"
3321
              else:
3322
                val = "ADMIN_down"
3323
        elif field == "oper_ram":
3324
          if instance.primary_node in bad_nodes:
3325
            val = None
3326
          elif instance.name in live_data:
3327
            val = live_data[instance.name].get("memory", "?")
3328
          else:
3329
            val = "-"
3330
        elif field == "disk_template":
3331
          val = instance.disk_template
3332
        elif field == "ip":
3333
          val = instance.nics[0].ip
3334
        elif field == "bridge":
3335
          val = instance.nics[0].bridge
3336
        elif field == "mac":
3337
          val = instance.nics[0].mac
3338
        elif field == "sda_size" or field == "sdb_size":
3339
          idx = ord(field[2]) - ord('a')
3340
          try:
3341
            val = instance.FindDisk(idx).size
3342
          except errors.OpPrereqError:
3343
            val = None
3344
        elif field == "disk_usage": # total disk usage per node
3345
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3346
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3347
        elif field == "tags":
3348
          val = list(instance.GetTags())
3349
        elif field == "serial_no":
3350
          val = instance.serial_no
3351
        elif field == "network_port":
3352
          val = instance.network_port
3353
        elif field == "hypervisor":
3354
          val = instance.hypervisor
3355
        elif field == "hvparams":
3356
          val = i_hv
3357
        elif (field.startswith(HVPREFIX) and
3358
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3359
          val = i_hv.get(field[len(HVPREFIX):], None)
3360
        elif field == "beparams":
3361
          val = i_be
3362
        elif (field.startswith(BEPREFIX) and
3363
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3364
          val = i_be.get(field[len(BEPREFIX):], None)
3365
        elif st_match and st_match.groups():
3366
          # matches a variable list
3367
          st_groups = st_match.groups()
3368
          if st_groups and st_groups[0] == "disk":
3369
            if st_groups[1] == "count":
3370
              val = len(instance.disks)
3371
            elif st_groups[1] == "sizes":
3372
              val = [disk.size for disk in instance.disks]
3373
            elif st_groups[1] == "size":
3374
              try:
3375
                val = instance.FindDisk(st_groups[2]).size
3376
              except errors.OpPrereqError:
3377
                val = None
3378
            else:
3379
              assert False, "Unhandled disk parameter"
3380
          elif st_groups[0] == "nic":
3381
            if st_groups[1] == "count":
3382
              val = len(instance.nics)
3383
            elif st_groups[1] == "macs":
3384
              val = [nic.mac for nic in instance.nics]
3385
            elif st_groups[1] == "ips":
3386
              val = [nic.ip for nic in instance.nics]
3387
            elif st_groups[1] == "bridges":
3388
              val = [nic.bridge for nic in instance.nics]
3389
            else:
3390
              # index-based item
3391
              nic_idx = int(st_groups[2])
3392
              if nic_idx >= len(instance.nics):
3393
                val = None
3394
              else:
3395
                if st_groups[1] == "mac":
3396
                  val = instance.nics[nic_idx].mac
3397
                elif st_groups[1] == "ip":
3398
                  val = instance.nics[nic_idx].ip
3399
                elif st_groups[1] == "bridge":
3400
                  val = instance.nics[nic_idx].bridge
3401
                else:
3402
                  assert False, "Unhandled NIC parameter"
3403
          else:
3404
            assert False, "Unhandled variable parameter"
3405
        else:
3406
          raise errors.ParameterError(field)
3407
        iout.append(val)
3408
      output.append(iout)
3409

    
3410
    return output
3411

    
3412

    
3413
class LUFailoverInstance(LogicalUnit):
3414
  """Failover an instance.
3415

3416
  """
3417
  HPATH = "instance-failover"
3418
  HTYPE = constants.HTYPE_INSTANCE
3419
  _OP_REQP = ["instance_name", "ignore_consistency"]
3420
  REQ_BGL = False
3421

    
3422
  def ExpandNames(self):
3423
    self._ExpandAndLockInstance()
3424
    self.needed_locks[locking.LEVEL_NODE] = []
3425
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3426

    
3427
  def DeclareLocks(self, level):
3428
    if level == locking.LEVEL_NODE:
3429
      self._LockInstancesNodes()
3430

    
3431
  def BuildHooksEnv(self):
3432
    """Build hooks env.
3433

3434
    This runs on master, primary and secondary nodes of the instance.
3435

3436
    """
3437
    env = {
3438
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3439
      }
3440
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3441
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3442
    return env, nl, nl
3443

    
3444
  def CheckPrereq(self):
3445
    """Check prerequisites.
3446

3447
    This checks that the instance is in the cluster.
3448

3449
    """
3450
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3451
    assert self.instance is not None, \
3452
      "Cannot retrieve locked instance %s" % self.op.instance_name
3453

    
3454
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3455
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3456
      raise errors.OpPrereqError("Instance's disk layout is not"
3457
                                 " network mirrored, cannot failover.")
3458

    
3459
    secondary_nodes = instance.secondary_nodes
3460
    if not secondary_nodes:
3461
      raise errors.ProgrammerError("no secondary node but using "
3462
                                   "a mirrored disk template")
3463

    
3464
    target_node = secondary_nodes[0]
3465
    _CheckNodeOnline(self, target_node)
3466
    _CheckNodeNotDrained(self, target_node)
3467
    # check memory requirements on the secondary node
3468
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3469
                         instance.name, bep[constants.BE_MEMORY],
3470
                         instance.hypervisor)
3471

    
3472
    # check bridge existance
3473
    brlist = [nic.bridge for nic in instance.nics]
3474
    result = self.rpc.call_bridges_exist(target_node, brlist)
3475
    result.Raise()
3476
    if not result.data:
3477
      raise errors.OpPrereqError("One or more target bridges %s does not"
3478
                                 " exist on destination node '%s'" %
3479
                                 (brlist, target_node))
3480

    
3481
  def Exec(self, feedback_fn):
3482
    """Failover an instance.
3483

3484
    The failover is done by shutting it down on its present node and
3485
    starting it on the secondary.
3486

3487
    """
3488
    instance = self.instance
3489

    
3490
    source_node = instance.primary_node
3491
    target_node = instance.secondary_nodes[0]
3492

    
3493
    feedback_fn("* checking disk consistency between source and target")
3494
    for dev in instance.disks:
3495
      # for drbd, these are drbd over lvm
3496
      if not _CheckDiskConsistency(self, dev, target_node, False):
3497
        if instance.admin_up and not self.op.ignore_consistency:
3498
          raise errors.OpExecError("Disk %s is degraded on target node,"
3499
                                   " aborting failover." % dev.iv_name)
3500

    
3501
    feedback_fn("* shutting down instance on source node")
3502
    logging.info("Shutting down instance %s on node %s",
3503
                 instance.name, source_node)
3504

    
3505
    result = self.rpc.call_instance_shutdown(source_node, instance)
3506
    msg = result.RemoteFailMsg()
3507
    if msg:
3508
      if self.op.ignore_consistency:
3509
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3510
                             " Proceeding anyway. Please make sure node"
3511
                             " %s is down. Error details: %s",
3512
                             instance.name, source_node, source_node, msg)
3513
      else:
3514
        raise errors.OpExecError("Could not shutdown instance %s on"
3515
                                 " node %s: %s" %
3516
                                 (instance.name, source_node, msg))
3517

    
3518
    feedback_fn("* deactivating the instance's disks on source node")
3519
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3520
      raise errors.OpExecError("Can't shut down the instance's disks.")
3521

    
3522
    instance.primary_node = target_node
3523
    # distribute new instance config to the other nodes
3524
    self.cfg.Update(instance)
3525

    
3526
    # Only start the instance if it's marked as up
3527
    if instance.admin_up:
3528
      feedback_fn("* activating the instance's disks on target node")
3529
      logging.info("Starting instance %s on node %s",
3530
                   instance.name, target_node)
3531

    
3532
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3533
                                               ignore_secondaries=True)
3534
      if not disks_ok:
3535
        _ShutdownInstanceDisks(self, instance)
3536
        raise errors.OpExecError("Can't activate the instance's disks")
3537

    
3538
      feedback_fn("* starting the instance on the target node")
3539
      result = self.rpc.call_instance_start(target_node, instance)
3540
      msg = result.RemoteFailMsg()
3541
      if msg:
3542
        _ShutdownInstanceDisks(self, instance)
3543
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3544
                                 (instance.name, target_node, msg))
3545

    
3546

    
3547
class LUMigrateInstance(LogicalUnit):
3548
  """Migrate an instance.
3549

3550
  This is migration without shutting down, compared to the failover,
3551
  which is done with shutdown.
3552

3553
  """
3554
  HPATH = "instance-migrate"
3555
  HTYPE = constants.HTYPE_INSTANCE
3556
  _OP_REQP = ["instance_name", "live", "cleanup"]
3557

    
3558
  REQ_BGL = False
3559

    
3560
  def ExpandNames(self):
3561
    self._ExpandAndLockInstance()
3562
    self.needed_locks[locking.LEVEL_NODE] = []
3563
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3564

    
3565
  def DeclareLocks(self, level):
3566
    if level == locking.LEVEL_NODE:
3567
      self._LockInstancesNodes()
3568

    
3569
  def BuildHooksEnv(self):
3570
    """Build hooks env.
3571

3572
    This runs on master, primary and secondary nodes of the instance.
3573

3574
    """
3575
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3576
    env["MIGRATE_LIVE"] = self.op.live
3577
    env["MIGRATE_CLEANUP"] = self.op.cleanup
3578
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3579
    return env, nl, nl
3580

    
3581
  def CheckPrereq(self):
3582
    """Check prerequisites.
3583

3584
    This checks that the instance is in the cluster.
3585

3586
    """
3587
    instance = self.cfg.GetInstanceInfo(
3588
      self.cfg.ExpandInstanceName(self.op.instance_name))
3589
    if instance is None:
3590
      raise errors.OpPrereqError("Instance '%s' not known" %
3591
                                 self.op.instance_name)
3592

    
3593
    if instance.disk_template != constants.DT_DRBD8:
3594
      raise errors.OpPrereqError("Instance's disk layout is not"
3595
                                 " drbd8, cannot migrate.")
3596

    
3597
    secondary_nodes = instance.secondary_nodes
3598
    if not secondary_nodes:
3599
      raise errors.ConfigurationError("No secondary node but using"
3600
                                      " drbd8 disk template")
3601

    
3602
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3603

    
3604
    target_node = secondary_nodes[0]
3605
    # check memory requirements on the secondary node
3606
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3607
                         instance.name, i_be[constants.BE_MEMORY],
3608
                         instance.hypervisor)
3609

    
3610
    # check bridge existance
3611
    brlist = [nic.bridge for nic in instance.nics]
3612
    result = self.rpc.call_bridges_exist(target_node, brlist)
3613
    if result.failed or not result.data:
3614
      raise errors.OpPrereqError("One or more target bridges %s does not"
3615
                                 " exist on destination node '%s'" %
3616
                                 (brlist, target_node))
3617

    
3618
    if not self.op.cleanup:
3619
      _CheckNodeNotDrained(self, target_node)
3620
      result = self.rpc.call_instance_migratable(instance.primary_node,
3621
                                                 instance)
3622
      msg = result.RemoteFailMsg()
3623
      if msg:
3624
        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3625
                                   msg)
3626

    
3627
    self.instance = instance
3628

    
3629
  def _WaitUntilSync(self):
3630
    """Poll with custom rpc for disk sync.
3631

3632
    This uses our own step-based rpc call.
3633

3634
    """
3635
    self.feedback_fn("* wait until resync is done")
3636
    all_done = False
3637
    while not all_done:
3638
      all_done = True
3639
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3640
                                            self.nodes_ip,
3641
                                            self.instance.disks)
3642
      min_percent = 100
3643
      for node, nres in result.items():
3644
        msg = nres.RemoteFailMsg()
3645
        if msg:
3646
          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3647
                                   (node, msg))
3648
        node_done, node_percent = nres.payload
3649
        all_done = all_done and node_done
3650
        if node_percent is not None:
3651
          min_percent = min(min_percent, node_percent)
3652
      if not all_done:
3653
        if min_percent < 100:
3654
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3655
        time.sleep(2)
3656

    
3657
  def _EnsureSecondary(self, node):
3658
    """Demote a node to secondary.
3659

3660
    """
3661
    self.feedback_fn("* switching node %s to secondary mode" % node)
3662

    
3663
    for dev in self.instance.disks:
3664
      self.cfg.SetDiskID(dev, node)
3665

    
3666
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3667
                                          self.instance.disks)
3668
    msg = result.RemoteFailMsg()
3669
    if msg:
3670
      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3671
                               " error %s" % (node, msg))
3672

    
3673
  def _GoStandalone(self):
3674
    """Disconnect from the network.
3675

3676
    """
3677
    self.feedback_fn("* changing into standalone mode")
3678
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3679
                                               self.instance.disks)
3680
    for node, nres in result.items():
3681
      msg = nres.RemoteFailMsg()
3682
      if msg:
3683
        raise errors.OpExecError("Cannot disconnect disks node %s,"
3684
                                 " error %s" % (node, msg))
3685

    
3686
  def _GoReconnect(self, multimaster):
3687
    """Reconnect to the network.
3688

3689
    """
3690
    if multimaster:
3691
      msg = "dual-master"
3692
    else:
3693
      msg = "single-master"
3694
    self.feedback_fn("* changing disks into %s mode" % msg)
3695
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3696
                                           self.instance.disks,
3697
                                           self.instance.name, multimaster)
3698
    for node, nres in result.items():
3699
      msg = nres.RemoteFailMsg()
3700
      if msg:
3701
        raise errors.OpExecError("Cannot change disks config on node %s,"
3702
                                 " error: %s" % (node, msg))
3703

    
3704
  def _ExecCleanup(self):
3705
    """Try to cleanup after a failed migration.
3706

3707
    The cleanup is done by:
3708
      - check that the instance is running only on one node
3709
        (and update the config if needed)
3710
      - change disks on its secondary node to secondary
3711
      - wait until disks are fully synchronized
3712
      - disconnect from the network
3713
      - change disks into single-master mode
3714
      - wait again until disks are fully synchronized
3715

3716
    """
3717
    instance = self.instance
3718
    target_node = self.target_node
3719
    source_node = self.source_node
3720

    
3721
    # check running on only one node
3722
    self.feedback_fn("* checking where the instance actually runs"
3723
                     " (if this hangs, the hypervisor might be in"
3724
                     " a bad state)")
3725
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3726
    for node, result in ins_l.items():
3727
      result.Raise()
3728
      if not isinstance(result.data, list):
3729
        raise errors.OpExecError("Can't contact node '%s'" % node)
3730

    
3731
    runningon_source = instance.name in ins_l[source_node].data
3732
    runningon_target = instance.name in ins_l[target_node].data
3733

    
3734
    if runningon_source and runningon_target:
3735
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3736
                               " or the hypervisor is confused. You will have"
3737
                               " to ensure manually that it runs only on one"
3738
                               " and restart this operation.")
3739

    
3740
    if not (runningon_source or runningon_target):
3741
      raise errors.OpExecError("Instance does not seem to be running at all."
3742
                               " In this case, it's safer to repair by"
3743
                               " running 'gnt-instance stop' to ensure disk"
3744
                               " shutdown, and then restarting it.")
3745

    
3746
    if runningon_target:
3747
      # the migration has actually succeeded, we need to update the config
3748
      self.feedback_fn("* instance running on secondary node (%s),"
3749
                       " updating config" % target_node)
3750
      instance.primary_node = target_node
3751
      self.cfg.Update(instance)
3752
      demoted_node = source_node
3753
    else:
3754
      self.feedback_fn("* instance confirmed to be running on its"
3755
                       " primary node (%s)" % source_node)
3756
      demoted_node = target_node
3757

    
3758
    self._EnsureSecondary(demoted_node)
3759
    try:
3760
      self._WaitUntilSync()
3761
    except errors.OpExecError:
3762
      # we ignore here errors, since if the device is standalone, it
3763
      # won't be able to sync
3764
      pass
3765
    self._GoStandalone()
3766
    self._GoReconnect(False)
3767
    self._WaitUntilSync()
3768

    
3769
    self.feedback_fn("* done")
3770

    
3771
  def _RevertDiskStatus(self):
3772
    """Try to revert the disk status after a failed migration.
3773

3774
    """
3775
    target_node = self.target_node
3776
    try:
3777
      self._EnsureSecondary(target_node)
3778
      self._GoStandalone()
3779
      self._GoReconnect(False)
3780
      self._WaitUntilSync()
3781
    except errors.OpExecError, err:
3782
      self.LogWarning("Migration failed and I can't reconnect the"
3783
                      " drives: error '%s'\n"
3784
                      "Please look and recover the instance status" %
3785
                      str(err))
3786

    
3787
  def _AbortMigration(self):
3788
    """Call the hypervisor code to abort a started migration.
3789

3790
    """
3791
    instance = self.instance
3792
    target_node = self.target_node
3793
    migration_info = self.migration_info
3794

    
3795
    abort_result = self.rpc.call_finalize_migration(target_node,
3796
                                                    instance,
3797
                                                    migration_info,
3798
                                                    False)
3799
    abort_msg = abort_result.RemoteFailMsg()
3800
    if abort_msg:
3801
      logging.error("Aborting migration failed on target node %s: %s" %
3802
                    (target_node, abort_msg))
3803
      # Don't raise an exception here, as we stil have to try to revert the
3804
      # disk status, even if this step failed.
3805

    
3806
  def _ExecMigration(self):
3807
    """Migrate an instance.
3808

3809
    The migrate is done by:
3810
      - change the disks into dual-master mode
3811
      - wait until disks are fully synchronized again
3812
      - migrate the instance
3813
      - change disks on the new secondary node (the old primary) to secondary
3814
      - wait until disks are fully synchronized
3815
      - change disks into single-master mode
3816

3817
    """
3818
    instance = self.instance
3819
    target_node = self.target_node
3820
    source_node = self.source_node
3821

    
3822
    self.feedback_fn("* checking disk consistency between source and target")
3823
    for dev in instance.disks:
3824
      if not _CheckDiskConsistency(self, dev, target_node, False):
3825
        raise errors.OpExecError("Disk %s is degraded or not fully"
3826
                                 " synchronized on target node,"
3827
                                 " aborting migrate." % dev.iv_name)
3828

    
3829
    # First get the migration information from the remote node
3830
    result = self.rpc.call_migration_info(source_node, instance)
3831
    msg = result.RemoteFailMsg()
3832
    if msg:
3833
      log_err = ("Failed fetching source migration information from %s: %s" %
3834
                 (source_node, msg))
3835
      logging.error(log_err)
3836
      raise errors.OpExecError(log_err)
3837

    
3838
    self.migration_info = migration_info = result.payload
3839

    
3840
    # Then switch the disks to master/master mode
3841
    self._EnsureSecondary(target_node)
3842
    self._GoStandalone()
3843
    self._GoReconnect(True)
3844
    self._WaitUntilSync()
3845

    
3846
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
3847
    result = self.rpc.call_accept_instance(target_node,
3848
                                           instance,
3849
                                           migration_info,
3850
                                           self.nodes_ip[target_node])
3851

    
3852
    msg = result.RemoteFailMsg()
3853
    if msg:
3854
      logging.error("Instance pre-migration failed, trying to revert"
3855
                    " disk status: %s", msg)
3856
      self._AbortMigration()
3857
      self._RevertDiskStatus()
3858
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3859
                               (instance.name, msg))
3860

    
3861
    self.feedback_fn("* migrating instance to %s" % target_node)
3862
    time.sleep(10)
3863
    result = self.rpc.call_instance_migrate(source_node, instance,
3864
                                            self.nodes_ip[target_node],
3865
                                            self.op.live)
3866
    msg = result.RemoteFailMsg()
3867
    if msg:
3868
      logging.error("Instance migration failed, trying to revert"
3869
                    " disk status: %s", msg)
3870
      self._AbortMigration()
3871
      self._RevertDiskStatus()
3872
      raise errors.OpExecError("Could not migrate instance %s: %s" %
3873
                               (instance.name, msg))
3874
    time.sleep(10)
3875

    
3876
    instance.primary_node = target_node
3877
    # distribute new instance config to the other nodes
3878
    self.cfg.Update(instance)
3879

    
3880
    result = self.rpc.call_finalize_migration(target_node,
3881
                                              instance,
3882
                                              migration_info,
3883
                                              True)
3884
    msg = result.RemoteFailMsg()
3885
    if msg:
3886
      logging.error("Instance migration succeeded, but finalization failed:"
3887
                    " %s" % msg)
3888
      raise errors.OpExecError("Could not finalize instance migration: %s" %
3889
                               msg)
3890

    
3891
    self._EnsureSecondary(source_node)
3892
    self._WaitUntilSync()
3893
    self._GoStandalone()
3894
    self._GoReconnect(False)
3895
    self._WaitUntilSync()
3896

    
3897
    self.feedback_fn("* done")
3898

    
3899
  def Exec(self, feedback_fn):
3900
    """Perform the migration.
3901

3902
    """
3903
    self.feedback_fn = feedback_fn
3904

    
3905
    self.source_node = self.instance.primary_node
3906
    self.target_node = self.instance.secondary_nodes[0]
3907
    self.all_nodes = [self.source_node, self.target_node]
3908
    self.nodes_ip = {
3909
      self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3910
      self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3911
      }
3912
    if self.op.cleanup:
3913
      return self._ExecCleanup()
3914
    else:
3915
      return self._ExecMigration()
3916

    
3917

    
3918
def _CreateBlockDev(lu, node, instance, device, force_create,
3919
                    info, force_open):
3920
  """Create a tree of block devices on a given node.
3921

3922
  If this device type has to be created on secondaries, create it and
3923
  all its children.
3924

3925
  If not, just recurse to children keeping the same 'force' value.
3926

3927
  @param lu: the lu on whose behalf we execute
3928
  @param node: the node on which to create the device
3929
  @type instance: L{objects.Instance}
3930
  @param instance: the instance which owns the device
3931
  @type device: L{objects.Disk}
3932
  @param device: the device to create
3933
  @type force_create: boolean
3934
  @param force_create: whether to force creation of this device; this
3935
      will be change to True whenever we find a device which has
3936
      CreateOnSecondary() attribute
3937
  @param info: the extra 'metadata' we should attach to the device
3938
      (this will be represented as a LVM tag)
3939
  @type force_open: boolean
3940
  @param force_open: this parameter will be passes to the
3941
      L{backend.BlockdevCreate} function where it specifies
3942
      whether we run on primary or not, and it affects both
3943
      the child assembly and the device own Open() execution
3944

3945
  """
3946
  if device.CreateOnSecondary():
3947
    force_create = True
3948

    
3949
  if device.children:
3950
    for child in device.children:
3951
      _CreateBlockDev(lu, node, instance, child, force_create,
3952
                      info, force_open)
3953

    
3954
  if not force_create:
3955
    return
3956

    
3957
  _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3958

    
3959

    
3960
def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3961
  """Create a single block device on a given node.
3962

3963
  This will not recurse over children of the device, so they must be
3964
  created in advance.
3965

3966
  @param lu: the lu on whose behalf we execute
3967
  @param node: the node on which to create the device
3968
  @type instance: L{objects.Instance}
3969
  @param instance: the instance which owns the device
3970
  @type device: L{objects.Disk}
3971
  @param device: the device to create
3972
  @param info: the extra 'metadata' we should attach to the device
3973
      (this will be represented as a LVM tag)
3974
  @type force_open: boolean
3975
  @param force_open: this parameter will be passes to the
3976
      L{backend.BlockdevCreate} function where it specifies
3977
      whether we run on primary or not, and it affects both
3978
      the child assembly and the device own Open() execution
3979

3980
  """
3981
  lu.cfg.SetDiskID(device, node)
3982
  result = lu.rpc.call_blockdev_create(node, device, device.size,
3983
                                       instance.name, force_open, info)
3984
  msg = result.RemoteFailMsg()
3985
  if msg:
3986
    raise errors.OpExecError("Can't create block device %s on"
3987
                             " node %s for instance %s: %s" %
3988
                             (device, node, instance.name, msg))
3989
  if device.physical_id is None:
3990
    device.physical_id = result.payload
3991

    
3992

    
3993
def _GenerateUniqueNames(lu, exts):
3994
  """Generate a suitable LV name.
3995

3996
  This will generate a logical volume name for the given instance.
3997

3998
  """
3999
  results = []
4000
  for val in exts:
4001
    new_id = lu.cfg.GenerateUniqueID()
4002
    results.append("%s%s" % (new_id, val))
4003
  return results
4004

    
4005

    
4006
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4007
                         p_minor, s_minor):
4008
  """