Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 35e994e9

History | View | Annotate | Download (240.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35
import random
36

    
37
from ganeti import ssh
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46
from ganeti import ssconf
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99
    self.CheckArguments()
100

    
101
  def __GetSSH(self):
102
    """Returns the SshRunner object
103

104
    """
105
    if not self.__ssh:
106
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107
    return self.__ssh
108

    
109
  ssh = property(fget=__GetSSH)
110

    
111
  def CheckArguments(self):
112
    """Check syntactic validity for the opcode arguments.
113

114
    This method is for doing a simple syntactic check and ensure
115
    validity of opcode parameters, without any cluster-related
116
    checks. While the same can be accomplished in ExpandNames and/or
117
    CheckPrereq, doing these separate is better because:
118

119
      - ExpandNames is left as as purely a lock-related function
120
      - CheckPrereq is run after we have aquired locks (and possible
121
        waited for them)
122

123
    The function is allowed to change the self.op attribute so that
124
    later methods can no longer worry about missing parameters.
125

126
    """
127
    pass
128

    
129
  def ExpandNames(self):
130
    """Expand names for this LU.
131

132
    This method is called before starting to execute the opcode, and it should
133
    update all the parameters of the opcode to their canonical form (e.g. a
134
    short node name must be fully expanded after this method has successfully
135
    completed). This way locking, hooks, logging, ecc. can work correctly.
136

137
    LUs which implement this method must also populate the self.needed_locks
138
    member, as a dict with lock levels as keys, and a list of needed lock names
139
    as values. Rules:
140

141
      - use an empty dict if you don't need any lock
142
      - if you don't need any lock at a particular level omit that level
143
      - don't put anything for the BGL level
144
      - if you want all locks at a level use locking.ALL_SET as a value
145

146
    If you need to share locks (rather than acquire them exclusively) at one
147
    level you can modify self.share_locks, setting a true value (usually 1) for
148
    that level. By default locks are not shared.
149

150
    Examples::
151

152
      # Acquire all nodes and one instance
153
      self.needed_locks = {
154
        locking.LEVEL_NODE: locking.ALL_SET,
155
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156
      }
157
      # Acquire just two nodes
158
      self.needed_locks = {
159
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160
      }
161
      # Acquire no locks
162
      self.needed_locks = {} # No, you can't leave it to the default value None
163

164
    """
165
    # The implementation of this method is mandatory only if the new LU is
166
    # concurrent, so that old LUs don't need to be changed all at the same
167
    # time.
168
    if self.REQ_BGL:
169
      self.needed_locks = {} # Exclusive LUs don't need locks.
170
    else:
171
      raise NotImplementedError
172

    
173
  def DeclareLocks(self, level):
174
    """Declare LU locking needs for a level
175

176
    While most LUs can just declare their locking needs at ExpandNames time,
177
    sometimes there's the need to calculate some locks after having acquired
178
    the ones before. This function is called just before acquiring locks at a
179
    particular level, but after acquiring the ones at lower levels, and permits
180
    such calculations. It can be used to modify self.needed_locks, and by
181
    default it does nothing.
182

183
    This function is only called if you have something already set in
184
    self.needed_locks for the level.
185

186
    @param level: Locking level which is going to be locked
187
    @type level: member of ganeti.locking.LEVELS
188

189
    """
190

    
191
  def CheckPrereq(self):
192
    """Check prerequisites for this LU.
193

194
    This method should check that the prerequisites for the execution
195
    of this LU are fulfilled. It can do internode communication, but
196
    it should be idempotent - no cluster or system changes are
197
    allowed.
198

199
    The method should raise errors.OpPrereqError in case something is
200
    not fulfilled. Its return value is ignored.
201

202
    This method should also update all the parameters of the opcode to
203
    their canonical form if it hasn't been done by ExpandNames before.
204

205
    """
206
    raise NotImplementedError
207

    
208
  def Exec(self, feedback_fn):
209
    """Execute the LU.
210

211
    This method should implement the actual work. It should raise
212
    errors.OpExecError for failures that are somewhat dealt with in
213
    code, or expected.
214

215
    """
216
    raise NotImplementedError
217

    
218
  def BuildHooksEnv(self):
219
    """Build hooks environment for this LU.
220

221
    This method should return a three-node tuple consisting of: a dict
222
    containing the environment that will be used for running the
223
    specific hook for this LU, a list of node names on which the hook
224
    should run before the execution, and a list of node names on which
225
    the hook should run after the execution.
226

227
    The keys of the dict must not have 'GANETI_' prefixed as this will
228
    be handled in the hooks runner. Also note additional keys will be
229
    added by the hooks runner. If the LU doesn't define any
230
    environment, an empty dict (and not None) should be returned.
231

232
    No nodes should be returned as an empty list (and not None).
233

234
    Note that if the HPATH for a LU class is None, this function will
235
    not be called.
236

237
    """
238
    raise NotImplementedError
239

    
240
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241
    """Notify the LU about the results of its hooks.
242

243
    This method is called every time a hooks phase is executed, and notifies
244
    the Logical Unit about the hooks' result. The LU can then use it to alter
245
    its result based on the hooks.  By default the method does nothing and the
246
    previous result is passed back unchanged but any LU can define it if it
247
    wants to use the local cluster hook-scripts somehow.
248

249
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
250
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251
    @param hook_results: the results of the multi-node hooks rpc call
252
    @param feedback_fn: function used send feedback back to the caller
253
    @param lu_result: the previous Exec result this LU had, or None
254
        in the PRE phase
255
    @return: the new Exec result, based on the previous result
256
        and hook results
257

258
    """
259
    return lu_result
260

    
261
  def _ExpandAndLockInstance(self):
262
    """Helper function to expand and lock an instance.
263

264
    Many LUs that work on an instance take its name in self.op.instance_name
265
    and need to expand it and then declare the expanded name for locking. This
266
    function does it, and then updates self.op.instance_name to the expanded
267
    name. It also initializes needed_locks as a dict, if this hasn't been done
268
    before.
269

270
    """
271
    if self.needed_locks is None:
272
      self.needed_locks = {}
273
    else:
274
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275
        "_ExpandAndLockInstance called with instance-level locks set"
276
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277
    if expanded_name is None:
278
      raise errors.OpPrereqError("Instance '%s' not known" %
279
                                  self.op.instance_name)
280
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281
    self.op.instance_name = expanded_name
282

    
283
  def _LockInstancesNodes(self, primary_only=False):
284
    """Helper function to declare instances' nodes for locking.
285

286
    This function should be called after locking one or more instances to lock
287
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288
    with all primary or secondary nodes for instances already locked and
289
    present in self.needed_locks[locking.LEVEL_INSTANCE].
290

291
    It should be called from DeclareLocks, and for safety only works if
292
    self.recalculate_locks[locking.LEVEL_NODE] is set.
293

294
    In the future it may grow parameters to just lock some instance's nodes, or
295
    to just lock primaries or secondary nodes, if needed.
296

297
    If should be called in DeclareLocks in a way similar to::
298

299
      if level == locking.LEVEL_NODE:
300
        self._LockInstancesNodes()
301

302
    @type primary_only: boolean
303
    @param primary_only: only lock primary nodes of locked instances
304

305
    """
306
    assert locking.LEVEL_NODE in self.recalculate_locks, \
307
      "_LockInstancesNodes helper function called with no nodes to recalculate"
308

    
309
    # TODO: check if we're really been called with the instance locks held
310

    
311
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312
    # future we might want to have different behaviors depending on the value
313
    # of self.recalculate_locks[locking.LEVEL_NODE]
314
    wanted_nodes = []
315
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316
      instance = self.context.cfg.GetInstanceInfo(instance_name)
317
      wanted_nodes.append(instance.primary_node)
318
      if not primary_only:
319
        wanted_nodes.extend(instance.secondary_nodes)
320

    
321
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325

    
326
    del self.recalculate_locks[locking.LEVEL_NODE]
327

    
328

    
329
class NoHooksLU(LogicalUnit):
330
  """Simple LU which runs no hooks.
331

332
  This LU is intended as a parent for other LogicalUnits which will
333
  run no hooks, in order to reduce duplicate code.
334

335
  """
336
  HPATH = None
337
  HTYPE = None
338

    
339

    
340
def _GetWantedNodes(lu, nodes):
341
  """Returns list of checked and expanded node names.
342

343
  @type lu: L{LogicalUnit}
344
  @param lu: the logical unit on whose behalf we execute
345
  @type nodes: list
346
  @param nodes: list of node names or None for all nodes
347
  @rtype: list
348
  @return: the list of nodes, sorted
349
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350

351
  """
352
  if not isinstance(nodes, list):
353
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
354

    
355
  if not nodes:
356
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357
      " non-empty list of nodes whose name is to be expanded.")
358

    
359
  wanted = []
360
  for name in nodes:
361
    node = lu.cfg.ExpandNodeName(name)
362
    if node is None:
363
      raise errors.OpPrereqError("No such node name '%s'" % name)
364
    wanted.append(node)
365

    
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _GetWantedInstances(lu, instances):
370
  """Returns list of checked and expanded instance names.
371

372
  @type lu: L{LogicalUnit}
373
  @param lu: the logical unit on whose behalf we execute
374
  @type instances: list
375
  @param instances: list of instance names or None for all instances
376
  @rtype: list
377
  @return: the list of instances, sorted
378
  @raise errors.OpPrereqError: if the instances parameter is wrong type
379
  @raise errors.OpPrereqError: if any of the passed instances is not found
380

381
  """
382
  if not isinstance(instances, list):
383
    raise errors.OpPrereqError("Invalid argument type 'instances'")
384

    
385
  if instances:
386
    wanted = []
387

    
388
    for name in instances:
389
      instance = lu.cfg.ExpandInstanceName(name)
390
      if instance is None:
391
        raise errors.OpPrereqError("No such instance name '%s'" % name)
392
      wanted.append(instance)
393

    
394
  else:
395
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
396
  return wanted
397

    
398

    
399
def _CheckOutputFields(static, dynamic, selected):
400
  """Checks whether all selected fields are valid.
401

402
  @type static: L{utils.FieldSet}
403
  @param static: static fields set
404
  @type dynamic: L{utils.FieldSet}
405
  @param dynamic: dynamic fields set
406

407
  """
408
  f = utils.FieldSet()
409
  f.Extend(static)
410
  f.Extend(dynamic)
411

    
412
  delta = f.NonMatching(selected)
413
  if delta:
414
    raise errors.OpPrereqError("Unknown output fields selected: %s"
415
                               % ",".join(delta))
416

    
417

    
418
def _CheckBooleanOpField(op, name):
419
  """Validates boolean opcode parameters.
420

421
  This will ensure that an opcode parameter is either a boolean value,
422
  or None (but that it always exists).
423

424
  """
425
  val = getattr(op, name, None)
426
  if not (val is None or isinstance(val, bool)):
427
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428
                               (name, str(val)))
429
  setattr(op, name, val)
430

    
431

    
432
def _CheckNodeOnline(lu, node):
433
  """Ensure that a given node is online.
434

435
  @param lu: the LU on behalf of which we make the check
436
  @param node: the node to check
437
  @raise errors.OpPrereqError: if the node is offline
438

439
  """
440
  if lu.cfg.GetNodeInfo(node).offline:
441
    raise errors.OpPrereqError("Can't use offline node %s" % node)
442

    
443

    
444
def _CheckNodeNotDrained(lu, node):
445
  """Ensure that a given node is not drained.
446

447
  @param lu: the LU on behalf of which we make the check
448
  @param node: the node to check
449
  @raise errors.OpPrereqError: if the node is drained
450

451
  """
452
  if lu.cfg.GetNodeInfo(node).drained:
453
    raise errors.OpPrereqError("Can't use drained node %s" % node)
454

    
455

    
456
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
457
                          memory, vcpus, nics, disk_template, disks):
458
  """Builds instance related env variables for hooks
459

460
  This builds the hook environment from individual variables.
461

462
  @type name: string
463
  @param name: the name of the instance
464
  @type primary_node: string
465
  @param primary_node: the name of the instance's primary node
466
  @type secondary_nodes: list
467
  @param secondary_nodes: list of secondary nodes as strings
468
  @type os_type: string
469
  @param os_type: the name of the instance's OS
470
  @type status: boolean
471
  @param status: the should_run status of the instance
472
  @type memory: string
473
  @param memory: the memory size of the instance
474
  @type vcpus: string
475
  @param vcpus: the count of VCPUs the instance has
476
  @type nics: list
477
  @param nics: list of tuples (ip, bridge, mac) representing
478
      the NICs the instance  has
479
  @type disk_template: string
480
  @param disk_template: the distk template of the instance
481
  @type disks: list
482
  @param disks: the list of (size, mode) pairs
483
  @rtype: dict
484
  @return: the hook environment for this instance
485

486
  """
487
  if status:
488
    str_status = "up"
489
  else:
490
    str_status = "down"
491
  env = {
492
    "OP_TARGET": name,
493
    "INSTANCE_NAME": name,
494
    "INSTANCE_PRIMARY": primary_node,
495
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
496
    "INSTANCE_OS_TYPE": os_type,
497
    "INSTANCE_STATUS": str_status,
498
    "INSTANCE_MEMORY": memory,
499
    "INSTANCE_VCPUS": vcpus,
500
    "INSTANCE_DISK_TEMPLATE": disk_template,
501
  }
502

    
503
  if nics:
504
    nic_count = len(nics)
505
    for idx, (ip, bridge, mac) in enumerate(nics):
506
      if ip is None:
507
        ip = ""
508
      env["INSTANCE_NIC%d_IP" % idx] = ip
509
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
510
      env["INSTANCE_NIC%d_MAC" % idx] = mac
511
  else:
512
    nic_count = 0
513

    
514
  env["INSTANCE_NIC_COUNT"] = nic_count
515

    
516
  if disks:
517
    disk_count = len(disks)
518
    for idx, (size, mode) in enumerate(disks):
519
      env["INSTANCE_DISK%d_SIZE" % idx] = size
520
      env["INSTANCE_DISK%d_MODE" % idx] = mode
521
  else:
522
    disk_count = 0
523

    
524
  env["INSTANCE_DISK_COUNT"] = disk_count
525

    
526
  return env
527

    
528

    
529
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
530
  """Builds instance related env variables for hooks from an object.
531

532
  @type lu: L{LogicalUnit}
533
  @param lu: the logical unit on whose behalf we execute
534
  @type instance: L{objects.Instance}
535
  @param instance: the instance for which we should build the
536
      environment
537
  @type override: dict
538
  @param override: dictionary with key/values that will override
539
      our values
540
  @rtype: dict
541
  @return: the hook environment dictionary
542

543
  """
544
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
545
  args = {
546
    'name': instance.name,
547
    'primary_node': instance.primary_node,
548
    'secondary_nodes': instance.secondary_nodes,
549
    'os_type': instance.os,
550
    'status': instance.admin_up,
551
    'memory': bep[constants.BE_MEMORY],
552
    'vcpus': bep[constants.BE_VCPUS],
553
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
554
    'disk_template': instance.disk_template,
555
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
556
  }
557
  if override:
558
    args.update(override)
559
  return _BuildInstanceHookEnv(**args)
560

    
561

    
562
def _AdjustCandidatePool(lu):
563
  """Adjust the candidate pool after node operations.
564

565
  """
566
  mod_list = lu.cfg.MaintainCandidatePool()
567
  if mod_list:
568
    lu.LogInfo("Promoted nodes to master candidate role: %s",
569
               ", ".join(node.name for node in mod_list))
570
    for name in mod_list:
571
      lu.context.ReaddNode(name)
572
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
573
  if mc_now > mc_max:
574
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
575
               (mc_now, mc_max))
576

    
577

    
578
def _CheckInstanceBridgesExist(lu, instance):
579
  """Check that the brigdes needed by an instance exist.
580

581
  """
582
  # check bridges existance
583
  brlist = [nic.bridge for nic in instance.nics]
584
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
585
  result.Raise()
586
  if not result.data:
587
    raise errors.OpPrereqError("One or more target bridges %s does not"
588
                               " exist on destination node '%s'" %
589
                               (brlist, instance.primary_node))
590

    
591

    
592
class LUDestroyCluster(NoHooksLU):
593
  """Logical unit for destroying the cluster.
594

595
  """
596
  _OP_REQP = []
597

    
598
  def CheckPrereq(self):
