(2.10) Hotplug: cmdlib support
[ganeti-local] / lib / cmdlib / instance_migration.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with instance migration an failover."""
23
24 import logging
25 import time
26
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import locking
30 from ganeti.masterd import iallocator
31 from ganeti import utils
32 from ganeti.cmdlib.base import LogicalUnit, Tasklet
33 from ganeti.cmdlib.common import ExpandInstanceName, \
34   CheckIAllocatorOrNode, ExpandNodeName
35 from ganeti.cmdlib.instance_storage import CheckDiskConsistency, \
36   ExpandCheckDisks, ShutdownInstanceDisks, AssembleInstanceDisks
37 from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \
38   CheckTargetNodeIPolicy, ReleaseLocks, CheckNodeNotDrained, \
39   CopyLockList, CheckNodeFreeMemory, CheckInstanceBridgesExist
40
41 import ganeti.masterd.instance
42
43
44 def _ExpandNamesForMigration(lu):
45   """Expands names for use with L{TLMigrateInstance}.
46
47   @type lu: L{LogicalUnit}
48
49   """
50   if lu.op.target_node is not None:
51     lu.op.target_node = ExpandNodeName(lu.cfg, lu.op.target_node)
52
53   lu.needed_locks[locking.LEVEL_NODE] = []
54   lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
55
56   lu.needed_locks[locking.LEVEL_NODE_RES] = []
57   lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
58
59   # The node allocation lock is actually only needed for externally replicated
60   # instances (e.g. sharedfile or RBD) and if an iallocator is used.
61   lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
62
63
64 def _DeclareLocksForMigration(lu, level):
65   """Declares locks for L{TLMigrateInstance}.
66
67   @type lu: L{LogicalUnit}
68   @param level: Lock level
69
70   """
71   if level == locking.LEVEL_NODE_ALLOC:
72     assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
73
74     instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
75
76     # Node locks are already declared here rather than at LEVEL_NODE as we need
77     # the instance object anyway to declare the node allocation lock.
78     if instance.disk_template in constants.DTS_EXT_MIRROR:
79       if lu.op.target_node is None:
80         lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
81         lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
82       else:
83         lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
84                                                lu.op.target_node]
85       del lu.recalculate_locks[locking.LEVEL_NODE]
86     else:
87       lu._LockInstancesNodes() # pylint: disable=W0212
88
89   elif level == locking.LEVEL_NODE:
90     # Node locks are declared together with the node allocation lock
91     assert (lu.needed_locks[locking.LEVEL_NODE] or
92             lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
93
94   elif level == locking.LEVEL_NODE_RES:
95     # Copy node locks
96     lu.needed_locks[locking.LEVEL_NODE_RES] = \
97       CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
98
99
100 class LUInstanceFailover(LogicalUnit):
101   """Failover an instance.
102
103   """
104   HPATH = "instance-failover"
105   HTYPE = constants.HTYPE_INSTANCE
106   REQ_BGL = False
107
108   def CheckArguments(self):
109     """Check the arguments.
110
111     """
112     self.iallocator = getattr(self.op, "iallocator", None)
113     self.target_node = getattr(self.op, "target_node", None)
114
115   def ExpandNames(self):
116     self._ExpandAndLockInstance()
117     _ExpandNamesForMigration(self)
118
119     self._migrater = \
120       TLMigrateInstance(self, self.op.instance_name, self.op.cleanup, True,
121                         False, self.op.ignore_consistency, True,
122                         self.op.shutdown_timeout, self.op.ignore_ipolicy)
123
124     self.tasklets = [self._migrater]
125
126   def DeclareLocks(self, level):
127     _DeclareLocksForMigration(self, level)
128
129   def BuildHooksEnv(self):
130     """Build hooks env.
131
132     This runs on master, primary and secondary nodes of the instance.
133
134     """
135     instance = self._migrater.instance
136     source_node = instance.primary_node
137     target_node = self.op.target_node
138     env = {
139       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
140       "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
141       "OLD_PRIMARY": source_node,
142       "NEW_PRIMARY": target_node,
143       "FAILOVER_CLEANUP": self.op.cleanup,
144       }
145
146     if instance.disk_template in constants.DTS_INT_MIRROR:
147       env["OLD_SECONDARY"] = instance.secondary_nodes[0]
148       env["NEW_SECONDARY"] = source_node
149     else:
150       env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
151
152     env.update(BuildInstanceHookEnvByObject(self, instance))
153
154     return env
155
156   def BuildHooksNodes(self):
157     """Build hooks nodes.
158
159     """
160     instance = self._migrater.instance
161     nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
162     return (nl, nl + [instance.primary_node])
163
164
165 class LUInstanceMigrate(LogicalUnit):
166   """Migrate an instance.
167
168   This is migration without shutting down, compared to the failover,
169   which is done with shutdown.
170
171   """
172   HPATH = "instance-migrate"
173   HTYPE = constants.HTYPE_INSTANCE
174   REQ_BGL = False
175
176   def ExpandNames(self):
177     self._ExpandAndLockInstance()
178     _ExpandNamesForMigration(self)
179
180     self._migrater = \
181       TLMigrateInstance(self, self.op.instance_name, self.op.cleanup,
182                         False, self.op.allow_failover, False,
183                         self.op.allow_runtime_changes,
184                         constants.DEFAULT_SHUTDOWN_TIMEOUT,
185                         self.op.ignore_ipolicy)
186
187     self.tasklets = [self._migrater]
188
189   def DeclareLocks(self, level):
190     _DeclareLocksForMigration(self, level)
191
192   def BuildHooksEnv(self):
193     """Build hooks env.
194
195     This runs on master, primary and secondary nodes of the instance.
196
197     """
198     instance = self._migrater.instance
199     source_node = instance.primary_node
200     target_node = self.op.target_node
201     env = BuildInstanceHookEnvByObject(self, instance)
202     env.update({
203       "MIGRATE_LIVE": self._migrater.live,
204       "MIGRATE_CLEANUP": self.op.cleanup,
205       "OLD_PRIMARY": source_node,
206       "NEW_PRIMARY": target_node,
207       "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
208       })
209
210     if instance.disk_template in constants.DTS_INT_MIRROR:
211       env["OLD_SECONDARY"] = target_node
212       env["NEW_SECONDARY"] = source_node
213     else:
214       env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
215
216     return env
217
218   def BuildHooksNodes(self):
219     """Build hooks nodes.
220
221     """
222     instance = self._migrater.instance
223     snodes = list(instance.secondary_nodes)
224     nl = [self.cfg.GetMasterNode(), instance.primary_node] + snodes
225     return (nl, nl)
226
227
228 class TLMigrateInstance(Tasklet):
229   """Tasklet class for instance migration.
230
231   @type live: boolean
232   @ivar live: whether the migration will be done live or non-live;
233       this variable is initalized only after CheckPrereq has run
234   @type cleanup: boolean
235   @ivar cleanup: Wheater we cleanup from a failed migration
236   @type iallocator: string
237   @ivar iallocator: The iallocator used to determine target_node
238   @type target_node: string
239   @ivar target_node: If given, the target_node to reallocate the instance to
240   @type failover: boolean
241   @ivar failover: Whether operation results in failover or migration
242   @type fallback: boolean
243   @ivar fallback: Whether fallback to failover is allowed if migration not
244                   possible
245   @type ignore_consistency: boolean
246   @ivar ignore_consistency: Wheter we should ignore consistency between source
247                             and target node
248   @type shutdown_timeout: int
249   @ivar shutdown_timeout: In case of failover timeout of the shutdown
250   @type ignore_ipolicy: bool
251   @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
252
253   """
254
255   # Constants
256   _MIGRATION_POLL_INTERVAL = 1      # seconds
257   _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
258
259   def __init__(self, lu, instance_name, cleanup, failover, fallback,
260                ignore_consistency, allow_runtime_changes, shutdown_timeout,
261                ignore_ipolicy):
262     """Initializes this class.
263
264     """
265     Tasklet.__init__(self, lu)
266
267     # Parameters
268     self.instance_name = instance_name
269     self.cleanup = cleanup
270     self.live = False # will be overridden later
271     self.failover = failover
272     self.fallback = fallback
273     self.ignore_consistency = ignore_consistency
274     self.shutdown_timeout = shutdown_timeout
275     self.ignore_ipolicy = ignore_ipolicy
276     self.allow_runtime_changes = allow_runtime_changes
277
278   def CheckPrereq(self):
279     """Check prerequisites.
280
281     This checks that the instance is in the cluster.
282
283     """
284     instance_name = ExpandInstanceName(self.lu.cfg, self.instance_name)
285     instance = self.cfg.GetInstanceInfo(instance_name)
286     assert instance is not None
287     self.instance = instance
288     cluster = self.cfg.GetClusterInfo()
289
290     if (not self.cleanup and
291         not instance.admin_state == constants.ADMINST_UP and
292         not self.failover and self.fallback):
293       self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
294                       " switching to failover")
295       self.failover = True
296
297     if instance.disk_template not in constants.DTS_MIRRORED:
298       if self.failover:
299         text = "failovers"
300       else:
301         text = "migrations"
302       raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
303                                  " %s" % (instance.disk_template, text),
304                                  errors.ECODE_STATE)
305
306     if instance.disk_template in constants.DTS_EXT_MIRROR:
307       CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
308
309       if self.lu.op.iallocator:
310         assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
311         self._RunAllocator()
312       else:
313         # We set set self.target_node as it is required by
314         # BuildHooksEnv
315         self.target_node = self.lu.op.target_node
316
317       # Check that the target node is correct in terms of instance policy
318       nodeinfo = self.cfg.GetNodeInfo(self.target_node)
319       group_info = self.cfg.GetNodeGroup(nodeinfo.group)
320       ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
321                                                               group_info)
322       CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
323                              ignore=self.ignore_ipolicy)
324
325       # self.target_node is already populated, either directly or by the
326       # iallocator run
327       target_node = self.target_node
328       if self.target_node == instance.primary_node:
329         raise errors.OpPrereqError("Cannot migrate instance %s"
330                                    " to its primary (%s)" %
331                                    (instance.name, instance.primary_node),
332                                    errors.ECODE_STATE)
333
334       if len(self.lu.tasklets) == 1:
335         # It is safe to release locks only when we're the only tasklet
336         # in the LU
337         ReleaseLocks(self.lu, locking.LEVEL_NODE,
338                      keep=[instance.primary_node, self.target_node])
339         ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
340
341     else:
342       assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
343
344       secondary_nodes = instance.secondary_nodes
345       if not secondary_nodes:
346         raise errors.ConfigurationError("No secondary node but using"
347                                         " %s disk template" %
348                                         instance.disk_template)
349       target_node = secondary_nodes[0]
350       if self.lu.op.iallocator or (self.lu.op.target_node and
351                                    self.lu.op.target_node != target_node):
352         if self.failover:
353           text = "failed over"
354         else:
355           text = "migrated"
356         raise errors.OpPrereqError("Instances with disk template %s cannot"
357                                    " be %s to arbitrary nodes"
358                                    " (neither an iallocator nor a target"
359                                    " node can be passed)" %
360                                    (instance.disk_template, text),
361                                    errors.ECODE_INVAL)
362       nodeinfo = self.cfg.GetNodeInfo(target_node)
363       group_info = self.cfg.GetNodeGroup(nodeinfo.group)
364       ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
365                                                               group_info)
366       CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
367                              ignore=self.ignore_ipolicy)
368
369     i_be = cluster.FillBE(instance)
370
371     # check memory requirements on the secondary node
372     if (not self.cleanup and
373          (not self.failover or instance.admin_state == constants.ADMINST_UP)):
374       self.tgt_free_mem = CheckNodeFreeMemory(self.lu, target_node,
375                                               "migrating instance %s" %
376                                               instance.name,
377                                               i_be[constants.BE_MINMEM],
378                                               instance.hypervisor)
379     else:
380       self.lu.LogInfo("Not checking memory on the secondary node as"
381                       " instance will not be started")
382
383     # check if failover must be forced instead of migration
384     if (not self.cleanup and not self.failover and
385         i_be[constants.BE_ALWAYS_FAILOVER]):
386       self.lu.LogInfo("Instance configured to always failover; fallback"
387                       " to failover")
388       self.failover = True
389
390     # check bridge existance
391     CheckInstanceBridgesExist(self.lu, instance, node=target_node)
392
393     if not self.cleanup:
394       CheckNodeNotDrained(self.lu, target_node)
395       if not self.failover:
396         result = self.rpc.call_instance_migratable(instance.primary_node,
397                                                    instance)
398         if result.fail_msg and self.fallback:
399           self.lu.LogInfo("Can't migrate, instance offline, fallback to"
400                           " failover")
401           self.failover = True
402         else:
403           result.Raise("Can't migrate, please use failover",
404                        prereq=True, ecode=errors.ECODE_STATE)
405
406     assert not (self.failover and self.cleanup)
407
408     if not self.failover:
409       if self.lu.op.live is not None and self.lu.op.mode is not None:
410         raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
411                                    " parameters are accepted",
412                                    errors.ECODE_INVAL)
413       if self.lu.op.live is not None:
414         if self.lu.op.live:
415           self.lu.op.mode = constants.HT_MIGRATION_LIVE
416         else:
417           self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
418         # reset the 'live' parameter to None so that repeated
419         # invocations of CheckPrereq do not raise an exception
420         self.lu.op.live = None
421       elif self.lu.op.mode is None:
422         # read the default value from the hypervisor
423         i_hv = cluster.FillHV(self.instance, skip_globals=False)
424         self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
425
426       self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
427     else:
428       # Failover is never live
429       self.live = False
430
431     if not (self.failover or self.cleanup):
432       remote_info = self.rpc.call_instance_info(instance.primary_node,
433                                                 instance.name,
434                                                 instance.hypervisor)
435       remote_info.Raise("Error checking instance on node %s" %
436                         instance.primary_node)
437       instance_running = bool(remote_info.payload)
438       if instance_running:
439         self.current_mem = int(remote_info.payload["memory"])
440
441   def _RunAllocator(self):
442     """Run the allocator based on input opcode.
443
444     """
445     assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
446
447     # FIXME: add a self.ignore_ipolicy option
448     req = iallocator.IAReqRelocate(name=self.instance_name,
449                                    relocate_from=[self.instance.primary_node])
450     ial = iallocator.IAllocator(self.cfg, self.rpc, req)
451
452     ial.Run(self.lu.op.iallocator)
453
454     if not ial.success:
455       raise errors.OpPrereqError("Can't compute nodes using"
456                                  " iallocator '%s': %s" %
457                                  (self.lu.op.iallocator, ial.info),
458                                  errors.ECODE_NORES)
459     self.target_node = ial.result[0]
460     self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
461                     self.instance_name, self.lu.op.iallocator,
462                     utils.CommaJoin(ial.result))
463
464   def _WaitUntilSync(self):
465     """Poll with custom rpc for disk sync.
466
467     This uses our own step-based rpc call.
468
469     """
470     self.feedback_fn("* wait until resync is done")
471     all_done = False
472     while not all_done:
473       all_done = True
474       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
475                                             self.nodes_ip,
476                                             (self.instance.disks,
477                                              self.instance))
478       min_percent = 100
479       for node, nres in result.items():
480         nres.Raise("Cannot resync disks on node %s" % node)
481         node_done, node_percent = nres.payload
482         all_done = all_done and node_done
483         if node_percent is not None:
484           min_percent = min(min_percent, node_percent)
485       if not all_done:
486         if min_percent < 100:
487           self.feedback_fn("   - progress: %.1f%%" % min_percent)
488         time.sleep(2)
489
490   def _EnsureSecondary(self, node):
491     """Demote a node to secondary.
492
493     """
494     self.feedback_fn("* switching node %s to secondary mode" % node)
495
496     for dev in self.instance.disks:
497       self.cfg.SetDiskID(dev, node)
498
499     result = self.rpc.call_blockdev_close(node, self.instance.name,
500                                           self.instance.disks)
501     result.Raise("Cannot change disk to secondary on node %s" % node)
502
503   def _GoStandalone(self):
504     """Disconnect from the network.
505
506     """
507     self.feedback_fn("* changing into standalone mode")
508     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
509                                                self.instance.disks)
510     for node, nres in result.items():
511       nres.Raise("Cannot disconnect disks node %s" % node)
512
513   def _GoReconnect(self, multimaster):
514     """Reconnect to the network.
515
516     """
517     if multimaster:
518       msg = "dual-master"
519     else:
520       msg = "single-master"
521     self.feedback_fn("* changing disks into %s mode" % msg)
522     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
523                                            (self.instance.disks, self.instance),
524                                            self.instance.name, multimaster)
525     for node, nres in result.items():
526       nres.Raise("Cannot change disks config on node %s" % node)
527
528   def _ExecCleanup(self):
529     """Try to cleanup after a failed migration.
530
531     The cleanup is done by:
532       - check that the instance is running only on one node
533         (and update the config if needed)
534       - change disks on its secondary node to secondary
535       - wait until disks are fully synchronized
536       - disconnect from the network
537       - change disks into single-master mode
538       - wait again until disks are fully synchronized
539
540     """
541     instance = self.instance
542     target_node = self.target_node
543     source_node = self.source_node
544
545     # check running on only one node
546     self.feedback_fn("* checking where the instance actually runs"
547                      " (if this hangs, the hypervisor might be in"
548                      " a bad state)")
549     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
550     for node, result in ins_l.items():
551       result.Raise("Can't contact node %s" % node)
552
553     runningon_source = instance.name in ins_l[source_node].payload
554     runningon_target = instance.name in ins_l[target_node].payload
555
556     if runningon_source and runningon_target:
557       raise errors.OpExecError("Instance seems to be running on two nodes,"
558                                " or the hypervisor is confused; you will have"
559                                " to ensure manually that it runs only on one"
560                                " and restart this operation")
561
562     if not (runningon_source or runningon_target):
563       raise errors.OpExecError("Instance does not seem to be running at all;"
564                                " in this case it's safer to repair by"
565                                " running 'gnt-instance stop' to ensure disk"
566                                " shutdown, and then restarting it")
567
568     if runningon_target:
569       # the migration has actually succeeded, we need to update the config
570       self.feedback_fn("* instance running on secondary node (%s),"
571                        " updating config" % target_node)
572       instance.primary_node = target_node
573       self.cfg.Update(instance, self.feedback_fn)
574       demoted_node = source_node
575     else:
576       self.feedback_fn("* instance confirmed to be running on its"
577                        " primary node (%s)" % source_node)
578       demoted_node = target_node
579
580     if instance.disk_template in constants.DTS_INT_MIRROR:
581       self._EnsureSecondary(demoted_node)
582       try:
583         self._WaitUntilSync()
584       except errors.OpExecError:
585         # we ignore here errors, since if the device is standalone, it
586         # won't be able to sync
587         pass
588       self._GoStandalone()
589       self._GoReconnect(False)
590       self._WaitUntilSync()
591
592     self.feedback_fn("* done")
593
594   def _RevertDiskStatus(self):
595     """Try to revert the disk status after a failed migration.
596
597     """
598     target_node = self.target_node
599     if self.instance.disk_template in constants.DTS_EXT_MIRROR:
600       return
601
602     try:
603       self._EnsureSecondary(target_node)
604       self._GoStandalone()
605       self._GoReconnect(False)
606       self._WaitUntilSync()
607     except errors.OpExecError, err:
608       self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
609                          " please try to recover the instance manually;"
610                          " error '%s'" % str(err))
611
612   def _AbortMigration(self):
613     """Call the hypervisor code to abort a started migration.
614
615     """
616     instance = self.instance
617     target_node = self.target_node
618     source_node = self.source_node
619     migration_info = self.migration_info
620
621     abort_result = self.rpc.call_instance_finalize_migration_dst(target_node,
622                                                                  instance,
623                                                                  migration_info,
624                                                                  False)
625     abort_msg = abort_result.fail_msg
626     if abort_msg:
627       logging.error("Aborting migration failed on target node %s: %s",
628                     target_node, abort_msg)
629       # Don't raise an exception here, as we stil have to try to revert the
630       # disk status, even if this step failed.
631
632     abort_result = self.rpc.call_instance_finalize_migration_src(
633       source_node, instance, False, self.live)
634     abort_msg = abort_result.fail_msg
635     if abort_msg:
636       logging.error("Aborting migration failed on source node %s: %s",
637                     source_node, abort_msg)
638
639   def _ExecMigration(self):
640     """Migrate an instance.
641
642     The migrate is done by:
643       - change the disks into dual-master mode
644       - wait until disks are fully synchronized again
645       - migrate the instance
646       - change disks on the new secondary node (the old primary) to secondary
647       - wait until disks are fully synchronized
648       - change disks into single-master mode
649
650     """
651     instance = self.instance
652     target_node = self.target_node
653     source_node = self.source_node
654
655     # Check for hypervisor version mismatch and warn the user.
656     nodeinfo = self.rpc.call_node_info([source_node, target_node],
657                                        None, [self.instance.hypervisor], False)
658     for ninfo in nodeinfo.values():
659       ninfo.Raise("Unable to retrieve node information from node '%s'" %
660                   ninfo.node)
661     (_, _, (src_info, )) = nodeinfo[source_node].payload
662     (_, _, (dst_info, )) = nodeinfo[target_node].payload
663
664     if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
665         (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
666       src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
667       dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
668       if src_version != dst_version:
669         self.feedback_fn("* warning: hypervisor version mismatch between"
670                          " source (%s) and target (%s) node" %
671                          (src_version, dst_version))
672
673     self.feedback_fn("* checking disk consistency between source and target")
674     for (idx, dev) in enumerate(instance.disks):
675       if not CheckDiskConsistency(self.lu, instance, dev, target_node, False):
676         raise errors.OpExecError("Disk %s is degraded or not fully"
677                                  " synchronized on target node,"
678                                  " aborting migration" % idx)
679
680     if self.current_mem > self.tgt_free_mem:
681       if not self.allow_runtime_changes:
682         raise errors.OpExecError("Memory ballooning not allowed and not enough"
683                                  " free memory to fit instance %s on target"
684                                  " node %s (have %dMB, need %dMB)" %
685                                  (instance.name, target_node,
686                                   self.tgt_free_mem, self.current_mem))
687       self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
688       rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
689                                                      instance,
690                                                      self.tgt_free_mem)
691       rpcres.Raise("Cannot modify instance runtime memory")
692
693     # First get the migration information from the remote node
694     result = self.rpc.call_migration_info(source_node, instance)
695     msg = result.fail_msg
696     if msg:
697       log_err = ("Failed fetching source migration information from %s: %s" %
698                  (source_node, msg))
699       logging.error(log_err)
700       raise errors.OpExecError(log_err)
701
702     self.migration_info = migration_info = result.payload
703
704     if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
705       # Then switch the disks to master/master mode
706       self._EnsureSecondary(target_node)
707       self._GoStandalone()
708       self._GoReconnect(True)
709       self._WaitUntilSync()
710
711     self.feedback_fn("* preparing %s to accept the instance" % target_node)
712     # This fills physical_id slot that may be missing on newly created disks
713     for disk in instance.disks:
714       self.cfg.SetDiskID(disk, target_node)
715     result = self.rpc.call_accept_instance(target_node,
716                                            instance,
717                                            migration_info,
718                                            self.nodes_ip[target_node])
719
720     msg = result.fail_msg
721     if msg:
722       logging.error("Instance pre-migration failed, trying to revert"
723                     " disk status: %s", msg)
724       self.feedback_fn("Pre-migration failed, aborting")
725       self._AbortMigration()
726       self._RevertDiskStatus()
727       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
728                                (instance.name, msg))
729
730     self.feedback_fn("* migrating instance to %s" % target_node)
731     result = self.rpc.call_instance_migrate(source_node, instance,
732                                             self.nodes_ip[target_node],
733                                             self.live)
734     msg = result.fail_msg
735     if msg:
736       logging.error("Instance migration failed, trying to revert"
737                     " disk status: %s", msg)
738       self.feedback_fn("Migration failed, aborting")
739       self._AbortMigration()
740       self._RevertDiskStatus()
741       raise errors.OpExecError("Could not migrate instance %s: %s" %
742                                (instance.name, msg))
743
744     self.feedback_fn("* starting memory transfer")
745     last_feedback = time.time()
746     while True:
747       result = self.rpc.call_instance_get_migration_status(source_node,
748                                                            instance)
749       msg = result.fail_msg
750       ms = result.payload   # MigrationStatus instance
751       if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
752         logging.error("Instance migration failed, trying to revert"
753                       " disk status: %s", msg)
754         self.feedback_fn("Migration failed, aborting")
755         self._AbortMigration()
756         self._RevertDiskStatus()
757         if not msg:
758           msg = "hypervisor returned failure"
759         raise errors.OpExecError("Could not migrate instance %s: %s" %
760                                  (instance.name, msg))
761
762       if result.payload.status != constants.HV_MIGRATION_ACTIVE:
763         self.feedback_fn("* memory transfer complete")
764         break
765
766       if (utils.TimeoutExpired(last_feedback,
767                                self._MIGRATION_FEEDBACK_INTERVAL) and
768           ms.transferred_ram is not None):
769         mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
770         self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
771         last_feedback = time.time()
772
773       time.sleep(self._MIGRATION_POLL_INTERVAL)
774
775     result = self.rpc.call_instance_finalize_migration_src(source_node,
776                                                            instance,
777                                                            True,
778                                                            self.live)
779     msg = result.fail_msg
780     if msg:
781       logging.error("Instance migration succeeded, but finalization failed"
782                     " on the source node: %s", msg)
783       raise errors.OpExecError("Could not finalize instance migration: %s" %
784                                msg)
785
786     instance.primary_node = target_node
787
788     # distribute new instance config to the other nodes
789     self.cfg.Update(instance, self.feedback_fn)
790
791     result = self.rpc.call_instance_finalize_migration_dst(target_node,
792                                                            instance,
793                                                            migration_info,
794                                                            True)
795     msg = result.fail_msg
796     if msg:
797       logging.error("Instance migration succeeded, but finalization failed"
798                     " on the target node: %s", msg)
799       raise errors.OpExecError("Could not finalize instance migration: %s" %
800                                msg)
801
802     if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
803       self._EnsureSecondary(source_node)
804       self._WaitUntilSync()
805       self._GoStandalone()
806       self._GoReconnect(False)
807       self._WaitUntilSync()
808
809     # If the instance's disk template is `rbd' or `ext' and there was a
810     # successful migration, unmap the device from the source node.
811     if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
812       disks = ExpandCheckDisks(instance, instance.disks)
813       self.feedback_fn("* unmapping instance's disks from %s" % source_node)
814       for disk in disks:
815         result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance))
816         msg = result.fail_msg
817         if msg:
818           logging.error("Migration was successful, but couldn't unmap the"
819                         " block device %s on source node %s: %s",
820                         disk.iv_name, source_node, msg)
821           logging.error("You need to unmap the device %s manually on %s",
822                         disk.iv_name, source_node)
823
824     self.feedback_fn("* done")
825
826   def _ExecFailover(self):
827     """Failover an instance.
828
829     The failover is done by shutting it down on its present node and
830     starting it on the secondary.
831
832     """
833     instance = self.instance
834     primary_node = self.cfg.GetNodeInfo(instance.primary_node)
835
836     source_node = instance.primary_node
837     target_node = self.target_node
838
839     if instance.disks_active:
840       self.feedback_fn("* checking disk consistency between source and target")
841       for (idx, dev) in enumerate(instance.disks):
842         # for drbd, these are drbd over lvm
843         if not CheckDiskConsistency(self.lu, instance, dev, target_node,
844                                     False):
845           if primary_node.offline:
846             self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
847                              " target node %s" %
848                              (primary_node.name, idx, target_node))
849           elif not self.ignore_consistency:
850             raise errors.OpExecError("Disk %s is degraded on target node,"
851                                      " aborting failover" % idx)
852     else:
853       self.feedback_fn("* not checking disk consistency as instance is not"
854                        " running")
855
856     self.feedback_fn("* shutting down instance on source node")
857     logging.info("Shutting down instance %s on node %s",
858                  instance.name, source_node)
859
860     result = self.rpc.call_instance_shutdown(source_node, instance,
861                                              self.shutdown_timeout,
862                                              self.lu.op.reason)
863     msg = result.fail_msg
864     if msg:
865       if self.ignore_consistency or primary_node.offline:
866         self.lu.LogWarning("Could not shutdown instance %s on node %s,"
867                            " proceeding anyway; please make sure node"
868                            " %s is down; error details: %s",
869                            instance.name, source_node, source_node, msg)
870       else:
871         raise errors.OpExecError("Could not shutdown instance %s on"
872                                  " node %s: %s" %
873                                  (instance.name, source_node, msg))
874
875     self.feedback_fn("* deactivating the instance's disks on source node")
876     if not ShutdownInstanceDisks(self.lu, instance, ignore_primary=True):
877       raise errors.OpExecError("Can't shut down the instance's disks")
878
879     instance.primary_node = target_node
880     # distribute new instance config to the other nodes
881     self.cfg.Update(instance, self.feedback_fn)
882
883     # Only start the instance if it's marked as up
884     if instance.admin_state == constants.ADMINST_UP:
885       self.feedback_fn("* activating the instance's disks on target node %s" %
886                        target_node)
887       logging.info("Starting instance %s on node %s",
888                    instance.name, target_node)
889
890       disks_ok, _ = AssembleInstanceDisks(self.lu, instance,
891                                           ignore_secondaries=True)
892       if not disks_ok:
893         ShutdownInstanceDisks(self.lu, instance)
894         raise errors.OpExecError("Can't activate the instance's disks")
895
896       self.feedback_fn("* starting the instance on the target node %s" %
897                        target_node)
898       result = self.rpc.call_instance_start(target_node, (instance, None, None),
899                                             False, self.lu.op.reason)
900       msg = result.fail_msg
901       if msg:
902         ShutdownInstanceDisks(self.lu, instance)
903         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
904                                  (instance.name, target_node, msg))
905
906   def Exec(self, feedback_fn):
907     """Perform the migration.
908
909     """
910     self.feedback_fn = feedback_fn
911     self.source_node = self.instance.primary_node
912
913     # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
914     if self.instance.disk_template in constants.DTS_INT_MIRROR:
915       self.target_node = self.instance.secondary_nodes[0]
916       # Otherwise self.target_node has been populated either
917       # directly, or through an iallocator.
918
919     self.all_nodes = [self.source_node, self.target_node]
920     self.nodes_ip = dict((name, node.secondary_ip) for (name, node)
921                          in self.cfg.GetMultiNodeInfo(self.all_nodes))
922
923     if self.failover:
924       feedback_fn("Failover instance %s" % self.instance.name)
925       self._ExecFailover()
926     else:
927       feedback_fn("Migrating instance %s" % self.instance.name)
928
929       if self.cleanup:
930         return self._ExecCleanup()
931       else:
932         return self._ExecMigration()