4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with instance migration an failover."""
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import locking
30 from ganeti.masterd import iallocator
31 from ganeti import utils
32 from ganeti.cmdlib.base import LogicalUnit, Tasklet
33 from ganeti.cmdlib.common import ExpandInstanceName, \
34 CheckIAllocatorOrNode, ExpandNodeName
35 from ganeti.cmdlib.instance_storage import CheckDiskConsistency, \
36 ExpandCheckDisks, ShutdownInstanceDisks, AssembleInstanceDisks
37 from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \
38 CheckTargetNodeIPolicy, ReleaseLocks, CheckNodeNotDrained, \
39 CopyLockList, CheckNodeFreeMemory, CheckInstanceBridgesExist
41 import ganeti.masterd.instance
44 def _ExpandNamesForMigration(lu):
45 """Expands names for use with L{TLMigrateInstance}.
47 @type lu: L{LogicalUnit}
50 if lu.op.target_node is not None:
51 lu.op.target_node = ExpandNodeName(lu.cfg, lu.op.target_node)
53 lu.needed_locks[locking.LEVEL_NODE] = []
54 lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
56 lu.needed_locks[locking.LEVEL_NODE_RES] = []
57 lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
59 # The node allocation lock is actually only needed for externally replicated
60 # instances (e.g. sharedfile or RBD) and if an iallocator is used.
61 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
64 def _DeclareLocksForMigration(lu, level):
65 """Declares locks for L{TLMigrateInstance}.
67 @type lu: L{LogicalUnit}
68 @param level: Lock level
71 if level == locking.LEVEL_NODE_ALLOC:
72 assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
74 instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
76 # Node locks are already declared here rather than at LEVEL_NODE as we need
77 # the instance object anyway to declare the node allocation lock.
78 if instance.disk_template in constants.DTS_EXT_MIRROR:
79 if lu.op.target_node is None:
80 lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
81 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
83 lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
85 del lu.recalculate_locks[locking.LEVEL_NODE]
87 lu._LockInstancesNodes() # pylint: disable=W0212
89 elif level == locking.LEVEL_NODE:
90 # Node locks are declared together with the node allocation lock
91 assert (lu.needed_locks[locking.LEVEL_NODE] or
92 lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
94 elif level == locking.LEVEL_NODE_RES:
96 lu.needed_locks[locking.LEVEL_NODE_RES] = \
97 CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
100 class LUInstanceFailover(LogicalUnit):
101 """Failover an instance.
104 HPATH = "instance-failover"
105 HTYPE = constants.HTYPE_INSTANCE
108 def CheckArguments(self):
109 """Check the arguments.
112 self.iallocator = getattr(self.op, "iallocator", None)
113 self.target_node = getattr(self.op, "target_node", None)
115 def ExpandNames(self):
116 self._ExpandAndLockInstance()
117 _ExpandNamesForMigration(self)
120 TLMigrateInstance(self, self.op.instance_name, self.op.cleanup, True,
121 False, self.op.ignore_consistency, True,
122 self.op.shutdown_timeout, self.op.ignore_ipolicy)
124 self.tasklets = [self._migrater]
126 def DeclareLocks(self, level):
127 _DeclareLocksForMigration(self, level)
129 def BuildHooksEnv(self):
132 This runs on master, primary and secondary nodes of the instance.
135 instance = self._migrater.instance
136 source_node = instance.primary_node
137 target_node = self.op.target_node
139 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
140 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
141 "OLD_PRIMARY": source_node,
142 "NEW_PRIMARY": target_node,
143 "FAILOVER_CLEANUP": self.op.cleanup,
146 if instance.disk_template in constants.DTS_INT_MIRROR:
147 env["OLD_SECONDARY"] = instance.secondary_nodes[0]
148 env["NEW_SECONDARY"] = source_node
150 env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
152 env.update(BuildInstanceHookEnvByObject(self, instance))
156 def BuildHooksNodes(self):
157 """Build hooks nodes.
160 instance = self._migrater.instance
161 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
162 return (nl, nl + [instance.primary_node])
165 class LUInstanceMigrate(LogicalUnit):
166 """Migrate an instance.
168 This is migration without shutting down, compared to the failover,
169 which is done with shutdown.
172 HPATH = "instance-migrate"
173 HTYPE = constants.HTYPE_INSTANCE
176 def ExpandNames(self):
177 self._ExpandAndLockInstance()
178 _ExpandNamesForMigration(self)
181 TLMigrateInstance(self, self.op.instance_name, self.op.cleanup,
182 False, self.op.allow_failover, False,
183 self.op.allow_runtime_changes,
184 constants.DEFAULT_SHUTDOWN_TIMEOUT,
185 self.op.ignore_ipolicy)
187 self.tasklets = [self._migrater]
189 def DeclareLocks(self, level):
190 _DeclareLocksForMigration(self, level)
192 def BuildHooksEnv(self):
195 This runs on master, primary and secondary nodes of the instance.
198 instance = self._migrater.instance
199 source_node = instance.primary_node
200 target_node = self.op.target_node
201 env = BuildInstanceHookEnvByObject(self, instance)
203 "MIGRATE_LIVE": self._migrater.live,
204 "MIGRATE_CLEANUP": self.op.cleanup,
205 "OLD_PRIMARY": source_node,
206 "NEW_PRIMARY": target_node,
207 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
210 if instance.disk_template in constants.DTS_INT_MIRROR:
211 env["OLD_SECONDARY"] = target_node
212 env["NEW_SECONDARY"] = source_node
214 env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
218 def BuildHooksNodes(self):
219 """Build hooks nodes.
222 instance = self._migrater.instance
223 snodes = list(instance.secondary_nodes)
224 nl = [self.cfg.GetMasterNode(), instance.primary_node] + snodes
228 class TLMigrateInstance(Tasklet):
229 """Tasklet class for instance migration.
232 @ivar live: whether the migration will be done live or non-live;
233 this variable is initalized only after CheckPrereq has run
234 @type cleanup: boolean
235 @ivar cleanup: Wheater we cleanup from a failed migration
236 @type iallocator: string
237 @ivar iallocator: The iallocator used to determine target_node
238 @type target_node: string
239 @ivar target_node: If given, the target_node to reallocate the instance to
240 @type failover: boolean
241 @ivar failover: Whether operation results in failover or migration
242 @type fallback: boolean
243 @ivar fallback: Whether fallback to failover is allowed if migration not
245 @type ignore_consistency: boolean
246 @ivar ignore_consistency: Wheter we should ignore consistency between source
248 @type shutdown_timeout: int
249 @ivar shutdown_timeout: In case of failover timeout of the shutdown
250 @type ignore_ipolicy: bool
251 @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
256 _MIGRATION_POLL_INTERVAL = 1 # seconds
257 _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
259 def __init__(self, lu, instance_name, cleanup, failover, fallback,
260 ignore_consistency, allow_runtime_changes, shutdown_timeout,
262 """Initializes this class.
265 Tasklet.__init__(self, lu)
268 self.instance_name = instance_name
269 self.cleanup = cleanup
270 self.live = False # will be overridden later
271 self.failover = failover
272 self.fallback = fallback
273 self.ignore_consistency = ignore_consistency
274 self.shutdown_timeout = shutdown_timeout
275 self.ignore_ipolicy = ignore_ipolicy
276 self.allow_runtime_changes = allow_runtime_changes
278 def CheckPrereq(self):
279 """Check prerequisites.
281 This checks that the instance is in the cluster.
284 instance_name = ExpandInstanceName(self.lu.cfg, self.instance_name)
285 instance = self.cfg.GetInstanceInfo(instance_name)
286 assert instance is not None
287 self.instance = instance
288 cluster = self.cfg.GetClusterInfo()
290 if (not self.cleanup and
291 not instance.admin_state == constants.ADMINST_UP and
292 not self.failover and self.fallback):
293 self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
294 " switching to failover")
297 if instance.disk_template not in constants.DTS_MIRRORED:
302 raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
303 " %s" % (instance.disk_template, text),
306 if instance.disk_template in constants.DTS_EXT_MIRROR:
307 CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
309 if self.lu.op.iallocator:
310 assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
313 # We set set self.target_node as it is required by
315 self.target_node = self.lu.op.target_node
317 # Check that the target node is correct in terms of instance policy
318 nodeinfo = self.cfg.GetNodeInfo(self.target_node)
319 group_info = self.cfg.GetNodeGroup(nodeinfo.group)
320 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
322 CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
323 ignore=self.ignore_ipolicy)
325 # self.target_node is already populated, either directly or by the
327 target_node = self.target_node
328 if self.target_node == instance.primary_node:
329 raise errors.OpPrereqError("Cannot migrate instance %s"
330 " to its primary (%s)" %
331 (instance.name, instance.primary_node),
334 if len(self.lu.tasklets) == 1:
335 # It is safe to release locks only when we're the only tasklet
337 ReleaseLocks(self.lu, locking.LEVEL_NODE,
338 keep=[instance.primary_node, self.target_node])
339 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
342 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
344 secondary_nodes = instance.secondary_nodes
345 if not secondary_nodes:
346 raise errors.ConfigurationError("No secondary node but using"
347 " %s disk template" %
348 instance.disk_template)
349 target_node = secondary_nodes[0]
350 if self.lu.op.iallocator or (self.lu.op.target_node and
351 self.lu.op.target_node != target_node):
356 raise errors.OpPrereqError("Instances with disk template %s cannot"
357 " be %s to arbitrary nodes"
358 " (neither an iallocator nor a target"
359 " node can be passed)" %
360 (instance.disk_template, text),
362 nodeinfo = self.cfg.GetNodeInfo(target_node)
363 group_info = self.cfg.GetNodeGroup(nodeinfo.group)
364 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
366 CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
367 ignore=self.ignore_ipolicy)
369 i_be = cluster.FillBE(instance)
371 # check memory requirements on the secondary node
372 if (not self.cleanup and
373 (not self.failover or instance.admin_state == constants.ADMINST_UP)):
374 self.tgt_free_mem = CheckNodeFreeMemory(self.lu, target_node,
375 "migrating instance %s" %
377 i_be[constants.BE_MINMEM],
380 self.lu.LogInfo("Not checking memory on the secondary node as"
381 " instance will not be started")
383 # check if failover must be forced instead of migration
384 if (not self.cleanup and not self.failover and
385 i_be[constants.BE_ALWAYS_FAILOVER]):
386 self.lu.LogInfo("Instance configured to always failover; fallback"
390 # check bridge existance
391 CheckInstanceBridgesExist(self.lu, instance, node=target_node)
394 CheckNodeNotDrained(self.lu, target_node)
395 if not self.failover:
396 result = self.rpc.call_instance_migratable(instance.primary_node,
398 if result.fail_msg and self.fallback:
399 self.lu.LogInfo("Can't migrate, instance offline, fallback to"
403 result.Raise("Can't migrate, please use failover",
404 prereq=True, ecode=errors.ECODE_STATE)
406 assert not (self.failover and self.cleanup)
408 if not self.failover:
409 if self.lu.op.live is not None and self.lu.op.mode is not None:
410 raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
411 " parameters are accepted",
413 if self.lu.op.live is not None:
415 self.lu.op.mode = constants.HT_MIGRATION_LIVE
417 self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
418 # reset the 'live' parameter to None so that repeated
419 # invocations of CheckPrereq do not raise an exception
420 self.lu.op.live = None
421 elif self.lu.op.mode is None:
422 # read the default value from the hypervisor
423 i_hv = cluster.FillHV(self.instance, skip_globals=False)
424 self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
426 self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
428 # Failover is never live
431 if not (self.failover or self.cleanup):
432 remote_info = self.rpc.call_instance_info(instance.primary_node,
435 remote_info.Raise("Error checking instance on node %s" %
436 instance.primary_node)
437 instance_running = bool(remote_info.payload)
439 self.current_mem = int(remote_info.payload["memory"])
441 def _RunAllocator(self):
442 """Run the allocator based on input opcode.
445 assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
447 # FIXME: add a self.ignore_ipolicy option
448 req = iallocator.IAReqRelocate(name=self.instance_name,
449 relocate_from=[self.instance.primary_node])
450 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
452 ial.Run(self.lu.op.iallocator)
455 raise errors.OpPrereqError("Can't compute nodes using"
456 " iallocator '%s': %s" %
457 (self.lu.op.iallocator, ial.info),
459 self.target_node = ial.result[0]
460 self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
461 self.instance_name, self.lu.op.iallocator,
462 utils.CommaJoin(ial.result))
464 def _WaitUntilSync(self):
465 """Poll with custom rpc for disk sync.
467 This uses our own step-based rpc call.
470 self.feedback_fn("* wait until resync is done")
474 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
476 (self.instance.disks,
479 for node, nres in result.items():
480 nres.Raise("Cannot resync disks on node %s" % node)
481 node_done, node_percent = nres.payload
482 all_done = all_done and node_done
483 if node_percent is not None:
484 min_percent = min(min_percent, node_percent)
486 if min_percent < 100:
487 self.feedback_fn(" - progress: %.1f%%" % min_percent)
490 def _EnsureSecondary(self, node):
491 """Demote a node to secondary.
494 self.feedback_fn("* switching node %s to secondary mode" % node)
496 for dev in self.instance.disks:
497 self.cfg.SetDiskID(dev, node)
499 result = self.rpc.call_blockdev_close(node, self.instance.name,
501 result.Raise("Cannot change disk to secondary on node %s" % node)
503 def _GoStandalone(self):
504 """Disconnect from the network.
507 self.feedback_fn("* changing into standalone mode")
508 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
510 for node, nres in result.items():
511 nres.Raise("Cannot disconnect disks node %s" % node)
513 def _GoReconnect(self, multimaster):
514 """Reconnect to the network.
520 msg = "single-master"
521 self.feedback_fn("* changing disks into %s mode" % msg)
522 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
523 (self.instance.disks, self.instance),
524 self.instance.name, multimaster)
525 for node, nres in result.items():
526 nres.Raise("Cannot change disks config on node %s" % node)
528 def _ExecCleanup(self):
529 """Try to cleanup after a failed migration.
531 The cleanup is done by:
532 - check that the instance is running only on one node
533 (and update the config if needed)
534 - change disks on its secondary node to secondary
535 - wait until disks are fully synchronized
536 - disconnect from the network
537 - change disks into single-master mode
538 - wait again until disks are fully synchronized
541 instance = self.instance
542 target_node = self.target_node
543 source_node = self.source_node
545 # check running on only one node
546 self.feedback_fn("* checking where the instance actually runs"
547 " (if this hangs, the hypervisor might be in"
549 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
550 for node, result in ins_l.items():
551 result.Raise("Can't contact node %s" % node)
553 runningon_source = instance.name in ins_l[source_node].payload
554 runningon_target = instance.name in ins_l[target_node].payload
556 if runningon_source and runningon_target:
557 raise errors.OpExecError("Instance seems to be running on two nodes,"
558 " or the hypervisor is confused; you will have"
559 " to ensure manually that it runs only on one"
560 " and restart this operation")
562 if not (runningon_source or runningon_target):
563 raise errors.OpExecError("Instance does not seem to be running at all;"
564 " in this case it's safer to repair by"
565 " running 'gnt-instance stop' to ensure disk"
566 " shutdown, and then restarting it")
569 # the migration has actually succeeded, we need to update the config
570 self.feedback_fn("* instance running on secondary node (%s),"
571 " updating config" % target_node)
572 instance.primary_node = target_node
573 self.cfg.Update(instance, self.feedback_fn)
574 demoted_node = source_node
576 self.feedback_fn("* instance confirmed to be running on its"
577 " primary node (%s)" % source_node)
578 demoted_node = target_node
580 if instance.disk_template in constants.DTS_INT_MIRROR:
581 self._EnsureSecondary(demoted_node)
583 self._WaitUntilSync()
584 except errors.OpExecError:
585 # we ignore here errors, since if the device is standalone, it
586 # won't be able to sync
589 self._GoReconnect(False)
590 self._WaitUntilSync()
592 self.feedback_fn("* done")
594 def _RevertDiskStatus(self):
595 """Try to revert the disk status after a failed migration.
598 target_node = self.target_node
599 if self.instance.disk_template in constants.DTS_EXT_MIRROR:
603 self._EnsureSecondary(target_node)
605 self._GoReconnect(False)
606 self._WaitUntilSync()
607 except errors.OpExecError, err:
608 self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
609 " please try to recover the instance manually;"
610 " error '%s'" % str(err))
612 def _AbortMigration(self):
613 """Call the hypervisor code to abort a started migration.
616 instance = self.instance
617 target_node = self.target_node
618 source_node = self.source_node
619 migration_info = self.migration_info
621 abort_result = self.rpc.call_instance_finalize_migration_dst(target_node,
625 abort_msg = abort_result.fail_msg
627 logging.error("Aborting migration failed on target node %s: %s",
628 target_node, abort_msg)
629 # Don't raise an exception here, as we stil have to try to revert the
630 # disk status, even if this step failed.
632 abort_result = self.rpc.call_instance_finalize_migration_src(
633 source_node, instance, False, self.live)
634 abort_msg = abort_result.fail_msg
636 logging.error("Aborting migration failed on source node %s: %s",
637 source_node, abort_msg)
639 def _ExecMigration(self):
640 """Migrate an instance.
642 The migrate is done by:
643 - change the disks into dual-master mode
644 - wait until disks are fully synchronized again
645 - migrate the instance
646 - change disks on the new secondary node (the old primary) to secondary
647 - wait until disks are fully synchronized
648 - change disks into single-master mode
651 instance = self.instance
652 target_node = self.target_node
653 source_node = self.source_node
655 # Check for hypervisor version mismatch and warn the user.
656 nodeinfo = self.rpc.call_node_info([source_node, target_node],
657 None, [self.instance.hypervisor], False)
658 for ninfo in nodeinfo.values():
659 ninfo.Raise("Unable to retrieve node information from node '%s'" %
661 (_, _, (src_info, )) = nodeinfo[source_node].payload
662 (_, _, (dst_info, )) = nodeinfo[target_node].payload
664 if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
665 (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
666 src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
667 dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
668 if src_version != dst_version:
669 self.feedback_fn("* warning: hypervisor version mismatch between"
670 " source (%s) and target (%s) node" %
671 (src_version, dst_version))
673 self.feedback_fn("* checking disk consistency between source and target")
674 for (idx, dev) in enumerate(instance.disks):
675 if not CheckDiskConsistency(self.lu, instance, dev, target_node, False):
676 raise errors.OpExecError("Disk %s is degraded or not fully"
677 " synchronized on target node,"
678 " aborting migration" % idx)
680 if self.current_mem > self.tgt_free_mem:
681 if not self.allow_runtime_changes:
682 raise errors.OpExecError("Memory ballooning not allowed and not enough"
683 " free memory to fit instance %s on target"
684 " node %s (have %dMB, need %dMB)" %
685 (instance.name, target_node,
686 self.tgt_free_mem, self.current_mem))
687 self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
688 rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
691 rpcres.Raise("Cannot modify instance runtime memory")
693 # First get the migration information from the remote node
694 result = self.rpc.call_migration_info(source_node, instance)
695 msg = result.fail_msg
697 log_err = ("Failed fetching source migration information from %s: %s" %
699 logging.error(log_err)
700 raise errors.OpExecError(log_err)
702 self.migration_info = migration_info = result.payload
704 if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
705 # Then switch the disks to master/master mode
706 self._EnsureSecondary(target_node)
708 self._GoReconnect(True)
709 self._WaitUntilSync()
711 self.feedback_fn("* preparing %s to accept the instance" % target_node)
712 # This fills physical_id slot that may be missing on newly created disks
713 for disk in instance.disks:
714 self.cfg.SetDiskID(disk, target_node)
715 result = self.rpc.call_accept_instance(target_node,
718 self.nodes_ip[target_node])
720 msg = result.fail_msg
722 logging.error("Instance pre-migration failed, trying to revert"
723 " disk status: %s", msg)
724 self.feedback_fn("Pre-migration failed, aborting")
725 self._AbortMigration()
726 self._RevertDiskStatus()
727 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
728 (instance.name, msg))
730 self.feedback_fn("* migrating instance to %s" % target_node)
731 result = self.rpc.call_instance_migrate(source_node, instance,
732 self.nodes_ip[target_node],
734 msg = result.fail_msg
736 logging.error("Instance migration failed, trying to revert"
737 " disk status: %s", msg)
738 self.feedback_fn("Migration failed, aborting")
739 self._AbortMigration()
740 self._RevertDiskStatus()
741 raise errors.OpExecError("Could not migrate instance %s: %s" %
742 (instance.name, msg))
744 self.feedback_fn("* starting memory transfer")
745 last_feedback = time.time()
747 result = self.rpc.call_instance_get_migration_status(source_node,
749 msg = result.fail_msg
750 ms = result.payload # MigrationStatus instance
751 if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
752 logging.error("Instance migration failed, trying to revert"
753 " disk status: %s", msg)
754 self.feedback_fn("Migration failed, aborting")
755 self._AbortMigration()
756 self._RevertDiskStatus()
758 msg = "hypervisor returned failure"
759 raise errors.OpExecError("Could not migrate instance %s: %s" %
760 (instance.name, msg))
762 if result.payload.status != constants.HV_MIGRATION_ACTIVE:
763 self.feedback_fn("* memory transfer complete")
766 if (utils.TimeoutExpired(last_feedback,
767 self._MIGRATION_FEEDBACK_INTERVAL) and
768 ms.transferred_ram is not None):
769 mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
770 self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
771 last_feedback = time.time()
773 time.sleep(self._MIGRATION_POLL_INTERVAL)
775 result = self.rpc.call_instance_finalize_migration_src(source_node,
779 msg = result.fail_msg
781 logging.error("Instance migration succeeded, but finalization failed"
782 " on the source node: %s", msg)
783 raise errors.OpExecError("Could not finalize instance migration: %s" %
786 instance.primary_node = target_node
788 # distribute new instance config to the other nodes
789 self.cfg.Update(instance, self.feedback_fn)
791 result = self.rpc.call_instance_finalize_migration_dst(target_node,
795 msg = result.fail_msg
797 logging.error("Instance migration succeeded, but finalization failed"
798 " on the target node: %s", msg)
799 raise errors.OpExecError("Could not finalize instance migration: %s" %
802 if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
803 self._EnsureSecondary(source_node)
804 self._WaitUntilSync()
806 self._GoReconnect(False)
807 self._WaitUntilSync()
809 # If the instance's disk template is `rbd' or `ext' and there was a
810 # successful migration, unmap the device from the source node.
811 if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
812 disks = ExpandCheckDisks(instance, instance.disks)
813 self.feedback_fn("* unmapping instance's disks from %s" % source_node)
815 result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance))
816 msg = result.fail_msg
818 logging.error("Migration was successful, but couldn't unmap the"
819 " block device %s on source node %s: %s",
820 disk.iv_name, source_node, msg)
821 logging.error("You need to unmap the device %s manually on %s",
822 disk.iv_name, source_node)
824 self.feedback_fn("* done")
826 def _ExecFailover(self):
827 """Failover an instance.
829 The failover is done by shutting it down on its present node and
830 starting it on the secondary.
833 instance = self.instance
834 primary_node = self.cfg.GetNodeInfo(instance.primary_node)
836 source_node = instance.primary_node
837 target_node = self.target_node
839 if instance.disks_active:
840 self.feedback_fn("* checking disk consistency between source and target")
841 for (idx, dev) in enumerate(instance.disks):
842 # for drbd, these are drbd over lvm
843 if not CheckDiskConsistency(self.lu, instance, dev, target_node,
845 if primary_node.offline:
846 self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
848 (primary_node.name, idx, target_node))
849 elif not self.ignore_consistency:
850 raise errors.OpExecError("Disk %s is degraded on target node,"
851 " aborting failover" % idx)
853 self.feedback_fn("* not checking disk consistency as instance is not"
856 self.feedback_fn("* shutting down instance on source node")
857 logging.info("Shutting down instance %s on node %s",
858 instance.name, source_node)
860 result = self.rpc.call_instance_shutdown(source_node, instance,
861 self.shutdown_timeout,
863 msg = result.fail_msg
865 if self.ignore_consistency or primary_node.offline:
866 self.lu.LogWarning("Could not shutdown instance %s on node %s,"
867 " proceeding anyway; please make sure node"
868 " %s is down; error details: %s",
869 instance.name, source_node, source_node, msg)
871 raise errors.OpExecError("Could not shutdown instance %s on"
873 (instance.name, source_node, msg))
875 self.feedback_fn("* deactivating the instance's disks on source node")
876 if not ShutdownInstanceDisks(self.lu, instance, ignore_primary=True):
877 raise errors.OpExecError("Can't shut down the instance's disks")
879 instance.primary_node = target_node
880 # distribute new instance config to the other nodes
881 self.cfg.Update(instance, self.feedback_fn)
883 # Only start the instance if it's marked as up
884 if instance.admin_state == constants.ADMINST_UP:
885 self.feedback_fn("* activating the instance's disks on target node %s" %
887 logging.info("Starting instance %s on node %s",
888 instance.name, target_node)
890 disks_ok, _ = AssembleInstanceDisks(self.lu, instance,
891 ignore_secondaries=True)
893 ShutdownInstanceDisks(self.lu, instance)
894 raise errors.OpExecError("Can't activate the instance's disks")
896 self.feedback_fn("* starting the instance on the target node %s" %
898 result = self.rpc.call_instance_start(target_node, (instance, None, None),
899 False, self.lu.op.reason)
900 msg = result.fail_msg
902 ShutdownInstanceDisks(self.lu, instance)
903 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
904 (instance.name, target_node, msg))
906 def Exec(self, feedback_fn):
907 """Perform the migration.
910 self.feedback_fn = feedback_fn
911 self.source_node = self.instance.primary_node
913 # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
914 if self.instance.disk_template in constants.DTS_INT_MIRROR:
915 self.target_node = self.instance.secondary_nodes[0]
916 # Otherwise self.target_node has been populated either
917 # directly, or through an iallocator.
919 self.all_nodes = [self.source_node, self.target_node]
920 self.nodes_ip = dict((name, node.secondary_ip) for (name, node)
921 in self.cfg.GetMultiNodeInfo(self.all_nodes))
924 feedback_fn("Failover instance %s" % self.instance.name)
927 feedback_fn("Migrating instance %s" % self.instance.name)
930 return self._ExecCleanup()
932 return self._ExecMigration()