599
    """Check prerequisites.
600

601
    This checks whether the cluster is empty.
602

603
    Any errors are signalled by raising errors.OpPrereqError.
604

605
    """
606
    master = self.cfg.GetMasterNode()
607

    
608
    nodelist = self.cfg.GetNodeList()
609
    if len(nodelist) != 1 or nodelist[0] != master:
610
      raise errors.OpPrereqError("There are still %d node(s) in"
611
                                 " this cluster." % (len(nodelist) - 1))
612
    instancelist = self.cfg.GetInstanceList()
613
    if instancelist:
614
      raise errors.OpPrereqError("There are still %d instance(s) in"
615
                                 " this cluster." % len(instancelist))
616

    
617
  def Exec(self, feedback_fn):
618
    """Destroys the cluster.
619

620
    """
621
    master = self.cfg.GetMasterNode()
622
    result = self.rpc.call_node_stop_master(master, False)
623
    result.Raise()
624
    if not result.data:
625
      raise errors.OpExecError("Could not disable the master role")
626
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
627
    utils.CreateBackup(priv_key)
628
    utils.CreateBackup(pub_key)
629
    return master
630

    
631

    
632
class LUVerifyCluster(LogicalUnit):
633
  """Verifies the cluster status.
634

635
  """
636
  HPATH = "cluster-verify"
637
  HTYPE = constants.HTYPE_CLUSTER
638
  _OP_REQP = ["skip_checks"]
639
  REQ_BGL = False
640

    
641
  def ExpandNames(self):
642
    self.needed_locks = {
643
      locking.LEVEL_NODE: locking.ALL_SET,
644
      locking.LEVEL_INSTANCE: locking.ALL_SET,
645
    }
646
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
647

    
648
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
649
                  node_result, feedback_fn, master_files,
650
                  drbd_map, vg_name):
651
    """Run multiple tests against a node.
652

653
    Test list:
654

655
      - compares ganeti version
656
      - checks vg existance and size > 20G
657
      - checks config file checksum
658
      - checks ssh to other nodes
659

660
    @type nodeinfo: L{objects.Node}
661
    @param nodeinfo: the node to check
662
    @param file_list: required list of files
663
    @param local_cksum: dictionary of local files and their checksums
664
    @param node_result: the results from the node
665
    @param feedback_fn: function used to accumulate results
666
    @param master_files: list of files that only masters should have
667
    @param drbd_map: the useddrbd minors for this node, in
668
        form of minor: (instance, must_exist) which correspond to instances
669
        and their running status
670
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
671

672
    """
673
    node = nodeinfo.name
674

    
675
    # main result, node_result should be a non-empty dict
676
    if not node_result or not isinstance(node_result, dict):
677
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
678
      return True
679

    
680
    # compares ganeti version
681
    local_version = constants.PROTOCOL_VERSION
682
    remote_version = node_result.get('version', None)
683
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
684
            len(remote_version) == 2):
685
      feedback_fn("  - ERROR: connection to %s failed" % (node))
686
      return True
687

    
688
    if local_version != remote_version[0]:
689
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
690
                  " node %s %s" % (local_version, node, remote_version[0]))
691
      return True
692

    
693
    # node seems compatible, we can actually try to look into its results
694

    
695
    bad = False
696

    
697
    # full package version
698
    if constants.RELEASE_VERSION != remote_version[1]:
699
      feedback_fn("  - WARNING: software version mismatch: master %s,"
700
                  " node %s %s" %
701
                  (constants.RELEASE_VERSION, node, remote_version[1]))
702

    
703
    # checks vg existence and size > 20G
704
    if vg_name is not None:
705
      vglist = node_result.get(constants.NV_VGLIST, None)
706
      if not vglist:
707
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
708
                        (node,))
709
        bad = True
710
      else:
711
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
712
                                              constants.MIN_VG_SIZE)
713
        if vgstatus:
714
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
715
          bad = True
716

    
717
    # checks config file checksum
718

    
719
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
720
    if not isinstance(remote_cksum, dict):
721
      bad = True
722
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
723
    else:
724
      for file_name in file_list:
725
        node_is_mc = nodeinfo.master_candidate
726
        must_have_file = file_name not in master_files
727
        if file_name not in remote_cksum:
728
          if node_is_mc or must_have_file:
729
            bad = True
730
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
731
        elif remote_cksum[file_name] != local_cksum[file_name]:
732
          if node_is_mc or must_have_file:
733
            bad = True
734
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
735
          else:
736
            # not candidate and this is not a must-have file
737
            bad = True
738
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
739
                        " '%s'" % file_name)
740
        else:
741
          # all good, except non-master/non-must have combination
742
          if not node_is_mc and not must_have_file:
743
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
744
                        " candidates" % file_name)
745

    
746
    # checks ssh to any
747

    
748
    if constants.NV_NODELIST not in node_result:
749
      bad = True
750
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
751
    else:
752
      if node_result[constants.NV_NODELIST]:
753
        bad = True
754
        for node in node_result[constants.NV_NODELIST]:
755
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
756
                          (node, node_result[constants.NV_NODELIST][node]))
757

    
758
    if constants.NV_NODENETTEST not in node_result:
759
      bad = True
760
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
761
    else:
762
      if node_result[constants.NV_NODENETTEST]:
763
        bad = True
764
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
765
        for node in nlist:
766
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
767
                          (node, node_result[constants.NV_NODENETTEST][node]))
768

    
769
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
770
    if isinstance(hyp_result, dict):
771
      for hv_name, hv_result in hyp_result.iteritems():
772
        if hv_result is not None:
773
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
774
                      (hv_name, hv_result))
775

    
776
    # check used drbd list
777
    if vg_name is not None:
778
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
779
      if not isinstance(used_minors, (tuple, list)):
780
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
781
                    str(used_minors))
782
      else:
783
        for minor, (iname, must_exist) in drbd_map.items():
784
          if minor not in used_minors and must_exist:
785
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
786
                        " not active" % (minor, iname))
787
            bad = True
788
        for minor in used_minors:
789
          if minor not in drbd_map:
790
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
791
                        minor)
792
            bad = True
793

    
794
    return bad
795

    
796
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
797
                      node_instance, feedback_fn, n_offline):
798
    """Verify an instance.
799

800
    This function checks to see if the required block devices are
801
    available on the instance's node.
802

803
    """
804
    bad = False
805

    
806
    node_current = instanceconfig.primary_node
807

    
808
    node_vol_should = {}
809
    instanceconfig.MapLVsByNode(node_vol_should)
810

    
811
    for node in node_vol_should:
812
      if node in n_offline:
813
        # ignore missing volumes on offline nodes
814
        continue
815
      for volume in node_vol_should[node]:
816
        if node not in node_vol_is or volume not in node_vol_is[node]:
817
          feedback_fn("  - ERROR: volume %s missing on node %s" %
818
                          (volume, node))
819
          bad = True
820

    
821
    if instanceconfig.admin_up:
822
      if ((node_current not in node_instance or
823
          not instance in node_instance[node_current]) and
824
          node_current not in n_offline):
825
        feedback_fn("  - ERROR: instance %s not running on node %s" %
826
                        (instance, node_current))
827
        bad = True
828

    
829
    for node in node_instance:
830
      if (not node == node_current):
831
        if instance in node_instance[node]:
832
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
833
                          (instance, node))
834
          bad = True
835

    
836
    return bad
837

    
838
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
839
    """Verify if there are any unknown volumes in the cluster.
840

841
    The .os, .swap and backup volumes are ignored. All other volumes are
842
    reported as unknown.
843

844
    """
845
    bad = False
846

    
847
    for node in node_vol_is:
848
      for volume in node_vol_is[node]:
849
        if node not in node_vol_should or volume not in node_vol_should[node]:
850
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
851
                      (volume, node))
852
          bad = True
853
    return bad
854

    
855
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
856
    """Verify the list of running instances.
857

858
    This checks what instances are running but unknown to the cluster.
859

860
    """
861
    bad = False
862
    for node in node_instance:
863
      for runninginstance in node_instance[node]:
864
        if runninginstance not in instancelist:
865
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
866
                          (runninginstance, node))
867
          bad = True
868
    return bad
869

    
870
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
871
    """Verify N+1 Memory Resilience.
872

873
    Check that if one single node dies we can still start all the instances it
874
    was primary for.
875

876
    """
877
    bad = False
878

    
879
    for node, nodeinfo in node_info.iteritems():
880
      # This code checks that every node which is now listed as secondary has
881
      # enough memory to host all instances it is supposed to should a single
882
      # other node in the cluster fail.
883
      # FIXME: not ready for failover to an arbitrary node
884
      # FIXME: does not support file-backed instances
885
      # WARNING: we currently take into account down instances as well as up
886
      # ones, considering that even if they're down someone might want to start
887
      # them even in the event of a node failure.
888
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
889
        needed_mem = 0
890
        for instance in instances:
891
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
892
          if bep[constants.BE_AUTO_BALANCE]:
893
            needed_mem += bep[constants.BE_MEMORY]
894
        if nodeinfo['mfree'] < needed_mem:
895
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
896
                      " failovers should node %s fail" % (node, prinode))
897
          bad = True
898
    return bad
899

    
900
  def CheckPrereq(self):
901
    """Check prerequisites.
902

903
    Transform the list of checks we're going to skip into a set and check that
904
    all its members are valid.
905

906
    """
907
    self.skip_set = frozenset(self.op.skip_checks)
908
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
909
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
910

    
911
  def BuildHooksEnv(self):
912
    """Build hooks env.
913

914
    Cluster-Verify hooks just rone in the post phase and their failure makes
915
    the output be logged in the verify output and the verification to fail.
916

917
    """
918
    all_nodes = self.cfg.GetNodeList()
919
    env = {
920
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
921
      }
922
    for node in self.cfg.GetAllNodesInfo().values():
923
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
924

    
925
    return env, [], all_nodes
926

    
927
  def Exec(self, feedback_fn):
928
    """Verify integrity of cluster, performing various test on nodes.
929

930
    """
931
    bad = False
932
    feedback_fn("* Verifying global settings")
933
    for msg in self.cfg.VerifyConfig():
934
      feedback_fn("  - ERROR: %s" % msg)
935

    
936
    vg_name = self.cfg.GetVGName()
937
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
938
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
939
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
940
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
941
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
942
                        for iname in instancelist)
943
    i_non_redundant = [] # Non redundant instances
944
    i_non_a_balanced = [] # Non auto-balanced instances
945
    n_offline = [] # List of offline nodes
946
    n_drained = [] # List of nodes being drained
947
    node_volume = {}
948
    node_instance = {}
949
    node_info = {}
950
    instance_cfg = {}
951

    
952
    # FIXME: verify OS list
953
    # do local checksums
954
    master_files = [constants.CLUSTER_CONF_FILE]
955

    
956
    file_names = ssconf.SimpleStore().GetFileList()
957
    file_names.append(constants.SSL_CERT_FILE)
958
    file_names.append(constants.RAPI_CERT_FILE)
959
    file_names.extend(master_files)
960

    
961
    local_checksums = utils.FingerprintFiles(file_names)
962

    
963
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
964
    node_verify_param = {
965
      constants.NV_FILELIST: file_names,
966
      constants.NV_NODELIST: [node.name for node in nodeinfo
967
                              if not node.offline],
968
      constants.NV_HYPERVISOR: hypervisors,
969
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
970
                                  node.secondary_ip) for node in nodeinfo
971
                                 if not node.offline],
972
      constants.NV_INSTANCELIST: hypervisors,
973
      constants.NV_VERSION: None,
974
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
975
      }
976
    if vg_name is not None:
977
      node_verify_param[constants.NV_VGLIST] = None
978
      node_verify_param[constants.NV_LVLIST] = vg_name
979
      node_verify_param[constants.NV_DRBDLIST] = None
980
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
981
                                           self.cfg.GetClusterName())
982

    
983
    cluster = self.cfg.GetClusterInfo()
984
    master_node = self.cfg.GetMasterNode()
985
    all_drbd_map = self.cfg.ComputeDRBDMap()
986

    
987
    for node_i in nodeinfo:
988
      node = node_i.name
989
      nresult = all_nvinfo[node].data
990

    
991
      if node_i.offline:
992
        feedback_fn("* Skipping offline node %s" % (node,))
993
        n_offline.append(node)
994
        continue
995

    
996
      if node == master_node:
997
        ntype = "master"
998
      elif node_i.master_candidate:
999
        ntype = "master candidate"
1000
      elif node_i.drained:
1001
        ntype = "drained"
1002
        n_drained.append(node)
1003
      else:
1004
        ntype = "regular"
1005
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1006

    
1007
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
1008
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
1009
        bad = True
1010
        continue
1011

    
1012
      node_drbd = {}
1013
      for minor, instance in all_drbd_map[node].items():
1014
        instance = instanceinfo[instance]
1015
        node_drbd[minor] = (instance.name, instance.admin_up)
1016
      result = self._VerifyNode(node_i, file_names, local_checksums,
1017
                                nresult, feedback_fn, master_files,
1018
                                node_drbd, vg_name)
1019
      bad = bad or result
1020

    
1021
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1022
      if vg_name is None:
1023
        node_volume[node] = {}
1024
      elif isinstance(lvdata, basestring):
1025
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1026
                    (node, utils.SafeEncode(lvdata)))
1027
        bad = True
1028
        node_volume[node] = {}
1029
      elif not isinstance(lvdata, dict):
1030
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1031
        bad = True
1032
        continue
1033
      else:
1034
        node_volume[node] = lvdata
1035

    
1036
      # node_instance
1037
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1038
      if not isinstance(idata, list):
1039
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1040
                    (node,))
1041
        bad = True
1042
        continue
1043

    
1044
      node_instance[node] = idata
1045

    
1046
      # node_info
1047
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1048
      if not isinstance(nodeinfo, dict):
1049
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1050
        bad = True
1051
        continue
1052

    
1053
      try:
1054
        node_info[node] = {
1055
          "mfree": int(nodeinfo['memory_free']),
1056
          "pinst": [],
1057
          "sinst": [],
1058
          # dictionary holding all instances this node is secondary for,
1059
          # grouped by their primary node. Each key is a cluster node, and each
1060
          # value is a list of instances which have the key as primary and the
1061
          # current node as secondary.  this is handy to calculate N+1 memory
1062
          # availability if you can only failover from a primary to its
1063
          # secondary.
1064
          "sinst-by-pnode": {},
1065
        }
1066
        # FIXME: devise a free space model for file based instances as well
1067
        if vg_name is not None:
1068
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1069
      except ValueError:
1070
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
1071
        bad = True
1072
        continue
1073

    
1074
    node_vol_should = {}
1075

    
1076
    for instance in instancelist:
1077
      feedback_fn("* Verifying instance %s" % instance)
1078
      inst_config = instanceinfo[instance]
1079
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1080
                                     node_instance, feedback_fn, n_offline)
1081
      bad = bad or result
1082
      inst_nodes_offline = []
1083

    
1084
      inst_config.MapLVsByNode(node_vol_should)
1085

    
1086
      instance_cfg[instance] = inst_config
1087

    
1088
      pnode = inst_config.primary_node
1089
      if pnode in node_info:
1090
        node_info[pnode]['pinst'].append(instance)
1091
      elif pnode not in n_offline:
1092
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1093
                    " %s failed" % (instance, pnode))
1094
        bad = True
1095

    
1096
      if pnode in n_offline:
1097
        inst_nodes_offline.append(pnode)
1098

    
1099
      # If the instance is non-redundant we cannot survive losing its primary
1100
      # node, so we are not N+1 compliant. On the other hand we have no disk
1101
      # templates with more than one secondary so that situation is not well
1102
      # supported either.
1103
      # FIXME: does not support file-backed instances
1104
      if len(inst_config.secondary_nodes) == 0:
1105
        i_non_redundant.append(instance)
1106
      elif len(inst_config.secondary_nodes) > 1:
1107
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1108
                    % instance)
1109

    
1110
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1111
        i_non_a_balanced.append(instance)
1112

    
1113
      for snode in inst_config.secondary_nodes:
1114
        if snode in node_info:
1115
          node_info[snode]['sinst'].append(instance)
1116
          if pnode not in node_info[snode]['sinst-by-pnode']:
1117
            node_info[snode]['sinst-by-pnode'][pnode] = []
1118
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1119
        elif snode not in n_offline:
1120
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1121
                      " %s failed" % (instance, snode))
1122
          bad = True
1123
        if snode in n_offline:
1124
          inst_nodes_offline.append(snode)
