Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_migration.py @ 1c3231aa

History | View | Annotate | Download (37.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with instance migration an failover."""
23

    
24
import logging
25
import time
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti.masterd import iallocator
31
from ganeti import utils
32
from ganeti.cmdlib.base import LogicalUnit, Tasklet
33
from ganeti.cmdlib.common import ExpandInstanceName, \
34
  CheckIAllocatorOrNode, ExpandNodeUuidAndName
35
from ganeti.cmdlib.instance_storage import CheckDiskConsistency, \
36
  ExpandCheckDisks, ShutdownInstanceDisks, AssembleInstanceDisks
37
from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \
38
  CheckTargetNodeIPolicy, ReleaseLocks, CheckNodeNotDrained, \
39
  CopyLockList, CheckNodeFreeMemory, CheckInstanceBridgesExist
40

    
41
import ganeti.masterd.instance
42

    
43

    
44
def _ExpandNamesForMigration(lu):
45
  """Expands names for use with L{TLMigrateInstance}.
46

47
  @type lu: L{LogicalUnit}
48

49
  """
50
  if lu.op.target_node is not None:
51
    (lu.op.target_node_uuid, lu.op.target_node) = \
52
      ExpandNodeUuidAndName(lu.cfg, lu.op.target_node_uuid, lu.op.target_node)
53

    
54
  lu.needed_locks[locking.LEVEL_NODE] = []
55
  lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
56

    
57
  lu.needed_locks[locking.LEVEL_NODE_RES] = []
58
  lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
59

    
60
  # The node allocation lock is actually only needed for externally replicated
61
  # instances (e.g. sharedfile or RBD) and if an iallocator is used.
62
  lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
63

    
64

    
65
def _DeclareLocksForMigration(lu, level):
66
  """Declares locks for L{TLMigrateInstance}.
67

68
  @type lu: L{LogicalUnit}
69
  @param level: Lock level
70

71
  """
72
  if level == locking.LEVEL_NODE_ALLOC:
73
    assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
74

    
75
    instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
76

    
77
    # Node locks are already declared here rather than at LEVEL_NODE as we need
78
    # the instance object anyway to declare the node allocation lock.
79
    if instance.disk_template in constants.DTS_EXT_MIRROR:
80
      if lu.op.target_node is None:
81
        lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
82
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
83
      else:
84
        lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
85
                                               lu.op.target_node_uuid]
86
      del lu.recalculate_locks[locking.LEVEL_NODE]
87
    else:
88
      lu._LockInstancesNodes() # pylint: disable=W0212
89

    
90
  elif level == locking.LEVEL_NODE:
91
    # Node locks are declared together with the node allocation lock
92
    assert (lu.needed_locks[locking.LEVEL_NODE] or
93
            lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
94

    
95
  elif level == locking.LEVEL_NODE_RES:
96
    # Copy node locks
97
    lu.needed_locks[locking.LEVEL_NODE_RES] = \
98
      CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
99

    
100

    
101
class LUInstanceFailover(LogicalUnit):
102
  """Failover an instance.
103

104
  """
105
  HPATH = "instance-failover"
106
  HTYPE = constants.HTYPE_INSTANCE
107
  REQ_BGL = False
108

    
109
  def CheckArguments(self):
110
    """Check the arguments.
111

112
    """
113
    self.iallocator = getattr(self.op, "iallocator", None)
114
    self.target_node = getattr(self.op, "target_node", None)
115

    
116
  def ExpandNames(self):
117
    self._ExpandAndLockInstance()
118
    _ExpandNamesForMigration(self)
119

    
120
    self._migrater = \
121
      TLMigrateInstance(self, self.op.instance_name, False, True, False,
122
                        self.op.ignore_consistency, True,
123
                        self.op.shutdown_timeout, self.op.ignore_ipolicy)
124

    
125
    self.tasklets = [self._migrater]
126

    
127
  def DeclareLocks(self, level):
128
    _DeclareLocksForMigration(self, level)
129

    
130
  def BuildHooksEnv(self):
131
    """Build hooks env.
132

133
    This runs on master, primary and secondary nodes of the instance.
134

135
    """
136
    instance = self._migrater.instance
137
    source_node_uuid = instance.primary_node
138
    env = {
139
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
140
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
141
      "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid),
142
      "NEW_PRIMARY": self.op.target_node,
143
      }
144

    
145
    if instance.disk_template in constants.DTS_INT_MIRROR:
146
      env["OLD_SECONDARY"] = self.cfg.GetNodeName(instance.secondary_nodes[0])
147
      env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid)
148
    else:
149
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
150

    
151
    env.update(BuildInstanceHookEnvByObject(self, instance))
152

    
153
    return env
154

    
155
  def BuildHooksNodes(self):
156
    """Build hooks nodes.
157

158
    """
159
    instance = self._migrater.instance
160
    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
161
    return (nl, nl + [instance.primary_node])
162

    
163

    
164
class LUInstanceMigrate(LogicalUnit):
165
  """Migrate an instance.
166

167
  This is migration without shutting down, compared to the failover,
168
  which is done with shutdown.
169

170
  """
171
  HPATH = "instance-migrate"
172
  HTYPE = constants.HTYPE_INSTANCE
173
  REQ_BGL = False
174

    
175
  def ExpandNames(self):
176
    self._ExpandAndLockInstance()
177
    _ExpandNamesForMigration(self)
178

    
179
    self._migrater = \
180
      TLMigrateInstance(self, self.op.instance_name, self.op.cleanup,
181
                        False, self.op.allow_failover, False,
182
                        self.op.allow_runtime_changes,
183
                        constants.DEFAULT_SHUTDOWN_TIMEOUT,
184
                        self.op.ignore_ipolicy)
185

    
186
    self.tasklets = [self._migrater]
187

    
188
  def DeclareLocks(self, level):
189
    _DeclareLocksForMigration(self, level)
190

    
191
  def BuildHooksEnv(self):
192
    """Build hooks env.
193

194
    This runs on master, primary and secondary nodes of the instance.
195

196
    """
197
    instance = self._migrater.instance
198
    source_node_uuid = instance.primary_node
199
    env = BuildInstanceHookEnvByObject(self, instance)
200
    env.update({
201
      "MIGRATE_LIVE": self._migrater.live,
202
      "MIGRATE_CLEANUP": self.op.cleanup,
203
      "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid),
204
      "NEW_PRIMARY": self.op.target_node,
205
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
206
      })
207

    
208
    if instance.disk_template in constants.DTS_INT_MIRROR:
209
      env["OLD_SECONDARY"] = self.cfg.GetNodeName(instance.secondary_nodes[0])
210
      env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid)
211
    else:
212
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
213

    
214
    return env
215

    
216
  def BuildHooksNodes(self):
217
    """Build hooks nodes.
218

219
    """
220
    instance = self._migrater.instance
221
    snode_uuids = list(instance.secondary_nodes)
222
    nl = [self.cfg.GetMasterNode(), instance.primary_node] + snode_uuids
223
    return (nl, nl)
224

    
225

    
226
class TLMigrateInstance(Tasklet):
227
  """Tasklet class for instance migration.
228

229
  @type live: boolean
230
  @ivar live: whether the migration will be done live or non-live;
231
      this variable is initalized only after CheckPrereq has run
232
  @type cleanup: boolean
233
  @ivar cleanup: Wheater we cleanup from a failed migration
234
  @type iallocator: string
235
  @ivar iallocator: The iallocator used to determine target_node
236
  @type target_node_uuid: string
237
  @ivar target_node_uuid: If given, the target node UUID to reallocate the
238
      instance to
239
  @type failover: boolean
240
  @ivar failover: Whether operation results in failover or migration
241
  @type fallback: boolean
242
  @ivar fallback: Whether fallback to failover is allowed if migration not
243
                  possible
244
  @type ignore_consistency: boolean
245
  @ivar ignore_consistency: Wheter we should ignore consistency between source
246
                            and target node
247
  @type shutdown_timeout: int
248
  @ivar shutdown_timeout: In case of failover timeout of the shutdown
249
  @type ignore_ipolicy: bool
250
  @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
251

252
  """
253

    
254
  # Constants
255
  _MIGRATION_POLL_INTERVAL = 1      # seconds
256
  _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
257

    
258
  def __init__(self, lu, instance_name, cleanup, failover, fallback,
259
               ignore_consistency, allow_runtime_changes, shutdown_timeout,
260
               ignore_ipolicy):
261
    """Initializes this class.
262

263
    """
264
    Tasklet.__init__(self, lu)
265

    
266
    # Parameters
267
    self.instance_name = instance_name
268
    self.cleanup = cleanup
269
    self.live = False # will be overridden later
270
    self.failover = failover
271
    self.fallback = fallback
272
    self.ignore_consistency = ignore_consistency
273
    self.shutdown_timeout = shutdown_timeout
274
    self.ignore_ipolicy = ignore_ipolicy
275
    self.allow_runtime_changes = allow_runtime_changes
276

    
277
  def CheckPrereq(self):
278
    """Check prerequisites.
279

280
    This checks that the instance is in the cluster.
281

282
    """
283
    instance_name = ExpandInstanceName(self.lu.cfg, self.instance_name)
284
    instance = self.cfg.GetInstanceInfo(instance_name)
285
    assert instance is not None
286
    self.instance = instance
287
    cluster = self.cfg.GetClusterInfo()
288

    
289
    if (not self.cleanup and
290
        not instance.admin_state == constants.ADMINST_UP and
291
        not self.failover and self.fallback):
292
      self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
293
                      " switching to failover")
294
      self.failover = True
295

    
296
    if instance.disk_template not in constants.DTS_MIRRORED:
297
      if self.failover:
298
        text = "failovers"
299
      else:
300
        text = "migrations"
301
      raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
302
                                 " %s" % (instance.disk_template, text),
303
                                 errors.ECODE_STATE)
304

    
305
    if instance.disk_template in constants.DTS_EXT_MIRROR:
306
      CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
307

    
308
      if self.lu.op.iallocator:
309
        assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
310
        self._RunAllocator()
311
      else:
312
        # We set set self.target_node_uuid as it is required by
313
        # BuildHooksEnv
314
        self.target_node_uuid = self.lu.op.target_node_uuid
315

    
316
      # Check that the target node is correct in terms of instance policy
317
      nodeinfo = self.cfg.GetNodeInfo(self.target_node_uuid)
318
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
319
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
320
                                                              group_info)
321
      CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
322
                             ignore=self.ignore_ipolicy)
323

    
324
      # self.target_node is already populated, either directly or by the
325
      # iallocator run
326
      target_node_uuid = self.target_node_uuid
327
      if self.target_node_uuid == instance.primary_node:
328
        raise errors.OpPrereqError(
329
          "Cannot migrate instance %s to its primary (%s)" %
330
          (instance.name, self.cfg.GetNodeName(instance.primary_node)),
331
          errors.ECODE_STATE)
332

    
333
      if len(self.lu.tasklets) == 1:
334
        # It is safe to release locks only when we're the only tasklet
335
        # in the LU
336
        ReleaseLocks(self.lu, locking.LEVEL_NODE,
337
                     keep=[instance.primary_node, self.target_node_uuid])
338
        ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
339

    
340
    else:
341
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
342

    
343
      secondary_node_uuids = instance.secondary_nodes
344
      if not secondary_node_uuids:
345
        raise errors.ConfigurationError("No secondary node but using"
346
                                        " %s disk template" %
347
                                        instance.disk_template)
348
      target_node_uuid = secondary_node_uuids[0]
349
      if self.lu.op.iallocator or \
350
        (self.lu.op.target_node_uuid and
351
         self.lu.op.target_node_uuid != target_node_uuid):
352
        if self.failover:
353
          text = "failed over"
354
        else:
355
          text = "migrated"
356
        raise errors.OpPrereqError("Instances with disk template %s cannot"
357
                                   " be %s to arbitrary nodes"
358
                                   " (neither an iallocator nor a target"
359
                                   " node can be passed)" %
360
                                   (instance.disk_template, text),
361
                                   errors.ECODE_INVAL)
362
      nodeinfo = self.cfg.GetNodeInfo(target_node_uuid)
363
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
364
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
365
                                                              group_info)
366
      CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
367
                             ignore=self.ignore_ipolicy)
368

    
369
    i_be = cluster.FillBE(instance)
370

    
371
    # check memory requirements on the secondary node
372
    if (not self.cleanup and
373
         (not self.failover or instance.admin_state == constants.ADMINST_UP)):
374
      self.tgt_free_mem = CheckNodeFreeMemory(
375
          self.lu, target_node_uuid, "migrating instance %s" % instance.name,
376
          i_be[constants.BE_MINMEM], instance.hypervisor,
377
          self.cfg.GetClusterInfo().hvparams[instance.hypervisor])
378
    else:
379
      self.lu.LogInfo("Not checking memory on the secondary node as"
380
                      " instance will not be started")
381

    
382
    # check if failover must be forced instead of migration
383
    if (not self.cleanup and not self.failover and
384
        i_be[constants.BE_ALWAYS_FAILOVER]):
385
      self.lu.LogInfo("Instance configured to always failover; fallback"
386
                      " to failover")
387
      self.failover = True
388

    
389
    # check bridge existance
390
    CheckInstanceBridgesExist(self.lu, instance, node_uuid=target_node_uuid)
391

    
392
    if not self.cleanup:
393
      CheckNodeNotDrained(self.lu, target_node_uuid)
394
      if not self.failover:
395
        result = self.rpc.call_instance_migratable(instance.primary_node,
396
                                                   instance)
397
        if result.fail_msg and self.fallback:
398
          self.lu.LogInfo("Can't migrate, instance offline, fallback to"
399
                          " failover")
400
          self.failover = True
401
        else:
402
          result.Raise("Can't migrate, please use failover",
403
                       prereq=True, ecode=errors.ECODE_STATE)
404

    
405
    assert not (self.failover and self.cleanup)
406

    
407
    if not self.failover:
408
      if self.lu.op.live is not None and self.lu.op.mode is not None:
409
        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
410
                                   " parameters are accepted",
411
                                   errors.ECODE_INVAL)
412
      if self.lu.op.live is not None:
413
        if self.lu.op.live:
414
          self.lu.op.mode = constants.HT_MIGRATION_LIVE
415
        else:
416
          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
417
        # reset the 'live' parameter to None so that repeated
418
        # invocations of CheckPrereq do not raise an exception
419
        self.lu.op.live = None
420
      elif self.lu.op.mode is None:
421
        # read the default value from the hypervisor
422
        i_hv = cluster.FillHV(self.instance, skip_globals=False)
423
        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
424

    
425
      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
426
    else:
427
      # Failover is never live
428
      self.live = False
429

    
430
    if not (self.failover or self.cleanup):
431
      remote_info = self.rpc.call_instance_info(
432
          instance.primary_node, instance.name, instance.hypervisor,
433
          cluster.hvparams[instance.hypervisor])
434
      remote_info.Raise("Error checking instance on node %s" %
435
                        self.cfg.GetNodeName(instance.primary_node))
436
      instance_running = bool(remote_info.payload)
437
      if instance_running:
438
        self.current_mem = int(remote_info.payload["memory"])
439

    
440
  def _RunAllocator(self):
441
    """Run the allocator based on input opcode.
442

443
    """
444
    assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
445

    
446
    # FIXME: add a self.ignore_ipolicy option
447
    req = iallocator.IAReqRelocate(
448
          name=self.instance_name,
449
          relocate_from_node_uuids=[self.instance.primary_node])
450
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
451

    
452
    ial.Run(self.lu.op.iallocator)
453

    
454
    if not ial.success:
455
      raise errors.OpPrereqError("Can't compute nodes using"
456
                                 " iallocator '%s': %s" %
457
                                 (self.lu.op.iallocator, ial.info),
458
                                 errors.ECODE_NORES)
459
    self.target_node_uuid = self.cfg.GetNodeInfoByName(ial.result[0]).uuid
460
    self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
461
                    self.instance_name, self.lu.op.iallocator,
462
                    utils.CommaJoin(ial.result))
463

    
464
  def _WaitUntilSync(self):
465
    """Poll with custom rpc for disk sync.
466

467
    This uses our own step-based rpc call.
468

469
    """
470
    self.feedback_fn("* wait until resync is done")
471
    all_done = False
472
    while not all_done:
473
      all_done = True
474
      result = self.rpc.call_drbd_wait_sync(self.all_node_uuids,
475
                                            self.nodes_ip,
476
                                            (self.instance.disks,
477
                                             self.instance))
478
      min_percent = 100
479
      for node_uuid, nres in result.items():
480
        nres.Raise("Cannot resync disks on node %s" %
481
                   self.cfg.GetNodeName(node_uuid))
482
        node_done, node_percent = nres.payload
483
        all_done = all_done and node_done
484
        if node_percent is not None:
485
          min_percent = min(min_percent, node_percent)
486
      if not all_done:
487
        if min_percent < 100:
488
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
489
        time.sleep(2)
490

    
491
  def _EnsureSecondary(self, node_uuid):
492
    """Demote a node to secondary.
493

494
    """
495
    self.feedback_fn("* switching node %s to secondary mode" %
496
                     self.cfg.GetNodeName(node_uuid))
497

    
498
    for dev in self.instance.disks:
499
      self.cfg.SetDiskID(dev, node_uuid)
500

    
501
    result = self.rpc.call_blockdev_close(node_uuid, self.instance.name,
502
                                          self.instance.disks)
503
    result.Raise("Cannot change disk to secondary on node %s" %
504
                 self.cfg.GetNodeName(node_uuid))
505

    
506
  def _GoStandalone(self):
507
    """Disconnect from the network.
508

509
    """
510
    self.feedback_fn("* changing into standalone mode")
511
    result = self.rpc.call_drbd_disconnect_net(self.all_node_uuids,
512
                                               self.nodes_ip,
513
                                               self.instance.disks)
514
    for node_uuid, nres in result.items():
515
      nres.Raise("Cannot disconnect disks node %s" %
516
                 self.cfg.GetNodeName(node_uuid))
517

    
518
  def _GoReconnect(self, multimaster):
519
    """Reconnect to the network.
520

521
    """
522
    if multimaster:
523
      msg = "dual-master"
524
    else:
525
      msg = "single-master"
526
    self.feedback_fn("* changing disks into %s mode" % msg)
527
    result = self.rpc.call_drbd_attach_net(self.all_node_uuids, self.nodes_ip,
528
                                           (self.instance.disks, self.instance),
529
                                           self.instance.name, multimaster)
530
    for node_uuid, nres in result.items():
531
      nres.Raise("Cannot change disks config on node %s" %
532
                 self.cfg.GetNodeName(node_uuid))
533

    
534
  def _ExecCleanup(self):
535
    """Try to cleanup after a failed migration.
536

537
    The cleanup is done by:
538
      - check that the instance is running only on one node
539
        (and update the config if needed)
540
      - change disks on its secondary node to secondary
541
      - wait until disks are fully synchronized
542
      - disconnect from the network
543
      - change disks into single-master mode
544
      - wait again until disks are fully synchronized
545

546
    """
547
    instance = self.instance
548
    target_node_uuid = self.target_node_uuid
549
    source_node_uuid = self.source_node_uuid
550

    
551
    # check running on only one node
552
    self.feedback_fn("* checking where the instance actually runs"
553
                     " (if this hangs, the hypervisor might be in"
554
                     " a bad state)")
555
    cluster_hvparams = self.cfg.GetClusterInfo().hvparams
556
    ins_l = self.rpc.call_instance_list(self.all_node_uuids,
557
                                        [instance.hypervisor],
558
                                        cluster_hvparams)
559
    for node_uuid, result in ins_l.items():
560
      result.Raise("Can't contact node %s" % node_uuid)
561

    
562
    runningon_source = instance.name in ins_l[source_node_uuid].payload
563
    runningon_target = instance.name in ins_l[target_node_uuid].payload
564

    
565
    if runningon_source and runningon_target:
566
      raise errors.OpExecError("Instance seems to be running on two nodes,"
567
                               " or the hypervisor is confused; you will have"
568
                               " to ensure manually that it runs only on one"
569
                               " and restart this operation")
570

    
571
    if not (runningon_source or runningon_target):
572
      raise errors.OpExecError("Instance does not seem to be running at all;"
573
                               " in this case it's safer to repair by"
574
                               " running 'gnt-instance stop' to ensure disk"
575
                               " shutdown, and then restarting it")
576

    
577
    if runningon_target:
578
      # the migration has actually succeeded, we need to update the config
579
      self.feedback_fn("* instance running on secondary node (%s),"
580
                       " updating config" %
581
                       self.cfg.GetNodeName(target_node_uuid))
582
      instance.primary_node = target_node_uuid
583
      self.cfg.Update(instance, self.feedback_fn)
584
      demoted_node_uuid = source_node_uuid
585
    else:
586
      self.feedback_fn("* instance confirmed to be running on its"
587
                       " primary node (%s)" %
588
                       self.cfg.GetNodeName(source_node_uuid))
589
      demoted_node_uuid = target_node_uuid
590

    
591
    if instance.disk_template in constants.DTS_INT_MIRROR:
592
      self._EnsureSecondary(demoted_node_uuid)
593
      try:
594
        self._WaitUntilSync()
595
      except errors.OpExecError:
596
        # we ignore here errors, since if the device is standalone, it
597
        # won't be able to sync
598
        pass
599
      self._GoStandalone()
600
      self._GoReconnect(False)
601
      self._WaitUntilSync()
602

    
603
    self.feedback_fn("* done")
604

    
605
  def _RevertDiskStatus(self):
606
    """Try to revert the disk status after a failed migration.
607

608
    """
609
    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
610
      return
611

    
612
    try:
613
      self._EnsureSecondary(self.target_node_uuid)
614
      self._GoStandalone()
615
      self._GoReconnect(False)
616
      self._WaitUntilSync()
617
    except errors.OpExecError, err:
618
      self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
619
                         " please try to recover the instance manually;"
620
                         " error '%s'" % str(err))
621

    
622
  def _AbortMigration(self):
623
    """Call the hypervisor code to abort a started migration.
624

625
    """
626
    instance = self.instance
627
    migration_info = self.migration_info
628

    
629
    abort_result = self.rpc.call_instance_finalize_migration_dst(
630
                     self.target_node_uuid, instance, migration_info, False)
631
    abort_msg = abort_result.fail_msg
632
    if abort_msg:
633
      logging.error("Aborting migration failed on target node %s: %s",
634
                    self.cfg.GetNodeName(self.target_node_uuid), abort_msg)
635
      # Don't raise an exception here, as we stil have to try to revert the
636
      # disk status, even if this step failed.
637

    
638
    abort_result = self.rpc.call_instance_finalize_migration_src(
639
      self.source_node_uuid, instance, False, self.live)
640
    abort_msg = abort_result.fail_msg
641
    if abort_msg:
642
      logging.error("Aborting migration failed on source node %s: %s",
643
                    self.cfg.GetNodeName(self.source_node_uuid), abort_msg)
644

    
645
  def _ExecMigration(self):
646
    """Migrate an instance.
647

648
    The migrate is done by:
649
      - change the disks into dual-master mode
650
      - wait until disks are fully synchronized again
651
      - migrate the instance
652
      - change disks on the new secondary node (the old primary) to secondary
653
      - wait until disks are fully synchronized
654
      - change disks into single-master mode
655

656
    """
657
    instance = self.instance
658
    target_node_uuid = self.target_node_uuid
659
    source_node_uuid = self.source_node_uuid
660

    
661
    # Check for hypervisor version mismatch and warn the user.
662
    hvspecs = [(instance.hypervisor,
663
                self.cfg.GetClusterInfo().hvparams[instance.hypervisor])]
664
    nodeinfo = self.rpc.call_node_info([source_node_uuid, target_node_uuid],
665
                                       None, hvspecs, False)
666
    for ninfo in nodeinfo.values():
667
      ninfo.Raise("Unable to retrieve node information from node '%s'" %
668
                  ninfo.node)
669
    (_, _, (src_info, )) = nodeinfo[source_node_uuid].payload
670
    (_, _, (dst_info, )) = nodeinfo[target_node_uuid].payload
671

    
672
    if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
673
        (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
674
      src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
675
      dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
676
      if src_version != dst_version:
677
        self.feedback_fn("* warning: hypervisor version mismatch between"
678
                         " source (%s) and target (%s) node" %
679
                         (src_version, dst_version))
680

    
681
    self.feedback_fn("* checking disk consistency between source and target")
682
    for (idx, dev) in enumerate(instance.disks):
683
      if not CheckDiskConsistency(self.lu, instance, dev, target_node_uuid,
684
                                  False):
685
        raise errors.OpExecError("Disk %s is degraded or not fully"
686
                                 " synchronized on target node,"
687
                                 " aborting migration" % idx)
688

    
689
    if self.current_mem > self.tgt_free_mem:
690
      if not self.allow_runtime_changes:
691
        raise errors.OpExecError("Memory ballooning not allowed and not enough"
692
                                 " free memory to fit instance %s on target"
693
                                 " node %s (have %dMB, need %dMB)" %
694
                                 (instance.name,
695
                                  self.cfg.GetNodeName(target_node_uuid),
696
                                  self.tgt_free_mem, self.current_mem))
697
      self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
698
      rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
699
                                                     instance,
700
                                                     self.tgt_free_mem)
701
      rpcres.Raise("Cannot modify instance runtime memory")
702

    
703
    # First get the migration information from the remote node
704
    result = self.rpc.call_migration_info(source_node_uuid, instance)
705
    msg = result.fail_msg
706
    if msg:
707
      log_err = ("Failed fetching source migration information from %s: %s" %
708
                 (self.cfg.GetNodeName(source_node_uuid), msg))
709
      logging.error(log_err)
710
      raise errors.OpExecError(log_err)
711

    
712
    self.migration_info = migration_info = result.payload
713

    
714
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
715
      # Then switch the disks to master/master mode
716
      self._EnsureSecondary(target_node_uuid)
717
      self._GoStandalone()
718
      self._GoReconnect(True)
719
      self._WaitUntilSync()
720

    
721
    self.feedback_fn("* preparing %s to accept the instance" %
722
                     self.cfg.GetNodeName(target_node_uuid))
723
    result = self.rpc.call_accept_instance(target_node_uuid,
724
                                           instance,
725
                                           migration_info,
726
                                           self.nodes_ip[target_node_uuid])
727

    
728
    msg = result.fail_msg
729
    if msg:
730
      logging.error("Instance pre-migration failed, trying to revert"
731
                    " disk status: %s", msg)
732
      self.feedback_fn("Pre-migration failed, aborting")
733
      self._AbortMigration()
734
      self._RevertDiskStatus()
735
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
736
                               (instance.name, msg))
737

    
738
    self.feedback_fn("* migrating instance to %s" %
739
                     self.cfg.GetNodeName(target_node_uuid))
740
    cluster = self.cfg.GetClusterInfo()
741
    result = self.rpc.call_instance_migrate(
742
        source_node_uuid, cluster.cluster_name, instance,
743
        self.nodes_ip[target_node_uuid], self.live)
744
    msg = result.fail_msg
745
    if msg:
746
      logging.error("Instance migration failed, trying to revert"
747
                    " disk status: %s", msg)
748
      self.feedback_fn("Migration failed, aborting")
749
      self._AbortMigration()
750
      self._RevertDiskStatus()
751
      raise errors.OpExecError("Could not migrate instance %s: %s" %
752
                               (instance.name, msg))
753

    
754
    self.feedback_fn("* starting memory transfer")
755
    last_feedback = time.time()
756
    while True:
757
      result = self.rpc.call_instance_get_migration_status(source_node_uuid,
758
                                                           instance)
759
      msg = result.fail_msg
760
      ms = result.payload   # MigrationStatus instance
761
      if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
762
        logging.error("Instance migration failed, trying to revert"
763
                      " disk status: %s", msg)
764
        self.feedback_fn("Migration failed, aborting")
765
        self._AbortMigration()
766
        self._RevertDiskStatus()
767
        if not msg:
768
          msg = "hypervisor returned failure"
769
        raise errors.OpExecError("Could not migrate instance %s: %s" %
770
                                 (instance.name, msg))
771

    
772
      if result.payload.status != constants.HV_MIGRATION_ACTIVE:
773
        self.feedback_fn("* memory transfer complete")
774
        break
775

    
776
      if (utils.TimeoutExpired(last_feedback,
777
                               self._MIGRATION_FEEDBACK_INTERVAL) and
778
          ms.transferred_ram is not None):
779
        mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
780
        self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
781
        last_feedback = time.time()
782

    
783
      time.sleep(self._MIGRATION_POLL_INTERVAL)
784

    
785
    result = self.rpc.call_instance_finalize_migration_src(source_node_uuid,
786
                                                           instance,
787
                                                           True,
788
                                                           self.live)
789
    msg = result.fail_msg
790
    if msg:
791
      logging.error("Instance migration succeeded, but finalization failed"
792
                    " on the source node: %s", msg)
793
      raise errors.OpExecError("Could not finalize instance migration: %s" %
794
                               msg)
795

    
796
    instance.primary_node = target_node_uuid
797

    
798
    # distribute new instance config to the other nodes
799
    self.cfg.Update(instance, self.feedback_fn)
800

    
801
    result = self.rpc.call_instance_finalize_migration_dst(target_node_uuid,
802
                                                           instance,
803
                                                           migration_info,
804
                                                           True)
805
    msg = result.fail_msg
806
    if msg:
807
      logging.error("Instance migration succeeded, but finalization failed"
808
                    " on the target node: %s", msg)
809
      raise errors.OpExecError("Could not finalize instance migration: %s" %
810
                               msg)
811

    
812
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
813
      self._EnsureSecondary(source_node_uuid)
814
      self._WaitUntilSync()
815
      self._GoStandalone()
816
      self._GoReconnect(False)
817
      self._WaitUntilSync()
818

    
819
    # If the instance's disk template is `rbd' or `ext' and there was a
820
    # successful migration, unmap the device from the source node.
821
    if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
822
      disks = ExpandCheckDisks(instance, instance.disks)
823
      self.feedback_fn("* unmapping instance's disks from %s" %
824
                       self.cfg.GetNodeName(source_node_uuid))
825
      for disk in disks:
826
        result = self.rpc.call_blockdev_shutdown(source_node_uuid,
827
                                                 (disk, instance))
828
        msg = result.fail_msg
829
        if msg:
830
          logging.error("Migration was successful, but couldn't unmap the"
831
                        " block device %s on source node %s: %s",
832
                        disk.iv_name, self.cfg.GetNodeName(source_node_uuid),
833
                        msg)
834
          logging.error("You need to unmap the device %s manually on %s",
835
                        disk.iv_name, self.cfg.GetNodeName(source_node_uuid))
836

    
837
    self.feedback_fn("* done")
838

    
839
  def _ExecFailover(self):
840
    """Failover an instance.
841

842
    The failover is done by shutting it down on its present node and
843
    starting it on the secondary.
844

845
    """
846
    instance = self.instance
847
    primary_node = self.cfg.GetNodeInfo(instance.primary_node)
848

    
849
    source_node_uuid = instance.primary_node
850
    target_node_uuid = self.target_node_uuid
851

    
852
    if instance.disks_active:
853
      self.feedback_fn("* checking disk consistency between source and target")
854
      for (idx, dev) in enumerate(instance.disks):
855
        # for drbd, these are drbd over lvm
856
        if not CheckDiskConsistency(self.lu, instance, dev, target_node_uuid,
857
                                    False):
858
          if primary_node.offline:
859
            self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
860
                             " target node %s" %
861
                             (primary_node.name, idx,
862
                              self.cfg.GetNodeName(target_node_uuid)))
863
          elif not self.ignore_consistency:
864
            raise errors.OpExecError("Disk %s is degraded on target node,"
865
                                     " aborting failover" % idx)
866
    else:
867
      self.feedback_fn("* not checking disk consistency as instance is not"
868
                       " running")
869

    
870
    self.feedback_fn("* shutting down instance on source node")
871
    logging.info("Shutting down instance %s on node %s",
872
                 instance.name, self.cfg.GetNodeName(source_node_uuid))
873

    
874
    result = self.rpc.call_instance_shutdown(source_node_uuid, instance,
875
                                             self.shutdown_timeout,
876
                                             self.lu.op.reason)
877
    msg = result.fail_msg
878
    if msg:
879
      if self.ignore_consistency or primary_node.offline:
880
        self.lu.LogWarning("Could not shutdown instance %s on node %s,"
881
                           " proceeding anyway; please make sure node"
882
                           " %s is down; error details: %s",
883
                           instance.name,
884
                           self.cfg.GetNodeName(source_node_uuid),
885
                           self.cfg.GetNodeName(source_node_uuid), msg)
886
      else:
887
        raise errors.OpExecError("Could not shutdown instance %s on"
888
                                 " node %s: %s" %
889
                                 (instance.name,
890
                                  self.cfg.GetNodeName(source_node_uuid), msg))
891

    
892
    self.feedback_fn("* deactivating the instance's disks on source node")
893
    if not ShutdownInstanceDisks(self.lu, instance, ignore_primary=True):
894
      raise errors.OpExecError("Can't shut down the instance's disks")
895

    
896
    instance.primary_node = target_node_uuid
897
    # distribute new instance config to the other nodes
898
    self.cfg.Update(instance, self.feedback_fn)
899

    
900
    # Only start the instance if it's marked as up
901
    if instance.admin_state == constants.ADMINST_UP:
902
      self.feedback_fn("* activating the instance's disks on target node %s" %
903
                       self.cfg.GetNodeName(target_node_uuid))
904
      logging.info("Starting instance %s on node %s",
905
                   instance.name, self.cfg.GetNodeName(target_node_uuid))
906

    
907
      disks_ok, _ = AssembleInstanceDisks(self.lu, instance,
908
                                          ignore_secondaries=True)
909
      if not disks_ok:
910
        ShutdownInstanceDisks(self.lu, instance)
911
        raise errors.OpExecError("Can't activate the instance's disks")
912

    
913
      self.feedback_fn("* starting the instance on the target node %s" %
914
                       self.cfg.GetNodeName(target_node_uuid))
915
      result = self.rpc.call_instance_start(target_node_uuid,
916
                                            (instance, None, None), False,
917
                                            self.lu.op.reason)
918
      msg = result.fail_msg
919
      if msg:
920
        ShutdownInstanceDisks(self.lu, instance)
921
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
922
                                 (instance.name,
923
                                  self.cfg.GetNodeName(target_node_uuid), msg))
924

    
925
  def Exec(self, feedback_fn):
926
    """Perform the migration.
927

928
    """
929
    self.feedback_fn = feedback_fn
930
    self.source_node_uuid = self.instance.primary_node
931

    
932
    # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
933
    if self.instance.disk_template in constants.DTS_INT_MIRROR:
934
      self.target_node_uuid = self.instance.secondary_nodes[0]
935
      # Otherwise self.target_node has been populated either
936
      # directly, or through an iallocator.
937

    
938
    self.all_node_uuids = [self.source_node_uuid, self.target_node_uuid]
939
    self.nodes_ip = dict((uuid, node.secondary_ip) for (uuid, node)
940
                         in self.cfg.GetMultiNodeInfo(self.all_node_uuids))
941

    
942
    if self.failover:
943
      feedback_fn("Failover instance %s" % self.instance.name)
944
      self._ExecFailover()
945
    else:
946
      feedback_fn("Migrating instance %s" % self.instance.name)
947

    
948
      if self.cleanup:
949
        return self._ExecCleanup()
950
      else:
951
        return self._ExecMigration()