Merge branch 'stable-2.8' into stable-2.9
[ganeti-local] / lib / cmdlib / instance_migration.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with instance migration an failover."""
23
24 import logging
25 import time
26
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import locking
30 from ganeti.masterd import iallocator
31 from ganeti import utils
32 from ganeti.cmdlib.base import LogicalUnit, Tasklet
33 from ganeti.cmdlib.common import ExpandInstanceUuidAndName, \
34   CheckIAllocatorOrNode, ExpandNodeUuidAndName
35 from ganeti.cmdlib.instance_storage import CheckDiskConsistency, \
36   ExpandCheckDisks, ShutdownInstanceDisks, AssembleInstanceDisks
37 from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \
38   CheckTargetNodeIPolicy, ReleaseLocks, CheckNodeNotDrained, \
39   CopyLockList, CheckNodeFreeMemory, CheckInstanceBridgesExist
40
41 import ganeti.masterd.instance
42
43
44 def _ExpandNamesForMigration(lu):
45   """Expands names for use with L{TLMigrateInstance}.
46
47   @type lu: L{LogicalUnit}
48
49   """
50   if lu.op.target_node is not None:
51     (lu.op.target_node_uuid, lu.op.target_node) = \
52       ExpandNodeUuidAndName(lu.cfg, lu.op.target_node_uuid, lu.op.target_node)
53
54   lu.needed_locks[locking.LEVEL_NODE] = []
55   lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
56
57   lu.needed_locks[locking.LEVEL_NODE_RES] = []
58   lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
59
60   # The node allocation lock is actually only needed for externally replicated
61   # instances (e.g. sharedfile or RBD) and if an iallocator is used.
62   lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
63
64
65 def _DeclareLocksForMigration(lu, level):
66   """Declares locks for L{TLMigrateInstance}.
67
68   @type lu: L{LogicalUnit}
69   @param level: Lock level
70
71   """
72   if level == locking.LEVEL_NODE_ALLOC:
73     assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
74
75     instance = lu.cfg.GetInstanceInfo(lu.op.instance_uuid)
76
77     # Node locks are already declared here rather than at LEVEL_NODE as we need
78     # the instance object anyway to declare the node allocation lock.
79     if instance.disk_template in constants.DTS_EXT_MIRROR:
80       if lu.op.target_node is None:
81         lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
82         lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
83       else:
84         lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
85                                                lu.op.target_node_uuid]
86       del lu.recalculate_locks[locking.LEVEL_NODE]
87     else:
88       lu._LockInstancesNodes() # pylint: disable=W0212
89
90   elif level == locking.LEVEL_NODE:
91     # Node locks are declared together with the node allocation lock
92     assert (lu.needed_locks[locking.LEVEL_NODE] or
93             lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
94
95   elif level == locking.LEVEL_NODE_RES:
96     # Copy node locks
97     lu.needed_locks[locking.LEVEL_NODE_RES] = \
98       CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
99
100
101 class LUInstanceFailover(LogicalUnit):
102   """Failover an instance.
103
104   """
105   HPATH = "instance-failover"
106   HTYPE = constants.HTYPE_INSTANCE
107   REQ_BGL = False
108
109   def CheckArguments(self):
110     """Check the arguments.
111
112     """
113     self.iallocator = getattr(self.op, "iallocator", None)
114     self.target_node = getattr(self.op, "target_node", None)
115
116   def ExpandNames(self):
117     self._ExpandAndLockInstance()
118     _ExpandNamesForMigration(self)
119
120     self._migrater = \
121       TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name,
122                         self.op.cleanup, True, False,
123                         self.op.ignore_consistency, True,
124                         self.op.shutdown_timeout, self.op.ignore_ipolicy)
125
126     self.tasklets = [self._migrater]
127
128   def DeclareLocks(self, level):
129     _DeclareLocksForMigration(self, level)
130
131   def BuildHooksEnv(self):
132     """Build hooks env.
133
134     This runs on master, primary and secondary nodes of the instance.
135
136     """
137     instance = self._migrater.instance
138     source_node_uuid = instance.primary_node
139     env = {
140       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
141       "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
142       "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid),
143       "NEW_PRIMARY": self.op.target_node,
144       "FAILOVER_CLEANUP": self.op.cleanup,
145       }
146
147     if instance.disk_template in constants.DTS_INT_MIRROR:
148       env["OLD_SECONDARY"] = self.cfg.GetNodeName(instance.secondary_nodes[0])
149       env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid)
150     else:
151       env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
152
153     env.update(BuildInstanceHookEnvByObject(self, instance))
154
155     return env
156
157   def BuildHooksNodes(self):
158     """Build hooks nodes.
159
160     """
161     instance = self._migrater.instance
162     nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
163     return (nl, nl + [instance.primary_node])
164
165
166 class LUInstanceMigrate(LogicalUnit):
167   """Migrate an instance.
168
169   This is migration without shutting down, compared to the failover,
170   which is done with shutdown.
171
172   """
173   HPATH = "instance-migrate"
174   HTYPE = constants.HTYPE_INSTANCE
175   REQ_BGL = False
176
177   def ExpandNames(self):
178     self._ExpandAndLockInstance()
179     _ExpandNamesForMigration(self)
180
181     self._migrater = \
182       TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name,
183                         self.op.cleanup, False, self.op.allow_failover, False,
184                         self.op.allow_runtime_changes,
185                         constants.DEFAULT_SHUTDOWN_TIMEOUT,
186                         self.op.ignore_ipolicy)
187
188     self.tasklets = [self._migrater]
189
190   def DeclareLocks(self, level):
191     _DeclareLocksForMigration(self, level)
192
193   def BuildHooksEnv(self):
194     """Build hooks env.
195
196     This runs on master, primary and secondary nodes of the instance.
197
198     """
199     instance = self._migrater.instance
200     source_node_uuid = instance.primary_node
201     env = BuildInstanceHookEnvByObject(self, instance)
202     env.update({
203       "MIGRATE_LIVE": self._migrater.live,
204       "MIGRATE_CLEANUP": self.op.cleanup,
205       "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid),
206       "NEW_PRIMARY": self.op.target_node,
207       "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
208       })
209
210     if instance.disk_template in constants.DTS_INT_MIRROR:
211       env["OLD_SECONDARY"] = self.cfg.GetNodeName(instance.secondary_nodes[0])
212       env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid)
213     else:
214       env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
215
216     return env
217
218   def BuildHooksNodes(self):
219     """Build hooks nodes.
220
221     """
222     instance = self._migrater.instance
223     snode_uuids = list(instance.secondary_nodes)
224     nl = [self.cfg.GetMasterNode(), instance.primary_node] + snode_uuids
225     return (nl, nl)
226
227
228 class TLMigrateInstance(Tasklet):
229   """Tasklet class for instance migration.
230
231   @type live: boolean
232   @ivar live: whether the migration will be done live or non-live;
233       this variable is initalized only after CheckPrereq has run
234   @type cleanup: boolean
235   @ivar cleanup: Wheater we cleanup from a failed migration
236   @type iallocator: string
237   @ivar iallocator: The iallocator used to determine target_node
238   @type target_node_uuid: string
239   @ivar target_node_uuid: If given, the target node UUID to reallocate the
240       instance to
241   @type failover: boolean
242   @ivar failover: Whether operation results in failover or migration
243   @type fallback: boolean
244   @ivar fallback: Whether fallback to failover is allowed if migration not
245                   possible
246   @type ignore_consistency: boolean
247   @ivar ignore_consistency: Wheter we should ignore consistency between source
248                             and target node
249   @type shutdown_timeout: int
250   @ivar shutdown_timeout: In case of failover timeout of the shutdown
251   @type ignore_ipolicy: bool
252   @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
253
254   """
255
256   # Constants
257   _MIGRATION_POLL_INTERVAL = 1      # seconds
258   _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
259
260   def __init__(self, lu, instance_uuid, instance_name, cleanup, failover,
261                fallback, ignore_consistency, allow_runtime_changes,
262                shutdown_timeout, ignore_ipolicy):
263     """Initializes this class.
264
265     """
266     Tasklet.__init__(self, lu)
267
268     # Parameters
269     self.instance_uuid = instance_uuid
270     self.instance_name = instance_name
271     self.cleanup = cleanup
272     self.live = False # will be overridden later
273     self.failover = failover
274     self.fallback = fallback
275     self.ignore_consistency = ignore_consistency
276     self.shutdown_timeout = shutdown_timeout
277     self.ignore_ipolicy = ignore_ipolicy
278     self.allow_runtime_changes = allow_runtime_changes
279
280   def CheckPrereq(self):
281     """Check prerequisites.
282
283     This checks that the instance is in the cluster.
284
285     """
286     (self.instance_uuid, self.instance_name) = \
287       ExpandInstanceUuidAndName(self.lu.cfg, self.instance_uuid,
288                                 self.instance_name)
289     self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
290     assert self.instance is not None
291     cluster = self.cfg.GetClusterInfo()
292
293     if (not self.cleanup and
294         not self.instance.admin_state == constants.ADMINST_UP and
295         not self.failover and self.fallback):
296       self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
297                       " switching to failover")
298       self.failover = True
299
300     if self.instance.disk_template not in constants.DTS_MIRRORED:
301       if self.failover:
302         text = "failovers"
303       else:
304         text = "migrations"
305       raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
306                                  " %s" % (self.instance.disk_template, text),
307                                  errors.ECODE_STATE)
308
309     if self.instance.disk_template in constants.DTS_EXT_MIRROR:
310       CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
311
312       if self.lu.op.iallocator:
313         assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
314         self._RunAllocator()
315       else:
316         # We set set self.target_node_uuid as it is required by
317         # BuildHooksEnv
318         self.target_node_uuid = self.lu.op.target_node_uuid
319
320       # Check that the target node is correct in terms of instance policy
321       nodeinfo = self.cfg.GetNodeInfo(self.target_node_uuid)
322       group_info = self.cfg.GetNodeGroup(nodeinfo.group)
323       ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
324                                                               group_info)
325       CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo,
326                              self.cfg, ignore=self.ignore_ipolicy)
327
328       # self.target_node is already populated, either directly or by the
329       # iallocator run
330       target_node_uuid = self.target_node_uuid
331       if self.target_node_uuid == self.instance.primary_node:
332         raise errors.OpPrereqError(
333           "Cannot migrate instance %s to its primary (%s)" %
334           (self.instance.name,
335            self.cfg.GetNodeName(self.instance.primary_node)),
336           errors.ECODE_STATE)
337
338       if len(self.lu.tasklets) == 1:
339         # It is safe to release locks only when we're the only tasklet
340         # in the LU
341         ReleaseLocks(self.lu, locking.LEVEL_NODE,
342                      keep=[self.instance.primary_node, self.target_node_uuid])
343         ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
344
345     else:
346       assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
347
348       secondary_node_uuids = self.instance.secondary_nodes
349       if not secondary_node_uuids:
350         raise errors.ConfigurationError("No secondary node but using"
351                                         " %s disk template" %
352                                         self.instance.disk_template)
353       target_node_uuid = secondary_node_uuids[0]
354       if self.lu.op.iallocator or \
355         (self.lu.op.target_node_uuid and
356          self.lu.op.target_node_uuid != target_node_uuid):
357         if self.failover:
358           text = "failed over"
359         else:
360           text = "migrated"
361         raise errors.OpPrereqError("Instances with disk template %s cannot"
362                                    " be %s to arbitrary nodes"
363                                    " (neither an iallocator nor a target"
364                                    " node can be passed)" %
365                                    (self.instance.disk_template, text),
366                                    errors.ECODE_INVAL)
367       nodeinfo = self.cfg.GetNodeInfo(target_node_uuid)
368       group_info = self.cfg.GetNodeGroup(nodeinfo.group)
369       ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
370                                                               group_info)
371       CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo,
372                              self.cfg, ignore=self.ignore_ipolicy)
373
374     i_be = cluster.FillBE(self.instance)
375
376     # check memory requirements on the secondary node
377     if (not self.cleanup and
378          (not self.failover or
379            self.instance.admin_state == constants.ADMINST_UP)):
380       self.tgt_free_mem = CheckNodeFreeMemory(
381           self.lu, target_node_uuid,
382           "migrating instance %s" % self.instance.name,
383           i_be[constants.BE_MINMEM], self.instance.hypervisor,
384           self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor])
385     else:
386       self.lu.LogInfo("Not checking memory on the secondary node as"
387                       " instance will not be started")
388
389     # check if failover must be forced instead of migration
390     if (not self.cleanup and not self.failover and
391         i_be[constants.BE_ALWAYS_FAILOVER]):
392       self.lu.LogInfo("Instance configured to always failover; fallback"
393                       " to failover")
394       self.failover = True
395
396     # check bridge existance
397     CheckInstanceBridgesExist(self.lu, self.instance,
398                               node_uuid=target_node_uuid)
399
400     if not self.cleanup:
401       CheckNodeNotDrained(self.lu, target_node_uuid)
402       if not self.failover:
403         result = self.rpc.call_instance_migratable(self.instance.primary_node,
404                                                    self.instance)
405         if result.fail_msg and self.fallback:
406           self.lu.LogInfo("Can't migrate, instance offline, fallback to"
407                           " failover")
408           self.failover = True
409         else:
410           result.Raise("Can't migrate, please use failover",
411                        prereq=True, ecode=errors.ECODE_STATE)
412
413     assert not (self.failover and self.cleanup)
414
415     if not self.failover:
416       if self.lu.op.live is not None and self.lu.op.mode is not None:
417         raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
418                                    " parameters are accepted",
419                                    errors.ECODE_INVAL)
420       if self.lu.op.live is not None:
421         if self.lu.op.live:
422           self.lu.op.mode = constants.HT_MIGRATION_LIVE
423         else:
424           self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
425         # reset the 'live' parameter to None so that repeated
426         # invocations of CheckPrereq do not raise an exception
427         self.lu.op.live = None
428       elif self.lu.op.mode is None:
429         # read the default value from the hypervisor
430         i_hv = cluster.FillHV(self.instance, skip_globals=False)
431         self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
432
433       self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
434     else:
435       # Failover is never live
436       self.live = False
437
438     if not (self.failover or self.cleanup):
439       remote_info = self.rpc.call_instance_info(
440           self.instance.primary_node, self.instance.name,
441           self.instance.hypervisor, cluster.hvparams[self.instance.hypervisor])
442       remote_info.Raise("Error checking instance on node %s" %
443                         self.cfg.GetNodeName(self.instance.primary_node))
444       instance_running = bool(remote_info.payload)
445       if instance_running:
446         self.current_mem = int(remote_info.payload["memory"])
447
448   def _RunAllocator(self):
449     """Run the allocator based on input opcode.
450
451     """
452     assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
453
454     # FIXME: add a self.ignore_ipolicy option
455     req = iallocator.IAReqRelocate(
456           inst_uuid=self.instance_uuid,
457           relocate_from_node_uuids=[self.instance.primary_node])
458     ial = iallocator.IAllocator(self.cfg, self.rpc, req)
459
460     ial.Run(self.lu.op.iallocator)
461
462     if not ial.success:
463       raise errors.OpPrereqError("Can't compute nodes using"
464                                  " iallocator '%s': %s" %
465                                  (self.lu.op.iallocator, ial.info),
466                                  errors.ECODE_NORES)
467     self.target_node_uuid = self.cfg.GetNodeInfoByName(ial.result[0]).uuid
468     self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
469                     self.instance_name, self.lu.op.iallocator,
470                     utils.CommaJoin(ial.result))
471
472   def _WaitUntilSync(self):
473     """Poll with custom rpc for disk sync.
474
475     This uses our own step-based rpc call.
476
477     """
478     self.feedback_fn("* wait until resync is done")
479     all_done = False
480     while not all_done:
481       all_done = True
482       result = self.rpc.call_drbd_wait_sync(self.all_node_uuids,
483                                             self.nodes_ip,
484                                             (self.instance.disks,
485                                              self.instance))
486       min_percent = 100
487       for node_uuid, nres in result.items():
488         nres.Raise("Cannot resync disks on node %s" %
489                    self.cfg.GetNodeName(node_uuid))
490         node_done, node_percent = nres.payload
491         all_done = all_done and node_done
492         if node_percent is not None:
493           min_percent = min(min_percent, node_percent)
494       if not all_done:
495         if min_percent < 100:
496           self.feedback_fn("   - progress: %.1f%%" % min_percent)
497         time.sleep(2)
498
499   def _EnsureSecondary(self, node_uuid):
500     """Demote a node to secondary.
501
502     """
503     self.feedback_fn("* switching node %s to secondary mode" %
504                      self.cfg.GetNodeName(node_uuid))
505
506     for dev in self.instance.disks:
507       self.cfg.SetDiskID(dev, node_uuid)
508
509     result = self.rpc.call_blockdev_close(node_uuid, self.instance.name,
510                                           self.instance.disks)
511     result.Raise("Cannot change disk to secondary on node %s" %
512                  self.cfg.GetNodeName(node_uuid))
513
514   def _GoStandalone(self):
515     """Disconnect from the network.
516
517     """
518     self.feedback_fn("* changing into standalone mode")
519     result = self.rpc.call_drbd_disconnect_net(self.all_node_uuids,
520                                                self.nodes_ip,
521                                                self.instance.disks)
522     for node_uuid, nres in result.items():
523       nres.Raise("Cannot disconnect disks node %s" %
524                  self.cfg.GetNodeName(node_uuid))
525
526   def _GoReconnect(self, multimaster):
527     """Reconnect to the network.
528
529     """
530     if multimaster:
531       msg = "dual-master"
532     else:
533       msg = "single-master"
534     self.feedback_fn("* changing disks into %s mode" % msg)
535     result = self.rpc.call_drbd_attach_net(self.all_node_uuids, self.nodes_ip,
536                                            (self.instance.disks, self.instance),
537                                            self.instance.name, multimaster)
538     for node_uuid, nres in result.items():
539       nres.Raise("Cannot change disks config on node %s" %
540                  self.cfg.GetNodeName(node_uuid))
541
542   def _ExecCleanup(self):
543     """Try to cleanup after a failed migration.
544
545     The cleanup is done by:
546       - check that the instance is running only on one node
547         (and update the config if needed)
548       - change disks on its secondary node to secondary
549       - wait until disks are fully synchronized
550       - disconnect from the network
551       - change disks into single-master mode
552       - wait again until disks are fully synchronized
553
554     """
555     # check running on only one node
556     self.feedback_fn("* checking where the instance actually runs"
557                      " (if this hangs, the hypervisor might be in"
558                      " a bad state)")
559     cluster_hvparams = self.cfg.GetClusterInfo().hvparams
560     ins_l = self.rpc.call_instance_list(self.all_node_uuids,
561                                         [self.instance.hypervisor],
562                                         cluster_hvparams)
563     for node_uuid, result in ins_l.items():
564       result.Raise("Can't contact node %s" % node_uuid)
565
566     runningon_source = self.instance.name in \
567                          ins_l[self.source_node_uuid].payload
568     runningon_target = self.instance.name in \
569                          ins_l[self.target_node_uuid].payload
570
571     if runningon_source and runningon_target:
572       raise errors.OpExecError("Instance seems to be running on two nodes,"
573                                " or the hypervisor is confused; you will have"
574                                " to ensure manually that it runs only on one"
575                                " and restart this operation")
576
577     if not (runningon_source or runningon_target):
578       raise errors.OpExecError("Instance does not seem to be running at all;"
579                                " in this case it's safer to repair by"
580                                " running 'gnt-instance stop' to ensure disk"
581                                " shutdown, and then restarting it")
582
583     if runningon_target:
584       # the migration has actually succeeded, we need to update the config
585       self.feedback_fn("* instance running on secondary node (%s),"
586                        " updating config" %
587                        self.cfg.GetNodeName(self.target_node_uuid))
588       self.instance.primary_node = self.target_node_uuid
589       self.cfg.Update(self.instance, self.feedback_fn)
590       demoted_node_uuid = self.source_node_uuid
591     else:
592       self.feedback_fn("* instance confirmed to be running on its"
593                        " primary node (%s)" %
594                        self.cfg.GetNodeName(self.source_node_uuid))
595       demoted_node_uuid = self.target_node_uuid
596
597     if self.instance.disk_template in constants.DTS_INT_MIRROR:
598       self._EnsureSecondary(demoted_node_uuid)
599       try:
600         self._WaitUntilSync()
601       except errors.OpExecError:
602         # we ignore here errors, since if the device is standalone, it
603         # won't be able to sync
604         pass
605       self._GoStandalone()
606       self._GoReconnect(False)
607       self._WaitUntilSync()
608
609     self.feedback_fn("* done")
610
611   def _RevertDiskStatus(self):
612     """Try to revert the disk status after a failed migration.
613
614     """
615     if self.instance.disk_template in constants.DTS_EXT_MIRROR:
616       return
617
618     try:
619       self._EnsureSecondary(self.target_node_uuid)
620       self._GoStandalone()
621       self._GoReconnect(False)
622       self._WaitUntilSync()
623     except errors.OpExecError, err:
624       self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
625                          " please try to recover the instance manually;"
626                          " error '%s'" % str(err))
627
628   def _AbortMigration(self):
629     """Call the hypervisor code to abort a started migration.
630
631     """
632     abort_result = self.rpc.call_instance_finalize_migration_dst(
633                      self.target_node_uuid, self.instance, self.migration_info,
634                      False)
635     abort_msg = abort_result.fail_msg
636     if abort_msg:
637       logging.error("Aborting migration failed on target node %s: %s",
638                     self.cfg.GetNodeName(self.target_node_uuid), abort_msg)
639       # Don't raise an exception here, as we stil have to try to revert the
640       # disk status, even if this step failed.
641
642     abort_result = self.rpc.call_instance_finalize_migration_src(
643       self.source_node_uuid, self.instance, False, self.live)
644     abort_msg = abort_result.fail_msg
645     if abort_msg:
646       logging.error("Aborting migration failed on source node %s: %s",
647                     self.cfg.GetNodeName(self.source_node_uuid), abort_msg)
648
649   def _ExecMigration(self):
650     """Migrate an instance.
651
652     The migrate is done by:
653       - change the disks into dual-master mode
654       - wait until disks are fully synchronized again
655       - migrate the instance
656       - change disks on the new secondary node (the old primary) to secondary
657       - wait until disks are fully synchronized
658       - change disks into single-master mode
659
660     """
661     # Check for hypervisor version mismatch and warn the user.
662     hvspecs = [(self.instance.hypervisor,
663                 self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor])]
664     nodeinfo = self.rpc.call_node_info(
665                  [self.source_node_uuid, self.target_node_uuid], None, hvspecs)
666     for ninfo in nodeinfo.values():
667       ninfo.Raise("Unable to retrieve node information from node '%s'" %
668                   ninfo.node)
669     (_, _, (src_info, )) = nodeinfo[self.source_node_uuid].payload
670     (_, _, (dst_info, )) = nodeinfo[self.target_node_uuid].payload
671
672     if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
673         (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
674       src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
675       dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
676       if src_version != dst_version:
677         self.feedback_fn("* warning: hypervisor version mismatch between"
678                          " source (%s) and target (%s) node" %
679                          (src_version, dst_version))
680
681     self.feedback_fn("* checking disk consistency between source and target")
682     for (idx, dev) in enumerate(self.instance.disks):
683       if not CheckDiskConsistency(self.lu, self.instance, dev,
684                                   self.target_node_uuid,
685                                   False):
686         raise errors.OpExecError("Disk %s is degraded or not fully"
687                                  " synchronized on target node,"
688                                  " aborting migration" % idx)
689
690     if self.current_mem > self.tgt_free_mem:
691       if not self.allow_runtime_changes:
692         raise errors.OpExecError("Memory ballooning not allowed and not enough"
693                                  " free memory to fit instance %s on target"
694                                  " node %s (have %dMB, need %dMB)" %
695                                  (self.instance.name,
696                                   self.cfg.GetNodeName(self.target_node_uuid),
697                                   self.tgt_free_mem, self.current_mem))
698       self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
699       rpcres = self.rpc.call_instance_balloon_memory(self.instance.primary_node,
700                                                      self.instance,
701                                                      self.tgt_free_mem)
702       rpcres.Raise("Cannot modify instance runtime memory")
703
704     # First get the migration information from the remote node
705     result = self.rpc.call_migration_info(self.source_node_uuid, self.instance)
706     msg = result.fail_msg
707     if msg:
708       log_err = ("Failed fetching source migration information from %s: %s" %
709                  (self.cfg.GetNodeName(self.source_node_uuid), msg))
710       logging.error(log_err)
711       raise errors.OpExecError(log_err)
712
713     self.migration_info = migration_info = result.payload
714
715     if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
716       # Then switch the disks to master/master mode
717       self._EnsureSecondary(self.target_node_uuid)
718       self._GoStandalone()
719       self._GoReconnect(True)
720       self._WaitUntilSync()
721
722     self.feedback_fn("* preparing %s to accept the instance" %
723                      self.cfg.GetNodeName(self.target_node_uuid))
724     result = self.rpc.call_accept_instance(self.target_node_uuid,
725                                            self.instance,
726                                            migration_info,
727                                            self.nodes_ip[self.target_node_uuid])
728
729     msg = result.fail_msg
730     if msg:
731       logging.error("Instance pre-migration failed, trying to revert"
732                     " disk status: %s", msg)
733       self.feedback_fn("Pre-migration failed, aborting")
734       self._AbortMigration()
735       self._RevertDiskStatus()
736       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
737                                (self.instance.name, msg))
738
739     self.feedback_fn("* migrating instance to %s" %
740                      self.cfg.GetNodeName(self.target_node_uuid))
741     cluster = self.cfg.GetClusterInfo()
742     result = self.rpc.call_instance_migrate(
743         self.source_node_uuid, cluster.cluster_name, self.instance,
744         self.nodes_ip[self.target_node_uuid], self.live)
745     msg = result.fail_msg
746     if msg:
747       logging.error("Instance migration failed, trying to revert"
748                     " disk status: %s", msg)
749       self.feedback_fn("Migration failed, aborting")
750       self._AbortMigration()
751       self._RevertDiskStatus()
752       raise errors.OpExecError("Could not migrate instance %s: %s" %
753                                (self.instance.name, msg))
754
755     self.feedback_fn("* starting memory transfer")
756     last_feedback = time.time()
757     while True:
758       result = self.rpc.call_instance_get_migration_status(
759                  self.source_node_uuid, self.instance)
760       msg = result.fail_msg
761       ms = result.payload   # MigrationStatus instance
762       if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
763         logging.error("Instance migration failed, trying to revert"
764                       " disk status: %s", msg)
765         self.feedback_fn("Migration failed, aborting")
766         self._AbortMigration()
767         self._RevertDiskStatus()
768         if not msg:
769           msg = "hypervisor returned failure"
770         raise errors.OpExecError("Could not migrate instance %s: %s" %
771                                  (self.instance.name, msg))
772
773       if result.payload.status != constants.HV_MIGRATION_ACTIVE:
774         self.feedback_fn("* memory transfer complete")
775         break
776
777       if (utils.TimeoutExpired(last_feedback,
778                                self._MIGRATION_FEEDBACK_INTERVAL) and
779           ms.transferred_ram is not None):
780         mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
781         self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
782         last_feedback = time.time()
783
784       time.sleep(self._MIGRATION_POLL_INTERVAL)
785
786     result = self.rpc.call_instance_finalize_migration_src(
787                self.source_node_uuid, self.instance, True, self.live)
788     msg = result.fail_msg
789     if msg:
790       logging.error("Instance migration succeeded, but finalization failed"
791                     " on the source node: %s", msg)
792       raise errors.OpExecError("Could not finalize instance migration: %s" %
793                                msg)
794
795     self.instance.primary_node = self.target_node_uuid
796
797     # distribute new instance config to the other nodes
798     self.cfg.Update(self.instance, self.feedback_fn)
799
800     result = self.rpc.call_instance_finalize_migration_dst(
801                self.target_node_uuid, self.instance, migration_info, True)
802     msg = result.fail_msg
803     if msg:
804       logging.error("Instance migration succeeded, but finalization failed"
805                     " on the target node: %s", msg)
806       raise errors.OpExecError("Could not finalize instance migration: %s" %
807                                msg)
808
809     if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
810       self._EnsureSecondary(self.source_node_uuid)
811       self._WaitUntilSync()
812       self._GoStandalone()
813       self._GoReconnect(False)
814       self._WaitUntilSync()
815
816     # If the instance's disk template is `rbd' or `ext' and there was a
817     # successful migration, unmap the device from the source node.
818     if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
819       disks = ExpandCheckDisks(self.instance, self.instance.disks)
820       self.feedback_fn("* unmapping instance's disks from %s" %
821                        self.cfg.GetNodeName(self.source_node_uuid))
822       for disk in disks:
823         result = self.rpc.call_blockdev_shutdown(self.source_node_uuid,
824                                                  (disk, self.instance))
825         msg = result.fail_msg
826         if msg:
827           logging.error("Migration was successful, but couldn't unmap the"
828                         " block device %s on source node %s: %s",
829                         disk.iv_name,
830                         self.cfg.GetNodeName(self.source_node_uuid), msg)
831           logging.error("You need to unmap the device %s manually on %s",
832                         disk.iv_name,
833                         self.cfg.GetNodeName(self.source_node_uuid))
834
835     self.feedback_fn("* done")
836
837   def _ExecFailover(self):
838     """Failover an instance.
839
840     The failover is done by shutting it down on its present node and
841     starting it on the secondary.
842
843     """
844     primary_node = self.cfg.GetNodeInfo(self.instance.primary_node)
845
846     source_node_uuid = self.instance.primary_node
847
848     if self.instance.disks_active:
849       self.feedback_fn("* checking disk consistency between source and target")
850       for (idx, dev) in enumerate(self.instance.disks):
851         # for drbd, these are drbd over lvm
852         if not CheckDiskConsistency(self.lu, self.instance, dev,
853                                     self.target_node_uuid, False):
854           if primary_node.offline:
855             self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
856                              " target node %s" %
857                              (primary_node.name, idx,
858                               self.cfg.GetNodeName(self.target_node_uuid)))
859           elif not self.ignore_consistency:
860             raise errors.OpExecError("Disk %s is degraded on target node,"
861                                      " aborting failover" % idx)
862     else:
863       self.feedback_fn("* not checking disk consistency as instance is not"
864                        " running")
865
866     self.feedback_fn("* shutting down instance on source node")
867     logging.info("Shutting down instance %s on node %s",
868                  self.instance.name, self.cfg.GetNodeName(source_node_uuid))
869
870     result = self.rpc.call_instance_shutdown(source_node_uuid, self.instance,
871                                              self.shutdown_timeout,
872                                              self.lu.op.reason)
873     msg = result.fail_msg
874     if msg:
875       if self.ignore_consistency or primary_node.offline:
876         self.lu.LogWarning("Could not shutdown instance %s on node %s,"
877                            " proceeding anyway; please make sure node"
878                            " %s is down; error details: %s",
879                            self.instance.name,
880                            self.cfg.GetNodeName(source_node_uuid),
881                            self.cfg.GetNodeName(source_node_uuid), msg)
882       else:
883         raise errors.OpExecError("Could not shutdown instance %s on"
884                                  " node %s: %s" %
885                                  (self.instance.name,
886                                   self.cfg.GetNodeName(source_node_uuid), msg))
887
888     self.feedback_fn("* deactivating the instance's disks on source node")
889     if not ShutdownInstanceDisks(self.lu, self.instance, ignore_primary=True):
890       raise errors.OpExecError("Can't shut down the instance's disks")
891
892     self.instance.primary_node = self.target_node_uuid
893     # distribute new instance config to the other nodes
894     self.cfg.Update(self.instance, self.feedback_fn)
895
896     # Only start the instance if it's marked as up
897     if self.instance.admin_state == constants.ADMINST_UP:
898       self.feedback_fn("* activating the instance's disks on target node %s" %
899                        self.cfg.GetNodeName(self.target_node_uuid))
900       logging.info("Starting instance %s on node %s", self.instance.name,
901                    self.cfg.GetNodeName(self.target_node_uuid))
902
903       disks_ok, _ = AssembleInstanceDisks(self.lu, self.instance,
904                                           ignore_secondaries=True)
905       if not disks_ok:
906         ShutdownInstanceDisks(self.lu, self.instance)
907         raise errors.OpExecError("Can't activate the instance's disks")
908
909       self.feedback_fn("* starting the instance on the target node %s" %
910                        self.cfg.GetNodeName(self.target_node_uuid))
911       result = self.rpc.call_instance_start(self.target_node_uuid,
912                                             (self.instance, None, None), False,
913                                             self.lu.op.reason)
914       msg = result.fail_msg
915       if msg:
916         ShutdownInstanceDisks(self.lu, self.instance)
917         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
918                                  (self.instance.name,
919                                   self.cfg.GetNodeName(self.target_node_uuid),
920                                   msg))
921
922   def Exec(self, feedback_fn):
923     """Perform the migration.
924
925     """
926     self.feedback_fn = feedback_fn
927     self.source_node_uuid = self.instance.primary_node
928
929     # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
930     if self.instance.disk_template in constants.DTS_INT_MIRROR:
931       self.target_node_uuid = self.instance.secondary_nodes[0]
932       # Otherwise self.target_node has been populated either
933       # directly, or through an iallocator.
934
935     self.all_node_uuids = [self.source_node_uuid, self.target_node_uuid]
936     self.nodes_ip = dict((uuid, node.secondary_ip) for (uuid, node)
937                          in self.cfg.GetMultiNodeInfo(self.all_node_uuids))
938
939     if self.failover:
940       feedback_fn("Failover instance %s" % self.instance.name)
941       self._ExecFailover()
942     else:
943       feedback_fn("Migrating instance %s" % self.instance.name)
944
945       if self.cleanup:
946         return self._ExecCleanup()
947       else:
948         return self._ExecMigration()