1125

    
1126
      if inst_nodes_offline:
1127
        # warn that the instance lives on offline nodes, and set bad=True
1128
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1129
                    ", ".join(inst_nodes_offline))
1130
        bad = True
1131

    
1132
    feedback_fn("* Verifying orphan volumes")
1133
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1134
                                       feedback_fn)
1135
    bad = bad or result
1136

    
1137
    feedback_fn("* Verifying remaining instances")
1138
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1139
                                         feedback_fn)
1140
    bad = bad or result
1141

    
1142
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1143
      feedback_fn("* Verifying N+1 Memory redundancy")
1144
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1145
      bad = bad or result
1146

    
1147
    feedback_fn("* Other Notes")
1148
    if i_non_redundant:
1149
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1150
                  % len(i_non_redundant))
1151

    
1152
    if i_non_a_balanced:
1153
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1154
                  % len(i_non_a_balanced))
1155

    
1156
    if n_offline:
1157
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1158

    
1159
    if n_drained:
1160
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1161

    
1162
    return not bad
1163

    
1164
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1165
    """Analize the post-hooks' result
1166

1167
    This method analyses the hook result, handles it, and sends some
1168
    nicely-formatted feedback back to the user.
1169

1170
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1171
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1172
    @param hooks_results: the results of the multi-node hooks rpc call
1173
    @param feedback_fn: function used send feedback back to the caller
1174
    @param lu_result: previous Exec result
1175
    @return: the new Exec result, based on the previous result
1176
        and hook results
1177

1178
    """
1179
    # We only really run POST phase hooks, and are only interested in
1180
    # their results
1181
    if phase == constants.HOOKS_PHASE_POST:
1182
      # Used to change hooks' output to proper indentation
1183
      indent_re = re.compile('^', re.M)
1184
      feedback_fn("* Hooks Results")
1185
      if not hooks_results:
1186
        feedback_fn("  - ERROR: general communication failure")
1187
        lu_result = 1
1188
      else:
1189
        for node_name in hooks_results:
1190
          show_node_header = True
1191
          res = hooks_results[node_name]
1192
          if res.failed or res.data is False or not isinstance(res.data, list):
1193
            if res.offline:
1194
              # no need to warn or set fail return value
1195
              continue
1196
            feedback_fn("    Communication failure in hooks execution")
1197
            lu_result = 1
1198
            continue
1199
          for script, hkr, output in res.data:
1200
            if hkr == constants.HKR_FAIL:
1201
              # The node header is only shown once, if there are
1202
              # failing hooks on that node
1203
              if show_node_header:
1204
                feedback_fn("  Node %s:" % node_name)
1205
                show_node_header = False
1206
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1207
              output = indent_re.sub('      ', output)
1208
              feedback_fn("%s" % output)
1209
              lu_result = 1
1210

    
1211
      return lu_result
1212

    
1213

    
1214
class LUVerifyDisks(NoHooksLU):
1215
  """Verifies the cluster disks status.
1216

1217
  """
1218
  _OP_REQP = []
1219
  REQ_BGL = False
1220

    
1221
  def ExpandNames(self):
1222
    self.needed_locks = {
1223
      locking.LEVEL_NODE: locking.ALL_SET,
1224
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1225
    }
1226
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1227

    
1228
  def CheckPrereq(self):
1229
    """Check prerequisites.
1230

1231
    This has no prerequisites.
1232

1233
    """
1234
    pass
1235

    
1236
  def Exec(self, feedback_fn):
1237
    """Verify integrity of cluster disks.
1238

1239
    """
1240
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1241

    
1242
    vg_name = self.cfg.GetVGName()
1243
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1244
    instances = [self.cfg.GetInstanceInfo(name)
1245
                 for name in self.cfg.GetInstanceList()]
1246

    
1247
    nv_dict = {}
1248
    for inst in instances:
1249
      inst_lvs = {}
1250
      if (not inst.admin_up or
1251
          inst.disk_template not in constants.DTS_NET_MIRROR):
1252
        continue
1253
      inst.MapLVsByNode(inst_lvs)
1254
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1255
      for node, vol_list in inst_lvs.iteritems():
1256
        for vol in vol_list:
1257
          nv_dict[(node, vol)] = inst
1258

    
1259
    if not nv_dict:
1260
      return result
1261

    
1262
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1263

    
1264
    to_act = set()
1265
    for node in nodes:
1266
      # node_volume
1267
      lvs = node_lvs[node]
1268
      if lvs.failed:
1269
        if not lvs.offline:
1270
          self.LogWarning("Connection to node %s failed: %s" %
1271
                          (node, lvs.data))
1272
        continue
1273
      lvs = lvs.data
1274
      if isinstance(lvs, basestring):
1275
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1276
        res_nlvm[node] = lvs
1277
      elif not isinstance(lvs, dict):
1278
        logging.warning("Connection to node %s failed or invalid data"
1279
                        " returned", node)
1280
        res_nodes.append(node)
1281
        continue
1282

    
1283
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1284
        inst = nv_dict.pop((node, lv_name), None)
1285
        if (not lv_online and inst is not None
1286
            and inst.name not in res_instances):
1287
          res_instances.append(inst.name)
1288

    
1289
    # any leftover items in nv_dict are missing LVs, let's arrange the
1290
    # data better
1291
    for key, inst in nv_dict.iteritems():
1292
      if inst.name not in res_missing:
1293
        res_missing[inst.name] = []
1294
      res_missing[inst.name].append(key)
1295

    
1296
    return result
1297

    
1298

    
1299
class LURenameCluster(LogicalUnit):
1300
  """Rename the cluster.
1301

1302
  """
1303
  HPATH = "cluster-rename"
1304
  HTYPE = constants.HTYPE_CLUSTER
1305
  _OP_REQP = ["name"]
1306

    
1307
  def BuildHooksEnv(self):
1308
    """Build hooks env.
1309

1310
    """
1311
    env = {
1312
      "OP_TARGET": self.cfg.GetClusterName(),
1313
      "NEW_NAME": self.op.name,
1314
      }
1315
    mn = self.cfg.GetMasterNode()
1316
    return env, [mn], [mn]
1317

    
1318
  def CheckPrereq(self):
1319
    """Verify that the passed name is a valid one.
1320

1321
    """
1322
    hostname = utils.HostInfo(self.op.name)
1323

    
1324
    new_name = hostname.name
1325
    self.ip = new_ip = hostname.ip
1326
    old_name = self.cfg.GetClusterName()
1327
    old_ip = self.cfg.GetMasterIP()
1328
    if new_name == old_name and new_ip == old_ip:
1329
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1330
                                 " cluster has changed")
1331
    if new_ip != old_ip:
1332
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1333
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1334
                                   " reachable on the network. Aborting." %
1335
                                   new_ip)
1336

    
1337
    self.op.name = new_name
1338

    
1339
  def Exec(self, feedback_fn):
1340
    """Rename the cluster.
1341

1342
    """
1343
    clustername = self.op.name
1344
    ip = self.ip
1345

    
1346
    # shutdown the master IP
1347
    master = self.cfg.GetMasterNode()
1348
    result = self.rpc.call_node_stop_master(master, False)
1349
    if result.failed or not result.data:
1350
      raise errors.OpExecError("Could not disable the master role")
1351

    
1352
    try:
1353
      cluster = self.cfg.GetClusterInfo()
1354
      cluster.cluster_name = clustername
1355
      cluster.master_ip = ip
1356
      self.cfg.Update(cluster)
1357

    
1358
      # update the known hosts file
1359
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1360
      node_list = self.cfg.GetNodeList()
1361
      try:
1362
        node_list.remove(master)
1363
      except ValueError:
1364
        pass
1365
      result = self.rpc.call_upload_file(node_list,
1366
                                         constants.SSH_KNOWN_HOSTS_FILE)
1367
      for to_node, to_result in result.iteritems():
1368
        if to_result.failed or not to_result.data:
1369
          logging.error("Copy of file %s to node %s failed",
1370
                        constants.SSH_KNOWN_HOSTS_FILE, to_node)
1371

    
1372
    finally:
1373
      result = self.rpc.call_node_start_master(master, False)
1374
      if result.failed or not result.data:
1375
        self.LogWarning("Could not re-enable the master role on"
1376
                        " the master, please restart manually.")
1377

    
1378

    
1379
def _RecursiveCheckIfLVMBased(disk):
1380
  """Check if the given disk or its children are lvm-based.
1381

1382
  @type disk: L{objects.Disk}
1383
  @param disk: the disk to check
1384
  @rtype: booleean
1385
  @return: boolean indicating whether a LD_LV dev_type was found or not
1386

1387
  """
1388
  if disk.children:
1389
    for chdisk in disk.children:
1390
      if _RecursiveCheckIfLVMBased(chdisk):
1391
        return True
1392
  return disk.dev_type == constants.LD_LV
1393

    
1394

    
1395
class LUSetClusterParams(LogicalUnit):
1396
  """Change the parameters of the cluster.
1397

1398
  """
1399
  HPATH = "cluster-modify"
1400
  HTYPE = constants.HTYPE_CLUSTER
1401
  _OP_REQP = []
1402
  REQ_BGL = False
1403

    
1404
  def CheckParameters(self):
1405
    """Check parameters
1406

1407
    """
1408
    if not hasattr(self.op, "candidate_pool_size"):
1409
      self.op.candidate_pool_size = None
1410
    if self.op.candidate_pool_size is not None:
1411
      try:
1412
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1413
      except ValueError, err:
1414
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1415
                                   str(err))
1416
      if self.op.candidate_pool_size < 1:
1417
        raise errors.OpPrereqError("At least one master candidate needed")
1418

    
1419
  def ExpandNames(self):
1420
    # FIXME: in the future maybe other cluster params won't require checking on
1421
    # all nodes to be modified.
1422
    self.needed_locks = {
1423
      locking.LEVEL_NODE: locking.ALL_SET,
1424
    }
1425
    self.share_locks[locking.LEVEL_NODE] = 1
1426

    
1427
  def BuildHooksEnv(self):
1428
    """Build hooks env.
1429

1430
    """
1431
    env = {
1432
      "OP_TARGET": self.cfg.GetClusterName(),
1433
      "NEW_VG_NAME": self.op.vg_name,
1434
      }
1435
    mn = self.cfg.GetMasterNode()
1436
    return env, [mn], [mn]
1437

    
1438
  def CheckPrereq(self):
1439
    """Check prerequisites.
1440

1441
    This checks whether the given params don't conflict and
1442
    if the given volume group is valid.
1443

1444
    """
1445
    if self.op.vg_name is not None and not self.op.vg_name:
1446
      instances = self.cfg.GetAllInstancesInfo().values()
1447
      for inst in instances:
1448
        for disk in inst.disks:
1449
          if _RecursiveCheckIfLVMBased(disk):
1450
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1451
                                       " lvm-based instances exist")
1452

    
1453
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1454

    
1455
    # if vg_name not None, checks given volume group on all nodes
1456
    if self.op.vg_name:
1457
      vglist = self.rpc.call_vg_list(node_list)
1458
      for node in node_list:
1459
        if vglist[node].failed:
1460
          # ignoring down node
1461
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1462
          continue
1463
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1464
                                              self.op.vg_name,
1465
                                              constants.MIN_VG_SIZE)
1466
        if vgstatus:
1467
          raise errors.OpPrereqError("Error on node '%s': %s" %
1468
                                     (node, vgstatus))
1469

    
1470
    self.cluster = cluster = self.cfg.GetClusterInfo()
1471
    # validate beparams changes
1472
    if self.op.beparams:
1473
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1474
      self.new_beparams = cluster.FillDict(
1475
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1476

    
1477
    # hypervisor list/parameters
1478
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1479
    if self.op.hvparams:
1480
      if not isinstance(self.op.hvparams, dict):
1481
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1482
      for hv_name, hv_dict in self.op.hvparams.items():
1483
        if hv_name not in self.new_hvparams:
1484
          self.new_hvparams[hv_name] = hv_dict
1485
        else:
1486
          self.new_hvparams[hv_name].update(hv_dict)
1487

    
1488
    if self.op.enabled_hypervisors is not None:
1489
      self.hv_list = self.op.enabled_hypervisors
1490
    else:
1491
      self.hv_list = cluster.enabled_hypervisors
1492

    
1493
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1494
      # either the enabled list has changed, or the parameters have, validate
1495
      for hv_name, hv_params in self.new_hvparams.items():
1496
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1497
            (self.op.enabled_hypervisors and
1498
             hv_name in self.op.enabled_hypervisors)):
1499
          # either this is a new hypervisor, or its parameters have changed
1500
          hv_class = hypervisor.GetHypervisor(hv_name)
1501
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1502
          hv_class.CheckParameterSyntax(hv_params)
1503
          _CheckHVParams(self, node_list, hv_name, hv_params)
1504

    
1505
  def Exec(self, feedback_fn):
1506
    """Change the parameters of the cluster.
1507

1508
    """
1509
    if self.op.vg_name is not None:
1510
      if self.op.vg_name != self.cfg.GetVGName():
1511
        self.cfg.SetVGName(self.op.vg_name)
1512
      else:
1513
        feedback_fn("Cluster LVM configuration already in desired"
1514
                    " state, not changing")
1515
    if self.op.hvparams:
1516
      self.cluster.hvparams = self.new_hvparams
1517
    if self.op.enabled_hypervisors is not None:
1518
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1519
    if self.op.beparams:
1520
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1521
    if self.op.candidate_pool_size is not None:
1522
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1523

    
1524
    self.cfg.Update(self.cluster)
1525

    
1526
    # we want to update nodes after the cluster so that if any errors
1527
    # happen, we have recorded and saved the cluster info
1528
    if self.op.candidate_pool_size is not None:
1529
      _AdjustCandidatePool(self)
1530

    
1531

    
1532
class LURedistributeConfig(NoHooksLU):
1533
  """Force the redistribution of cluster configuration.
1534

1535
  This is a very simple LU.
1536

1537
  """
1538
  _OP_REQP = []
1539
  REQ_BGL = False
1540

    
1541
  def ExpandNames(self):
1542
    self.needed_locks = {
1543
      locking.LEVEL_NODE: locking.ALL_SET,
1544
    }
1545
    self.share_locks[locking.LEVEL_NODE] = 1
1546

    
1547
  def CheckPrereq(self):
1548
    """Check prerequisites.
1549

1550
    """
1551

    
1552
  def Exec(self, feedback_fn):
1553
    """Redistribute the configuration.
1554

1555
    """
1556
    self.cfg.Update(self.cfg.GetClusterInfo())
1557

    
1558

    
1559
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1560
  """Sleep and poll for an instance's disk to sync.
1561

1562
  """
1563
  if not instance.disks:
1564
    return True
1565

    
1566
  if not oneshot:
1567
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1568

    
1569
  node = instance.primary_node
1570

    
1571
  for dev in instance.disks:
1572
    lu.cfg.SetDiskID(dev, node)
1573

    
1574
  retries = 0
1575
  while True:
1576
    max_time = 0
1577
    done = True
1578
    cumul_degraded = False
1579
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1580
    if rstats.failed or not rstats.data:
1581
      lu.LogWarning("Can't get any data from node %s", node)
1582
      retries += 1
1583
      if retries >= 10:
1584
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1585
                                 " aborting." % node)
1586
      time.sleep(6)
1587
      continue
1588
    rstats = rstats.data
1589
    retries = 0
1590
    for i, mstat in enumerate(rstats):
1591
      if mstat is None:
1592
        lu.LogWarning("Can't compute data for node %s/%s",
1593
                           node, instance.disks[i].iv_name)
1594
        continue
1595
      # we ignore the ldisk parameter
1596
      perc_done, est_time, is_degraded, _ = mstat
1597
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1598
      if perc_done is not None:
1599
        done = False
1600
        if est_time is not None:
1601
          rem_time = "%d estimated seconds remaining" % est_time
1602
          max_time = est_time
1603
        else:
1604
          rem_time = "no time estimate"
1605
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1606
                        (instance.disks[i].iv_name, perc_done, rem_time))
1607
    if done or oneshot:
1608
      break
1609

    
1610
    time.sleep(min(60, max_time))
1611

    
1612
  if done:
1613
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1614
  return not cumul_degraded
1615

    
1616

    
1617
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1618
  """Check that mirrors are not degraded.
1619

1620
  The ldisk parameter, if True, will change the test from the
1621
  is_degraded attribute (which represents overall non-ok status for
1622
  the device(s)) to the ldisk (representing the local storage status).
1623

1624
  """
