Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_migration.py @ 8ac806e6

History | View | Annotate | Download (35.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with instance migration an failover."""
23

    
24
import logging
25
import time
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti.masterd import iallocator
31
from ganeti import utils
32
from ganeti.cmdlib.base import LogicalUnit, Tasklet
33
from ganeti.cmdlib.common import ExpandInstanceName, \
34
  CheckIAllocatorOrNode, ExpandNodeName
35
from ganeti.cmdlib.instance_storage import CheckDiskConsistency, \
36
  ExpandCheckDisks, ShutdownInstanceDisks, AssembleInstanceDisks
37
from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \
38
  CheckTargetNodeIPolicy, ReleaseLocks, CheckNodeNotDrained, \
39
  CopyLockList, CheckNodeFreeMemory, CheckInstanceBridgesExist
40

    
41
import ganeti.masterd.instance
42

    
43

    
44
def _ExpandNamesForMigration(lu):
45
  """Expands names for use with L{TLMigrateInstance}.
46

47
  @type lu: L{LogicalUnit}
48

49
  """
50
  if lu.op.target_node is not None:
51
    lu.op.target_node = ExpandNodeName(lu.cfg, lu.op.target_node)
52

    
53
  lu.needed_locks[locking.LEVEL_NODE] = []
54
  lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
55

    
56
  lu.needed_locks[locking.LEVEL_NODE_RES] = []
57
  lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
58

    
59
  # The node allocation lock is actually only needed for externally replicated
60
  # instances (e.g. sharedfile or RBD) and if an iallocator is used.
61
  lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
62

    
63

    
64
def _DeclareLocksForMigration(lu, level):
65
  """Declares locks for L{TLMigrateInstance}.
66

67
  @type lu: L{LogicalUnit}
68
  @param level: Lock level
69

70
  """
71
  if level == locking.LEVEL_NODE_ALLOC:
72
    assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
73

    
74
    instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
75

    
76
    # Node locks are already declared here rather than at LEVEL_NODE as we need
77
    # the instance object anyway to declare the node allocation lock.
78
    if instance.disk_template in constants.DTS_EXT_MIRROR:
79
      if lu.op.target_node is None:
80
        lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
81
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
82
      else:
83
        lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
84
                                               lu.op.target_node]
85
      del lu.recalculate_locks[locking.LEVEL_NODE]
86
    else:
87
      lu._LockInstancesNodes() # pylint: disable=W0212
88

    
89
  elif level == locking.LEVEL_NODE:
90
    # Node locks are declared together with the node allocation lock
91
    assert (lu.needed_locks[locking.LEVEL_NODE] or
92
            lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
93

    
94
  elif level == locking.LEVEL_NODE_RES:
95
    # Copy node locks
96
    lu.needed_locks[locking.LEVEL_NODE_RES] = \
97
      CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
98

    
99

    
100
class LUInstanceFailover(LogicalUnit):
101
  """Failover an instance.
102

103
  """
104
  HPATH = "instance-failover"
105
  HTYPE = constants.HTYPE_INSTANCE
106
  REQ_BGL = False
107

    
108
  def CheckArguments(self):
109
    """Check the arguments.
110

111
    """
112
    self.iallocator = getattr(self.op, "iallocator", None)
113
    self.target_node = getattr(self.op, "target_node", None)
114

    
115
  def ExpandNames(self):
116
    self._ExpandAndLockInstance()
117
    _ExpandNamesForMigration(self)
118

    
119
    self._migrater = \
120
      TLMigrateInstance(self, self.op.instance_name, False, True, False,
121
                        self.op.ignore_consistency, True,
122
                        self.op.shutdown_timeout, self.op.ignore_ipolicy)
123

    
124
    self.tasklets = [self._migrater]
125

    
126
  def DeclareLocks(self, level):
127
    _DeclareLocksForMigration(self, level)
128

    
129
  def BuildHooksEnv(self):
130
    """Build hooks env.
131

132
    This runs on master, primary and secondary nodes of the instance.
133

134
    """
135
    instance = self._migrater.instance
136
    source_node = instance.primary_node
137
    target_node = self.op.target_node
138
    env = {
139
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
140
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
141
      "OLD_PRIMARY": source_node,
142
      "NEW_PRIMARY": target_node,
143
      }
144

    
145
    if instance.disk_template in constants.DTS_INT_MIRROR:
146
      env["OLD_SECONDARY"] = instance.secondary_nodes[0]
147
      env["NEW_SECONDARY"] = source_node
148
    else:
149
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
150

    
151
    env.update(BuildInstanceHookEnvByObject(self, instance))
152

    
153
    return env
154

    
155
  def BuildHooksNodes(self):
156
    """Build hooks nodes.
157

158
    """
159
    instance = self._migrater.instance
160
    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
161
    return (nl, nl + [instance.primary_node])
162

    
163

    
164
class LUInstanceMigrate(LogicalUnit):
165
  """Migrate an instance.
166

167
  This is migration without shutting down, compared to the failover,
168
  which is done with shutdown.
169

170
  """
171
  HPATH = "instance-migrate"
172
  HTYPE = constants.HTYPE_INSTANCE
173
  REQ_BGL = False
174

    
175
  def ExpandNames(self):
176
    self._ExpandAndLockInstance()
177
    _ExpandNamesForMigration(self)
178

    
179
    self._migrater = \
180
      TLMigrateInstance(self, self.op.instance_name, self.op.cleanup,
181
                        False, self.op.allow_failover, False,
182
                        self.op.allow_runtime_changes,
183
                        constants.DEFAULT_SHUTDOWN_TIMEOUT,
184
                        self.op.ignore_ipolicy)
185

    
186
    self.tasklets = [self._migrater]
187

    
188
  def DeclareLocks(self, level):
189
    _DeclareLocksForMigration(self, level)
190

    
191
  def BuildHooksEnv(self):
192
    """Build hooks env.
193

194
    This runs on master, primary and secondary nodes of the instance.
195

196
    """
197
    instance = self._migrater.instance
198
    source_node = instance.primary_node
199
    target_node = self.op.target_node
200
    env = BuildInstanceHookEnvByObject(self, instance)
201
    env.update({
202
      "MIGRATE_LIVE": self._migrater.live,
203
      "MIGRATE_CLEANUP": self.op.cleanup,
204
      "OLD_PRIMARY": source_node,
205
      "NEW_PRIMARY": target_node,
206
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
207
      })
208

    
209
    if instance.disk_template in constants.DTS_INT_MIRROR:
210
      env["OLD_SECONDARY"] = target_node
211
      env["NEW_SECONDARY"] = source_node
212
    else:
213
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
214

    
215
    return env
216

    
217
  def BuildHooksNodes(self):
218
    """Build hooks nodes.
219

220
    """
221
    instance = self._migrater.instance
222
    snodes = list(instance.secondary_nodes)
223
    nl = [self.cfg.GetMasterNode(), instance.primary_node] + snodes
224
    return (nl, nl)
225

    
226

    
227
class TLMigrateInstance(Tasklet):
228
  """Tasklet class for instance migration.
229

230
  @type live: boolean
231
  @ivar live: whether the migration will be done live or non-live;
232
      this variable is initalized only after CheckPrereq has run
233
  @type cleanup: boolean
234
  @ivar cleanup: Wheater we cleanup from a failed migration
235
  @type iallocator: string
236
  @ivar iallocator: The iallocator used to determine target_node
237
  @type target_node: string
238
  @ivar target_node: If given, the target_node to reallocate the instance to
239
  @type failover: boolean
240
  @ivar failover: Whether operation results in failover or migration
241
  @type fallback: boolean
242
  @ivar fallback: Whether fallback to failover is allowed if migration not
243
                  possible
244
  @type ignore_consistency: boolean
245
  @ivar ignore_consistency: Wheter we should ignore consistency between source
246
                            and target node
247
  @type shutdown_timeout: int
248
  @ivar shutdown_timeout: In case of failover timeout of the shutdown
249
  @type ignore_ipolicy: bool
250
  @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
251

252
  """
253

    
254
  # Constants
255
  _MIGRATION_POLL_INTERVAL = 1      # seconds
256
  _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
257

    
258
  def __init__(self, lu, instance_name, cleanup, failover, fallback,
259
               ignore_consistency, allow_runtime_changes, shutdown_timeout,
260
               ignore_ipolicy):
261
    """Initializes this class.
262

263
    """
264
    Tasklet.__init__(self, lu)
265

    
266
    # Parameters
267
    self.instance_name = instance_name
268
    self.cleanup = cleanup
269
    self.live = False # will be overridden later
270
    self.failover = failover
271
    self.fallback = fallback
272
    self.ignore_consistency = ignore_consistency
273
    self.shutdown_timeout = shutdown_timeout
274
    self.ignore_ipolicy = ignore_ipolicy
275
    self.allow_runtime_changes = allow_runtime_changes
276

    
277
  def CheckPrereq(self):
278
    """Check prerequisites.
279

280
    This checks that the instance is in the cluster.
281

282
    """
283
    instance_name = ExpandInstanceName(self.lu.cfg, self.instance_name)
284
    instance = self.cfg.GetInstanceInfo(instance_name)
285
    assert instance is not None
286
    self.instance = instance
287
    cluster = self.cfg.GetClusterInfo()
288

    
289
    if (not self.cleanup and
290
        not instance.admin_state == constants.ADMINST_UP and
291
        not self.failover and self.fallback):
292
      self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
293
                      " switching to failover")
294
      self.failover = True
295

    
296
    if instance.disk_template not in constants.DTS_MIRRORED:
297
      if self.failover:
298
        text = "failovers"
299
      else:
300
        text = "migrations"
301
      raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
302
                                 " %s" % (instance.disk_template, text),
303
                                 errors.ECODE_STATE)
304

    
305
    if instance.disk_template in constants.DTS_EXT_MIRROR:
306
      CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
307

    
308
      if self.lu.op.iallocator:
309
        assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
310
        self._RunAllocator()
311
      else:
312
        # We set set self.target_node as it is required by
313
        # BuildHooksEnv
314
        self.target_node = self.lu.op.target_node
315

    
316
      # Check that the target node is correct in terms of instance policy
317
      nodeinfo = self.cfg.GetNodeInfo(self.target_node)
318
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
319
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
320
                                                              group_info)
321
      CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
322
                             ignore=self.ignore_ipolicy)
323

    
324
      # self.target_node is already populated, either directly or by the
325
      # iallocator run
326
      target_node = self.target_node
327
      if self.target_node == instance.primary_node:
328
        raise errors.OpPrereqError("Cannot migrate instance %s"
329
                                   " to its primary (%s)" %
330
                                   (instance.name, instance.primary_node),
331
                                   errors.ECODE_STATE)
332

    
333
      if len(self.lu.tasklets) == 1:
334
        # It is safe to release locks only when we're the only tasklet
335
        # in the LU
336
        ReleaseLocks(self.lu, locking.LEVEL_NODE,
337
                     keep=[instance.primary_node, self.target_node])
338
        ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
339

    
340
    else:
341
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
342

    
343
      secondary_nodes = instance.secondary_nodes
344
      if not secondary_nodes:
345
        raise errors.ConfigurationError("No secondary node but using"
346
                                        " %s disk template" %
347
                                        instance.disk_template)
348
      target_node = secondary_nodes[0]
349
      if self.lu.op.iallocator or (self.lu.op.target_node and
350
                                   self.lu.op.target_node != target_node):
351
        if self.failover:
352
          text = "failed over"
353
        else:
354
          text = "migrated"
355
        raise errors.OpPrereqError("Instances with disk template %s cannot"
356
                                   " be %s to arbitrary nodes"
357
                                   " (neither an iallocator nor a target"
358
                                   " node can be passed)" %
359
                                   (instance.disk_template, text),
360
                                   errors.ECODE_INVAL)
361
      nodeinfo = self.cfg.GetNodeInfo(target_node)
362
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
363
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
364
                                                              group_info)
365
      CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
366
                             ignore=self.ignore_ipolicy)
367

    
368
    i_be = cluster.FillBE(instance)
369

    
370
    # check memory requirements on the secondary node
371
    if (not self.cleanup and
372
         (not self.failover or instance.admin_state == constants.ADMINST_UP)):
373
      self.tgt_free_mem = CheckNodeFreeMemory(self.lu, target_node,
374
                                              "migrating instance %s" %
375
                                              instance.name,
376
                                              i_be[constants.BE_MINMEM],
377
                                              instance.hypervisor)
378
    else:
379
      self.lu.LogInfo("Not checking memory on the secondary node as"
380
                      " instance will not be started")
381

    
382
    # check if failover must be forced instead of migration
383
    if (not self.cleanup and not self.failover and
384
        i_be[constants.BE_ALWAYS_FAILOVER]):
385
      self.lu.LogInfo("Instance configured to always failover; fallback"
386
                      " to failover")
387
      self.failover = True
388

    
389
    # check bridge existance
390
    CheckInstanceBridgesExist(self.lu, instance, node=target_node)
391

    
392
    if not self.cleanup:
393
      CheckNodeNotDrained(self.lu, target_node)
394
      if not self.failover:
395
        result = self.rpc.call_instance_migratable(instance.primary_node,
396
                                                   instance)
397
        if result.fail_msg and self.fallback:
398
          self.lu.LogInfo("Can't migrate, instance offline, fallback to"
399
                          " failover")
400
          self.failover = True
401
        else:
402
          result.Raise("Can't migrate, please use failover",
403
                       prereq=True, ecode=errors.ECODE_STATE)
404

    
405
    assert not (self.failover and self.cleanup)
406

    
407
    if not self.failover:
408
      if self.lu.op.live is not None and self.lu.op.mode is not None:
409
        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
410
                                   " parameters are accepted",
411
                                   errors.ECODE_INVAL)
412
      if self.lu.op.live is not None:
413
        if self.lu.op.live:
414
          self.lu.op.mode = constants.HT_MIGRATION_LIVE
415
        else:
416
          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
417
        # reset the 'live' parameter to None so that repeated
418
        # invocations of CheckPrereq do not raise an exception
419
        self.lu.op.live = None
420
      elif self.lu.op.mode is None:
421
        # read the default value from the hypervisor
422
        i_hv = cluster.FillHV(self.instance, skip_globals=False)
423
        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
424

    
425
      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
426
    else:
427
      # Failover is never live
428
      self.live = False
429

    
430
    if not (self.failover or self.cleanup):
431
      remote_info = self.rpc.call_instance_info(instance.primary_node,
432
                                                instance.name,
433
                                                instance.hypervisor)
434
      remote_info.Raise("Error checking instance on node %s" %
435
                        instance.primary_node)
436
      instance_running = bool(remote_info.payload)
437
      if instance_running:
438
        self.current_mem = int(remote_info.payload["memory"])
439

    
440
  def _RunAllocator(self):
441
    """Run the allocator based on input opcode.
442

443
    """
444
    assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
445

    
446
    # FIXME: add a self.ignore_ipolicy option
447
    req = iallocator.IAReqRelocate(name=self.instance_name,
448
                                   relocate_from=[self.instance.primary_node])
449
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
450

    
451
    ial.Run(self.lu.op.iallocator)
452

    
453
    if not ial.success:
454
      raise errors.OpPrereqError("Can't compute nodes using"
455
                                 " iallocator '%s': %s" %
456
                                 (self.lu.op.iallocator, ial.info),
457
                                 errors.ECODE_NORES)
458
    self.target_node = ial.result[0]
459
    self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
460
                    self.instance_name, self.lu.op.iallocator,
461
                    utils.CommaJoin(ial.result))
462

    
463
  def _WaitUntilSync(self):
464
    """Poll with custom rpc for disk sync.
465

466
    This uses our own step-based rpc call.
467

468
    """
469
    self.feedback_fn("* wait until resync is done")
470
    all_done = False
471
    while not all_done:
472
      all_done = True
473
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
474
                                            self.nodes_ip,
475
                                            (self.instance.disks,
476
                                             self.instance))
477
      min_percent = 100
478
      for node, nres in result.items():
479
        nres.Raise("Cannot resync disks on node %s" % node)
480
        node_done, node_percent = nres.payload
481
        all_done = all_done and node_done
482
        if node_percent is not None:
483
          min_percent = min(min_percent, node_percent)
484
      if not all_done:
485
        if min_percent < 100:
486
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
487
        time.sleep(2)
488

    
489
  def _EnsureSecondary(self, node):
490
    """Demote a node to secondary.
491

492
    """
493
    self.feedback_fn("* switching node %s to secondary mode" % node)
494

    
495
    for dev in self.instance.disks:
496
      self.cfg.SetDiskID(dev, node)
497

    
498
    result = self.rpc.call_blockdev_close(node, self.instance.name,
499
                                          self.instance.disks)
500
    result.Raise("Cannot change disk to secondary on node %s" % node)
501

    
502
  def _GoStandalone(self):
503
    """Disconnect from the network.
504

505
    """
506
    self.feedback_fn("* changing into standalone mode")
507
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
508
                                               self.instance.disks)
509
    for node, nres in result.items():
510
      nres.Raise("Cannot disconnect disks node %s" % node)
511

    
512
  def _GoReconnect(self, multimaster):
513
    """Reconnect to the network.
514

515
    """
516
    if multimaster:
517
      msg = "dual-master"
518
    else:
519
      msg = "single-master"
520
    self.feedback_fn("* changing disks into %s mode" % msg)
521
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
522
                                           (self.instance.disks, self.instance),
523
                                           self.instance.name, multimaster)
524
    for node, nres in result.items():
525
      nres.Raise("Cannot change disks config on node %s" % node)
526

    
527
  def _ExecCleanup(self):
528
    """Try to cleanup after a failed migration.
529

530
    The cleanup is done by:
531
      - check that the instance is running only on one node
532
        (and update the config if needed)
533
      - change disks on its secondary node to secondary
534
      - wait until disks are fully synchronized
535
      - disconnect from the network
536
      - change disks into single-master mode
537
      - wait again until disks are fully synchronized
538

539
    """
540
    instance = self.instance
541
    target_node = self.target_node
542
    source_node = self.source_node
543

    
544
    # check running on only one node
545
    self.feedback_fn("* checking where the instance actually runs"
546
                     " (if this hangs, the hypervisor might be in"
547
                     " a bad state)")
548
    cluster_hvparams = self.cfg.GetClusterInfo().hvparams
549
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor],
550
                                        cluster_hvparams)
551
    for node, result in ins_l.items():
552
      result.Raise("Can't contact node %s" % node)
553

    
554
    runningon_source = instance.name in ins_l[source_node].payload
555
    runningon_target = instance.name in ins_l[target_node].payload
556

    
557
    if runningon_source and runningon_target:
558
      raise errors.OpExecError("Instance seems to be running on two nodes,"
559
                               " or the hypervisor is confused; you will have"
560
                               " to ensure manually that it runs only on one"
561
                               " and restart this operation")
562

    
563
    if not (runningon_source or runningon_target):
564
      raise errors.OpExecError("Instance does not seem to be running at all;"
565
                               " in this case it's safer to repair by"
566
                               " running 'gnt-instance stop' to ensure disk"
567
                               " shutdown, and then restarting it")
568

    
569
    if runningon_target:
570
      # the migration has actually succeeded, we need to update the config
571
      self.feedback_fn("* instance running on secondary node (%s),"
572
                       " updating config" % target_node)
573
      instance.primary_node = target_node
574
      self.cfg.Update(instance, self.feedback_fn)
575
      demoted_node = source_node
576
    else:
577
      self.feedback_fn("* instance confirmed to be running on its"
578
                       " primary node (%s)" % source_node)
579
      demoted_node = target_node
580

    
581
    if instance.disk_template in constants.DTS_INT_MIRROR:
582
      self._EnsureSecondary(demoted_node)
583
      try:
584
        self._WaitUntilSync()
585
      except errors.OpExecError:
586
        # we ignore here errors, since if the device is standalone, it
587
        # won't be able to sync
588
        pass
589
      self._GoStandalone()
590
      self._GoReconnect(False)
591
      self._WaitUntilSync()
592

    
593
    self.feedback_fn("* done")
594

    
595
  def _RevertDiskStatus(self):
596
    """Try to revert the disk status after a failed migration.
597

598
    """
599
    target_node = self.target_node
600
    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
601
      return
602

    
603
    try:
604
      self._EnsureSecondary(target_node)
605
      self._GoStandalone()
606
      self._GoReconnect(False)
607
      self._WaitUntilSync()
608
    except errors.OpExecError, err:
609
      self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
610
                         " please try to recover the instance manually;"
611
                         " error '%s'" % str(err))
612

    
613
  def _AbortMigration(self):
614
    """Call the hypervisor code to abort a started migration.
615

616
    """
617
    instance = self.instance
618
    target_node = self.target_node
619
    source_node = self.source_node
620
    migration_info = self.migration_info
621

    
622
    abort_result = self.rpc.call_instance_finalize_migration_dst(target_node,
623
                                                                 instance,
624
                                                                 migration_info,
625
                                                                 False)
626
    abort_msg = abort_result.fail_msg
627
    if abort_msg:
628
      logging.error("Aborting migration failed on target node %s: %s",
629
                    target_node, abort_msg)
630
      # Don't raise an exception here, as we stil have to try to revert the
631
      # disk status, even if this step failed.
632

    
633
    abort_result = self.rpc.call_instance_finalize_migration_src(
634
      source_node, instance, False, self.live)
635
    abort_msg = abort_result.fail_msg
636
    if abort_msg:
637
      logging.error("Aborting migration failed on source node %s: %s",
638
                    source_node, abort_msg)
639

    
640
  def _ExecMigration(self):
641
    """Migrate an instance.
642

643
    The migrate is done by:
644
      - change the disks into dual-master mode
645
      - wait until disks are fully synchronized again
646
      - migrate the instance
647
      - change disks on the new secondary node (the old primary) to secondary
648
      - wait until disks are fully synchronized
649
      - change disks into single-master mode
650

651
    """
652
    instance = self.instance
653
    target_node = self.target_node
654
    source_node = self.source_node
655

    
656
    # Check for hypervisor version mismatch and warn the user.
657
    nodeinfo = self.rpc.call_node_info([source_node, target_node],
658
                                       None, [self.instance.hypervisor], False)
659
    for ninfo in nodeinfo.values():
660
      ninfo.Raise("Unable to retrieve node information from node '%s'" %
661
                  ninfo.node)
662
    (_, _, (src_info, )) = nodeinfo[source_node].payload
663
    (_, _, (dst_info, )) = nodeinfo[target_node].payload
664

    
665
    if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
666
        (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
667
      src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
668
      dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
669
      if src_version != dst_version:
670
        self.feedback_fn("* warning: hypervisor version mismatch between"
671
                         " source (%s) and target (%s) node" %
672
                         (src_version, dst_version))
673

    
674
    self.feedback_fn("* checking disk consistency between source and target")
675
    for (idx, dev) in enumerate(instance.disks):
676
      if not CheckDiskConsistency(self.lu, instance, dev, target_node, False):
677
        raise errors.OpExecError("Disk %s is degraded or not fully"
678
                                 " synchronized on target node,"
679
                                 " aborting migration" % idx)
680

    
681
    if self.current_mem > self.tgt_free_mem:
682
      if not self.allow_runtime_changes:
683
        raise errors.OpExecError("Memory ballooning not allowed and not enough"
684
                                 " free memory to fit instance %s on target"
685
                                 " node %s (have %dMB, need %dMB)" %
686
                                 (instance.name, target_node,
687
                                  self.tgt_free_mem, self.current_mem))
688
      self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
689
      rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
690
                                                     instance,
691
                                                     self.tgt_free_mem)
692
      rpcres.Raise("Cannot modify instance runtime memory")
693

    
694
    # First get the migration information from the remote node
695
    result = self.rpc.call_migration_info(source_node, instance)
696
    msg = result.fail_msg
697
    if msg:
698
      log_err = ("Failed fetching source migration information from %s: %s" %
699
                 (source_node, msg))
700
      logging.error(log_err)
701
      raise errors.OpExecError(log_err)
702

    
703
    self.migration_info = migration_info = result.payload
704

    
705
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
706
      # Then switch the disks to master/master mode
707
      self._EnsureSecondary(target_node)
708
      self._GoStandalone()
709
      self._GoReconnect(True)
710
      self._WaitUntilSync()
711

    
712
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
713
    result = self.rpc.call_accept_instance(target_node,
714
                                           instance,
715
                                           migration_info,
716
                                           self.nodes_ip[target_node])
717

    
718
    msg = result.fail_msg
719
    if msg:
720
      logging.error("Instance pre-migration failed, trying to revert"
721
                    " disk status: %s", msg)
722
      self.feedback_fn("Pre-migration failed, aborting")
723
      self._AbortMigration()
724
      self._RevertDiskStatus()
725
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
726
                               (instance.name, msg))
727

    
728
    self.feedback_fn("* migrating instance to %s" % target_node)
729
    result = self.rpc.call_instance_migrate(source_node, instance,
730
                                            self.nodes_ip[target_node],
731
                                            self.live)
732
    msg = result.fail_msg
733
    if msg:
734
      logging.error("Instance migration failed, trying to revert"
735
                    " disk status: %s", msg)
736
      self.feedback_fn("Migration failed, aborting")
737
      self._AbortMigration()
738
      self._RevertDiskStatus()
739
      raise errors.OpExecError("Could not migrate instance %s: %s" %
740
                               (instance.name, msg))
741

    
742
    self.feedback_fn("* starting memory transfer")
743
    last_feedback = time.time()
744
    while True:
745
      result = self.rpc.call_instance_get_migration_status(source_node,
746
                                                           instance)
747
      msg = result.fail_msg
748
      ms = result.payload   # MigrationStatus instance
749
      if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
750
        logging.error("Instance migration failed, trying to revert"
751
                      " disk status: %s", msg)
752
        self.feedback_fn("Migration failed, aborting")
753
        self._AbortMigration()
754
        self._RevertDiskStatus()
755
        if not msg:
756
          msg = "hypervisor returned failure"
757
        raise errors.OpExecError("Could not migrate instance %s: %s" %
758
                                 (instance.name, msg))
759

    
760
      if result.payload.status != constants.HV_MIGRATION_ACTIVE:
761
        self.feedback_fn("* memory transfer complete")
762
        break
763

    
764
      if (utils.TimeoutExpired(last_feedback,
765
                               self._MIGRATION_FEEDBACK_INTERVAL) and
766
          ms.transferred_ram is not None):
767
        mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
768
        self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
769
        last_feedback = time.time()
770

    
771
      time.sleep(self._MIGRATION_POLL_INTERVAL)
772

    
773
    result = self.rpc.call_instance_finalize_migration_src(source_node,
774
                                                           instance,
775
                                                           True,
776
                                                           self.live)
777
    msg = result.fail_msg
778
    if msg:
779
      logging.error("Instance migration succeeded, but finalization failed"
780
                    " on the source node: %s", msg)
781
      raise errors.OpExecError("Could not finalize instance migration: %s" %
782
                               msg)
783

    
784
    instance.primary_node = target_node
785

    
786
    # distribute new instance config to the other nodes
787
    self.cfg.Update(instance, self.feedback_fn)
788

    
789
    result = self.rpc.call_instance_finalize_migration_dst(target_node,
790
                                                           instance,
791
                                                           migration_info,
792
                                                           True)
793
    msg = result.fail_msg
794
    if msg:
795
      logging.error("Instance migration succeeded, but finalization failed"
796
                    " on the target node: %s", msg)
797
      raise errors.OpExecError("Could not finalize instance migration: %s" %
798
                               msg)
799

    
800
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
801
      self._EnsureSecondary(source_node)
802
      self._WaitUntilSync()
803
      self._GoStandalone()
804
      self._GoReconnect(False)
805
      self._WaitUntilSync()
806

    
807
    # If the instance's disk template is `rbd' or `ext' and there was a
808
    # successful migration, unmap the device from the source node.
809
    if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
810
      disks = ExpandCheckDisks(instance, instance.disks)
811
      self.feedback_fn("* unmapping instance's disks from %s" % source_node)
812
      for disk in disks:
813
        result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance))
814
        msg = result.fail_msg
815
        if msg:
816
          logging.error("Migration was successful, but couldn't unmap the"
817
                        " block device %s on source node %s: %s",
818
                        disk.iv_name, source_node, msg)
819
          logging.error("You need to unmap the device %s manually on %s",
820
                        disk.iv_name, source_node)
821

    
822
    self.feedback_fn("* done")
823

    
824
  def _ExecFailover(self):
825
    """Failover an instance.
826

827
    The failover is done by shutting it down on its present node and
828
    starting it on the secondary.
829

830
    """
831
    instance = self.instance
832
    primary_node = self.cfg.GetNodeInfo(instance.primary_node)
833

    
834
    source_node = instance.primary_node
835
    target_node = self.target_node
836

    
837
    if instance.disks_active:
838
      self.feedback_fn("* checking disk consistency between source and target")
839
      for (idx, dev) in enumerate(instance.disks):
840
        # for drbd, these are drbd over lvm
841
        if not CheckDiskConsistency(self.lu, instance, dev, target_node,
842
                                    False):
843
          if primary_node.offline:
844
            self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
845
                             " target node %s" %
846
                             (primary_node.name, idx, target_node))
847
          elif not self.ignore_consistency:
848
            raise errors.OpExecError("Disk %s is degraded on target node,"
849
                                     " aborting failover" % idx)
850
    else:
851
      self.feedback_fn("* not checking disk consistency as instance is not"
852
                       " running")
853

    
854
    self.feedback_fn("* shutting down instance on source node")
855
    logging.info("Shutting down instance %s on node %s",
856
                 instance.name, source_node)
857

    
858
    result = self.rpc.call_instance_shutdown(source_node, instance,
859
                                             self.shutdown_timeout,
860
                                             self.lu.op.reason)
861
    msg = result.fail_msg
862
    if msg:
863
      if self.ignore_consistency or primary_node.offline:
864
        self.lu.LogWarning("Could not shutdown instance %s on node %s,"
865
                           " proceeding anyway; please make sure node"
866
                           " %s is down; error details: %s",
867
                           instance.name, source_node, source_node, msg)
868
      else:
869
        raise errors.OpExecError("Could not shutdown instance %s on"
870
                                 " node %s: %s" %
871
                                 (instance.name, source_node, msg))
872

    
873
    self.feedback_fn("* deactivating the instance's disks on source node")
874
    if not ShutdownInstanceDisks(self.lu, instance, ignore_primary=True):
875
      raise errors.OpExecError("Can't shut down the instance's disks")
876

    
877
    instance.primary_node = target_node
878
    # distribute new instance config to the other nodes
879
    self.cfg.Update(instance, self.feedback_fn)
880

    
881
    # Only start the instance if it's marked as up
882
    if instance.admin_state == constants.ADMINST_UP:
883
      self.feedback_fn("* activating the instance's disks on target node %s" %
884
                       target_node)
885
      logging.info("Starting instance %s on node %s",
886
                   instance.name, target_node)
887

    
888
      disks_ok, _ = AssembleInstanceDisks(self.lu, instance,
889
                                          ignore_secondaries=True)
890
      if not disks_ok:
891
        ShutdownInstanceDisks(self.lu, instance)
892
        raise errors.OpExecError("Can't activate the instance's disks")
893

    
894
      self.feedback_fn("* starting the instance on the target node %s" %
895
                       target_node)
896
      result = self.rpc.call_instance_start(target_node, (instance, None, None),
897
                                            False, self.lu.op.reason)
898
      msg = result.fail_msg
899
      if msg:
900
        ShutdownInstanceDisks(self.lu, instance)
901
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
902
                                 (instance.name, target_node, msg))
903

    
904
  def Exec(self, feedback_fn):
905
    """Perform the migration.
906

907
    """
908
    self.feedback_fn = feedback_fn
909
    self.source_node = self.instance.primary_node
910

    
911
    # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
912
    if self.instance.disk_template in constants.DTS_INT_MIRROR:
913
      self.target_node = self.instance.secondary_nodes[0]
914
      # Otherwise self.target_node has been populated either
915
      # directly, or through an iallocator.
916

    
917
    self.all_nodes = [self.source_node, self.target_node]
918
    self.nodes_ip = dict((name, node.secondary_ip) for (name, node)
919
                         in self.cfg.GetMultiNodeInfo(self.all_nodes))
920

    
921
    if self.failover:
922
      feedback_fn("Failover instance %s" % self.instance.name)
923
      self._ExecFailover()
924
    else:
925
      feedback_fn("Migrating instance %s" % self.instance.name)
926

    
927
      if self.cleanup:
928
        return self._ExecCleanup()
929
      else:
930
        return self._ExecMigration()