Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_migration.py @ 87e25be1

History | View | Annotate | Download (35.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with instance migration an failover."""
23

    
24
import logging
25
import time
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti.masterd import iallocator
31
from ganeti import utils
32
from ganeti.cmdlib.base import LogicalUnit, Tasklet
33
from ganeti.cmdlib.common import _ExpandInstanceName, \
34
  _CheckIAllocatorOrNode, _ExpandNodeName
35
from ganeti.cmdlib.instance_storage import _CheckDiskConsistency, \
36
  _ExpandCheckDisks, _ShutdownInstanceDisks, _AssembleInstanceDisks
37
from ganeti.cmdlib.instance_utils import _BuildInstanceHookEnvByObject, \
38
  _CheckTargetNodeIPolicy, _ReleaseLocks, _CheckNodeNotDrained, \
39
  _CopyLockList, _CheckNodeFreeMemory, _CheckInstanceBridgesExist
40

    
41
import ganeti.masterd.instance
42

    
43

    
44
def _ExpandNamesForMigration(lu):
45
  """Expands names for use with L{TLMigrateInstance}.
46

47
  @type lu: L{LogicalUnit}
48

49
  """
50
  if lu.op.target_node is not None:
51
    lu.op.target_node = _ExpandNodeName(lu.cfg, lu.op.target_node)
52

    
53
  lu.needed_locks[locking.LEVEL_NODE] = []
54
  lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
55

    
56
  lu.needed_locks[locking.LEVEL_NODE_RES] = []
57
  lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
58

    
59
  # The node allocation lock is actually only needed for externally replicated
60
  # instances (e.g. sharedfile or RBD) and if an iallocator is used.
61
  lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
62

    
63

    
64
def _DeclareLocksForMigration(lu, level):
65
  """Declares locks for L{TLMigrateInstance}.
66

67
  @type lu: L{LogicalUnit}
68
  @param level: Lock level
69

70
  """
71
  if level == locking.LEVEL_NODE_ALLOC:
72
    assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
73

    
74
    instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
75

    
76
    # Node locks are already declared here rather than at LEVEL_NODE as we need
77
    # the instance object anyway to declare the node allocation lock.
78
    if instance.disk_template in constants.DTS_EXT_MIRROR:
79
      if lu.op.target_node is None:
80
        lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
81
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
82
      else:
83
        lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
84
                                               lu.op.target_node]
85
      del lu.recalculate_locks[locking.LEVEL_NODE]
86
    else:
87
      lu._LockInstancesNodes() # pylint: disable=W0212
88

    
89
  elif level == locking.LEVEL_NODE:
90
    # Node locks are declared together with the node allocation lock
91
    assert (lu.needed_locks[locking.LEVEL_NODE] or
92
            lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
93

    
94
  elif level == locking.LEVEL_NODE_RES:
95
    # Copy node locks
96
    lu.needed_locks[locking.LEVEL_NODE_RES] = \
97
      _CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
98

    
99

    
100
class LUInstanceFailover(LogicalUnit):
101
  """Failover an instance.
102

103
  """
104
  HPATH = "instance-failover"
105
  HTYPE = constants.HTYPE_INSTANCE
106
  REQ_BGL = False
107

    
108
  def CheckArguments(self):
109
    """Check the arguments.
110

111
    """
112
    self.iallocator = getattr(self.op, "iallocator", None)
113
    self.target_node = getattr(self.op, "target_node", None)
114

    
115
  def ExpandNames(self):
116
    self._ExpandAndLockInstance()
117
    _ExpandNamesForMigration(self)
118

    
119
    self._migrater = \
120
      TLMigrateInstance(self, self.op.instance_name, False, True, False,
121
                        self.op.ignore_consistency, True,
122
                        self.op.shutdown_timeout, self.op.ignore_ipolicy)
123

    
124
    self.tasklets = [self._migrater]
125

    
126
  def DeclareLocks(self, level):
127
    _DeclareLocksForMigration(self, level)
128

    
129
  def BuildHooksEnv(self):
130
    """Build hooks env.
131

132
    This runs on master, primary and secondary nodes of the instance.
133

134
    """
135
    instance = self._migrater.instance
136
    source_node = instance.primary_node
137
    target_node = self.op.target_node
138
    env = {
139
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
140
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
141
      "OLD_PRIMARY": source_node,
142
      "NEW_PRIMARY": target_node,
143
      }
144

    
145
    if instance.disk_template in constants.DTS_INT_MIRROR:
146
      env["OLD_SECONDARY"] = instance.secondary_nodes[0]
147
      env["NEW_SECONDARY"] = source_node
148
    else:
149
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
150

    
151
    env.update(_BuildInstanceHookEnvByObject(self, instance))
152

    
153
    return env
154

    
155
  def BuildHooksNodes(self):
156
    """Build hooks nodes.
157

158
    """
159
    instance = self._migrater.instance
160
    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
161
    return (nl, nl + [instance.primary_node])
162

    
163

    
164
class LUInstanceMigrate(LogicalUnit):
165
  """Migrate an instance.
166

167
  This is migration without shutting down, compared to the failover,
168
  which is done with shutdown.
169

170
  """
171
  HPATH = "instance-migrate"
172
  HTYPE = constants.HTYPE_INSTANCE
173
  REQ_BGL = False
174

    
175
  def ExpandNames(self):
176
    self._ExpandAndLockInstance()
177
    _ExpandNamesForMigration(self)
178

    
179
    self._migrater = \
180
      TLMigrateInstance(self, self.op.instance_name, self.op.cleanup,
181
                        False, self.op.allow_failover, False,
182
                        self.op.allow_runtime_changes,
183
                        constants.DEFAULT_SHUTDOWN_TIMEOUT,
184
                        self.op.ignore_ipolicy)
185

    
186
    self.tasklets = [self._migrater]
187

    
188
  def DeclareLocks(self, level):
189
    _DeclareLocksForMigration(self, level)
190

    
191
  def BuildHooksEnv(self):
192
    """Build hooks env.
193

194
    This runs on master, primary and secondary nodes of the instance.
195

196
    """
197
    instance = self._migrater.instance
198
    source_node = instance.primary_node
199
    target_node = self.op.target_node
200
    env = _BuildInstanceHookEnvByObject(self, instance)
201
    env.update({
202
      "MIGRATE_LIVE": self._migrater.live,
203
      "MIGRATE_CLEANUP": self.op.cleanup,
204
      "OLD_PRIMARY": source_node,
205
      "NEW_PRIMARY": target_node,
206
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
207
      })
208

    
209
    if instance.disk_template in constants.DTS_INT_MIRROR:
210
      env["OLD_SECONDARY"] = target_node
211
      env["NEW_SECONDARY"] = source_node
212
    else:
213
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
214

    
215
    return env
216

    
217
  def BuildHooksNodes(self):
218
    """Build hooks nodes.
219

220
    """
221
    instance = self._migrater.instance
222
    snodes = list(instance.secondary_nodes)
223
    nl = [self.cfg.GetMasterNode(), instance.primary_node] + snodes
224
    return (nl, nl)
225

    
226

    
227
class TLMigrateInstance(Tasklet):
228
  """Tasklet class for instance migration.
229

230
  @type live: boolean
231
  @ivar live: whether the migration will be done live or non-live;
232
      this variable is initalized only after CheckPrereq has run
233
  @type cleanup: boolean
234
  @ivar cleanup: Wheater we cleanup from a failed migration
235
  @type iallocator: string
236
  @ivar iallocator: The iallocator used to determine target_node
237
  @type target_node: string
238
  @ivar target_node: If given, the target_node to reallocate the instance to
239
  @type failover: boolean
240
  @ivar failover: Whether operation results in failover or migration
241
  @type fallback: boolean
242
  @ivar fallback: Whether fallback to failover is allowed if migration not
243
                  possible
244
  @type ignore_consistency: boolean
245
  @ivar ignore_consistency: Wheter we should ignore consistency between source
246
                            and target node
247
  @type shutdown_timeout: int
248
  @ivar shutdown_timeout: In case of failover timeout of the shutdown
249
  @type ignore_ipolicy: bool
250
  @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
251

252
  """
253

    
254
  # Constants
255
  _MIGRATION_POLL_INTERVAL = 1      # seconds
256
  _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
257

    
258
  def __init__(self, lu, instance_name, cleanup, failover, fallback,
259
               ignore_consistency, allow_runtime_changes, shutdown_timeout,
260
               ignore_ipolicy):
261
    """Initializes this class.
262

263
    """
264
    Tasklet.__init__(self, lu)
265

    
266
    # Parameters
267
    self.instance_name = instance_name
268
    self.cleanup = cleanup
269
    self.live = False # will be overridden later
270
    self.failover = failover
271
    self.fallback = fallback
272
    self.ignore_consistency = ignore_consistency
273
    self.shutdown_timeout = shutdown_timeout
274
    self.ignore_ipolicy = ignore_ipolicy
275
    self.allow_runtime_changes = allow_runtime_changes
276

    
277
  def CheckPrereq(self):
278
    """Check prerequisites.
279

280
    This checks that the instance is in the cluster.
281

282
    """
283
    instance_name = _ExpandInstanceName(self.lu.cfg, self.instance_name)
284
    instance = self.cfg.GetInstanceInfo(instance_name)
285
    assert instance is not None
286
    self.instance = instance
287
    cluster = self.cfg.GetClusterInfo()
288

    
289
    if (not self.cleanup and
290
        not instance.admin_state == constants.ADMINST_UP and
291
        not self.failover and self.fallback):
292
      self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
293
                      " switching to failover")
294
      self.failover = True
295

    
296
    if instance.disk_template not in constants.DTS_MIRRORED:
297
      if self.failover:
298
        text = "failovers"
299
      else:
300
        text = "migrations"
301
      raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
302
                                 " %s" % (instance.disk_template, text),
303
                                 errors.ECODE_STATE)
304

    
305
    if instance.disk_template in constants.DTS_EXT_MIRROR:
306
      _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
307

    
308
      if self.lu.op.iallocator:
309
        assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
310
        self._RunAllocator()
311
      else:
312
        # We set set self.target_node as it is required by
313
        # BuildHooksEnv
314
        self.target_node = self.lu.op.target_node
315

    
316
      # Check that the target node is correct in terms of instance policy
317
      nodeinfo = self.cfg.GetNodeInfo(self.target_node)
318
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
319
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
320
                                                              group_info)
321
      _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
322
                              ignore=self.ignore_ipolicy)
323

    
324
      # self.target_node is already populated, either directly or by the
325
      # iallocator run
326
      target_node = self.target_node
327
      if self.target_node == instance.primary_node:
328
        raise errors.OpPrereqError("Cannot migrate instance %s"
329
                                   " to its primary (%s)" %
330
                                   (instance.name, instance.primary_node),
331
                                   errors.ECODE_STATE)
332

    
333
      if len(self.lu.tasklets) == 1:
334
        # It is safe to release locks only when we're the only tasklet
335
        # in the LU
336
        _ReleaseLocks(self.lu, locking.LEVEL_NODE,
337
                      keep=[instance.primary_node, self.target_node])
338
        _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
339

    
340
    else:
341
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
342

    
343
      secondary_nodes = instance.secondary_nodes
344
      if not secondary_nodes:
345
        raise errors.ConfigurationError("No secondary node but using"
346
                                        " %s disk template" %
347
                                        instance.disk_template)
348
      target_node = secondary_nodes[0]
349
      if self.lu.op.iallocator or (self.lu.op.target_node and
350
                                   self.lu.op.target_node != target_node):
351
        if self.failover:
352
          text = "failed over"
353
        else:
354
          text = "migrated"
355
        raise errors.OpPrereqError("Instances with disk template %s cannot"
356
                                   " be %s to arbitrary nodes"
357
                                   " (neither an iallocator nor a target"
358
                                   " node can be passed)" %
359
                                   (instance.disk_template, text),
360
                                   errors.ECODE_INVAL)
361
      nodeinfo = self.cfg.GetNodeInfo(target_node)
362
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
363
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
364
                                                              group_info)
365
      _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
366
                              ignore=self.ignore_ipolicy)
367

    
368
    i_be = cluster.FillBE(instance)
369

    
370
    # check memory requirements on the secondary node
371
    if (not self.cleanup and
372
         (not self.failover or instance.admin_state == constants.ADMINST_UP)):
373
      self.tgt_free_mem = _CheckNodeFreeMemory(self.lu, target_node,
374
                                               "migrating instance %s" %
375
                                               instance.name,
376
                                               i_be[constants.BE_MINMEM],
377
                                               instance.hypervisor)
378
    else:
379
      self.lu.LogInfo("Not checking memory on the secondary node as"
380
                      " instance will not be started")
381

    
382
    # check if failover must be forced instead of migration
383
    if (not self.cleanup and not self.failover and
384
        i_be[constants.BE_ALWAYS_FAILOVER]):
385
      self.lu.LogInfo("Instance configured to always failover; fallback"
386
                      " to failover")
387
      self.failover = True
388

    
389
    # check bridge existance
390
    _CheckInstanceBridgesExist(self.lu, instance, node=target_node)
391

    
392
    if not self.cleanup:
393
      _CheckNodeNotDrained(self.lu, target_node)
394
      if not self.failover:
395
        result = self.rpc.call_instance_migratable(instance.primary_node,
396
                                                   instance)
397
        if result.fail_msg and self.fallback:
398
          self.lu.LogInfo("Can't migrate, instance offline, fallback to"
399
                          " failover")
400
          self.failover = True
401
        else:
402
          result.Raise("Can't migrate, please use failover",
403
                       prereq=True, ecode=errors.ECODE_STATE)
404

    
405
    assert not (self.failover and self.cleanup)
406

    
407
    if not self.failover:
408
      if self.lu.op.live is not None and self.lu.op.mode is not None:
409
        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
410
                                   " parameters are accepted",
411
                                   errors.ECODE_INVAL)
412
      if self.lu.op.live is not None:
413
        if self.lu.op.live:
414
          self.lu.op.mode = constants.HT_MIGRATION_LIVE
415
        else:
416
          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
417
        # reset the 'live' parameter to None so that repeated
418
        # invocations of CheckPrereq do not raise an exception
419
        self.lu.op.live = None
420
      elif self.lu.op.mode is None:
421
        # read the default value from the hypervisor
422
        i_hv = cluster.FillHV(self.instance, skip_globals=False)
423
        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
424

    
425
      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
426
    else:
427
      # Failover is never live
428
      self.live = False
429

    
430
    if not (self.failover or self.cleanup):
431
      remote_info = self.rpc.call_instance_info(instance.primary_node,
432
                                                instance.name,
433
                                                instance.hypervisor)
434
      remote_info.Raise("Error checking instance on node %s" %
435
                        instance.primary_node)
436
      instance_running = bool(remote_info.payload)
437
      if instance_running:
438
        self.current_mem = int(remote_info.payload["memory"])
439

    
440
  def _RunAllocator(self):
441
    """Run the allocator based on input opcode.
442

443
    """
444
    assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
445

    
446
    # FIXME: add a self.ignore_ipolicy option
447
    req = iallocator.IAReqRelocate(name=self.instance_name,
448
                                   relocate_from=[self.instance.primary_node])
449
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
450

    
451
    ial.Run(self.lu.op.iallocator)
452

    
453
    if not ial.success:
454
      raise errors.OpPrereqError("Can't compute nodes using"
455
                                 " iallocator '%s': %s" %
456
                                 (self.lu.op.iallocator, ial.info),
457
                                 errors.ECODE_NORES)
458
    self.target_node = ial.result[0]
459
    self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
460
                    self.instance_name, self.lu.op.iallocator,
461
                    utils.CommaJoin(ial.result))
462

    
463
  def _WaitUntilSync(self):
464
    """Poll with custom rpc for disk sync.
465

466
    This uses our own step-based rpc call.
467

468
    """
469
    self.feedback_fn("* wait until resync is done")
470
    all_done = False
471
    while not all_done:
472
      all_done = True
473
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
474
                                            self.nodes_ip,
475
                                            (self.instance.disks,
476
                                             self.instance))
477
      min_percent = 100
478
      for node, nres in result.items():
479
        nres.Raise("Cannot resync disks on node %s" % node)
480
        node_done, node_percent = nres.payload
481
        all_done = all_done and node_done
482
        if node_percent is not None:
483
          min_percent = min(min_percent, node_percent)
484
      if not all_done:
485
        if min_percent < 100:
486
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
487
        time.sleep(2)
488

    
489
  def _EnsureSecondary(self, node):
490
    """Demote a node to secondary.
491

492
    """
493
    self.feedback_fn("* switching node %s to secondary mode" % node)
494

    
495
    for dev in self.instance.disks:
496
      self.cfg.SetDiskID(dev, node)
497

    
498
    result = self.rpc.call_blockdev_close(node, self.instance.name,
499
                                          self.instance.disks)
500
    result.Raise("Cannot change disk to secondary on node %s" % node)
501

    
502
  def _GoStandalone(self):
503
    """Disconnect from the network.
504

505
    """
506
    self.feedback_fn("* changing into standalone mode")
507
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
508
                                               self.instance.disks)
509
    for node, nres in result.items():
510
      nres.Raise("Cannot disconnect disks node %s" % node)
511

    
512
  def _GoReconnect(self, multimaster):
513
    """Reconnect to the network.
514

515
    """
516
    if multimaster:
517
      msg = "dual-master"
518
    else:
519
      msg = "single-master"
520
    self.feedback_fn("* changing disks into %s mode" % msg)
521
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
522
                                           (self.instance.disks, self.instance),
523
                                           self.instance.name, multimaster)
524
    for node, nres in result.items():
525
      nres.Raise("Cannot change disks config on node %s" % node)
526

    
527
  def _ExecCleanup(self):
528
    """Try to cleanup after a failed migration.
529

530
    The cleanup is done by:
531
      - check that the instance is running only on one node
532
        (and update the config if needed)
533
      - change disks on its secondary node to secondary
534
      - wait until disks are fully synchronized
535
      - disconnect from the network
536
      - change disks into single-master mode
537
      - wait again until disks are fully synchronized
538

539
    """
540
    instance = self.instance
541
    target_node = self.target_node
542
    source_node = self.source_node
543

    
544
    # check running on only one node
545
    self.feedback_fn("* checking where the instance actually runs"
546
                     " (if this hangs, the hypervisor might be in"
547
                     " a bad state)")
548
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
549
    for node, result in ins_l.items():
550
      result.Raise("Can't contact node %s" % node)
551

    
552
    runningon_source = instance.name in ins_l[source_node].payload
553
    runningon_target = instance.name in ins_l[target_node].payload
554

    
555
    if runningon_source and runningon_target:
556
      raise errors.OpExecError("Instance seems to be running on two nodes,"
557
                               " or the hypervisor is confused; you will have"
558
                               " to ensure manually that it runs only on one"
559
                               " and restart this operation")
560

    
561
    if not (runningon_source or runningon_target):
562
      raise errors.OpExecError("Instance does not seem to be running at all;"
563
                               " in this case it's safer to repair by"
564
                               " running 'gnt-instance stop' to ensure disk"
565
                               " shutdown, and then restarting it")
566

    
567
    if runningon_target:
568
      # the migration has actually succeeded, we need to update the config
569
      self.feedback_fn("* instance running on secondary node (%s),"
570
                       " updating config" % target_node)
571
      instance.primary_node = target_node
572
      self.cfg.Update(instance, self.feedback_fn)
573
      demoted_node = source_node
574
    else:
575
      self.feedback_fn("* instance confirmed to be running on its"
576
                       " primary node (%s)" % source_node)
577
      demoted_node = target_node
578

    
579
    if instance.disk_template in constants.DTS_INT_MIRROR:
580
      self._EnsureSecondary(demoted_node)
581
      try:
582
        self._WaitUntilSync()
583
      except errors.OpExecError:
584
        # we ignore here errors, since if the device is standalone, it
585
        # won't be able to sync
586
        pass
587
      self._GoStandalone()
588
      self._GoReconnect(False)
589
      self._WaitUntilSync()
590

    
591
    self.feedback_fn("* done")
592

    
593
  def _RevertDiskStatus(self):
594
    """Try to revert the disk status after a failed migration.
595

596
    """
597
    target_node = self.target_node
598
    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
599
      return
600

    
601
    try:
602
      self._EnsureSecondary(target_node)
603
      self._GoStandalone()
604
      self._GoReconnect(False)
605
      self._WaitUntilSync()
606
    except errors.OpExecError, err:
607
      self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
608
                         " please try to recover the instance manually;"
609
                         " error '%s'" % str(err))
610

    
611
  def _AbortMigration(self):
612
    """Call the hypervisor code to abort a started migration.
613

614
    """
615
    instance = self.instance
616
    target_node = self.target_node
617
    source_node = self.source_node
618
    migration_info = self.migration_info
619

    
620
    abort_result = self.rpc.call_instance_finalize_migration_dst(target_node,
621
                                                                 instance,
622
                                                                 migration_info,
623
                                                                 False)
624
    abort_msg = abort_result.fail_msg
625
    if abort_msg:
626
      logging.error("Aborting migration failed on target node %s: %s",
627
                    target_node, abort_msg)
628
      # Don't raise an exception here, as we stil have to try to revert the
629
      # disk status, even if this step failed.
630

    
631
    abort_result = self.rpc.call_instance_finalize_migration_src(
632
      source_node, instance, False, self.live)
633
    abort_msg = abort_result.fail_msg
634
    if abort_msg:
635
      logging.error("Aborting migration failed on source node %s: %s",
636
                    source_node, abort_msg)
637

    
638
  def _ExecMigration(self):
639
    """Migrate an instance.
640

641
    The migrate is done by:
642
      - change the disks into dual-master mode
643
      - wait until disks are fully synchronized again
644
      - migrate the instance
645
      - change disks on the new secondary node (the old primary) to secondary
646
      - wait until disks are fully synchronized
647
      - change disks into single-master mode
648

649
    """
650
    instance = self.instance
651
    target_node = self.target_node
652
    source_node = self.source_node
653

    
654
    # Check for hypervisor version mismatch and warn the user.
655
    nodeinfo = self.rpc.call_node_info([source_node, target_node],
656
                                       None, [self.instance.hypervisor], False)
657
    for ninfo in nodeinfo.values():
658
      ninfo.Raise("Unable to retrieve node information from node '%s'" %
659
                  ninfo.node)
660
    (_, _, (src_info, )) = nodeinfo[source_node].payload
661
    (_, _, (dst_info, )) = nodeinfo[target_node].payload
662

    
663
    if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
664
        (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
665
      src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
666
      dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
667
      if src_version != dst_version:
668
        self.feedback_fn("* warning: hypervisor version mismatch between"
669
                         " source (%s) and target (%s) node" %
670
                         (src_version, dst_version))
671

    
672
    self.feedback_fn("* checking disk consistency between source and target")
673
    for (idx, dev) in enumerate(instance.disks):
674
      if not _CheckDiskConsistency(self.lu, instance, dev, target_node, False):
675
        raise errors.OpExecError("Disk %s is degraded or not fully"
676
                                 " synchronized on target node,"
677
                                 " aborting migration" % idx)
678

    
679
    if self.current_mem > self.tgt_free_mem:
680
      if not self.allow_runtime_changes:
681
        raise errors.OpExecError("Memory ballooning not allowed and not enough"
682
                                 " free memory to fit instance %s on target"
683
                                 " node %s (have %dMB, need %dMB)" %
684
                                 (instance.name, target_node,
685
                                  self.tgt_free_mem, self.current_mem))
686
      self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
687
      rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
688
                                                     instance,
689
                                                     self.tgt_free_mem)
690
      rpcres.Raise("Cannot modify instance runtime memory")
691

    
692
    # First get the migration information from the remote node
693
    result = self.rpc.call_migration_info(source_node, instance)
694
    msg = result.fail_msg
695
    if msg:
696
      log_err = ("Failed fetching source migration information from %s: %s" %
697
                 (source_node, msg))
698
      logging.error(log_err)
699
      raise errors.OpExecError(log_err)
700

    
701
    self.migration_info = migration_info = result.payload
702

    
703
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
704
      # Then switch the disks to master/master mode
705
      self._EnsureSecondary(target_node)
706
      self._GoStandalone()
707
      self._GoReconnect(True)
708
      self._WaitUntilSync()
709

    
710
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
711
    result = self.rpc.call_accept_instance(target_node,
712
                                           instance,
713
                                           migration_info,
714
                                           self.nodes_ip[target_node])
715

    
716
    msg = result.fail_msg
717
    if msg:
718
      logging.error("Instance pre-migration failed, trying to revert"
719
                    " disk status: %s", msg)
720
      self.feedback_fn("Pre-migration failed, aborting")
721
      self._AbortMigration()
722
      self._RevertDiskStatus()
723
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
724
                               (instance.name, msg))
725

    
726
    self.feedback_fn("* migrating instance to %s" % target_node)
727
    result = self.rpc.call_instance_migrate(source_node, instance,
728
                                            self.nodes_ip[target_node],
729
                                            self.live)
730
    msg = result.fail_msg
731
    if msg:
732
      logging.error("Instance migration failed, trying to revert"
733
                    " disk status: %s", msg)
734
      self.feedback_fn("Migration failed, aborting")
735
      self._AbortMigration()
736
      self._RevertDiskStatus()
737
      raise errors.OpExecError("Could not migrate instance %s: %s" %
738
                               (instance.name, msg))
739

    
740
    self.feedback_fn("* starting memory transfer")
741
    last_feedback = time.time()
742
    while True:
743
      result = self.rpc.call_instance_get_migration_status(source_node,
744
                                                           instance)
745
      msg = result.fail_msg
746
      ms = result.payload   # MigrationStatus instance
747
      if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
748
        logging.error("Instance migration failed, trying to revert"
749
                      " disk status: %s", msg)
750
        self.feedback_fn("Migration failed, aborting")
751
        self._AbortMigration()
752
        self._RevertDiskStatus()
753
        if not msg:
754
          msg = "hypervisor returned failure"
755
        raise errors.OpExecError("Could not migrate instance %s: %s" %
756
                                 (instance.name, msg))
757

    
758
      if result.payload.status != constants.HV_MIGRATION_ACTIVE:
759
        self.feedback_fn("* memory transfer complete")
760
        break
761

    
762
      if (utils.TimeoutExpired(last_feedback,
763
                               self._MIGRATION_FEEDBACK_INTERVAL) and
764
          ms.transferred_ram is not None):
765
        mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
766
        self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
767
        last_feedback = time.time()
768

    
769
      time.sleep(self._MIGRATION_POLL_INTERVAL)
770

    
771
    result = self.rpc.call_instance_finalize_migration_src(source_node,
772
                                                           instance,
773
                                                           True,
774
                                                           self.live)
775
    msg = result.fail_msg
776
    if msg:
777
      logging.error("Instance migration succeeded, but finalization failed"
778
                    " on the source node: %s", msg)
779
      raise errors.OpExecError("Could not finalize instance migration: %s" %
780
                               msg)
781

    
782
    instance.primary_node = target_node
783

    
784
    # distribute new instance config to the other nodes
785
    self.cfg.Update(instance, self.feedback_fn)
786

    
787
    result = self.rpc.call_instance_finalize_migration_dst(target_node,
788
                                                           instance,
789
                                                           migration_info,
790
                                                           True)
791
    msg = result.fail_msg
792
    if msg:
793
      logging.error("Instance migration succeeded, but finalization failed"
794
                    " on the target node: %s", msg)
795
      raise errors.OpExecError("Could not finalize instance migration: %s" %
796
                               msg)
797

    
798
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
799
      self._EnsureSecondary(source_node)
800
      self._WaitUntilSync()
801
      self._GoStandalone()
802
      self._GoReconnect(False)
803
      self._WaitUntilSync()
804

    
805
    # If the instance's disk template is `rbd' or `ext' and there was a
806
    # successful migration, unmap the device from the source node.
807
    if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
808
      disks = _ExpandCheckDisks(instance, instance.disks)
809
      self.feedback_fn("* unmapping instance's disks from %s" % source_node)
810
      for disk in disks:
811
        result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance))
812
        msg = result.fail_msg
813
        if msg:
814
          logging.error("Migration was successful, but couldn't unmap the"
815
                        " block device %s on source node %s: %s",
816
                        disk.iv_name, source_node, msg)
817
          logging.error("You need to unmap the device %s manually on %s",
818
                        disk.iv_name, source_node)
819

    
820
    self.feedback_fn("* done")
821

    
822
  def _ExecFailover(self):
823
    """Failover an instance.
824

825
    The failover is done by shutting it down on its present node and
826
    starting it on the secondary.
827

828
    """
829
    instance = self.instance
830
    primary_node = self.cfg.GetNodeInfo(instance.primary_node)
831

    
832
    source_node = instance.primary_node
833
    target_node = self.target_node
834

    
835
    if instance.admin_state == constants.ADMINST_UP:
836
      self.feedback_fn("* checking disk consistency between source and target")
837
      for (idx, dev) in enumerate(instance.disks):
838
        # for drbd, these are drbd over lvm
839
        if not _CheckDiskConsistency(self.lu, instance, dev, target_node,
840
                                     False):
841
          if primary_node.offline:
842
            self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
843
                             " target node %s" %
844
                             (primary_node.name, idx, target_node))
845
          elif not self.ignore_consistency:
846
            raise errors.OpExecError("Disk %s is degraded on target node,"
847
                                     " aborting failover" % idx)
848
    else:
849
      self.feedback_fn("* not checking disk consistency as instance is not"
850
                       " running")
851

    
852
    self.feedback_fn("* shutting down instance on source node")
853
    logging.info("Shutting down instance %s on node %s",
854
                 instance.name, source_node)
855

    
856
    result = self.rpc.call_instance_shutdown(source_node, instance,
857
                                             self.shutdown_timeout,
858
                                             self.lu.op.reason)
859
    msg = result.fail_msg
860
    if msg:
861
      if self.ignore_consistency or primary_node.offline:
862
        self.lu.LogWarning("Could not shutdown instance %s on node %s,"
863
                           " proceeding anyway; please make sure node"
864
                           " %s is down; error details: %s",
865
                           instance.name, source_node, source_node, msg)
866
      else:
867
        raise errors.OpExecError("Could not shutdown instance %s on"
868
                                 " node %s: %s" %
869
                                 (instance.name, source_node, msg))
870

    
871
    self.feedback_fn("* deactivating the instance's disks on source node")
872
    if not _ShutdownInstanceDisks(self.lu, instance, ignore_primary=True):
873
      raise errors.OpExecError("Can't shut down the instance's disks")
874

    
875
    instance.primary_node = target_node
876
    # distribute new instance config to the other nodes
877
    self.cfg.Update(instance, self.feedback_fn)
878

    
879
    # Only start the instance if it's marked as up
880
    if instance.admin_state == constants.ADMINST_UP:
881
      self.feedback_fn("* activating the instance's disks on target node %s" %
882
                       target_node)
883
      logging.info("Starting instance %s on node %s",
884
                   instance.name, target_node)
885

    
886
      disks_ok, _ = _AssembleInstanceDisks(self.lu, instance,
887
                                           ignore_secondaries=True)
888
      if not disks_ok:
889
        _ShutdownInstanceDisks(self.lu, instance)
890
        raise errors.OpExecError("Can't activate the instance's disks")
891

    
892
      self.feedback_fn("* starting the instance on the target node %s" %
893
                       target_node)
894
      result = self.rpc.call_instance_start(target_node, (instance, None, None),
895
                                            False, self.lu.op.reason)
896
      msg = result.fail_msg
897
      if msg:
898
        _ShutdownInstanceDisks(self.lu, instance)
899
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
900
                                 (instance.name, target_node, msg))
901

    
902
  def Exec(self, feedback_fn):
903
    """Perform the migration.
904

905
    """
906
    self.feedback_fn = feedback_fn
907
    self.source_node = self.instance.primary_node
908

    
909
    # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
910
    if self.instance.disk_template in constants.DTS_INT_MIRROR:
911
      self.target_node = self.instance.secondary_nodes[0]
912
      # Otherwise self.target_node has been populated either
913
      # directly, or through an iallocator.
914

    
915
    self.all_nodes = [self.source_node, self.target_node]
916
    self.nodes_ip = dict((name, node.secondary_ip) for (name, node)
917
                         in self.cfg.GetMultiNodeInfo(self.all_nodes))
918

    
919
    if self.failover:
920
      feedback_fn("Failover instance %s" % self.instance.name)
921
      self._ExecFailover()
922
    else:
923
      feedback_fn("Migrating instance %s" % self.instance.name)
924

    
925
      if self.cleanup:
926
        return self._ExecCleanup()
927
      else:
928
        return self._ExecMigration()