1625
  lu.cfg.SetDiskID(dev, node)
1626
  if ldisk:
1627
    idx = 6
1628
  else:
1629
    idx = 5
1630

    
1631
  result = True
1632
  if on_primary or dev.AssembleOnSecondary():
1633
    rstats = lu.rpc.call_blockdev_find(node, dev)
1634
    msg = rstats.RemoteFailMsg()
1635
    if msg:
1636
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1637
      result = False
1638
    elif not rstats.payload:
1639
      lu.LogWarning("Can't find disk on node %s", node)
1640
      result = False
1641
    else:
1642
      result = result and (not rstats.payload[idx])
1643
  if dev.children:
1644
    for child in dev.children:
1645
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1646

    
1647
  return result
1648

    
1649

    
1650
class LUDiagnoseOS(NoHooksLU):
1651
  """Logical unit for OS diagnose/query.
1652

1653
  """
1654
  _OP_REQP = ["output_fields", "names"]
1655
  REQ_BGL = False
1656
  _FIELDS_STATIC = utils.FieldSet()
1657
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1658

    
1659
  def ExpandNames(self):
1660
    if self.op.names:
1661
      raise errors.OpPrereqError("Selective OS query not supported")
1662

    
1663
    _CheckOutputFields(static=self._FIELDS_STATIC,
1664
                       dynamic=self._FIELDS_DYNAMIC,
1665
                       selected=self.op.output_fields)
1666

    
1667
    # Lock all nodes, in shared mode
1668
    self.needed_locks = {}
1669
    self.share_locks[locking.LEVEL_NODE] = 1
1670
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1671

    
1672
  def CheckPrereq(self):
1673
    """Check prerequisites.
1674

1675
    """
1676

    
1677
  @staticmethod
1678
  def _DiagnoseByOS(node_list, rlist):
1679
    """Remaps a per-node return list into an a per-os per-node dictionary
1680

1681
    @param node_list: a list with the names of all nodes
1682
    @param rlist: a map with node names as keys and OS objects as values
1683

1684
    @rtype: dict
1685
    @return: a dictionary with osnames as keys and as value another map, with
1686
        nodes as keys and list of OS objects as values, eg::
1687

1688
          {"debian-etch": {"node1": [<object>,...],
1689
                           "node2": [<object>,]}
1690
          }
1691

1692
    """
1693
    all_os = {}
1694
    for node_name, nr in rlist.iteritems():
1695
      if nr.failed or not nr.data:
1696
        continue
1697
      for os_obj in nr.data:
1698
        if os_obj.name not in all_os:
1699
          # build a list of nodes for this os containing empty lists
1700
          # for each node in node_list
1701
          all_os[os_obj.name] = {}
1702
          for nname in node_list:
1703
            all_os[os_obj.name][nname] = []
1704
        all_os[os_obj.name][node_name].append(os_obj)
1705
    return all_os
1706

    
1707
  def Exec(self, feedback_fn):
1708
    """Compute the list of OSes.
1709

1710
    """
1711
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1712
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1713
                   if node in node_list]
1714
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1715
    if node_data == False:
1716
      raise errors.OpExecError("Can't gather the list of OSes")
1717
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1718
    output = []
1719
    for os_name, os_data in pol.iteritems():
1720
      row = []
1721
      for field in self.op.output_fields:
1722
        if field == "name":
1723
          val = os_name
1724
        elif field == "valid":
1725
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1726
        elif field == "node_status":
1727
          val = {}
1728
          for node_name, nos_list in os_data.iteritems():
1729
            val[node_name] = [(v.status, v.path) for v in nos_list]
1730
        else:
1731
          raise errors.ParameterError(field)
1732
        row.append(val)
1733
      output.append(row)
1734

    
1735
    return output
1736

    
1737

    
1738
class LURemoveNode(LogicalUnit):
1739
  """Logical unit for removing a node.
1740

1741
  """
1742
  HPATH = "node-remove"
1743
  HTYPE = constants.HTYPE_NODE
1744
  _OP_REQP = ["node_name"]
1745

    
1746
  def BuildHooksEnv(self):
1747
    """Build hooks env.
1748

1749
    This doesn't run on the target node in the pre phase as a failed
1750
    node would then be impossible to remove.
1751

1752
    """
1753
    env = {
1754
      "OP_TARGET": self.op.node_name,
1755
      "NODE_NAME": self.op.node_name,
1756
      }
1757
    all_nodes = self.cfg.GetNodeList()
1758
    all_nodes.remove(self.op.node_name)
1759
    return env, all_nodes, all_nodes
1760

    
1761
  def CheckPrereq(self):
1762
    """Check prerequisites.
1763

1764
    This checks:
1765
     - the node exists in the configuration
1766
     - it does not have primary or secondary instances
1767
     - it's not the master
1768

1769
    Any errors are signalled by raising errors.OpPrereqError.
1770

1771
    """
1772
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1773
    if node is None:
1774
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1775

    
1776
    instance_list = self.cfg.GetInstanceList()
1777

    
1778
    masternode = self.cfg.GetMasterNode()
1779
    if node.name == masternode:
1780
      raise errors.OpPrereqError("Node is the master node,"
1781
                                 " you need to failover first.")
1782

    
1783
    for instance_name in instance_list:
1784
      instance = self.cfg.GetInstanceInfo(instance_name)
1785
      if node.name in instance.all_nodes:
1786
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1787
                                   " please remove first." % instance_name)
1788
    self.op.node_name = node.name
1789
    self.node = node
1790

    
1791
  def Exec(self, feedback_fn):
1792
    """Removes the node from the cluster.
1793

1794
    """
1795
    node = self.node
1796
    logging.info("Stopping the node daemon and removing configs from node %s",
1797
                 node.name)
1798

    
1799
    self.context.RemoveNode(node.name)
1800

    
1801
    self.rpc.call_node_leave_cluster(node.name)
1802

    
1803
    # Promote nodes to master candidate as needed
1804
    _AdjustCandidatePool(self)
1805

    
1806

    
1807
class LUQueryNodes(NoHooksLU):
1808
  """Logical unit for querying nodes.
1809

1810
  """
1811
  _OP_REQP = ["output_fields", "names", "use_locking"]
1812
  REQ_BGL = False
1813
  _FIELDS_DYNAMIC = utils.FieldSet(
1814
    "dtotal", "dfree",
1815
    "mtotal", "mnode", "mfree",
1816
    "bootid",
1817
    "ctotal", "cnodes", "csockets",
1818
    )
1819

    
1820
  _FIELDS_STATIC = utils.FieldSet(
1821
    "name", "pinst_cnt", "sinst_cnt",
1822
    "pinst_list", "sinst_list",
1823
    "pip", "sip", "tags",
1824
    "serial_no",
1825
    "master_candidate",
1826
    "master",
1827
    "offline",
1828
    "drained",
1829
    )
1830

    
1831
  def ExpandNames(self):
1832
    _CheckOutputFields(static=self._FIELDS_STATIC,
1833
                       dynamic=self._FIELDS_DYNAMIC,
1834
                       selected=self.op.output_fields)
1835

    
1836
    self.needed_locks = {}
1837
    self.share_locks[locking.LEVEL_NODE] = 1
1838

    
1839
    if self.op.names:
1840
      self.wanted = _GetWantedNodes(self, self.op.names)
1841
    else:
1842
      self.wanted = locking.ALL_SET
1843

    
1844
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1845
    self.do_locking = self.do_node_query and self.op.use_locking
1846
    if self.do_locking:
1847
      # if we don't request only static fields, we need to lock the nodes
1848
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1849

    
1850

    
1851
  def CheckPrereq(self):
1852
    """Check prerequisites.
1853

1854
    """
1855
    # The validation of the node list is done in the _GetWantedNodes,
1856
    # if non empty, and if empty, there's no validation to do
1857
    pass
1858

    
1859
  def Exec(self, feedback_fn):
1860
    """Computes the list of nodes and their attributes.
1861

1862
    """
1863
    all_info = self.cfg.GetAllNodesInfo()
1864
    if self.do_locking:
1865
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1866
    elif self.wanted != locking.ALL_SET:
1867
      nodenames = self.wanted
1868
      missing = set(nodenames).difference(all_info.keys())
1869
      if missing:
1870
        raise errors.OpExecError(
1871
          "Some nodes were removed before retrieving their data: %s" % missing)
1872
    else:
1873
      nodenames = all_info.keys()
1874

    
1875
    nodenames = utils.NiceSort(nodenames)
1876
    nodelist = [all_info[name] for name in nodenames]
1877

    
1878
    # begin data gathering
1879

    
1880
    if self.do_node_query:
1881
      live_data = {}
1882
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1883
                                          self.cfg.GetHypervisorType())
1884
      for name in nodenames:
1885
        nodeinfo = node_data[name]
1886
        if not nodeinfo.failed and nodeinfo.data:
1887
          nodeinfo = nodeinfo.data
1888
          fn = utils.TryConvert
1889
          live_data[name] = {
1890
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1891
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1892
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1893
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1894
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1895
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1896
            "bootid": nodeinfo.get('bootid', None),
1897
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1898
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1899
            }
1900
        else:
1901
          live_data[name] = {}
1902
    else:
1903
      live_data = dict.fromkeys(nodenames, {})
1904

    
1905
    node_to_primary = dict([(name, set()) for name in nodenames])
1906
    node_to_secondary = dict([(name, set()) for name in nodenames])
1907

    
1908
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1909
                             "sinst_cnt", "sinst_list"))
1910
    if inst_fields & frozenset(self.op.output_fields):
1911
      instancelist = self.cfg.GetInstanceList()
1912

    
1913
      for instance_name in instancelist:
1914
        inst = self.cfg.GetInstanceInfo(instance_name)
1915
        if inst.primary_node in node_to_primary:
1916
          node_to_primary[inst.primary_node].add(inst.name)
1917
        for secnode in inst.secondary_nodes:
1918
          if secnode in node_to_secondary:
1919
            node_to_secondary[secnode].add(inst.name)
1920

    
1921
    master_node = self.cfg.GetMasterNode()
1922

    
1923
    # end data gathering
1924

    
1925
    output = []
1926
    for node in nodelist:
1927
      node_output = []
1928
      for field in self.op.output_fields:
1929
        if field == "name":
1930
          val = node.name
1931
        elif field == "pinst_list":
1932
          val = list(node_to_primary[node.name])
1933
        elif field == "sinst_list":
1934
          val = list(node_to_secondary[node.name])
1935
        elif field == "pinst_cnt":
1936
          val = len(node_to_primary[node.name])
1937
        elif field == "sinst_cnt":
1938
          val = len(node_to_secondary[node.name])
1939
        elif field == "pip":
1940
          val = node.primary_ip
1941
        elif field == "sip":
1942
          val = node.secondary_ip
1943
        elif field == "tags":
1944
          val = list(node.GetTags())
1945
        elif field == "serial_no":
1946
          val = node.serial_no
1947
        elif field == "master_candidate":
1948
          val = node.master_candidate
1949
        elif field == "master":
1950
          val = node.name == master_node
1951
        elif field == "offline":
1952
          val = node.offline
1953
        elif field == "drained":
1954
          val = node.drained
1955
        elif self._FIELDS_DYNAMIC.Matches(field):
1956
          val = live_data[node.name].get(field, None)
1957
        else:
1958
          raise errors.ParameterError(field)
1959
        node_output.append(val)
1960
      output.append(node_output)
1961

    
1962
    return output
1963

    
1964

    
1965
class LUQueryNodeVolumes(NoHooksLU):
1966
  """Logical unit for getting volumes on node(s).
1967

1968
  """
1969
  _OP_REQP = ["nodes", "output_fields"]
1970
  REQ_BGL = False
1971
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1972
  _FIELDS_STATIC = utils.FieldSet("node")
1973

    
1974
  def ExpandNames(self):
1975
    _CheckOutputFields(static=self._FIELDS_STATIC,
1976
                       dynamic=self._FIELDS_DYNAMIC,
1977
                       selected=self.op.output_fields)
1978

    
1979
    self.needed_locks = {}
1980
    self.share_locks[locking.LEVEL_NODE] = 1
1981
    if not self.op.nodes:
1982
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1983
    else:
1984
      self.needed_locks[locking.LEVEL_NODE] = \
1985
        _GetWantedNodes(self, self.op.nodes)
1986

    
1987
  def CheckPrereq(self):
1988
    """Check prerequisites.
1989

1990
    This checks that the fields required are valid output fields.
1991

1992
    """
1993
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1994

    
1995
  def Exec(self, feedback_fn):
1996
    """Computes the list of nodes and their attributes.
1997

1998
    """
1999
    nodenames = self.nodes
2000
    volumes = self.rpc.call_node_volumes(nodenames)
2001

    
2002
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2003
             in self.cfg.GetInstanceList()]
2004

    
2005
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2006

    
2007
    output = []
2008
    for node in nodenames:
2009
      if node not in volumes or volumes[node].failed or not volumes[node].data:
2010
        continue
2011

    
2012
      node_vols = volumes[node].data[:]
2013
      node_vols.sort(key=lambda vol: vol['dev'])
2014

    
2015
      for vol in node_vols:
2016
        node_output = []
2017
        for field in self.op.output_fields:
2018
          if field == "node":
2019
            val = node
2020
          elif field == "phys":
2021
            val = vol['dev']
2022
          elif field == "vg":
2023
            val = vol['vg']
2024
          elif field == "name":
2025
            val = vol['name']
2026
          elif field == "size":
2027
            val = int(float(vol['size']))
2028
          elif field == "instance":
2029
            for inst in ilist:
2030
              if node not in lv_by_node[inst]:
2031
                continue
2032
              if vol['name'] in lv_by_node[inst][node]:
2033
                val = inst.name
2034
                break
2035
            else:
2036
              val = '-'
2037
          else:
2038
            raise errors.ParameterError(field)
2039
          node_output.append(str(val))
2040

    
2041
        output.append(node_output)
2042

    
2043
    return output
2044

    
2045

    
2046
class LUAddNode(LogicalUnit):
2047
  """Logical unit for adding node to the cluster.
2048

2049
  """
2050
  HPATH = "node-add"
2051
  HTYPE = constants.HTYPE_NODE
2052
  _OP_REQP = ["node_name"]
2053

    
2054
  def BuildHooksEnv(self):
2055
    """Build hooks env.
2056

2057
    This will run on all nodes before, and on all nodes + the new node after.
2058

2059
    """
2060
    env = {
2061
      "OP_TARGET": self.op.node_name,
2062
      "NODE_NAME": self.op.node_name,
2063
      "NODE_PIP": self.op.primary_ip,
2064
      "NODE_SIP": self.op.secondary_ip,
2065
      }
2066
    nodes_0 = self.cfg.GetNodeList()
2067
    nodes_1 = nodes_0 + [self.op.node_name, ]
2068
    return env, nodes_0, nodes_1
2069

    
2070
  def CheckPrereq(self):
2071
    """Check prerequisites.
2072

2073
    This checks:
2074
     - the new node is not already in the config
2075
     - it is resolvable
2076
     - its parameters (single/dual homed) matches the cluster
2077

2078
    Any errors are signalled by raising errors.OpPrereqError.
2079

2080
    """
2081
    node_name = self.op.node_name
2082
    cfg = self.cfg
2083

    
2084
    dns_data = utils.HostInfo(node_name)
2085

    
2086
    node = dns_data.name
2087
    primary_ip = self.op.primary_ip = dns_data.ip
2088
    secondary_ip = getattr(self.op, "secondary_ip", None)
2089
    if secondary_ip is None:
2090
      secondary_ip = primary_ip
2091
    if not utils.IsValidIP(secondary_ip):
2092
      raise errors.OpPrereqError("Invalid secondary IP given")
2093
    self.op.secondary_ip = secondary_ip
2094

    
2095
    node_list = cfg.GetNodeList()
2096
    if not self.op.readd and node in node_list:
2097
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2098
                                 node)
2099
    elif self.op.readd and node not in node_list:
2100
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2101

    
2102
    for existing_node_name in node_list:
2103
      existing_node = cfg.GetNodeInfo(existing_node_name)
2104

    
2105
      if self.op.readd and node == existing_node_name:
2106
        if (existing_node.primary_ip != primary_ip or
2107
            existing_node.secondary_ip != secondary_ip):
2108
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2109
                                     " address configuration as before")
2110
        continue
