Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_migration.py @ aa7a5c90

History | View | Annotate | Download (35.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with instance migration an failover."""
23

    
24
import logging
25
import time
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti.masterd import iallocator
31
from ganeti import utils
32
from ganeti.cmdlib.base import LogicalUnit, Tasklet
33
from ganeti.cmdlib.common import ExpandInstanceName, \
34
  CheckIAllocatorOrNode, ExpandNodeName
35
from ganeti.cmdlib.instance_storage import CheckDiskConsistency, \
36
  ExpandCheckDisks, ShutdownInstanceDisks, AssembleInstanceDisks
37
from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \
38
  CheckTargetNodeIPolicy, ReleaseLocks, CheckNodeNotDrained, \
39
  CopyLockList, CheckNodeFreeMemory, CheckInstanceBridgesExist
40

    
41
import ganeti.masterd.instance
42

    
43

    
44
def _ExpandNamesForMigration(lu):
45
  """Expands names for use with L{TLMigrateInstance}.
46

47
  @type lu: L{LogicalUnit}
48

49
  """
50
  if lu.op.target_node is not None:
51
    lu.op.target_node = ExpandNodeName(lu.cfg, lu.op.target_node)
52

    
53
  lu.needed_locks[locking.LEVEL_NODE] = []
54
  lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
55

    
56
  lu.needed_locks[locking.LEVEL_NODE_RES] = []
57
  lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
58

    
59
  # The node allocation lock is actually only needed for externally replicated
60
  # instances (e.g. sharedfile or RBD) and if an iallocator is used.
61
  lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
62

    
63

    
64
def _DeclareLocksForMigration(lu, level):
65
  """Declares locks for L{TLMigrateInstance}.
66

67
  @type lu: L{LogicalUnit}
68
  @param level: Lock level
69

70
  """
71
  if level == locking.LEVEL_NODE_ALLOC:
72
    assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
73

    
74
    instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
75

    
76
    # Node locks are already declared here rather than at LEVEL_NODE as we need
77
    # the instance object anyway to declare the node allocation lock.
78
    if instance.disk_template in constants.DTS_EXT_MIRROR:
79
      if lu.op.target_node is None:
80
        lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
81
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
82
      else:
83
        lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
84
                                               lu.op.target_node]
85
      del lu.recalculate_locks[locking.LEVEL_NODE]
86
    else:
87
      lu._LockInstancesNodes() # pylint: disable=W0212
88

    
89
  elif level == locking.LEVEL_NODE:
90
    # Node locks are declared together with the node allocation lock
91
    assert (lu.needed_locks[locking.LEVEL_NODE] or
92
            lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
93

    
94
  elif level == locking.LEVEL_NODE_RES:
95
    # Copy node locks
96
    lu.needed_locks[locking.LEVEL_NODE_RES] = \
97
      CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
98

    
99

    
100
class LUInstanceFailover(LogicalUnit):
101
  """Failover an instance.
102

103
  """
104
  HPATH = "instance-failover"
105
  HTYPE = constants.HTYPE_INSTANCE
106
  REQ_BGL = False
107

    
108
  def CheckArguments(self):
109
    """Check the arguments.
110

111
    """
112
    self.iallocator = getattr(self.op, "iallocator", None)
113
    self.target_node = getattr(self.op, "target_node", None)
114

    
115
  def ExpandNames(self):
116
    self._ExpandAndLockInstance()
117
    _ExpandNamesForMigration(self)
118

    
119
    self._migrater = \
120
      TLMigrateInstance(self, self.op.instance_name, self.op.cleanup, True,
121
                        False, self.op.ignore_consistency, True,
122
                        self.op.shutdown_timeout, self.op.ignore_ipolicy)
123

    
124
    self.tasklets = [self._migrater]
125

    
126
  def DeclareLocks(self, level):
127
    _DeclareLocksForMigration(self, level)
128

    
129
  def BuildHooksEnv(self):
130
    """Build hooks env.
131

132
    This runs on master, primary and secondary nodes of the instance.
133

134
    """
135
    instance = self._migrater.instance
136
    source_node = instance.primary_node
137
    target_node = self.op.target_node
138
    env = {
139
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
140
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
141
      "OLD_PRIMARY": source_node,
142
      "NEW_PRIMARY": target_node,
143
      "FAILOVER_CLEANUP": self.op.cleanup,
144
      }
145

    
146
    if instance.disk_template in constants.DTS_INT_MIRROR:
147
      env["OLD_SECONDARY"] = instance.secondary_nodes[0]
148
      env["NEW_SECONDARY"] = source_node
149
    else:
150
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
151

    
152
    env.update(BuildInstanceHookEnvByObject(self, instance))
153

    
154
    return env
155

    
156
  def BuildHooksNodes(self):
157
    """Build hooks nodes.
158

159
    """
160
    instance = self._migrater.instance
161
    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
162
    return (nl, nl + [instance.primary_node])
163

    
164

    
165
class LUInstanceMigrate(LogicalUnit):
166
  """Migrate an instance.
167

168
  This is migration without shutting down, compared to the failover,
169
  which is done with shutdown.
170

171
  """
172
  HPATH = "instance-migrate"
173
  HTYPE = constants.HTYPE_INSTANCE
174
  REQ_BGL = False
175

    
176
  def ExpandNames(self):
177
    self._ExpandAndLockInstance()
178
    _ExpandNamesForMigration(self)
179

    
180
    self._migrater = \
181
      TLMigrateInstance(self, self.op.instance_name, self.op.cleanup,
182
                        False, self.op.allow_failover, False,
183
                        self.op.allow_runtime_changes,
184
                        constants.DEFAULT_SHUTDOWN_TIMEOUT,
185
                        self.op.ignore_ipolicy)
186

    
187
    self.tasklets = [self._migrater]
188

    
189
  def DeclareLocks(self, level):
190
    _DeclareLocksForMigration(self, level)
191

    
192
  def BuildHooksEnv(self):
193
    """Build hooks env.
194

195
    This runs on master, primary and secondary nodes of the instance.
196

197
    """
198
    instance = self._migrater.instance
199
    source_node = instance.primary_node
200
    target_node = self.op.target_node
201
    env = BuildInstanceHookEnvByObject(self, instance)
202
    env.update({
203
      "MIGRATE_LIVE": self._migrater.live,
204
      "MIGRATE_CLEANUP": self.op.cleanup,
205
      "OLD_PRIMARY": source_node,
206
      "NEW_PRIMARY": target_node,
207
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
208
      })
209

    
210
    if instance.disk_template in constants.DTS_INT_MIRROR:
211
      env["OLD_SECONDARY"] = target_node
212
      env["NEW_SECONDARY"] = source_node
213
    else:
214
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
215

    
216
    return env
217

    
218
  def BuildHooksNodes(self):
219
    """Build hooks nodes.
220

221
    """
222
    instance = self._migrater.instance
223
    snodes = list(instance.secondary_nodes)
224
    nl = [self.cfg.GetMasterNode(), instance.primary_node] + snodes
225
    return (nl, nl)
226

    
227

    
228
class TLMigrateInstance(Tasklet):
229
  """Tasklet class for instance migration.
230

231
  @type live: boolean
232
  @ivar live: whether the migration will be done live or non-live;
233
      this variable is initalized only after CheckPrereq has run
234
  @type cleanup: boolean
235
  @ivar cleanup: Wheater we cleanup from a failed migration
236
  @type iallocator: string
237
  @ivar iallocator: The iallocator used to determine target_node
238
  @type target_node: string
239
  @ivar target_node: If given, the target_node to reallocate the instance to
240
  @type failover: boolean
241
  @ivar failover: Whether operation results in failover or migration
242
  @type fallback: boolean
243
  @ivar fallback: Whether fallback to failover is allowed if migration not
244
                  possible
245
  @type ignore_consistency: boolean
246
  @ivar ignore_consistency: Wheter we should ignore consistency between source
247
                            and target node
248
  @type shutdown_timeout: int
249
  @ivar shutdown_timeout: In case of failover timeout of the shutdown
250
  @type ignore_ipolicy: bool
251
  @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
252

253
  """
254

    
255
  # Constants
256
  _MIGRATION_POLL_INTERVAL = 1      # seconds
257
  _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
258

    
259
  def __init__(self, lu, instance_name, cleanup, failover, fallback,
260
               ignore_consistency, allow_runtime_changes, shutdown_timeout,
261
               ignore_ipolicy):
262
    """Initializes this class.
263

264
    """
265
    Tasklet.__init__(self, lu)
266

    
267
    # Parameters
268
    self.instance_name = instance_name
269
    self.cleanup = cleanup
270
    self.live = False # will be overridden later
271
    self.failover = failover
272
    self.fallback = fallback
273
    self.ignore_consistency = ignore_consistency
274
    self.shutdown_timeout = shutdown_timeout
275
    self.ignore_ipolicy = ignore_ipolicy
276
    self.allow_runtime_changes = allow_runtime_changes
277

    
278
  def CheckPrereq(self):
279
    """Check prerequisites.
280

281
    This checks that the instance is in the cluster.
282

283
    """
284
    instance_name = ExpandInstanceName(self.lu.cfg, self.instance_name)
285
    instance = self.cfg.GetInstanceInfo(instance_name)
286
    assert instance is not None
287
    self.instance = instance
288
    cluster = self.cfg.GetClusterInfo()
289

    
290
    if (not self.cleanup and
291
        not instance.admin_state == constants.ADMINST_UP and
292
        not self.failover and self.fallback):
293
      self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
294
                      " switching to failover")
295
      self.failover = True
296

    
297
    if instance.disk_template not in constants.DTS_MIRRORED:
298
      if self.failover:
299
        text = "failovers"
300
      else:
301
        text = "migrations"
302
      raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
303
                                 " %s" % (instance.disk_template, text),
304
                                 errors.ECODE_STATE)
305

    
306
    if instance.disk_template in constants.DTS_EXT_MIRROR:
307
      CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
308

    
309
      if self.lu.op.iallocator:
310
        assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
311
        self._RunAllocator()
312
      else:
313
        # We set set self.target_node as it is required by
314
        # BuildHooksEnv
315
        self.target_node = self.lu.op.target_node
316

    
317
      # Check that the target node is correct in terms of instance policy
318
      nodeinfo = self.cfg.GetNodeInfo(self.target_node)
319
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
320
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
321
                                                              group_info)
322
      CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
323
                             ignore=self.ignore_ipolicy)
324

    
325
      # self.target_node is already populated, either directly or by the
326
      # iallocator run
327
      target_node = self.target_node
328
      if self.target_node == instance.primary_node:
329
        raise errors.OpPrereqError("Cannot migrate instance %s"
330
                                   " to its primary (%s)" %
331
                                   (instance.name, instance.primary_node),
332
                                   errors.ECODE_STATE)
333

    
334
      if len(self.lu.tasklets) == 1:
335
        # It is safe to release locks only when we're the only tasklet
336
        # in the LU
337
        ReleaseLocks(self.lu, locking.LEVEL_NODE,
338
                     keep=[instance.primary_node, self.target_node])
339
        ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
340

    
341
    else:
342
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
343

    
344
      secondary_nodes = instance.secondary_nodes
345
      if not secondary_nodes:
346
        raise errors.ConfigurationError("No secondary node but using"
347
                                        " %s disk template" %
348
                                        instance.disk_template)
349
      target_node = secondary_nodes[0]
350
      if self.lu.op.iallocator or (self.lu.op.target_node and
351
                                   self.lu.op.target_node != target_node):
352
        if self.failover:
353
          text = "failed over"
354
        else:
355
          text = "migrated"
356
        raise errors.OpPrereqError("Instances with disk template %s cannot"
357
                                   " be %s to arbitrary nodes"
358
                                   " (neither an iallocator nor a target"
359
                                   " node can be passed)" %
360
                                   (instance.disk_template, text),
361
                                   errors.ECODE_INVAL)
362
      nodeinfo = self.cfg.GetNodeInfo(target_node)
363
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
364
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
365
                                                              group_info)
366
      CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
367
                             ignore=self.ignore_ipolicy)
368

    
369
    i_be = cluster.FillBE(instance)
370

    
371
    # check memory requirements on the secondary node
372
    if (not self.cleanup and
373
         (not self.failover or instance.admin_state == constants.ADMINST_UP)):
374
      self.tgt_free_mem = CheckNodeFreeMemory(self.lu, target_node,
375
                                              "migrating instance %s" %
376
                                              instance.name,
377
                                              i_be[constants.BE_MINMEM],
378
                                              instance.hypervisor)
379
    else:
380
      self.lu.LogInfo("Not checking memory on the secondary node as"
381
                      " instance will not be started")
382

    
383
    # check if failover must be forced instead of migration
384
    if (not self.cleanup and not self.failover and
385
        i_be[constants.BE_ALWAYS_FAILOVER]):
386
      self.lu.LogInfo("Instance configured to always failover; fallback"
387
                      " to failover")
388
      self.failover = True
389

    
390
    # check bridge existance
391
    CheckInstanceBridgesExist(self.lu, instance, node=target_node)
392

    
393
    if not self.cleanup:
394
      CheckNodeNotDrained(self.lu, target_node)
395
      if not self.failover:
396
        result = self.rpc.call_instance_migratable(instance.primary_node,
397
                                                   instance)
398
        if result.fail_msg and self.fallback:
399
          self.lu.LogInfo("Can't migrate, instance offline, fallback to"
400
                          " failover")
401
          self.failover = True
402
        else:
403
          result.Raise("Can't migrate, please use failover",
404
                       prereq=True, ecode=errors.ECODE_STATE)
405

    
406
    assert not (self.failover and self.cleanup)
407

    
408
    if not self.failover:
409
      if self.lu.op.live is not None and self.lu.op.mode is not None:
410
        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
411
                                   " parameters are accepted",
412
                                   errors.ECODE_INVAL)
413
      if self.lu.op.live is not None:
414
        if self.lu.op.live:
415
          self.lu.op.mode = constants.HT_MIGRATION_LIVE
416
        else:
417
          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
418
        # reset the 'live' parameter to None so that repeated
419
        # invocations of CheckPrereq do not raise an exception
420
        self.lu.op.live = None
421
      elif self.lu.op.mode is None:
422
        # read the default value from the hypervisor
423
        i_hv = cluster.FillHV(self.instance, skip_globals=False)
424
        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
425

    
426
      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
427
    else:
428
      # Failover is never live
429
      self.live = False
430

    
431
    if not (self.failover or self.cleanup):
432
      remote_info = self.rpc.call_instance_info(instance.primary_node,
433
                                                instance.name,
434
                                                instance.hypervisor)
435
      remote_info.Raise("Error checking instance on node %s" %
436
                        instance.primary_node)
437
      instance_running = bool(remote_info.payload)
438
      if instance_running:
439
        self.current_mem = int(remote_info.payload["memory"])
440

    
441
  def _RunAllocator(self):
442
    """Run the allocator based on input opcode.
443

444
    """
445
    assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
446

    
447
    # FIXME: add a self.ignore_ipolicy option
448
    req = iallocator.IAReqRelocate(name=self.instance_name,
449
                                   relocate_from=[self.instance.primary_node])
450
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
451

    
452
    ial.Run(self.lu.op.iallocator)
453

    
454
    if not ial.success:
455
      raise errors.OpPrereqError("Can't compute nodes using"
456
                                 " iallocator '%s': %s" %
457
                                 (self.lu.op.iallocator, ial.info),
458
                                 errors.ECODE_NORES)
459
    self.target_node = ial.result[0]
460
    self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
461
                    self.instance_name, self.lu.op.iallocator,
462
                    utils.CommaJoin(ial.result))
463

    
464
  def _WaitUntilSync(self):
465
    """Poll with custom rpc for disk sync.
466

467
    This uses our own step-based rpc call.
468

469
    """
470
    self.feedback_fn("* wait until resync is done")
471
    all_done = False
472
    while not all_done:
473
      all_done = True
474
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
475
                                            self.nodes_ip,
476
                                            (self.instance.disks,
477
                                             self.instance))
478
      min_percent = 100
479
      for node, nres in result.items():
480
        nres.Raise("Cannot resync disks on node %s" % node)
481
        node_done, node_percent = nres.payload
482
        all_done = all_done and node_done
483
        if node_percent is not None:
484
          min_percent = min(min_percent, node_percent)
485
      if not all_done:
486
        if min_percent < 100:
487
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
488
        time.sleep(2)
489

    
490
  def _EnsureSecondary(self, node):
491
    """Demote a node to secondary.
492

493
    """
494
    self.feedback_fn("* switching node %s to secondary mode" % node)
495

    
496
    for dev in self.instance.disks:
497
      self.cfg.SetDiskID(dev, node)
498

    
499
    result = self.rpc.call_blockdev_close(node, self.instance.name,
500
                                          self.instance.disks)
501
    result.Raise("Cannot change disk to secondary on node %s" % node)
502

    
503
  def _GoStandalone(self):
504
    """Disconnect from the network.
505

506
    """
507
    self.feedback_fn("* changing into standalone mode")
508
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
509
                                               self.instance.disks)
510
    for node, nres in result.items():
511
      nres.Raise("Cannot disconnect disks node %s" % node)
512

    
513
  def _GoReconnect(self, multimaster):
514
    """Reconnect to the network.
515

516
    """
517
    if multimaster:
518
      msg = "dual-master"
519
    else:
520
      msg = "single-master"
521
    self.feedback_fn("* changing disks into %s mode" % msg)
522
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
523
                                           (self.instance.disks, self.instance),
524
                                           self.instance.name, multimaster)
525
    for node, nres in result.items():
526
      nres.Raise("Cannot change disks config on node %s" % node)
527

    
528
  def _ExecCleanup(self):
529
    """Try to cleanup after a failed migration.
530

531
    The cleanup is done by:
532
      - check that the instance is running only on one node
533
        (and update the config if needed)
534
      - change disks on its secondary node to secondary
535
      - wait until disks are fully synchronized
536
      - disconnect from the network
537
      - change disks into single-master mode
538
      - wait again until disks are fully synchronized
539

540
    """
541
    instance = self.instance
542
    target_node = self.target_node
543
    source_node = self.source_node
544

    
545
    # check running on only one node
546
    self.feedback_fn("* checking where the instance actually runs"
547
                     " (if this hangs, the hypervisor might be in"
548
                     " a bad state)")
549
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
550
    for node, result in ins_l.items():
551
      result.Raise("Can't contact node %s" % node)
552

    
553
    runningon_source = instance.name in ins_l[source_node].payload
554
    runningon_target = instance.name in ins_l[target_node].payload
555

    
556
    if runningon_source and runningon_target:
557
      raise errors.OpExecError("Instance seems to be running on two nodes,"
558
                               " or the hypervisor is confused; you will have"
559
                               " to ensure manually that it runs only on one"
560
                               " and restart this operation")
561

    
562
    if not (runningon_source or runningon_target):
563
      raise errors.OpExecError("Instance does not seem to be running at all;"
564
                               " in this case it's safer to repair by"
565
                               " running 'gnt-instance stop' to ensure disk"
566
                               " shutdown, and then restarting it")
567

    
568
    if runningon_target:
569
      # the migration has actually succeeded, we need to update the config
570
      self.feedback_fn("* instance running on secondary node (%s),"
571
                       " updating config" % target_node)
572
      instance.primary_node = target_node
573
      self.cfg.Update(instance, self.feedback_fn)
574
      demoted_node = source_node
575
    else:
576
      self.feedback_fn("* instance confirmed to be running on its"
577
                       " primary node (%s)" % source_node)
578
      demoted_node = target_node
579

    
580
    if instance.disk_template in constants.DTS_INT_MIRROR:
581
      self._EnsureSecondary(demoted_node)
582
      try:
583
        self._WaitUntilSync()
584
      except errors.OpExecError:
585
        # we ignore here errors, since if the device is standalone, it
586
        # won't be able to sync
587
        pass
588
      self._GoStandalone()
589
      self._GoReconnect(False)
590
      self._WaitUntilSync()
591

    
592
    self.feedback_fn("* done")
593

    
594
  def _RevertDiskStatus(self):
595
    """Try to revert the disk status after a failed migration.
596

597
    """
598
    target_node = self.target_node
599
    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
600
      return
601

    
602
    try:
603
      self._EnsureSecondary(target_node)
604
      self._GoStandalone()
605
      self._GoReconnect(False)
606
      self._WaitUntilSync()
607
    except errors.OpExecError, err:
608
      self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
609
                         " please try to recover the instance manually;"
610
                         " error '%s'" % str(err))
611

    
612
  def _AbortMigration(self):
613
    """Call the hypervisor code to abort a started migration.
614

615
    """
616
    instance = self.instance
617
    target_node = self.target_node
618
    source_node = self.source_node
619
    migration_info = self.migration_info
620

    
621
    abort_result = self.rpc.call_instance_finalize_migration_dst(target_node,
622
                                                                 instance,
623
                                                                 migration_info,
624
                                                                 False)
625
    abort_msg = abort_result.fail_msg
626
    if abort_msg:
627
      logging.error("Aborting migration failed on target node %s: %s",
628
                    target_node, abort_msg)
629
      # Don't raise an exception here, as we stil have to try to revert the
630
      # disk status, even if this step failed.
631

    
632
    abort_result = self.rpc.call_instance_finalize_migration_src(
633
      source_node, instance, False, self.live)
634
    abort_msg = abort_result.fail_msg
635
    if abort_msg:
636
      logging.error("Aborting migration failed on source node %s: %s",
637
                    source_node, abort_msg)
638

    
639
  def _ExecMigration(self):
640
    """Migrate an instance.
641

642
    The migrate is done by:
643
      - change the disks into dual-master mode
644
      - wait until disks are fully synchronized again
645
      - migrate the instance
646
      - change disks on the new secondary node (the old primary) to secondary
647
      - wait until disks are fully synchronized
648
      - change disks into single-master mode
649

650
    """
651
    instance = self.instance
652
    target_node = self.target_node
653
    source_node = self.source_node
654

    
655
    # Check for hypervisor version mismatch and warn the user.
656
    nodeinfo = self.rpc.call_node_info([source_node, target_node],
657
                                       None, [self.instance.hypervisor], False)
658
    for ninfo in nodeinfo.values():
659
      ninfo.Raise("Unable to retrieve node information from node '%s'" %
660
                  ninfo.node)
661
    (_, _, (src_info, )) = nodeinfo[source_node].payload
662
    (_, _, (dst_info, )) = nodeinfo[target_node].payload
663

    
664
    if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
665
        (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
666
      src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
667
      dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
668
      if src_version != dst_version:
669
        self.feedback_fn("* warning: hypervisor version mismatch between"
670
                         " source (%s) and target (%s) node" %
671
                         (src_version, dst_version))
672

    
673
    self.feedback_fn("* checking disk consistency between source and target")
674
    for (idx, dev) in enumerate(instance.disks):
675
      if not CheckDiskConsistency(self.lu, instance, dev, target_node, False):
676
        raise errors.OpExecError("Disk %s is degraded or not fully"
677
                                 " synchronized on target node,"
678
                                 " aborting migration" % idx)
679

    
680
    if self.current_mem > self.tgt_free_mem:
681
      if not self.allow_runtime_changes:
682
        raise errors.OpExecError("Memory ballooning not allowed and not enough"
683
                                 " free memory to fit instance %s on target"
684
                                 " node %s (have %dMB, need %dMB)" %
685
                                 (instance.name, target_node,
686
                                  self.tgt_free_mem, self.current_mem))
687
      self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
688
      rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
689
                                                     instance,
690
                                                     self.tgt_free_mem)
691
      rpcres.Raise("Cannot modify instance runtime memory")
692

    
693
    # First get the migration information from the remote node
694
    result = self.rpc.call_migration_info(source_node, instance)
695
    msg = result.fail_msg
696
    if msg:
697
      log_err = ("Failed fetching source migration information from %s: %s" %
698
                 (source_node, msg))
699
      logging.error(log_err)
700
      raise errors.OpExecError(log_err)
701

    
702
    self.migration_info = migration_info = result.payload
703

    
704
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
705
      # Then switch the disks to master/master mode
706
      self._EnsureSecondary(target_node)
707
      self._GoStandalone()
708
      self._GoReconnect(True)
709
      self._WaitUntilSync()
710

    
711
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
712
    result = self.rpc.call_accept_instance(target_node,
713
                                           instance,
714
                                           migration_info,
715
                                           self.nodes_ip[target_node])
716

    
717
    msg = result.fail_msg
718
    if msg:
719
      logging.error("Instance pre-migration failed, trying to revert"
720
                    " disk status: %s", msg)
721
      self.feedback_fn("Pre-migration failed, aborting")
722
      self._AbortMigration()
723
      self._RevertDiskStatus()
724
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
725
                               (instance.name, msg))
726

    
727
    self.feedback_fn("* migrating instance to %s" % target_node)
728
    result = self.rpc.call_instance_migrate(source_node, instance,
729
                                            self.nodes_ip[target_node],
730
                                            self.live)
731
    msg = result.fail_msg
732
    if msg:
733
      logging.error("Instance migration failed, trying to revert"
734
                    " disk status: %s", msg)
735
      self.feedback_fn("Migration failed, aborting")
736
      self._AbortMigration()
737
      self._RevertDiskStatus()
738
      raise errors.OpExecError("Could not migrate instance %s: %s" %
739
                               (instance.name, msg))
740

    
741
    self.feedback_fn("* starting memory transfer")
742
    last_feedback = time.time()
743
    while True:
744
      result = self.rpc.call_instance_get_migration_status(source_node,
745
                                                           instance)
746
      msg = result.fail_msg
747
      ms = result.payload   # MigrationStatus instance
748
      if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
749
        logging.error("Instance migration failed, trying to revert"
750
                      " disk status: %s", msg)
751
        self.feedback_fn("Migration failed, aborting")
752
        self._AbortMigration()
753
        self._RevertDiskStatus()
754
        if not msg:
755
          msg = "hypervisor returned failure"
756
        raise errors.OpExecError("Could not migrate instance %s: %s" %
757
                                 (instance.name, msg))
758

    
759
      if result.payload.status != constants.HV_MIGRATION_ACTIVE:
760
        self.feedback_fn("* memory transfer complete")
761
        break
762

    
763
      if (utils.TimeoutExpired(last_feedback,
764
                               self._MIGRATION_FEEDBACK_INTERVAL) and
765
          ms.transferred_ram is not None):
766
        mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
767
        self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
768
        last_feedback = time.time()
769

    
770
      time.sleep(self._MIGRATION_POLL_INTERVAL)
771

    
772
    result = self.rpc.call_instance_finalize_migration_src(source_node,
773
                                                           instance,
774
                                                           True,
775
                                                           self.live)
776
    msg = result.fail_msg
777
    if msg:
778
      logging.error("Instance migration succeeded, but finalization failed"
779
                    " on the source node: %s", msg)
780
      raise errors.OpExecError("Could not finalize instance migration: %s" %
781
                               msg)
782

    
783
    instance.primary_node = target_node
784

    
785
    # distribute new instance config to the other nodes
786
    self.cfg.Update(instance, self.feedback_fn)
787

    
788
    result = self.rpc.call_instance_finalize_migration_dst(target_node,
789
                                                           instance,
790
                                                           migration_info,
791
                                                           True)
792
    msg = result.fail_msg
793
    if msg:
794
      logging.error("Instance migration succeeded, but finalization failed"
795
                    " on the target node: %s", msg)
796
      raise errors.OpExecError("Could not finalize instance migration: %s" %
797
                               msg)
798

    
799
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
800
      self._EnsureSecondary(source_node)
801
      self._WaitUntilSync()
802
      self._GoStandalone()
803
      self._GoReconnect(False)
804
      self._WaitUntilSync()
805

    
806
    # If the instance's disk template is `rbd' or `ext' and there was a
807
    # successful migration, unmap the device from the source node.
808
    if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
809
      disks = ExpandCheckDisks(instance, instance.disks)
810
      self.feedback_fn("* unmapping instance's disks from %s" % source_node)
811
      for disk in disks:
812
        result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance))
813
        msg = result.fail_msg
814
        if msg:
815
          logging.error("Migration was successful, but couldn't unmap the"
816
                        " block device %s on source node %s: %s",
817
                        disk.iv_name, source_node, msg)
818
          logging.error("You need to unmap the device %s manually on %s",
819
                        disk.iv_name, source_node)
820

    
821
    self.feedback_fn("* done")
822

    
823
  def _ExecFailover(self):
824
    """Failover an instance.
825

826
    The failover is done by shutting it down on its present node and
827
    starting it on the secondary.
828

829
    """
830
    instance = self.instance
831
    primary_node = self.cfg.GetNodeInfo(instance.primary_node)
832

    
833
    source_node = instance.primary_node
834
    target_node = self.target_node
835

    
836
    if instance.disks_active:
837
      self.feedback_fn("* checking disk consistency between source and target")
838
      for (idx, dev) in enumerate(instance.disks):
839
        # for drbd, these are drbd over lvm
840
        if not CheckDiskConsistency(self.lu, instance, dev, target_node,
841
                                    False):
842
          if primary_node.offline:
843
            self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
844
                             " target node %s" %
845
                             (primary_node.name, idx, target_node))
846
          elif not self.ignore_consistency:
847
            raise errors.OpExecError("Disk %s is degraded on target node,"
848
                                     " aborting failover" % idx)
849
    else:
850
      self.feedback_fn("* not checking disk consistency as instance is not"
851
                       " running")
852

    
853
    self.feedback_fn("* shutting down instance on source node")
854
    logging.info("Shutting down instance %s on node %s",
855
                 instance.name, source_node)
856

    
857
    result = self.rpc.call_instance_shutdown(source_node, instance,
858
                                             self.shutdown_timeout,
859
                                             self.lu.op.reason)
860
    msg = result.fail_msg
861
    if msg:
862
      if self.ignore_consistency or primary_node.offline:
863
        self.lu.LogWarning("Could not shutdown instance %s on node %s,"
864
                           " proceeding anyway; please make sure node"
865
                           " %s is down; error details: %s",
866
                           instance.name, source_node, source_node, msg)
867
      else:
868
        raise errors.OpExecError("Could not shutdown instance %s on"
869
                                 " node %s: %s" %
870
                                 (instance.name, source_node, msg))
871

    
872
    self.feedback_fn("* deactivating the instance's disks on source node")
873
    if not ShutdownInstanceDisks(self.lu, instance, ignore_primary=True):
874
      raise errors.OpExecError("Can't shut down the instance's disks")
875

    
876
    instance.primary_node = target_node
877
    # distribute new instance config to the other nodes
878
    self.cfg.Update(instance, self.feedback_fn)
879

    
880
    # Only start the instance if it's marked as up
881
    if instance.admin_state == constants.ADMINST_UP:
882
      self.feedback_fn("* activating the instance's disks on target node %s" %
883
                       target_node)
884
      logging.info("Starting instance %s on node %s",
885
                   instance.name, target_node)
886

    
887
      disks_ok, _ = AssembleInstanceDisks(self.lu, instance,
888
                                          ignore_secondaries=True)
889
      if not disks_ok:
890
        ShutdownInstanceDisks(self.lu, instance)
891
        raise errors.OpExecError("Can't activate the instance's disks")
892

    
893
      self.feedback_fn("* starting the instance on the target node %s" %
894
                       target_node)
895
      result = self.rpc.call_instance_start(target_node, (instance, None, None),
896
                                            False, self.lu.op.reason)
897
      msg = result.fail_msg
898
      if msg:
899
        ShutdownInstanceDisks(self.lu, instance)
900
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
901
                                 (instance.name, target_node, msg))
902

    
903
  def Exec(self, feedback_fn):
904
    """Perform the migration.
905

906
    """
907
    self.feedback_fn = feedback_fn
908
    self.source_node = self.instance.primary_node
909

    
910
    # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
911
    if self.instance.disk_template in constants.DTS_INT_MIRROR:
912
      self.target_node = self.instance.secondary_nodes[0]
913
      # Otherwise self.target_node has been populated either
914
      # directly, or through an iallocator.
915

    
916
    self.all_nodes = [self.source_node, self.target_node]
917
    self.nodes_ip = dict((name, node.secondary_ip) for (name, node)
918
                         in self.cfg.GetMultiNodeInfo(self.all_nodes))
919

    
920
    if self.failover:
921
      feedback_fn("Failover instance %s" % self.instance.name)
922
      self._ExecFailover()
923
    else:
924
      feedback_fn("Migrating instance %s" % self.instance.name)
925

    
926
      if self.cleanup:
927
        return self._ExecCleanup()
928
      else:
929
        return self._ExecMigration()