Fix formatting of instance names in config verify
[ganeti-local] / lib / cmdlib / instance_migration.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with instance migration an failover."""
23
24 import logging
25 import time
26
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import locking
30 from ganeti.masterd import iallocator
31 from ganeti import utils
32 from ganeti.cmdlib.base import LogicalUnit, Tasklet
33 from ganeti.cmdlib.common import ExpandInstanceUuidAndName, \
34   CheckIAllocatorOrNode, ExpandNodeUuidAndName
35 from ganeti.cmdlib.instance_storage import CheckDiskConsistency, \
36   ExpandCheckDisks, ShutdownInstanceDisks, AssembleInstanceDisks
37 from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \
38   CheckTargetNodeIPolicy, ReleaseLocks, CheckNodeNotDrained, \
39   CopyLockList, CheckNodeFreeMemory, CheckInstanceBridgesExist
40
41 import ganeti.masterd.instance
42
43
44 def _ExpandNamesForMigration(lu):
45   """Expands names for use with L{TLMigrateInstance}.
46
47   @type lu: L{LogicalUnit}
48
49   """
50   if lu.op.target_node is not None:
51     (lu.op.target_node_uuid, lu.op.target_node) = \
52       ExpandNodeUuidAndName(lu.cfg, lu.op.target_node_uuid, lu.op.target_node)
53
54   lu.needed_locks[locking.LEVEL_NODE] = []
55   lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
56
57   lu.needed_locks[locking.LEVEL_NODE_RES] = []
58   lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
59
60   # The node allocation lock is actually only needed for externally replicated
61   # instances (e.g. sharedfile or RBD) and if an iallocator is used.
62   lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
63
64
65 def _DeclareLocksForMigration(lu, level):
66   """Declares locks for L{TLMigrateInstance}.
67
68   @type lu: L{LogicalUnit}
69   @param level: Lock level
70
71   """
72   if level == locking.LEVEL_NODE_ALLOC:
73     assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
74
75     instance = lu.cfg.GetInstanceInfo(lu.op.instance_uuid)
76
77     # Node locks are already declared here rather than at LEVEL_NODE as we need
78     # the instance object anyway to declare the node allocation lock.
79     if instance.disk_template in constants.DTS_EXT_MIRROR:
80       if lu.op.target_node is None:
81         lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
82         lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
83       else:
84         lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
85                                                lu.op.target_node_uuid]
86       del lu.recalculate_locks[locking.LEVEL_NODE]
87     else:
88       lu._LockInstancesNodes() # pylint: disable=W0212
89
90   elif level == locking.LEVEL_NODE:
91     # Node locks are declared together with the node allocation lock
92     assert (lu.needed_locks[locking.LEVEL_NODE] or
93             lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
94
95   elif level == locking.LEVEL_NODE_RES:
96     # Copy node locks
97     lu.needed_locks[locking.LEVEL_NODE_RES] = \
98       CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
99
100
101 class LUInstanceFailover(LogicalUnit):
102   """Failover an instance.
103
104   """
105   HPATH = "instance-failover"
106   HTYPE = constants.HTYPE_INSTANCE
107   REQ_BGL = False
108
109   def CheckArguments(self):
110     """Check the arguments.
111
112     """
113     self.iallocator = getattr(self.op, "iallocator", None)
114     self.target_node = getattr(self.op, "target_node", None)
115
116   def ExpandNames(self):
117     self._ExpandAndLockInstance()
118     _ExpandNamesForMigration(self)
119
120     self._migrater = \
121       TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name,
122                         False, True, False, self.op.ignore_consistency, True,
123                         self.op.shutdown_timeout, self.op.ignore_ipolicy)
124
125     self.tasklets = [self._migrater]
126
127   def DeclareLocks(self, level):
128     _DeclareLocksForMigration(self, level)
129
130   def BuildHooksEnv(self):
131     """Build hooks env.
132
133     This runs on master, primary and secondary nodes of the instance.
134
135     """
136     instance = self._migrater.instance
137     source_node_uuid = instance.primary_node
138     env = {
139       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
140       "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
141       "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid),
142       "NEW_PRIMARY": self.op.target_node,
143       }
144
145     if instance.disk_template in constants.DTS_INT_MIRROR:
146       env["OLD_SECONDARY"] = self.cfg.GetNodeName(instance.secondary_nodes[0])
147       env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid)
148     else:
149       env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
150
151     env.update(BuildInstanceHookEnvByObject(self, instance))
152
153     return env
154
155   def BuildHooksNodes(self):
156     """Build hooks nodes.
157
158     """
159     instance = self._migrater.instance
160     nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
161     return (nl, nl + [instance.primary_node])
162
163
164 class LUInstanceMigrate(LogicalUnit):
165   """Migrate an instance.
166
167   This is migration without shutting down, compared to the failover,
168   which is done with shutdown.
169
170   """
171   HPATH = "instance-migrate"
172   HTYPE = constants.HTYPE_INSTANCE
173   REQ_BGL = False
174
175   def ExpandNames(self):
176     self._ExpandAndLockInstance()
177     _ExpandNamesForMigration(self)
178
179     self._migrater = \
180       TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name,
181                         self.op.cleanup, False, self.op.allow_failover, False,
182                         self.op.allow_runtime_changes,
183                         constants.DEFAULT_SHUTDOWN_TIMEOUT,
184                         self.op.ignore_ipolicy)
185
186     self.tasklets = [self._migrater]
187
188   def DeclareLocks(self, level):
189     _DeclareLocksForMigration(self, level)
190
191   def BuildHooksEnv(self):
192     """Build hooks env.
193
194     This runs on master, primary and secondary nodes of the instance.
195
196     """
197     instance = self._migrater.instance
198     source_node_uuid = instance.primary_node
199     env = BuildInstanceHookEnvByObject(self, instance)
200     env.update({
201       "MIGRATE_LIVE": self._migrater.live,
202       "MIGRATE_CLEANUP": self.op.cleanup,
203       "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid),
204       "NEW_PRIMARY": self.op.target_node,
205       "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
206       })
207
208     if instance.disk_template in constants.DTS_INT_MIRROR:
209       env["OLD_SECONDARY"] = self.cfg.GetNodeName(instance.secondary_nodes[0])
210       env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid)
211     else:
212       env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
213
214     return env
215
216   def BuildHooksNodes(self):
217     """Build hooks nodes.
218
219     """
220     instance = self._migrater.instance
221     snode_uuids = list(instance.secondary_nodes)
222     nl = [self.cfg.GetMasterNode(), instance.primary_node] + snode_uuids
223     return (nl, nl)
224
225
226 class TLMigrateInstance(Tasklet):
227   """Tasklet class for instance migration.
228
229   @type live: boolean
230   @ivar live: whether the migration will be done live or non-live;
231       this variable is initalized only after CheckPrereq has run
232   @type cleanup: boolean
233   @ivar cleanup: Wheater we cleanup from a failed migration
234   @type iallocator: string
235   @ivar iallocator: The iallocator used to determine target_node
236   @type target_node_uuid: string
237   @ivar target_node_uuid: If given, the target node UUID to reallocate the
238       instance to
239   @type failover: boolean
240   @ivar failover: Whether operation results in failover or migration
241   @type fallback: boolean
242   @ivar fallback: Whether fallback to failover is allowed if migration not
243                   possible
244   @type ignore_consistency: boolean
245   @ivar ignore_consistency: Wheter we should ignore consistency between source
246                             and target node
247   @type shutdown_timeout: int
248   @ivar shutdown_timeout: In case of failover timeout of the shutdown
249   @type ignore_ipolicy: bool
250   @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
251
252   """
253
254   # Constants
255   _MIGRATION_POLL_INTERVAL = 1      # seconds
256   _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
257
258   def __init__(self, lu, instance_uuid, instance_name, cleanup, failover,
259                fallback, ignore_consistency, allow_runtime_changes,
260                shutdown_timeout, ignore_ipolicy):
261     """Initializes this class.
262
263     """
264     Tasklet.__init__(self, lu)
265
266     # Parameters
267     self.instance_uuid = instance_uuid
268     self.instance_name = instance_name
269     self.cleanup = cleanup
270     self.live = False # will be overridden later
271     self.failover = failover
272     self.fallback = fallback
273     self.ignore_consistency = ignore_consistency
274     self.shutdown_timeout = shutdown_timeout
275     self.ignore_ipolicy = ignore_ipolicy
276     self.allow_runtime_changes = allow_runtime_changes
277
278   def CheckPrereq(self):
279     """Check prerequisites.
280
281     This checks that the instance is in the cluster.
282
283     """
284     (self.instance_uuid, self.instance_name) = \
285       ExpandInstanceUuidAndName(self.lu.cfg, self.instance_uuid,
286                                 self.instance_name)
287     self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
288     assert self.instance is not None
289     cluster = self.cfg.GetClusterInfo()
290
291     if (not self.cleanup and
292         not self.instance.admin_state == constants.ADMINST_UP and
293         not self.failover and self.fallback):
294       self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
295                       " switching to failover")
296       self.failover = True
297
298     if self.instance.disk_template not in constants.DTS_MIRRORED:
299       if self.failover:
300         text = "failovers"
301       else:
302         text = "migrations"
303       raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
304                                  " %s" % (self.instance.disk_template, text),
305                                  errors.ECODE_STATE)
306
307     if self.instance.disk_template in constants.DTS_EXT_MIRROR:
308       CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
309
310       if self.lu.op.iallocator:
311         assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
312         self._RunAllocator()
313       else:
314         # We set set self.target_node_uuid as it is required by
315         # BuildHooksEnv
316         self.target_node_uuid = self.lu.op.target_node_uuid
317
318       # Check that the target node is correct in terms of instance policy
319       nodeinfo = self.cfg.GetNodeInfo(self.target_node_uuid)
320       group_info = self.cfg.GetNodeGroup(nodeinfo.group)
321       ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
322                                                               group_info)
323       CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo,
324                              self.cfg, ignore=self.ignore_ipolicy)
325
326       # self.target_node is already populated, either directly or by the
327       # iallocator run
328       target_node_uuid = self.target_node_uuid
329       if self.target_node_uuid == self.instance.primary_node:
330         raise errors.OpPrereqError(
331           "Cannot migrate instance %s to its primary (%s)" %
332           (self.instance.name,
333            self.cfg.GetNodeName(self.instance.primary_node)),
334           errors.ECODE_STATE)
335
336       if len(self.lu.tasklets) == 1:
337         # It is safe to release locks only when we're the only tasklet
338         # in the LU
339         ReleaseLocks(self.lu, locking.LEVEL_NODE,
340                      keep=[self.instance.primary_node, self.target_node_uuid])
341         ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
342
343     else:
344       assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
345
346       secondary_node_uuids = self.instance.secondary_nodes
347       if not secondary_node_uuids:
348         raise errors.ConfigurationError("No secondary node but using"
349                                         " %s disk template" %
350                                         self.instance.disk_template)
351       target_node_uuid = secondary_node_uuids[0]
352       if self.lu.op.iallocator or \
353         (self.lu.op.target_node_uuid and
354          self.lu.op.target_node_uuid != target_node_uuid):
355         if self.failover:
356           text = "failed over"
357         else:
358           text = "migrated"
359         raise errors.OpPrereqError("Instances with disk template %s cannot"
360                                    " be %s to arbitrary nodes"
361                                    " (neither an iallocator nor a target"
362                                    " node can be passed)" %
363                                    (self.instance.disk_template, text),
364                                    errors.ECODE_INVAL)
365       nodeinfo = self.cfg.GetNodeInfo(target_node_uuid)
366       group_info = self.cfg.GetNodeGroup(nodeinfo.group)
367       ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
368                                                               group_info)
369       CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo,
370                              self.cfg, ignore=self.ignore_ipolicy)
371
372     i_be = cluster.FillBE(self.instance)
373
374     # check memory requirements on the secondary node
375     if (not self.cleanup and
376          (not self.failover or
377            self.instance.admin_state == constants.ADMINST_UP)):
378       self.tgt_free_mem = CheckNodeFreeMemory(
379           self.lu, target_node_uuid,
380           "migrating instance %s" % self.instance.name,
381           i_be[constants.BE_MINMEM], self.instance.hypervisor,
382           self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor])
383     else:
384       self.lu.LogInfo("Not checking memory on the secondary node as"
385                       " instance will not be started")
386
387     # check if failover must be forced instead of migration
388     if (not self.cleanup and not self.failover and
389         i_be[constants.BE_ALWAYS_FAILOVER]):
390       self.lu.LogInfo("Instance configured to always failover; fallback"
391                       " to failover")
392       self.failover = True
393
394     # check bridge existance
395     CheckInstanceBridgesExist(self.lu, self.instance,
396                               node_uuid=target_node_uuid)
397
398     if not self.cleanup:
399       CheckNodeNotDrained(self.lu, target_node_uuid)
400       if not self.failover:
401         result = self.rpc.call_instance_migratable(self.instance.primary_node,
402                                                    self.instance)
403         if result.fail_msg and self.fallback:
404           self.lu.LogInfo("Can't migrate, instance offline, fallback to"
405                           " failover")
406           self.failover = True
407         else:
408           result.Raise("Can't migrate, please use failover",
409                        prereq=True, ecode=errors.ECODE_STATE)
410
411     assert not (self.failover and self.cleanup)
412
413     if not self.failover:
414       if self.lu.op.live is not None and self.lu.op.mode is not None:
415         raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
416                                    " parameters are accepted",
417                                    errors.ECODE_INVAL)
418       if self.lu.op.live is not None:
419         if self.lu.op.live:
420           self.lu.op.mode = constants.HT_MIGRATION_LIVE
421         else:
422           self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
423         # reset the 'live' parameter to None so that repeated
424         # invocations of CheckPrereq do not raise an exception
425         self.lu.op.live = None
426       elif self.lu.op.mode is None:
427         # read the default value from the hypervisor
428         i_hv = cluster.FillHV(self.instance, skip_globals=False)
429         self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
430
431       self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
432     else:
433       # Failover is never live
434       self.live = False
435
436     if not (self.failover or self.cleanup):
437       remote_info = self.rpc.call_instance_info(
438           self.instance.primary_node, self.instance.name,
439           self.instance.hypervisor, cluster.hvparams[self.instance.hypervisor])
440       remote_info.Raise("Error checking instance on node %s" %
441                         self.cfg.GetNodeName(self.instance.primary_node))
442       instance_running = bool(remote_info.payload)
443       if instance_running:
444         self.current_mem = int(remote_info.payload["memory"])
445
446   def _RunAllocator(self):
447     """Run the allocator based on input opcode.
448
449     """
450     assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
451
452     # FIXME: add a self.ignore_ipolicy option
453     req = iallocator.IAReqRelocate(
454           inst_uuid=self.instance_uuid,
455           relocate_from_node_uuids=[self.instance.primary_node])
456     ial = iallocator.IAllocator(self.cfg, self.rpc, req)
457
458     ial.Run(self.lu.op.iallocator)
459
460     if not ial.success:
461       raise errors.OpPrereqError("Can't compute nodes using"
462                                  " iallocator '%s': %s" %
463                                  (self.lu.op.iallocator, ial.info),
464                                  errors.ECODE_NORES)
465     self.target_node_uuid = self.cfg.GetNodeInfoByName(ial.result[0]).uuid
466     self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
467                     self.instance_name, self.lu.op.iallocator,
468                     utils.CommaJoin(ial.result))
469
470   def _WaitUntilSync(self):
471     """Poll with custom rpc for disk sync.
472
473     This uses our own step-based rpc call.
474
475     """
476     self.feedback_fn("* wait until resync is done")
477     all_done = False
478     while not all_done:
479       all_done = True
480       result = self.rpc.call_drbd_wait_sync(self.all_node_uuids,
481                                             self.nodes_ip,
482                                             (self.instance.disks,
483                                              self.instance))
484       min_percent = 100
485       for node_uuid, nres in result.items():
486         nres.Raise("Cannot resync disks on node %s" %
487                    self.cfg.GetNodeName(node_uuid))
488         node_done, node_percent = nres.payload
489         all_done = all_done and node_done
490         if node_percent is not None:
491           min_percent = min(min_percent, node_percent)
492       if not all_done:
493         if min_percent < 100:
494           self.feedback_fn("   - progress: %.1f%%" % min_percent)
495         time.sleep(2)
496
497   def _EnsureSecondary(self, node_uuid):
498     """Demote a node to secondary.
499
500     """
501     self.feedback_fn("* switching node %s to secondary mode" %
502                      self.cfg.GetNodeName(node_uuid))
503
504     for dev in self.instance.disks:
505       self.cfg.SetDiskID(dev, node_uuid)
506
507     result = self.rpc.call_blockdev_close(node_uuid, self.instance.name,
508                                           self.instance.disks)
509     result.Raise("Cannot change disk to secondary on node %s" %
510                  self.cfg.GetNodeName(node_uuid))
511
512   def _GoStandalone(self):
513     """Disconnect from the network.
514
515     """
516     self.feedback_fn("* changing into standalone mode")
517     result = self.rpc.call_drbd_disconnect_net(self.all_node_uuids,
518                                                self.nodes_ip,
519                                                self.instance.disks)
520     for node_uuid, nres in result.items():
521       nres.Raise("Cannot disconnect disks node %s" %
522                  self.cfg.GetNodeName(node_uuid))
523
524   def _GoReconnect(self, multimaster):
525     """Reconnect to the network.
526
527     """
528     if multimaster:
529       msg = "dual-master"
530     else:
531       msg = "single-master"
532     self.feedback_fn("* changing disks into %s mode" % msg)
533     result = self.rpc.call_drbd_attach_net(self.all_node_uuids, self.nodes_ip,
534                                            (self.instance.disks, self.instance),
535                                            self.instance.name, multimaster)
536     for node_uuid, nres in result.items():
537       nres.Raise("Cannot change disks config on node %s" %
538                  self.cfg.GetNodeName(node_uuid))
539
540   def _ExecCleanup(self):
541     """Try to cleanup after a failed migration.
542
543     The cleanup is done by:
544       - check that the instance is running only on one node
545         (and update the config if needed)
546       - change disks on its secondary node to secondary
547       - wait until disks are fully synchronized
548       - disconnect from the network
549       - change disks into single-master mode
550       - wait again until disks are fully synchronized
551
552     """
553     # check running on only one node
554     self.feedback_fn("* checking where the instance actually runs"
555                      " (if this hangs, the hypervisor might be in"
556                      " a bad state)")
557     cluster_hvparams = self.cfg.GetClusterInfo().hvparams
558     ins_l = self.rpc.call_instance_list(self.all_node_uuids,
559                                         [self.instance.hypervisor],
560                                         cluster_hvparams)
561     for node_uuid, result in ins_l.items():
562       result.Raise("Can't contact node %s" % node_uuid)
563
564     runningon_source = self.instance.name in \
565                          ins_l[self.source_node_uuid].payload
566     runningon_target = self.instance.name in \
567                          ins_l[self.target_node_uuid].payload
568
569     if runningon_source and runningon_target:
570       raise errors.OpExecError("Instance seems to be running on two nodes,"
571                                " or the hypervisor is confused; you will have"
572                                " to ensure manually that it runs only on one"
573                                " and restart this operation")
574
575     if not (runningon_source or runningon_target):
576       raise errors.OpExecError("Instance does not seem to be running at all;"
577                                " in this case it's safer to repair by"
578                                " running 'gnt-instance stop' to ensure disk"
579                                " shutdown, and then restarting it")
580
581     if runningon_target:
582       # the migration has actually succeeded, we need to update the config
583       self.feedback_fn("* instance running on secondary node (%s),"
584                        " updating config" %
585                        self.cfg.GetNodeName(self.target_node_uuid))
586       self.instance.primary_node = self.target_node_uuid
587       self.cfg.Update(self.instance, self.feedback_fn)
588       demoted_node_uuid = self.source_node_uuid
589     else:
590       self.feedback_fn("* instance confirmed to be running on its"
591                        " primary node (%s)" %
592                        self.cfg.GetNodeName(self.source_node_uuid))
593       demoted_node_uuid = self.target_node_uuid
594
595     if self.instance.disk_template in constants.DTS_INT_MIRROR:
596       self._EnsureSecondary(demoted_node_uuid)
597       try:
598         self._WaitUntilSync()
599       except errors.OpExecError:
600         # we ignore here errors, since if the device is standalone, it
601         # won't be able to sync
602         pass
603       self._GoStandalone()
604       self._GoReconnect(False)
605       self._WaitUntilSync()
606
607     self.feedback_fn("* done")
608
609   def _RevertDiskStatus(self):
610     """Try to revert the disk status after a failed migration.
611
612     """
613     if self.instance.disk_template in constants.DTS_EXT_MIRROR:
614       return
615
616     try:
617       self._EnsureSecondary(self.target_node_uuid)
618       self._GoStandalone()
619       self._GoReconnect(False)
620       self._WaitUntilSync()
621     except errors.OpExecError, err:
622       self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
623                          " please try to recover the instance manually;"
624                          " error '%s'" % str(err))
625
626   def _AbortMigration(self):
627     """Call the hypervisor code to abort a started migration.
628
629     """
630     abort_result = self.rpc.call_instance_finalize_migration_dst(
631                      self.target_node_uuid, self.instance, self.migration_info,
632                      False)
633     abort_msg = abort_result.fail_msg
634     if abort_msg:
635       logging.error("Aborting migration failed on target node %s: %s",
636                     self.cfg.GetNodeName(self.target_node_uuid), abort_msg)
637       # Don't raise an exception here, as we stil have to try to revert the
638       # disk status, even if this step failed.
639
640     abort_result = self.rpc.call_instance_finalize_migration_src(
641       self.source_node_uuid, self.instance, False, self.live)
642     abort_msg = abort_result.fail_msg
643     if abort_msg:
644       logging.error("Aborting migration failed on source node %s: %s",
645                     self.cfg.GetNodeName(self.source_node_uuid), abort_msg)
646
647   def _ExecMigration(self):
648     """Migrate an instance.
649
650     The migrate is done by:
651       - change the disks into dual-master mode
652       - wait until disks are fully synchronized again
653       - migrate the instance
654       - change disks on the new secondary node (the old primary) to secondary
655       - wait until disks are fully synchronized
656       - change disks into single-master mode
657
658     """
659     # Check for hypervisor version mismatch and warn the user.
660     hvspecs = [(self.instance.hypervisor,
661                 self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor])]
662     nodeinfo = self.rpc.call_node_info(
663                  [self.source_node_uuid, self.target_node_uuid], None, hvspecs)
664     for ninfo in nodeinfo.values():
665       ninfo.Raise("Unable to retrieve node information from node '%s'" %
666                   ninfo.node)
667     (_, _, (src_info, )) = nodeinfo[self.source_node_uuid].payload
668     (_, _, (dst_info, )) = nodeinfo[self.target_node_uuid].payload
669
670     if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
671         (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
672       src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
673       dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
674       if src_version != dst_version:
675         self.feedback_fn("* warning: hypervisor version mismatch between"
676                          " source (%s) and target (%s) node" %
677                          (src_version, dst_version))
678
679     self.feedback_fn("* checking disk consistency between source and target")
680     for (idx, dev) in enumerate(self.instance.disks):
681       if not CheckDiskConsistency(self.lu, self.instance, dev,
682                                   self.target_node_uuid,
683                                   False):
684         raise errors.OpExecError("Disk %s is degraded or not fully"
685                                  " synchronized on target node,"
686                                  " aborting migration" % idx)
687
688     if self.current_mem > self.tgt_free_mem:
689       if not self.allow_runtime_changes:
690         raise errors.OpExecError("Memory ballooning not allowed and not enough"
691                                  " free memory to fit instance %s on target"
692                                  " node %s (have %dMB, need %dMB)" %
693                                  (self.instance.name,
694                                   self.cfg.GetNodeName(self.target_node_uuid),
695                                   self.tgt_free_mem, self.current_mem))
696       self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
697       rpcres = self.rpc.call_instance_balloon_memory(self.instance.primary_node,
698                                                      self.instance,
699                                                      self.tgt_free_mem)
700       rpcres.Raise("Cannot modify instance runtime memory")
701
702     # First get the migration information from the remote node
703     result = self.rpc.call_migration_info(self.source_node_uuid, self.instance)
704     msg = result.fail_msg
705     if msg:
706       log_err = ("Failed fetching source migration information from %s: %s" %
707                  (self.cfg.GetNodeName(self.source_node_uuid), msg))
708       logging.error(log_err)
709       raise errors.OpExecError(log_err)
710
711     self.migration_info = migration_info = result.payload
712
713     if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
714       # Then switch the disks to master/master mode
715       self._EnsureSecondary(self.target_node_uuid)
716       self._GoStandalone()
717       self._GoReconnect(True)
718       self._WaitUntilSync()
719
720     self.feedback_fn("* preparing %s to accept the instance" %
721                      self.cfg.GetNodeName(self.target_node_uuid))
722     result = self.rpc.call_accept_instance(self.target_node_uuid,
723                                            self.instance,
724                                            migration_info,
725                                            self.nodes_ip[self.target_node_uuid])
726
727     msg = result.fail_msg
728     if msg:
729       logging.error("Instance pre-migration failed, trying to revert"
730                     " disk status: %s", msg)
731       self.feedback_fn("Pre-migration failed, aborting")
732       self._AbortMigration()
733       self._RevertDiskStatus()
734       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
735                                (self.instance.name, msg))
736
737     self.feedback_fn("* migrating instance to %s" %
738                      self.cfg.GetNodeName(self.target_node_uuid))
739     cluster = self.cfg.GetClusterInfo()
740     result = self.rpc.call_instance_migrate(
741         self.source_node_uuid, cluster.cluster_name, self.instance,
742         self.nodes_ip[self.target_node_uuid], self.live)
743     msg = result.fail_msg
744     if msg:
745       logging.error("Instance migration failed, trying to revert"
746                     " disk status: %s", msg)
747       self.feedback_fn("Migration failed, aborting")
748       self._AbortMigration()
749       self._RevertDiskStatus()
750       raise errors.OpExecError("Could not migrate instance %s: %s" %
751                                (self.instance.name, msg))
752
753     self.feedback_fn("* starting memory transfer")
754     last_feedback = time.time()
755     while True:
756       result = self.rpc.call_instance_get_migration_status(
757                  self.source_node_uuid, self.instance)
758       msg = result.fail_msg
759       ms = result.payload   # MigrationStatus instance
760       if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
761         logging.error("Instance migration failed, trying to revert"
762                       " disk status: %s", msg)
763         self.feedback_fn("Migration failed, aborting")
764         self._AbortMigration()
765         self._RevertDiskStatus()
766         if not msg:
767           msg = "hypervisor returned failure"
768         raise errors.OpExecError("Could not migrate instance %s: %s" %
769                                  (self.instance.name, msg))
770
771       if result.payload.status != constants.HV_MIGRATION_ACTIVE:
772         self.feedback_fn("* memory transfer complete")
773         break
774
775       if (utils.TimeoutExpired(last_feedback,
776                                self._MIGRATION_FEEDBACK_INTERVAL) and
777           ms.transferred_ram is not None):
778         mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
779         self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
780         last_feedback = time.time()
781
782       time.sleep(self._MIGRATION_POLL_INTERVAL)
783
784     result = self.rpc.call_instance_finalize_migration_src(
785                self.source_node_uuid, self.instance, True, self.live)
786     msg = result.fail_msg
787     if msg:
788       logging.error("Instance migration succeeded, but finalization failed"
789                     " on the source node: %s", msg)
790       raise errors.OpExecError("Could not finalize instance migration: %s" %
791                                msg)
792
793     self.instance.primary_node = self.target_node_uuid
794
795     # distribute new instance config to the other nodes
796     self.cfg.Update(self.instance, self.feedback_fn)
797
798     result = self.rpc.call_instance_finalize_migration_dst(
799                self.target_node_uuid, self.instance, migration_info, True)
800     msg = result.fail_msg
801     if msg:
802       logging.error("Instance migration succeeded, but finalization failed"
803                     " on the target node: %s", msg)
804       raise errors.OpExecError("Could not finalize instance migration: %s" %
805                                msg)
806
807     if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
808       self._EnsureSecondary(self.source_node_uuid)
809       self._WaitUntilSync()
810       self._GoStandalone()
811       self._GoReconnect(False)
812       self._WaitUntilSync()
813
814     # If the instance's disk template is `rbd' or `ext' and there was a
815     # successful migration, unmap the device from the source node.
816     if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
817       disks = ExpandCheckDisks(self.instance, self.instance.disks)
818       self.feedback_fn("* unmapping instance's disks from %s" %
819                        self.cfg.GetNodeName(self.source_node_uuid))
820       for disk in disks:
821         result = self.rpc.call_blockdev_shutdown(self.source_node_uuid,
822                                                  (disk, self.instance))
823         msg = result.fail_msg
824         if msg:
825           logging.error("Migration was successful, but couldn't unmap the"
826                         " block device %s on source node %s: %s",
827                         disk.iv_name,
828                         self.cfg.GetNodeName(self.source_node_uuid), msg)
829           logging.error("You need to unmap the device %s manually on %s",
830                         disk.iv_name,
831                         self.cfg.GetNodeName(self.source_node_uuid))
832
833     self.feedback_fn("* done")
834
835   def _ExecFailover(self):
836     """Failover an instance.
837
838     The failover is done by shutting it down on its present node and
839     starting it on the secondary.
840
841     """
842     primary_node = self.cfg.GetNodeInfo(self.instance.primary_node)
843
844     source_node_uuid = self.instance.primary_node
845
846     if self.instance.disks_active:
847       self.feedback_fn("* checking disk consistency between source and target")
848       for (idx, dev) in enumerate(self.instance.disks):
849         # for drbd, these are drbd over lvm
850         if not CheckDiskConsistency(self.lu, self.instance, dev,
851                                     self.target_node_uuid, False):
852           if primary_node.offline:
853             self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
854                              " target node %s" %
855                              (primary_node.name, idx,
856                               self.cfg.GetNodeName(self.target_node_uuid)))
857           elif not self.ignore_consistency:
858             raise errors.OpExecError("Disk %s is degraded on target node,"
859                                      " aborting failover" % idx)
860     else:
861       self.feedback_fn("* not checking disk consistency as instance is not"
862                        " running")
863
864     self.feedback_fn("* shutting down instance on source node")
865     logging.info("Shutting down instance %s on node %s",
866                  self.instance.name, self.cfg.GetNodeName(source_node_uuid))
867
868     result = self.rpc.call_instance_shutdown(source_node_uuid, self.instance,
869                                              self.shutdown_timeout,
870                                              self.lu.op.reason)
871     msg = result.fail_msg
872     if msg:
873       if self.ignore_consistency or primary_node.offline:
874         self.lu.LogWarning("Could not shutdown instance %s on node %s,"
875                            " proceeding anyway; please make sure node"
876                            " %s is down; error details: %s",
877                            self.instance.name,
878                            self.cfg.GetNodeName(source_node_uuid),
879                            self.cfg.GetNodeName(source_node_uuid), msg)
880       else:
881         raise errors.OpExecError("Could not shutdown instance %s on"
882                                  " node %s: %s" %
883                                  (self.instance.name,
884                                   self.cfg.GetNodeName(source_node_uuid), msg))
885
886     self.feedback_fn("* deactivating the instance's disks on source node")
887     if not ShutdownInstanceDisks(self.lu, self.instance, ignore_primary=True):
888       raise errors.OpExecError("Can't shut down the instance's disks")
889
890     self.instance.primary_node = self.target_node_uuid
891     # distribute new instance config to the other nodes
892     self.cfg.Update(self.instance, self.feedback_fn)
893
894     # Only start the instance if it's marked as up
895     if self.instance.admin_state == constants.ADMINST_UP:
896       self.feedback_fn("* activating the instance's disks on target node %s" %
897                        self.cfg.GetNodeName(self.target_node_uuid))
898       logging.info("Starting instance %s on node %s", self.instance.name,
899                    self.cfg.GetNodeName(self.target_node_uuid))
900
901       disks_ok, _ = AssembleInstanceDisks(self.lu, self.instance,
902                                           ignore_secondaries=True)
903       if not disks_ok:
904         ShutdownInstanceDisks(self.lu, self.instance)
905         raise errors.OpExecError("Can't activate the instance's disks")
906
907       self.feedback_fn("* starting the instance on the target node %s" %
908                        self.cfg.GetNodeName(self.target_node_uuid))
909       result = self.rpc.call_instance_start(self.target_node_uuid,
910                                             (self.instance, None, None), False,
911                                             self.lu.op.reason)
912       msg = result.fail_msg
913       if msg:
914         ShutdownInstanceDisks(self.lu, self.instance)
915         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
916                                  (self.instance.name,
917                                   self.cfg.GetNodeName(self.target_node_uuid),
918                                   msg))
919
920   def Exec(self, feedback_fn):
921     """Perform the migration.
922
923     """
924     self.feedback_fn = feedback_fn
925     self.source_node_uuid = self.instance.primary_node
926
927     # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
928     if self.instance.disk_template in constants.DTS_INT_MIRROR:
929       self.target_node_uuid = self.instance.secondary_nodes[0]
930       # Otherwise self.target_node has been populated either
931       # directly, or through an iallocator.
932
933     self.all_node_uuids = [self.source_node_uuid, self.target_node_uuid]
934     self.nodes_ip = dict((uuid, node.secondary_ip) for (uuid, node)
935                          in self.cfg.GetMultiNodeInfo(self.all_node_uuids))
936
937     if self.failover:
938       feedback_fn("Failover instance %s" % self.instance.name)
939       self._ExecFailover()
940     else:
941       feedback_fn("Migrating instance %s" % self.instance.name)
942
943       if self.cleanup:
944         return self._ExecCleanup()
945       else:
946         return self._ExecMigration()