2111

    
2112
      if (existing_node.primary_ip == primary_ip or
2113
          existing_node.secondary_ip == primary_ip or
2114
          existing_node.primary_ip == secondary_ip or
2115
          existing_node.secondary_ip == secondary_ip):
2116
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2117
                                   " existing node %s" % existing_node.name)
2118

    
2119
    # check that the type of the node (single versus dual homed) is the
2120
    # same as for the master
2121
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2122
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2123
    newbie_singlehomed = secondary_ip == primary_ip
2124
    if master_singlehomed != newbie_singlehomed:
2125
      if master_singlehomed:
2126
        raise errors.OpPrereqError("The master has no private ip but the"
2127
                                   " new node has one")
2128
      else:
2129
        raise errors.OpPrereqError("The master has a private ip but the"
2130
                                   " new node doesn't have one")
2131

    
2132
    # checks reachablity
2133
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2134
      raise errors.OpPrereqError("Node not reachable by ping")
2135

    
2136
    if not newbie_singlehomed:
2137
      # check reachability from my secondary ip to newbie's secondary ip
2138
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2139
                           source=myself.secondary_ip):
2140
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2141
                                   " based ping to noded port")
2142

    
2143
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2144
    mc_now, _ = self.cfg.GetMasterCandidateStats()
2145
    master_candidate = mc_now < cp_size
2146

    
2147
    self.new_node = objects.Node(name=node,
2148
                                 primary_ip=primary_ip,
2149
                                 secondary_ip=secondary_ip,
2150
                                 master_candidate=master_candidate,
2151
                                 offline=False, drained=False)
2152

    
2153
  def Exec(self, feedback_fn):
2154
    """Adds the new node to the cluster.
2155

2156
    """
2157
    new_node = self.new_node
2158
    node = new_node.name
2159

    
2160
    # check connectivity
2161
    result = self.rpc.call_version([node])[node]
2162
    result.Raise()
2163
    if result.data:
2164
      if constants.PROTOCOL_VERSION == result.data:
2165
        logging.info("Communication to node %s fine, sw version %s match",
2166
                     node, result.data)
2167
      else:
2168
        raise errors.OpExecError("Version mismatch master version %s,"
2169
                                 " node version %s" %
2170
                                 (constants.PROTOCOL_VERSION, result.data))
2171
    else:
2172
      raise errors.OpExecError("Cannot get version from the new node")
2173

    
2174
    # setup ssh on node
2175
    logging.info("Copy ssh key to node %s", node)
2176
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2177
    keyarray = []
2178
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2179
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2180
                priv_key, pub_key]
2181

    
2182
    for i in keyfiles:
2183
      f = open(i, 'r')
2184
      try:
2185
        keyarray.append(f.read())
2186
      finally:
2187
        f.close()
2188

    
2189
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2190
                                    keyarray[2],
2191
                                    keyarray[3], keyarray[4], keyarray[5])
2192

    
2193
    msg = result.RemoteFailMsg()
2194
    if msg:
2195
      raise errors.OpExecError("Cannot transfer ssh keys to the"
2196
                               " new node: %s" % msg)
2197

    
2198
    # Add node to our /etc/hosts, and add key to known_hosts
2199
    utils.AddHostToEtcHosts(new_node.name)
2200

    
2201
    if new_node.secondary_ip != new_node.primary_ip:
2202
      result = self.rpc.call_node_has_ip_address(new_node.name,
2203
                                                 new_node.secondary_ip)
2204
      if result.failed or not result.data:
2205
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2206
                                 " you gave (%s). Please fix and re-run this"
2207
                                 " command." % new_node.secondary_ip)
2208

    
2209
    node_verify_list = [self.cfg.GetMasterNode()]
2210
    node_verify_param = {
2211
      'nodelist': [node],
2212
      # TODO: do a node-net-test as well?
2213
    }
2214

    
2215
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2216
                                       self.cfg.GetClusterName())
2217
    for verifier in node_verify_list:
2218
      if result[verifier].failed or not result[verifier].data:
2219
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2220
                                 " for remote verification" % verifier)
2221
      if result[verifier].data['nodelist']:
2222
        for failed in result[verifier].data['nodelist']:
2223
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2224
                      (verifier, result[verifier].data['nodelist'][failed]))
2225
        raise errors.OpExecError("ssh/hostname verification failed.")
2226

    
2227
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2228
    # including the node just added
2229
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2230
    dist_nodes = self.cfg.GetNodeList()
2231
    if not self.op.readd:
2232
      dist_nodes.append(node)
2233
    if myself.name in dist_nodes:
2234
      dist_nodes.remove(myself.name)
2235

    
2236
    logging.debug("Copying hosts and known_hosts to all nodes")
2237
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2238
      result = self.rpc.call_upload_file(dist_nodes, fname)
2239
      for to_node, to_result in result.iteritems():
2240
        if to_result.failed or not to_result.data:
2241
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2242

    
2243
    to_copy = []
2244
    enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2245
    if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2246
      to_copy.append(constants.VNC_PASSWORD_FILE)
2247

    
2248
    for fname in to_copy:
2249
      result = self.rpc.call_upload_file([node], fname)
2250
      if result[node].failed or not result[node]:
2251
        logging.error("Could not copy file %s to node %s", fname, node)
2252

    
2253
    if self.op.readd:
2254
      self.context.ReaddNode(new_node)
2255
    else:
2256
      self.context.AddNode(new_node)
2257

    
2258

    
2259
class LUSetNodeParams(LogicalUnit):
2260
  """Modifies the parameters of a node.
2261

2262
  """
2263
  HPATH = "node-modify"
2264
  HTYPE = constants.HTYPE_NODE
2265
  _OP_REQP = ["node_name"]
2266
  REQ_BGL = False
2267

    
2268
  def CheckArguments(self):
2269
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2270
    if node_name is None:
2271
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2272
    self.op.node_name = node_name
2273
    _CheckBooleanOpField(self.op, 'master_candidate')
2274
    _CheckBooleanOpField(self.op, 'offline')
2275
    _CheckBooleanOpField(self.op, 'drained')
2276
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2277
    if all_mods.count(None) == 3:
2278
      raise errors.OpPrereqError("Please pass at least one modification")
2279
    if all_mods.count(True) > 1:
2280
      raise errors.OpPrereqError("Can't set the node into more than one"
2281
                                 " state at the same time")
2282

    
2283
  def ExpandNames(self):
2284
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2285

    
2286
  def BuildHooksEnv(self):
2287
    """Build hooks env.
2288

2289
    This runs on the master node.
2290

2291
    """
2292
    env = {
2293
      "OP_TARGET": self.op.node_name,
2294
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2295
      "OFFLINE": str(self.op.offline),
2296
      "DRAINED": str(self.op.drained),
2297
      }
2298
    nl = [self.cfg.GetMasterNode(),
2299
          self.op.node_name]
2300
    return env, nl, nl
2301

    
2302
  def CheckPrereq(self):
2303
    """Check prerequisites.
2304

2305
    This only checks the instance list against the existing names.
2306

2307
    """
2308
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2309

    
2310
    if ((self.op.master_candidate == False or self.op.offline == True or
2311
         self.op.drained == True) and node.master_candidate):
2312
      # we will demote the node from master_candidate
2313
      if self.op.node_name == self.cfg.GetMasterNode():
2314
        raise errors.OpPrereqError("The master node has to be a"
2315
                                   " master candidate, online and not drained")
2316
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2317
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2318
      if num_candidates <= cp_size:
2319
        msg = ("Not enough master candidates (desired"
2320
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2321
        if self.op.force:
2322
          self.LogWarning(msg)
2323
        else:
2324
          raise errors.OpPrereqError(msg)
2325

    
2326
    if (self.op.master_candidate == True and
2327
        ((node.offline and not self.op.offline == False) or
2328
         (node.drained and not self.op.drained == False))):
2329
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2330
                                 " to master_candidate")
2331

    
2332
    return
2333

    
2334
  def Exec(self, feedback_fn):
2335
    """Modifies a node.
2336

2337
    """
2338
    node = self.node
2339

    
2340
    result = []
2341
    changed_mc = False
2342

    
2343
    if self.op.offline is not None:
2344
      node.offline = self.op.offline
2345
      result.append(("offline", str(self.op.offline)))
2346
      if self.op.offline == True:
2347
        if node.master_candidate:
2348
          node.master_candidate = False
2349
          changed_mc = True
2350
          result.append(("master_candidate", "auto-demotion due to offline"))
2351
        if node.drained:
2352
          node.drained = False
2353
          result.append(("drained", "clear drained status due to offline"))
2354

    
2355
    if self.op.master_candidate is not None:
2356
      node.master_candidate = self.op.master_candidate
2357
      changed_mc = True
2358
      result.append(("master_candidate", str(self.op.master_candidate)))
2359
      if self.op.master_candidate == False:
2360
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2361
        msg = rrc.RemoteFailMsg()
2362
        if msg:
2363
          self.LogWarning("Node failed to demote itself: %s" % msg)
2364

    
2365
    if self.op.drained is not None:
2366
      node.drained = self.op.drained
2367
      result.append(("drained", str(self.op.drained)))
2368
      if self.op.drained == True:
2369
        if node.master_candidate:
2370
          node.master_candidate = False
2371
          changed_mc = True
2372
          result.append(("master_candidate", "auto-demotion due to drain"))
2373
        if node.offline:
2374
          node.offline = False
2375
          result.append(("offline", "clear offline status due to drain"))
2376

    
2377
    # this will trigger configuration file update, if needed
2378
    self.cfg.Update(node)
2379
    # this will trigger job queue propagation or cleanup
2380
    if changed_mc:
2381
      self.context.ReaddNode(node)
2382

    
2383
    return result
2384

    
2385

    
2386
class LUQueryClusterInfo(NoHooksLU):
2387
  """Query cluster configuration.
2388

2389
  """
2390
  _OP_REQP = []
2391
  REQ_BGL = False
2392

    
2393
  def ExpandNames(self):
2394
    self.needed_locks = {}
2395

    
2396
  def CheckPrereq(self):
2397
    """No prerequsites needed for this LU.
2398

2399
    """
2400
    pass
2401

    
2402
  def Exec(self, feedback_fn):
2403
    """Return cluster config.
2404

2405
    """
2406
    cluster = self.cfg.GetClusterInfo()
2407
    result = {
2408
      "software_version": constants.RELEASE_VERSION,
2409
      "protocol_version": constants.PROTOCOL_VERSION,
2410
      "config_version": constants.CONFIG_VERSION,
2411
      "os_api_version": constants.OS_API_VERSION,
2412
      "export_version": constants.EXPORT_VERSION,
2413
      "architecture": (platform.architecture()[0], platform.machine()),
2414
      "name": cluster.cluster_name,
2415
      "master": cluster.master_node,
2416
      "default_hypervisor": cluster.default_hypervisor,
2417
      "enabled_hypervisors": cluster.enabled_hypervisors,
2418
      "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2419
                        for hypervisor in cluster.enabled_hypervisors]),
2420
      "beparams": cluster.beparams,
2421
      "candidate_pool_size": cluster.candidate_pool_size,
2422
      }
2423

    
2424
    return result
2425

    
2426

    
2427
class LUQueryConfigValues(NoHooksLU):
2428
  """Return configuration values.
2429

2430
  """
2431
  _OP_REQP = []
2432
  REQ_BGL = False
2433
  _FIELDS_DYNAMIC = utils.FieldSet()
2434
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2435

    
2436
  def ExpandNames(self):
2437
    self.needed_locks = {}
2438

    
2439
    _CheckOutputFields(static=self._FIELDS_STATIC,
2440
                       dynamic=self._FIELDS_DYNAMIC,
2441
                       selected=self.op.output_fields)
2442

    
2443
  def CheckPrereq(self):
2444
    """No prerequisites.
2445

2446
    """
2447
    pass
2448

    
2449
  def Exec(self, feedback_fn):
2450
    """Dump a representation of the cluster config to the standard output.
2451

2452
    """
2453
    values = []
2454
    for field in self.op.output_fields:
2455
      if field == "cluster_name":
2456
        entry = self.cfg.GetClusterName()
2457
      elif field == "master_node":
2458
        entry = self.cfg.GetMasterNode()
2459
      elif field == "drain_flag":
2460
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2461
      else:
2462
        raise errors.ParameterError(field)
2463
      values.append(entry)
2464
    return values
2465

    
2466

    
2467
class LUActivateInstanceDisks(NoHooksLU):
2468
  """Bring up an instance's disks.
2469

2470
  """
2471
  _OP_REQP = ["instance_name"]
2472
  REQ_BGL = False
2473

    
2474
  def ExpandNames(self):
2475
    self._ExpandAndLockInstance()
2476
    self.needed_locks[locking.LEVEL_NODE] = []
2477
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2478

    
2479
  def DeclareLocks(self, level):
2480
    if level == locking.LEVEL_NODE:
2481
      self._LockInstancesNodes()
2482

    
2483
  def CheckPrereq(self):
2484
    """Check prerequisites.
2485

2486
    This checks that the instance is in the cluster.
2487

2488
    """
2489
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2490
    assert self.instance is not None, \
2491
      "Cannot retrieve locked instance %s" % self.op.instance_name
2492
    _CheckNodeOnline(self, self.instance.primary_node)
2493

    
2494
  def Exec(self, feedback_fn):
2495
    """Activate the disks.
2496

2497
    """
2498
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2499
    if not disks_ok:
2500
      raise errors.OpExecError("Cannot activate block devices")
2501

    
2502
    return disks_info
2503

    
2504

    
2505
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2506
  """Prepare the block devices for an instance.
2507

2508
  This sets up the block devices on all nodes.
2509

2510
  @type lu: L{LogicalUnit}
2511
  @param lu: the logical unit on whose behalf we execute
2512
  @type instance: L{objects.Instance}
2513
  @param instance: the instance for whose disks we assemble
2514
  @type ignore_secondaries: boolean
2515
  @param ignore_secondaries: if true, errors on secondary nodes
2516
      won't result in an error return from the function
2517
  @return: False if the operation failed, otherwise a list of
2518
      (host, instance_visible_name, node_visible_name)
2519
      with the mapping from node devices to instance devices
2520

2521
  """
2522
  device_info = []
2523
  disks_ok = True
2524
  iname = instance.name
2525
  # With the two passes mechanism we try to reduce the window of
2526
  # opportunity for the race condition of switching DRBD to primary
2527
  # before handshaking occured, but we do not eliminate it
2528

    
2529
  # The proper fix would be to wait (with some limits) until the
2530
  # connection has been made and drbd transitions from WFConnection
2531
  # into any other network-connected state (Connected, SyncTarget,
2532
  # SyncSource, etc.)
2533

    
2534
  # 1st pass, assemble on all nodes in secondary mode
2535
  for inst_disk in instance.disks:
2536
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2537
      lu.cfg.SetDiskID(node_disk, node)
2538
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2539
      msg = result.RemoteFailMsg()
2540
      if msg:
2541
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2542
                           " (is_primary=False, pass=1): %s",
2543
                           inst_disk.iv_name, node, msg)
2544
        if not ignore_secondaries:
2545
          disks_ok = False
2546

    
2547
  # FIXME: race condition on drbd migration to primary
2548

    
2549
  # 2nd pass, do only the primary node
2550
  for inst_disk in instance.disks:
2551
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2552
      if node != instance.primary_node:
2553
        continue
2554
      lu.cfg.SetDiskID(node_disk, node)
2555
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2556
      msg = result.RemoteFailMsg()
2557
      if msg:
2558
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2559
                           " (is_primary=True, pass=2): %s",
2560
                           inst_disk.iv_name, node, msg)
2561
        disks_ok = False
2562
    device_info.append((instance.primary_node, inst_disk.iv_name,
2563
                        result.payload))
2564

    
2565
  # leave the disks configured for the primary node
2566
  # this is a workaround that would be fixed better by
2567
  # improving the logical/physical id handling
2568
  for disk in instance.disks:
2569
    lu.cfg.SetDiskID(disk, instance.primary_node)
2570

    
2571
  return disks_ok, device_info
2572

    
2573

    
2574
def _StartInstanceDisks(lu, instance, force):
2575
  """Start the disks of an instance.
2576

2577
  """
2578
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2579
                                           ignore_secondaries=force)
2580
  if not disks_ok:
2581
    _ShutdownInstanceDisks(lu, instance)
2582
    if force is not None and not force:
2583
      lu.proc.LogWarning("", hint="If the message above refers to a"
2584
                         " secondary node,"
2585
                         " you can retry the operation using '--force'.")
2586
    raise errors.OpExecError("Disk consistency error")
2587

    
2588

    
2589
class LUDeactivateInstanceDisks(NoHooksLU):
2590
  """Shutdown an instance's disks.
2591

2592
  """
2593
  _OP_REQP = ["instance_name"]
2594
  REQ_BGL = False
2595

    
2596
  def ExpandNames(self):
2597
    self._ExpandAndLockInstance()
2598
    self.needed_locks[locking.LEVEL_NODE] = []
2599
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2600

    
2601
  def DeclareLocks(self, level):
2602
    if level == locking.LEVEL_NODE:
2603
      self._LockInstancesNodes()
2604

    
2605
  def CheckPrereq(self):
2606
    """Check prerequisites.
2607

2608
    This checks that the instance is in the cluster.
2609

2610
    """
2611
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2612
    assert self.instance is not None, \
2613
      "Cannot retrieve locked instance %s" % self.op.instance_name
2614

    
2615
  def Exec(self, feedback_fn):
2616
    """Deactivate the disks
2617

2618
    """
2619
    instance = self.instance
2620
    _SafeShutdownInstanceDisks(self, instance)
2621

    
2622

    
2623
def _SafeShutdownInstanceDisks(lu, instance):
2624
  """Shutdown block devices of an instance.
2625

2626
  This function checks if an instance is running, before calling
2627
  _ShutdownInstanceDisks.
2628

2629
  """
2630
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2631
                                      [instance.hypervisor])
2632
  ins_l = ins_l[instance.primary_node]
2633
  if ins_l.failed or not isinstance(ins_l.data, list):
2634
    raise errors.OpExecError("Can't contact node '%s'" %
2635
                             instance.primary_node)
2636

    
2637
  if instance.name in ins_l.data:
2638
    raise errors.OpExecError("Instance is running, can't shutdown"
2639
                             " block devices.")
2640

    
2641
  _ShutdownInstanceDisks(lu, instance)
2642

    
2643

    
2644
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2645
  """Shutdown block devices of an instance.
2646

2647
  This does the shutdown on all nodes of the instance.
2648

2649
  If the ignore_primary is false, errors on the primary node are
2650
  ignored.
2651

2652
  """
2653
  all_result = True
2654
  for disk in instance.disks:
2655
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2656
      lu.cfg.SetDiskID(top_disk, node)
2657
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2658
      msg = result.RemoteFailMsg()
2659
      if msg:
2660
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2661
                      disk.iv_name, node, msg)
2662
        if not ignore_primary or node != instance.primary_node:
2663
          all_result = False
2664
  return all_result
2665

    
2666

    
2667
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2668
  """Checks if a node has enough free memory.
2669

2670
  This function check if a given node has the needed amount of free
2671
  memory. In case the node has less memory or we cannot get the
2672
  information from the node, this function raise an OpPrereqError
2673
  exception.
2674

2675
  @type lu: C{LogicalUnit}
2676
  @param lu: a logical unit from which we get configuration data
2677
  @type node: C{str}
2678
  @param node: the node to check
2679
  @type reason: C{str}
2680
  @param reason: string to use in the error message
2681
  @type requested: C{int}
2682
  @param requested: the amount of memory in MiB to check for
2683
  @type hypervisor_name: C{str}
2684
  @param hypervisor_name: the hypervisor to ask for memory stats
2685
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2686
      we cannot check the node
2687

2688
  """
2689
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2690
  nodeinfo[node].Raise()
2691
  free_mem = nodeinfo[node].data.get('memory_free')
2692
  if not isinstance(free_mem, int):
2693
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2694
                             " was '%s'" % (node, free_mem))
2695
  if requested > free_mem:
2696
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2697
                             " needed %s MiB, available %s MiB" %
2698
                             (node, reason, requested, free_mem))
2699

    
2700

    
2701
class LUStartupInstance(LogicalUnit):
2702
  """Starts an instance.
2703

2704
  """
2705
  HPATH = "instance-start"
2706
  HTYPE = constants.HTYPE_INSTANCE
2707
  _OP_REQP = ["instance_name", "force"]
2708
  REQ_BGL = False
2709

    
2710
  def ExpandNames(self):
2711
    self._ExpandAndLockInstance()
2712

    
2713
  def BuildHooksEnv(self):
2714
    """Build hooks env.
2715

2716
    This runs on master, primary and secondary nodes of the instance.
2717

2718
    """
2719
    env = {
2720
      "FORCE": self.op.force,
2721
      }
2722
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2723
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2724
    return env, nl, nl
2725

    
2726
  def CheckPrereq(self):
2727
    """Check prerequisites.
2728

2729
    This checks that the instance is in the cluster.
2730

2731
    """
2732
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2733
    assert self.instance is not None, \
2734
      "Cannot retrieve locked instance %s" % self.op.instance_name
2735

    
2736
    _CheckNodeOnline(self, instance.primary_node)
2737

    
2738
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2739
    # check bridges existance
2740
    _CheckInstanceBridgesExist(self, instance)
2741

    
2742
    _CheckNodeFreeMemory(self, instance.primary_node,
2743
                         "starting instance %s" % instance.name,
2744
                         bep[constants.BE_MEMORY], instance.hypervisor)
2745

    
2746
  def Exec(self, feedback_fn):
2747
    """Start the instance.
2748

2749
    """
2750
    instance = self.instance
2751
    force = self.op.force
2752

    
2753
    self.cfg.MarkInstanceUp(instance.name)
2754

    
2755
    node_current = instance.primary_node
2756

    
2757
    _StartInstanceDisks(self, instance, force)
2758

    
2759
    result = self.rpc.call_instance_start(node_current, instance)
2760
    msg = result.RemoteFailMsg()
2761
    if msg:
2762
      _ShutdownInstanceDisks(self, instance)
2763
      raise errors.OpExecError("Could not start instance: %s" % msg)
2764

    
2765

    
2766
class LURebootInstance(LogicalUnit):
2767
  """Reboot an instance.
2768

2769
  """
2770
  HPATH = "instance-reboot"
2771
  HTYPE = constants.HTYPE_INSTANCE
2772
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2773
  REQ_BGL = False
2774

    
2775
  def ExpandNames(self):
2776
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2777
                                   constants.INSTANCE_REBOOT_HARD,
2778
                                   constants.INSTANCE_REBOOT_FULL]:
2779
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2780
                                  (constants.INSTANCE_REBOOT_SOFT,
2781
                                   constants.INSTANCE_REBOOT_HARD,
2782
                                   constants.INSTANCE_REBOOT_FULL))
2783
    self._ExpandAndLockInstance()
2784

    
2785
  def BuildHooksEnv(self):
2786
    """Build hooks env.
2787

2788
    This runs on master, primary and secondary nodes of the instance.
2789

2790
    """
2791
    env = {
2792
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2793
      "REBOOT_TYPE": self.op.reboot_type,
2794
      }
2795
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2796
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2797
    return env, nl, nl
2798

    
2799
  def CheckPrereq(self):
2800
    """Check prerequisites.
2801

2802
    This checks that the instance is in the cluster.
2803

2804
    """
2805
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2806
    assert self.instance is not None, \
2807
      "Cannot retrieve locked instance %s" % self.op.instance_name
2808

    
2809
    _CheckNodeOnline(self, instance.primary_node)
2810

    
2811
    # check bridges existance
2812
    _CheckInstanceBridgesExist(self, instance)
2813

    
2814
  def Exec(self, feedback_fn):
2815
    """Reboot the instance.
2816

2817
    """
2818
    instance = self.instance
2819
    ignore_secondaries = self.op.ignore_secondaries
2820
    reboot_type = self.op.reboot_type
2821

    
2822
    node_current = instance.primary_node
2823

    
2824
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2825
                       constants.INSTANCE_REBOOT_HARD]:
2826
      for disk in instance.disks:
2827
        self.cfg.SetDiskID(disk, node_current)
2828
      result = self.rpc.call_instance_reboot(node_current, instance,
2829
                                             reboot_type)
2830
      msg = result.RemoteFailMsg()
2831
      if msg:
2832
        raise errors.OpExecError("Could not reboot instance: %s" % msg)
2833
    else:
2834
      result = self.rpc.call_instance_shutdown(node_current, instance)
2835
      msg = result.RemoteFailMsg()
2836
      if msg:
2837
        raise errors.OpExecError("Could not shutdown instance for"
2838
                                 " full reboot: %s" % msg)
2839
      _ShutdownInstanceDisks(self, instance)
2840
      _StartInstanceDisks(self, instance, ignore_secondaries)
2841
      result = self.rpc.call_instance_start(node_current, instance)
2842
      msg = result.RemoteFailMsg()
2843
      if msg:
2844
        _ShutdownInstanceDisks(self, instance)
2845
        raise errors.OpExecError("Could not start instance for"
2846
                                 " full reboot: %s" % msg)
2847

    
2848
    self.cfg.MarkInstanceUp(instance.name)
2849

    
2850

    
2851
class LUShutdownInstance(LogicalUnit):
2852
  """Shutdown an instance.
2853

2854
  """
2855
  HPATH = "instance-stop"
2856
  HTYPE = constants.HTYPE_INSTANCE
2857
  _OP_REQP = ["instance_name"]
2858
  REQ_BGL = False
2859

    
2860
  def ExpandNames(self):
2861
    self._ExpandAndLockInstance()
2862

    
2863
  def BuildHooksEnv(self):
2864
    """Build hooks env.
2865

2866
    This runs on master, primary and secondary nodes of the instance.
2867

2868
    """
2869
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2870
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2871
    return env, nl, nl
2872

    
2873
  def CheckPrereq(self):
2874
    """Check prerequisites.
2875

2876
    This checks that the instance is in the cluster.
2877

2878
    """
2879
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2880
    assert self.instance is not None, \
2881
      "Cannot retrieve locked instance %s" % self.op.instance_name
2882
    _CheckNodeOnline(self, self.instance.primary_node)
2883

    
2884
  def Exec(self, feedback_fn):
2885
    """Shutdown the instance.
2886

2887
    """
2888
    instance = self.instance
2889
    node_current = instance.primary_node
2890
    self.cfg.MarkInstanceDown(instance.name)
2891
    result = self.rpc.call_instance_shutdown(node_current, instance)
2892
    msg = result.RemoteFailMsg()
2893
    if msg:
2894
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2895

    
2896
    _ShutdownInstanceDisks(self, instance)
2897

    
2898

    
2899
class LUReinstallInstance(LogicalUnit):
2900
  """Reinstall an instance.
2901

2902
  """
2903
  HPATH = "instance-reinstall"
2904
  HTYPE = constants.HTYPE_INSTANCE
2905
  _OP_REQP = ["instance_name"]
2906
  REQ_BGL = False
2907

    
2908
  def ExpandNames(self):
2909
    self._ExpandAndLockInstance()
2910

    
2911
  def BuildHooksEnv(self):
2912
    """Build hooks env.
2913

2914
    This runs on master, primary and secondary nodes of the instance.
2915

2916
    """
2917
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2918
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2919
    return env, nl, nl
2920

    
2921
  def CheckPrereq(self):
2922
    """Check prerequisites.
2923

2924
    This checks that the instance is in the cluster and is not running.
2925

2926
    """
2927
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2928
    assert instance is not None, \
2929
      "Cannot retrieve locked instance %s" % self.op.instance_name
2930
    _CheckNodeOnline(self, instance.primary_node)
2931

    
2932
    if instance.disk_template == constants.DT_DISKLESS:
2933
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2934
                                 self.op.instance_name)
2935
    if instance.admin_up:
2936
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2937
                                 self.op.instance_name)
2938
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2939
                                              instance.name,
2940
                                              instance.hypervisor)
2941
    if remote_info.failed or remote_info.data:
2942
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2943
                                 (self.op.instance_name,
2944
                                  instance.primary_node))
2945

    
2946
    self.op.os_type = getattr(self.op, "os_type", None)
2947
    if self.op.os_type is not None:
2948
      # OS verification
2949
      pnode = self.cfg.GetNodeInfo(
2950
        self.cfg.ExpandNodeName(instance.primary_node))
2951
      if pnode is None:
2952
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2953
                                   self.op.pnode)
2954
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2955
      result.Raise()
2956
      if not isinstance(result.data, objects.OS):
2957
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2958
                                   " primary node"  % self.op.os_type)
2959

    
2960
    self.instance = instance
2961

    
2962
  def Exec(self, feedback_fn):
2963
    """Reinstall the instance.
2964

2965
    """
2966
    inst = self.instance
2967

    
2968
    if self.op.os_type is not None:
2969
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2970
      inst.os = self.op.os_type
2971
      self.cfg.Update(inst)
2972

    
2973
    _StartInstanceDisks(self, inst, None)
2974
    try:
2975
      feedback_fn("Running the instance OS create scripts...")
2976
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2977
      msg = result.RemoteFailMsg()
2978
      if msg:
2979
        raise errors.OpExecError("Could not install OS for instance %s"
2980
                                 " on node %s: %s" %
2981
                                 (inst.name, inst.primary_node, msg))
2982
    finally:
2983
      _ShutdownInstanceDisks(self, inst)
2984

    
2985

    
2986
class LURenameInstance(LogicalUnit):
2987
  """Rename an instance.
2988

2989
  """
2990
  HPATH = "instance-rename"
2991
  HTYPE = constants.HTYPE_INSTANCE
2992
  _OP_REQP = ["instance_name", "new_name"]
2993

    
2994
  def BuildHooksEnv(self):
2995
    """Build hooks env.
2996

2997
    This runs on master, primary and secondary nodes of the instance.
2998

2999
    """
3000
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3001
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3002
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3003
    return env, nl, nl
3004

    
3005
  def CheckPrereq(self):
3006
    """Check prerequisites.
3007

3008
    This checks that the instance is in the cluster and is not running.
3009

3010
    """
3011
    instance = self.cfg.GetInstanceInfo(
3012
      self.cfg.ExpandInstanceName(self.op.instance_name))
3013
    if instance is None:
3014
      raise errors.OpPrereqError("Instance '%s' not known" %
3015
                                 self.op.instance_name)
3016
    _CheckNodeOnline(self, instance.primary_node)
3017

    
3018
    if instance.admin_up:
3019
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3020
                                 self.op.instance_name)
3021
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3022
                                              instance.name,
3023
                                              instance.hypervisor)
3024
    remote_info.Raise()
3025
    if remote_info.data:
3026
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3027
                                 (self.op.instance_name,
3028
                                  instance.primary_node))
3029
    self.instance = instance
3030

    
3031
    # new name verification
3032
    name_info = utils.HostInfo(self.op.new_name)
3033

    
3034
    self.op.new_name = new_name = name_info.name
3035
    instance_list = self.cfg.GetInstanceList()
3036
    if new_name in instance_list:
3037
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3038
                                 new_name)
3039

    
3040
    if not getattr(self.op, "ignore_ip", False):
3041
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3042
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3043
                                   (name_info.ip, new_name))
3044

    
3045

    
3046
  def Exec(self, feedback_fn):
3047
    """Reinstall the instance.
3048

3049
    """
3050
    inst = self.instance
3051
    old_name = inst.name
3052

    
3053
    if inst.disk_template == constants.DT_FILE:
3054
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3055

    
3056
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3057
    # Change the instance lock. This is definitely safe while we hold the BGL
3058
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3059
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3060

    
3061
    # re-read the instance from the configuration after rename
3062
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3063

    
3064
    if inst.disk_template == constants.DT_FILE:
3065
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3066
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3067
                                                     old_file_storage_dir,
3068
                                                     new_file_storage_dir)
3069
      result.Raise()
3070
      if not result.data:
3071
        raise errors.OpExecError("Could not connect to node '%s' to rename"
3072
                                 " directory '%s' to '%s' (but the instance"
3073
                                 " has been renamed in Ganeti)" % (
3074
                                 inst.primary_node, old_file_storage_dir,
3075
                                 new_file_storage_dir))
3076

    
3077
      if not result.data[0]:
3078
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3079
                                 " (but the instance has been renamed in"
3080
                                 " Ganeti)" % (old_file_storage_dir,
3081
                                               new_file_storage_dir))
3082

    
3083
    _StartInstanceDisks(self, inst, None)
3084
    try:
3085
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3086
                                                 old_name)
3087
      msg = result.RemoteFailMsg()
3088
      if msg:
3089
        msg = ("Could not run OS rename script for instance %s on node %s"
3090
               " (but the instance has been renamed in Ganeti): %s" %
3091
               (inst.name, inst.primary_node, msg))
3092
        self.proc.LogWarning(msg)
3093
    finally:
3094
      _ShutdownInstanceDisks(self, inst)
3095

    
3096

    
3097
class LURemoveInstance(LogicalUnit):
3098
  """Remove an instance.
3099

3100
  """
3101
  HPATH = "instance-remove"
3102
  HTYPE = constants.HTYPE_INSTANCE
3103
  _OP_REQP = ["instance_name", "ignore_failures"]
3104
  REQ_BGL = False
3105

    
3106
  def ExpandNames(self):
3107
    self._ExpandAndLockInstance()
3108
    self.needed_locks[locking.LEVEL_NODE] = []
3109
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3110

    
3111
  def DeclareLocks(self, level):
3112
    if level == locking.LEVEL_NODE:
3113
      self._LockInstancesNodes()
3114

    
3115
  def BuildHooksEnv(self):
3116
    """Build hooks env.
3117

3118
    This runs on master, primary and secondary nodes of the instance.
3119

3120
    """
3121
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3122
    nl = [self.cfg.GetMasterNode()]
3123
    return env, nl, nl
3124

    
3125
  def CheckPrereq(self):
3126
    """Check prerequisites.
3127

3128
    This checks that the instance is in the cluster.
3129

3130
    """
3131
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3132
    assert self.instance is not None, \
3133
      "Cannot retrieve locked instance %s" % self.op.instance_name
3134

    
3135
  def Exec(self, feedback_fn):
3136
    """Remove the instance.
3137

3138
    """
3139
    instance = self.instance
3140
    logging.info("Shutting down instance %s on node %s",
3141
                 instance.name, instance.primary_node)
3142

    
3143
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3144
    msg = result.RemoteFailMsg()
3145
    if msg:
3146
      if self.op.ignore_failures:
3147
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3148
      else:
3149
        raise errors.OpExecError("Could not shutdown instance %s on"
3150
                                 " node %s: %s" %
3151
                                 (instance.name, instance.primary_node, msg))
3152

    
3153
    logging.info("Removing block devices for instance %s", instance.name)
3154

    
3155
    if not _RemoveDisks(self, instance):
3156
      if self.op.ignore_failures:
3157
        feedback_fn("Warning: can't remove instance's disks")
3158
      else:
3159
        raise errors.OpExecError("Can't remove instance's disks")
3160

    
3161
    logging.info("Removing instance %s out of cluster config", instance.name)
3162

    
3163
    self.cfg.RemoveInstance(instance.name)
3164
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3165

    
3166

    
3167
class LUQueryInstances(NoHooksLU):
3168
  """Logical unit for querying instances.
3169

3170
  """
3171
  _OP_REQP = ["output_fields", "names", "use_locking"]
3172
  REQ_BGL = False
3173
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3174
                                    "admin_state",
3175
                                    "disk_template", "ip", "mac", "bridge",
3176
                                    "sda_size", "sdb_size", "vcpus", "tags",
3177
                                    "network_port", "beparams",
3178
                                    r"(disk)\.(size)/([0-9]+)",
3179
                                    r"(disk)\.(sizes)", "disk_usage",
3180
                                    r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3181
                                    r"(nic)\.(macs|ips|bridges)",
3182
                                    r"(disk|nic)\.(count)",
3183
                                    "serial_no", "hypervisor", "hvparams",] +
3184
                                  ["hv/%s" % name
3185
                                   for name in constants.HVS_PARAMETERS] +
3186
                                  ["be/%s" % name
3187
                                   for name in constants.BES_PARAMETERS])
3188
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3189

    
3190

    
3191
  def ExpandNames(self):
3192
    _CheckOutputFields(static=self._FIELDS_STATIC,
3193
                       dynamic=self._FIELDS_DYNAMIC,
3194
                       selected=self.op.output_fields)
3195

    
3196
    self.needed_locks = {}
3197
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3198
    self.share_locks[locking.LEVEL_NODE] = 1
3199

    
3200
    if self.op.names:
3201
      self.wanted = _GetWantedInstances(self, self.op.names)
3202
    else:
3203
      self.wanted = locking.ALL_SET
3204

    
3205
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3206
    self.do_locking = self.do_node_query and self.op.use_locking
3207
    if self.do_locking:
3208
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3209
      self.needed_locks[locking.LEVEL_NODE] = []
3210
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3211

    
3212
  def DeclareLocks(self, level):
3213
    if level == locking.LEVEL_NODE and self.do_locking:
3214
      self._LockInstancesNodes()
3215

    
3216
  def CheckPrereq(self):
3217
    """Check prerequisites.
3218

3219
    """
3220
    pass
3221

    
3222
  def Exec(self, feedback_fn):
3223
    """Computes the list of nodes and their attributes.
3224

3225
    """
3226
    all_info = self.cfg.GetAllInstancesInfo()
3227
    if self.wanted == locking.ALL_SET:
3228
      # caller didn't specify instance names, so ordering is not important
3229
      if self.do_locking:
3230
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3231
      else:
3232
        instance_names = all_info.keys()
3233
      instance_names = utils.NiceSort(instance_names)
3234
    else:
3235
      # caller did specify names, so we must keep the ordering
3236
      if self.do_locking:
3237
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3238
      else:
3239
        tgt_set = all_info.keys()
3240
      missing = set(self.wanted).difference(tgt_set)
3241
      if missing:
3242
        raise errors.OpExecError("Some instances were removed before"
3243
                                 " retrieving their data: %s" % missing)
3244
      instance_names = self.wanted
3245

    
3246
    instance_list = [all_info[iname] for iname in instance_names]
3247

    
3248
    # begin data gathering
3249

    
3250
    nodes = frozenset([inst.primary_node for inst in instance_list])
3251
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3252

    
3253
    bad_nodes = []
3254
    off_nodes = []
3255
    if self.do_node_query:
3256
      live_data = {}
3257
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3258
      for name in nodes:
3259
        result = node_data[name]
3260
        if result.offline:
3261
          # offline nodes will be in both lists
3262
          off_nodes.append(name)
3263
        if result.failed:
3264
          bad_nodes.append(name)
3265
        else:
3266
          if result.data:
3267
            live_data.update(result.data)
3268
            # else no instance is alive
3269
    else:
3270
      live_data = dict([(name, {}) for name in instance_names])
3271

    
3272
    # end data gathering
3273

    
3274
    HVPREFIX = "hv/"
3275
    BEPREFIX = "be/"
3276
    output = []
3277
    for instance in instance_list:
3278
      iout = []
3279
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3280
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3281
      for field in self.op.output_fields:
3282
        st_match = self._FIELDS_STATIC.Matches(field)
3283
        if field == "name":
3284
          val = instance.name
3285
        elif field == "os":
3286
          val = instance.os
3287
        elif field == "pnode":
3288
          val = instance.primary_node
3289
        elif field == "snodes":
3290
          val = list(instance.secondary_nodes)
3291
        elif field == "admin_state":
3292
          val = instance.admin_up
3293
        elif field == "oper_state":
3294
          if instance.primary_node in bad_nodes:
3295
            val = None
3296
          else:
3297
            val = bool(live_data.get(instance.name))
3298
        elif field == "status":
3299
          if instance.primary_node in off_nodes:
3300
            val = "ERROR_nodeoffline"
3301
          elif instance.primary_node in bad_nodes:
3302
            val = "ERROR_nodedown"
3303
          else:
3304
            running = bool(live_data.get(instance.name))
3305
            if running:
3306
              if instance.admin_up:
3307
                val = "running"
3308
              else:
3309
                val = "ERROR_up"
3310
            else:
3311
              if instance.admin_up:
3312
                val = "ERROR_down"
3313
              else:
3314
                val = "ADMIN_down"
3315
        elif field == "oper_ram":
3316
          if instance.primary_node in bad_nodes:
3317
            val = None
3318
          elif instance.name in live_data:
3319
            val = live_data[instance.name].get("memory", "?")
3320
          else:
3321
            val = "-"
3322
        elif field == "disk_template":
3323
          val = instance.disk_template
3324
        elif field == "ip":
3325
          val = instance.nics[0].ip
3326
        elif field == "bridge":
3327
          val = instance.nics[0].bridge
3328
        elif field == "mac":
3329
          val = instance.nics[0].mac
3330
        elif field == "sda_size" or field == "sdb_size":
3331
          idx = ord(field[2]) - ord('a')
3332
          try:
3333
            val = instance.FindDisk(idx).size
3334
          except errors.OpPrereqError:
3335
            val = None
3336
        elif field == "disk_usage": # total disk usage per node
3337
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3338
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3339
        elif field == "tags":
3340
          val = list(instance.GetTags())
3341
        elif field == "serial_no":
3342
          val = instance.serial_no
3343
        elif field == "network_port":
3344
          val = instance.network_port
3345
        elif field == "hypervisor":
3346
          val = instance.hypervisor
3347
        elif field == "hvparams":
3348
          val = i_hv
3349
        elif (field.startswith(HVPREFIX) and
3350
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3351
          val = i_hv.get(field[len(HVPREFIX):], None)
3352
        elif field == "beparams":
3353
          val = i_be
3354
        elif (field.startswith(BEPREFIX) and
3355
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3356
          val = i_be.get(field[len(BEPREFIX):], None)
3357
        elif st_match and st_match.groups():
3358
          # matches a variable list
3359
          st_groups = st_match.groups()
3360
          if st_groups and st_groups[0] == "disk":
3361
            if st_groups[1] == "count":
3362
              val = len(instance.disks)
3363
            elif st_groups[1] == "sizes":
3364
              val = [disk.size for disk in instance.disks]
3365
            elif st_groups[1] == "size":
3366
              try:
3367
                val = instance.FindDisk(st_groups[2]).size
3368
              except errors.OpPrereqError:
3369
                val = None
3370
            else:
3371
              assert False, "Unhandled disk parameter"
3372
          elif st_groups[0] == "nic":
3373
            if st_groups[1] == "count":
3374
              val = len(instance.nics)
3375
            elif st_groups[1] == "macs":
3376
              val = [nic.mac for nic in instance.nics]
3377
            elif st_groups[1] == "ips":
3378
              val = [nic.ip for nic in instance.nics]
3379
            elif st_groups[1] == "bridges":
3380
              val = [nic.bridge for nic in instance.nics]
3381
            else:
3382
              # index-based item
3383
              nic_idx = int(st_groups[2])
3384
              if nic_idx >= len(instance.nics):
3385
                val = None
3386
              else:
3387
                if st_groups[1] == "mac":
3388
                  val = instance.nics[nic_idx].mac
3389
                elif st_groups[1] == "ip":
3390
                  val = instance.nics[nic_idx].ip
3391
                elif st_groups[1] == "bridge":
3392
                  val = instance.nics[nic_idx].bridge
3393
                else:
3394
                  assert False, "Unhandled NIC parameter"
3395
          else:
3396
            assert False, "Unhandled variable parameter"
3397
        else:
3398
          raise errors.ParameterError(field)
3399
        iout.append(val)
3400
      output.append(iout)
3401

    
3402
    return output
3403

    
3404

    
3405
class LUFailoverInstance(LogicalUnit):
3406
  """Failover an instance.
3407

3408
  """
3409
  HPATH = "instance-failover"
3410
  HTYPE = constants.HTYPE_INSTANCE
3411
  _OP_REQP = ["instance_name", "ignore_consistency"]
3412
  REQ_BGL = False
3413

    
3414
  def ExpandNames(self):
3415
    self._ExpandAndLockInstance()
3416
    self.needed_locks[locking.LEVEL_NODE] = []
3417
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3418

    
3419
  def DeclareLocks(self, level):
3420
    if level == locking.LEVEL_NODE:
3421
      self._LockInstancesNodes()
3422

    
3423
  def BuildHooksEnv(self):
3424
    """Build hooks env.
3425

3426
    This runs on master, primary and secondary nodes of the instance.
3427

3428
    """
3429
    env = {
3430
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3431
      }
3432
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3433
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3434
    return env, nl, nl
3435

    
3436
  def CheckPrereq(self):
3437
    """Check prerequisites.
3438

3439
    This checks that the instance is in the cluster.
3440

3441
    """
3442
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3443
    assert self.instance is not None, \
3444
      "Cannot retrieve locked instance %s" % self.op.instance_name
3445

    
3446
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3447
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3448
      raise errors.OpPrereqError("Instance's disk layout is not"
3449
                                 " network mirrored, cannot failover.")
3450

    
3451
    secondary_nodes = instance.secondary_nodes
3452
    if not secondary_nodes:
3453
      raise errors.ProgrammerError("no secondary node but using "
3454
                                   "a mirrored disk template")
3455

    
3456
    target_node = secondary_nodes[0]
3457
    _CheckNodeOnline(self, target_node)
3458
    _CheckNodeNotDrained(self, target_node)
3459
    # check memory requirements on the secondary node
3460
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3461
                         instance.name, bep[constants.BE_MEMORY],
3462
                         instance.hypervisor)
3463

    
3464
    # check bridge existance
3465
    brlist = [nic.bridge for nic in instance.nics]
3466
    result = self.rpc.call_bridges_exist(target_node, brlist)
3467
    result.Raise()
3468
    if not result.data:
3469
      raise errors.OpPrereqError("One or more target bridges %s does not"
3470
                                 " exist on destination node '%s'" %
3471
                                 (brlist, target_node))
3472

    
3473
  def Exec(self, feedback_fn):
3474
    """Failover an instance.
3475

3476
    The failover is done by shutting it down on its present node and
3477
    starting it on the secondary.
3478

3479
    """
3480
    instance = self.instance
3481

    
3482
    source_node = instance.primary_node
3483
    target_node = instance.secondary_nodes[0]
3484

    
3485
    feedback_fn("* checking disk consistency between source and target")
3486
    for dev in instance.disks:
3487
      # for drbd, these are drbd over lvm
3488
      if not _CheckDiskConsistency(self, dev, target_node, False):
3489
        if instance.admin_up and not self.op.ignore_consistency:
3490
          raise errors.OpExecError("Disk %s is degraded on target node,"
3491
                                   " aborting failover." % dev.iv_name)
3492

    
3493
    feedback_fn("* shutting down instance on source node")
3494
    logging.info("Shutting down instance %s on node %s",
3495
                 instance.name, source_node)
3496

    
3497
    result = self.rpc.call_instance_shutdown(source_node, instance)
3498
    msg = result.RemoteFailMsg()
3499
    if msg:
3500
      if self.op.ignore_consistency:
3501
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3502
                             " Proceeding anyway. Please make sure node"
3503
                             " %s is down. Error details: %s",
3504
                             instance.name, source_node, source_node, msg)
3505
      else:
3506
        raise errors.OpExecError("Could not shutdown instance %s on"
3507
                                 " node %s: %s" %
3508
                                 (instance.name, source_node, msg))
3509

    
3510
    feedback_fn("* deactivating the instance's disks on source node")
3511
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3512
      raise errors.OpExecError("Can't shut down the instance's disks.")
3513

    
3514
    instance.primary_node = target_node
3515
    # distribute new instance config to the other nodes
3516
    self.cfg.Update(instance)
3517

    
3518
    # Only start the instance if it's marked as up
3519
    if instance.admin_up:
3520
      feedback_fn("* activating the instance's disks on target node")
3521
      logging.info("Starting instance %s on node %s",
3522
                   instance.name, target_node)
3523

    
3524
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3525
                                               ignore_secondaries=True)
3526
      if not disks_ok:
3527
        _ShutdownInstanceDisks(self, instance)
3528
        raise errors.OpExecError("Can't activate the instance's disks")
3529

    
3530
      feedback_fn("* starting the instance on the target node")
3531
      result = self.rpc.call_instance_start(target_node, instance)
3532
      msg = result.RemoteFailMsg()
3533
      if msg:
3534
        _ShutdownInstanceDisks(self, instance)
3535
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3536
                                 (instance.name, target_node, msg))
3537

    
3538

    
3539
class LUMigrateInstance(LogicalUnit):
3540
  """Migrate an instance.
3541

3542
  This is migration without shutting down, compared to the failover,
3543
  which is done with shutdown.
3544

3545
  """
3546
  HPATH = "instance-migrate"
3547
  HTYPE = constants.HTYPE_INSTANCE
3548
  _OP_REQP = ["instance_name", "live", "cleanup"]
3549

    
3550
  REQ_BGL = False
3551

    
3552
  def ExpandNames(self):
3553
    self._ExpandAndLockInstance()
3554
    self.needed_locks[locking.LEVEL_NODE] = []
3555
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3556

    
3557
  def DeclareLocks(self, level):
3558
    if level == locking.LEVEL_NODE:
3559
      self._LockInstancesNodes()
3560

    
3561
  def BuildHooksEnv(self):
3562
    """Build hooks env.
3563

3564
    This runs on master, primary and secondary nodes of the instance.
3565

3566
    """
3567
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3568
    env["MIGRATE_LIVE"] = self.op.live
3569
    env["MIGRATE_CLEANUP"] = self.op.cleanup
3570
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3571
    return env, nl, nl
3572

    
3573
  def CheckPrereq(self):
3574
    """Check prerequisites.
3575

3576
    This checks that the instance is in the cluster.
3577

3578
    """
3579
    instance = self.cfg.GetInstanceInfo(
3580
      self.cfg.ExpandInstanceName(self.op.instance_name))
3581
    if instance is None:
3582
      raise errors.OpPrereqError("Instance '%s' not known" %
3583
                                 self.op.instance_name)
3584

    
3585
    if instance.disk_template != constants.DT_DRBD8:
3586
      raise errors.OpPrereqError("Instance's disk layout is not"
3587
                                 " drbd8, cannot migrate.")
3588

    
3589
    secondary_nodes = instance.secondary_nodes
3590
    if not secondary_nodes:
3591
      raise errors.ConfigurationError("No secondary node but using"
3592
                                      " drbd8 disk template")
3593

    
3594
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3595

    
3596
    target_node = secondary_nodes[0]
3597
    # check memory requirements on the secondary node
3598
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3599
                         instance.name, i_be[constants.BE_MEMORY],
3600
                         instance.hypervisor)
3601

    
3602
    # check bridge existance
3603
    brlist = [nic.bridge for nic in instance.nics]
3604
    result = self.rpc.call_bridges_exist(target_node, brlist)
3605
    if result.failed or not result.data:
3606
      raise errors.OpPrereqError("One or more target bridges %s does not"
3607
                                 " exist on destination node '%s'" %
3608
                                 (brlist, target_node))
3609

    
3610
    if not self.op.cleanup:
3611
      _CheckNodeNotDrained(self, target_node)
3612
      result = self.rpc.call_instance_migratable(instance.primary_node,
3613
                                                 instance)
3614
      msg = result.RemoteFailMsg()
3615
      if msg:
3616
        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3617
                                   msg)
3618

    
3619
    self.instance = instance
3620

    
3621
  def _WaitUntilSync(self):
3622
    """Poll with custom rpc for disk sync.
3623

3624
    This uses our own step-based rpc call.
3625

3626
    """
3627
    self.feedback_fn("* wait until resync is done")
3628
    all_done = False
3629
    while not all_done:
3630
      all_done = True
3631
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3632
                                            self.nodes_ip,
3633
                                            self.instance.disks)
3634
      min_percent = 100
3635
      for node, nres in result.items():
3636
        msg = nres.RemoteFailMsg()
3637
        if msg:
3638
          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3639
                                   (node, msg))
3640
        node_done, node_percent = nres.payload
3641
        all_done = all_done and node_done
3642
        if node_percent is not None:
3643
          min_percent = min(min_percent, node_percent)
3644
      if not all_done:
3645
        if min_percent < 100:
3646
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3647
        time.sleep(2)
3648

    
3649
  def _EnsureSecondary(self, node):
3650
    """Demote a node to secondary.
3651

3652
    """
3653
    self.feedback_fn("* switching node %s to secondary mode" % node)
3654

    
3655
    for dev in self.instance.disks:
3656
      self.cfg.SetDiskID(dev, node)
3657

    
3658
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3659
                                          self.instance.disks)
3660
    msg = result.RemoteFailMsg()
3661
    if msg:
3662
      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3663
                               " error %s" % (node, msg))
3664

    
3665
  def _GoStandalone(self):
3666
    """Disconnect from the network.
3667

3668
    """
3669
    self.feedback_fn("* changing into standalone mode")
3670
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3671
                                               self.instance.disks)
3672
    for node, nres in result.items():
3673
      msg = nres.RemoteFailMsg()
3674
      if msg:
3675
        raise errors.OpExecError("Cannot disconnect disks node %s,"
3676
                                 " error %s" % (node, msg))
3677

    
3678
  def _GoReconnect(self, multimaster):
3679
    """Reconnect to the network.
3680

3681
    """
3682
    if multimaster:
3683
      msg = "dual-master"
3684
    else:
3685
      msg = "single-master"
3686
    self.feedback_fn("* changing disks into %s mode" % msg)
3687
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3688
                                           self.instance.disks,
3689
                                           self.instance.name, multimaster)
3690
    for node, nres in result.items():
3691
      msg = nres.RemoteFailMsg()
3692
      if msg:
3693
        raise errors.OpExecError("Cannot change disks config on node %s,"
3694
                                 " error: %s" % (node, msg))
3695

    
3696
  def _ExecCleanup(self):
3697
    """Try to cleanup after a failed migration.
3698

3699
    The cleanup is done by:
3700
      - check that the instance is running only on one node
3701
        (and update the config if needed)
3702
      - change disks on its secondary node to secondary
3703
      - wait until disks are fully synchronized
3704
      - disconnect from the network
3705
      - change disks into single-master mode
3706
      - wait again until disks are fully synchronized
3707

3708
    """
3709
    instance = self.instance
3710
    target_node = self.target_node
3711
    source_node = self.source_node
3712

    
3713
    # check running on only one node
3714
    self.feedback_fn("* checking where the instance actually runs"
3715
                     " (if this hangs, the hypervisor might be in"
3716
                     " a bad state)")
3717
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3718
    for node, result in ins_l.items():
3719
      result.Raise()
3720
      if not isinstance(result.data, list):
3721
        raise errors.OpExecError("Can't contact node '%s'" % node)
3722

    
3723
    runningon_source = instance.name in ins_l[source_node].data
3724
    runningon_target = instance.name in ins_l[target_node].data
3725

    
3726
    if runningon_source and runningon_target:
3727
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3728
                               " or the hypervisor is confused. You will have"
3729
                               " to ensure manually that it runs only on one"
3730
                               " and restart this operation.")
3731

    
3732
    if not (runningon_source or runningon_target):
3733
      raise errors.OpExecError("Instance does not seem to be running at all."
3734
                               " In this case, it's safer to repair by"
3735
                               " running 'gnt-instance stop' to ensure disk"
3736
                               " shutdown, and then restarting it.")
3737

    
3738
    if runningon_target:
3739
      # the migration has actually succeeded, we need to update the config
3740
      self.feedback_fn("* instance running on secondary node (%s),"
3741
                       " updating config" % target_node)
3742
      instance.primary_node = target_node
3743
      self.cfg.Update(instance)
3744
      demoted_node = source_node
3745
    else:
3746
      self.feedback_fn("* instance confirmed to be running on its"
3747
                       " primary node (%s)" % source_node)
3748
      demoted_node = target_node
3749

    
3750
    self._EnsureSecondary(demoted_node)
3751
    try:
3752
      self._WaitUntilSync()
3753
    except errors.OpExecError:
3754
      # we ignore here errors, since if the device is standalone, it
3755
      # won't be able to sync
3756
      pass
3757
    self._GoStandalone()
3758
    self._GoReconnect(False)
3759
    self._WaitUntilSync()
3760

    
3761
    self.feedback_fn("* done")
3762

    
3763
  def _RevertDiskStatus(self):
3764
    """Try to revert the disk status after a failed migration.
3765

3766
    """
3767
    target_node = self.target_node
3768
    try:
3769
      self._EnsureSecondary(target_node)
3770
      self._GoStandalone()
3771
      self._GoReconnect(False)
3772
      self._WaitUntilSync()
3773
    except errors.OpExecError, err:
3774
      self.LogWarning("Migration failed and I can't reconnect the"
3775
                      " drives: error '%s'\n"
3776
                      "Please look and recover the instance status" %
3777
                      str(err))
3778

    
3779
  def _AbortMigration(self):
3780
    """Call the hypervisor code to abort a started migration.
3781

3782
    """
3783
    instance = self.instance
3784
    target_node = self.target_node
3785
    migration_info = self.migration_info
3786

    
3787
    abort_result = self.rpc.call_finalize_migration(target_node,
3788
                                                    instance,
3789
                                                    migration_info,
3790
                                                    False)
3791
    abort_msg = abort_result.RemoteFailMsg()
3792
    if abort_msg:
3793
      logging.error("Aborting migration failed on target node %s: %s" %
3794
                    (target_node, abort_msg))
3795
      # Don't raise an exception here, as we stil have to try to revert the
3796
      # disk status, even if this step failed.
3797

    
3798
  def _ExecMigration(self):
3799
    """Migrate an instance.
3800

3801
    The migrate is done by:
3802
      - change the disks into dual-master mode
3803
      - wait until disks are fully synchronized again
3804
      - migrate the instance
3805
      - change disks on the new secondary node (the old primary) to secondary
3806
      - wait until disks are fully synchronized
3807
      - change disks into single-master mode
3808

3809
    """
3810
    instance = self.instance
3811
    target_node = self.target_node
3812
    source_node = self.source_node
3813

    
3814
    self.feedback_fn("* checking disk consistency between source and target")
3815
    for dev in instance.disks:
3816
      if not _CheckDiskConsistency(self, dev, target_node, False):
3817
        raise errors.OpExecError("Disk %s is degraded or not fully"
3818
                                 " synchronized on target node,"
3819
                                 " aborting migrate." % dev.iv_name)
3820

    
3821
    # First get the migration information from the remote node
3822
    result = self.rpc.call_migration_info(source_node, instance)
3823
    msg = result.RemoteFailMsg()
3824
    if msg:
3825
      log_err = ("Failed fetching source migration information from %s: %s" %
3826
                 (source_node, msg))
3827
      logging.error(log_err)
3828
      raise errors.OpExecError(log_err)
3829

    
3830
    self.migration_info = migration_info = result.payload
3831

    
3832
    # Then switch the disks to master/master mode
3833
    self._EnsureSecondary(target_node)
3834
    self._GoStandalone()
3835
    self._GoReconnect(True)
3836
    self._WaitUntilSync()
3837

    
3838
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
3839
    result = self.rpc.call_accept_instance(target_node,
3840
                                           instance,
3841
                                           migration_info,
3842
                                           self.nodes_ip[target_node])
3843

    
3844
    msg = result.RemoteFailMsg()
3845
    if msg:
3846
      logging.error("Instance pre-migration failed, trying to revert"
3847
                    " disk status: %s", msg)
3848
      self._AbortMigration()
3849
      self._RevertDiskStatus()
3850
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3851
                               (instance.name, msg))
3852

    
3853
    self.feedback_fn("* migrating instance to %s" % target_node)
3854
    time.sleep(10)
3855
    result = self.rpc.call_instance_migrate(source_node, instance,
3856
                                            self.nodes_ip[target_node],
3857
                                            self.op.live)
3858
    msg = result.RemoteFailMsg()
3859
    if msg:
3860
      logging.error("Instance migration failed, trying to revert"
3861
                    " disk status: %s", msg)
3862
      self._AbortMigration()
3863
      self._RevertDiskStatus()
3864
      raise errors.OpExecError("Could not migrate instance %s: %s" %
3865
                               (instance.name, msg))
3866
    time.sleep(10)
3867

    
3868
    instance.primary_node = target_node
3869
    # distribute new instance config to the other nodes
3870
    self.cfg.Update(instance)
3871

    
3872
    result = self.rpc.call_finalize_migration(target_node,
3873
                                              instance,
3874
                                              migration_info,
3875
                                              True)
3876
    msg = result.RemoteFailMsg()
3877
    if msg:
3878
      logging.error("Instance migration succeeded, but finalization failed:"
3879
                    " %s" % msg)
3880
      raise errors.OpExecError("Could not finalize instance migration: %s" %
3881
                               msg)
3882

    
3883
    self._EnsureSecondary(source_node)
3884
    self._WaitUntilSync()
3885
    self._GoStandalone()
3886
    self._GoReconnect(False)
3887
    self._WaitUntilSync()
3888

    
3889
    self.feedback_fn("* done")
3890

    
3891
  def Exec(self, feedback_fn):
3892
    """Perform the migration.
3893

3894
    """
3895
    self.feedback_fn = feedback_fn
3896

    
3897
    self.source_node = self.instance.primary_node
3898
    self.target_node = self.instance.secondary_nodes[0]
3899
    self.all_nodes = [self.source_node, self.target_node]
3900
    self.nodes_ip = {
3901
      self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3902
      self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3903
      }
3904
    if self.op.cleanup:
3905
      return self._ExecCleanup()
3906
    else:
3907
      return self._ExecMigration()
3908

    
3909

    
3910
def _CreateBlockDev(lu, node, instance, device, force_create,
3911
                    info, force_open):
3912
  """Create a tree of block devices on a given node.
3913

3914
  If this device type has to be created on secondaries, create it and
3915
  all its children.
3916

3917
  If not, just recurse to children keeping the same 'force' value.
3918

3919
  @param lu: the lu on whose behalf we execute
3920
  @param node: the node on which to create the device
3921
  @type instance: L{objects.Instance}
3922
  @param instance: the instance which owns the device
3923
  @type device: L{objects.Disk}
3924
  @param device: the device to create
3925
  @type force_create: boolean
3926
  @param force_create: whether to force creation of this device; this
3927
      will be change to True whenever we find a device which has
3928
      CreateOnSecondary() attribute
3929
  @param info: the extra 'metadata' we should attach to the device
3930
      (this will be represented as a LVM tag)
3931
  @type force_open: boolean
3932
  @param force_open: this parameter will be passes to the
3933
      L{backend.BlockdevCreate} function where it specifies
3934
      whether we run on primary or not, and it affects both
3935
      the child assembly and the device own Open() execution
3936

3937
  """
3938
  if device.CreateOnSecondary():
3939
    force_create = True
3940

    
3941
  if device.children:
3942
    for child in device.children:
3943
      _CreateBlockDev(lu, node, instance, child, force_create,
3944
                      info, force_open)
3945

    
3946
  if not force_create:
3947
    return
3948

    
3949
  _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3950

    
3951

    
3952
def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3953
  """Create a single block device on a given node.
3954

3955
  This will not recurse over children of the device, so they must be
3956
  created in advance.
3957

3958
  @param lu: the lu on whose behalf we execute
3959
  @param node: the node on which to create the device
3960
  @type instance: L{objects.Instance}
3961
  @param instance: the instance which owns the device
3962
  @type device: L{objects.Disk}
3963
  @param device: the device to create
3964
  @param info: the extra 'metadata' we should attach to the device
3965
      (this will be represented as a LVM tag)
3966
  @type force_open: boolean
3967
  @param force_open: this parameter will be passes to the
3968
      L{backend.BlockdevCreate} function where it specifies
3969
      whether we run on primary or not, and it affects both
3970
      the child assembly and the device own Open() execution
3971

3972
  """
3973
  lu.cfg.SetDiskID(device, node)
3974
  result = lu.rpc.call_blockdev_create(node, device, device.size,
3975
                                       instance.name, force_open, info)
3976
  msg = result.RemoteFailMsg()
3977
  if msg:
3978
    raise errors.OpExecError("Can't create block device %s on"
3979
                             " node %s for instance %s: %s" %
3980
                             (device, node, instance.name, msg))
3981
  if device.physical_id is None:
3982
    device.physical_id = result.payload
3983

    
3984

    
3985
def _GenerateUniqueNames(lu, exts):
3986
  """Generate a suitable LV name.
3987

3988
  This will generate a logical volume name for the given instance.
3989

3990
  """
3991
  results = []
3992
  for val in exts:
3993
    new_id = lu.cfg.GenerateUniqueID()
3994
    results.append("%s%s" % (new_id, val))
3995
  return results
3996

    
3997

    
3998
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3999
                         p_minor, s_minor):
4000
  """Generate a drbd8 device complete with its children.
4001

4002
  """
4003
  port = lu.cfg.AllocatePort()
4004
  vgname = lu.cfg.GetVGName()
4005
  shared_secret = lu.cfg.GenerateDRBDSecret()
4006
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4007
                          logical_id=(vgname, names[0]))
4008
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,