Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_migration.py @ ce81990d

History | View | Annotate | Download (36.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with instance migration an failover."""
23

    
24
import logging
25
import time
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti.masterd import iallocator
31
from ganeti import utils
32
from ganeti.cmdlib.base import LogicalUnit, Tasklet
33
from ganeti.cmdlib.common import ExpandInstanceName, \
34
  CheckIAllocatorOrNode, ExpandNodeName
35
from ganeti.cmdlib.instance_storage import CheckDiskConsistency, \
36
  ExpandCheckDisks, ShutdownInstanceDisks, AssembleInstanceDisks
37
from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \
38
  CheckTargetNodeIPolicy, ReleaseLocks, CheckNodeNotDrained, \
39
  CopyLockList, CheckNodeFreeMemory, CheckInstanceBridgesExist
40

    
41
import ganeti.masterd.instance
42

    
43

    
44
def _ExpandNamesForMigration(lu):
45
  """Expands names for use with L{TLMigrateInstance}.
46

47
  @type lu: L{LogicalUnit}
48

49
  """
50
  if lu.op.target_node is not None:
51
    lu.op.target_node = ExpandNodeName(lu.cfg, lu.op.target_node)
52

    
53
  lu.needed_locks[locking.LEVEL_NODE] = []
54
  lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
55

    
56
  lu.needed_locks[locking.LEVEL_NODE_RES] = []
57
  lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
58

    
59
  # The node allocation lock is actually only needed for externally replicated
60
  # instances (e.g. sharedfile or RBD) and if an iallocator is used.
61
  lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
62

    
63

    
64
def _DeclareLocksForMigration(lu, level):
65
  """Declares locks for L{TLMigrateInstance}.
66

67
  @type lu: L{LogicalUnit}
68
  @param level: Lock level
69

70
  """
71
  if level == locking.LEVEL_NODE_ALLOC:
72
    assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
73

    
74
    instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
75

    
76
    # Node locks are already declared here rather than at LEVEL_NODE as we need
77
    # the instance object anyway to declare the node allocation lock.
78
    if instance.disk_template in constants.DTS_EXT_MIRROR:
79
      if lu.op.target_node is None:
80
        lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
81
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
82
      else:
83
        lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
84
                                               lu.op.target_node]
85
      del lu.recalculate_locks[locking.LEVEL_NODE]
86
    else:
87
      lu._LockInstancesNodes() # pylint: disable=W0212
88

    
89
  elif level == locking.LEVEL_NODE:
90
    # Node locks are declared together with the node allocation lock
91
    assert (lu.needed_locks[locking.LEVEL_NODE] or
92
            lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
93

    
94
  elif level == locking.LEVEL_NODE_RES:
95
    # Copy node locks
96
    lu.needed_locks[locking.LEVEL_NODE_RES] = \
97
      CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
98

    
99

    
100
class LUInstanceFailover(LogicalUnit):
101
  """Failover an instance.
102

103
  """
104
  HPATH = "instance-failover"
105
  HTYPE = constants.HTYPE_INSTANCE
106
  REQ_BGL = False
107

    
108
  def CheckArguments(self):
109
    """Check the arguments.
110

111
    """
112
    self.iallocator = getattr(self.op, "iallocator", None)
113
    self.target_node = getattr(self.op, "target_node", None)
114

    
115
  def ExpandNames(self):
116
    self._ExpandAndLockInstance()
117
    _ExpandNamesForMigration(self)
118

    
119
    self._migrater = \
120
      TLMigrateInstance(self, self.op.instance_name, self.op.cleanup, True,
121
                        False, self.op.ignore_consistency, True,
122
                        self.op.shutdown_timeout, self.op.ignore_ipolicy)
123

    
124
    self.tasklets = [self._migrater]
125

    
126
  def DeclareLocks(self, level):
127
    _DeclareLocksForMigration(self, level)
128

    
129
  def BuildHooksEnv(self):
130
    """Build hooks env.
131

132
    This runs on master, primary and secondary nodes of the instance.
133

134
    """
135
    instance = self._migrater.instance
136
    source_node = instance.primary_node
137
    target_node = self._migrater.target_node
138
    env = {
139
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
140
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
141
      "OLD_PRIMARY": source_node,
142
      "NEW_PRIMARY": target_node,
143
      "FAILOVER_CLEANUP": self.op.cleanup,
144
      }
145

    
146
    if instance.disk_template in constants.DTS_INT_MIRROR:
147
      env["OLD_SECONDARY"] = instance.secondary_nodes[0]
148
      env["NEW_SECONDARY"] = source_node
149
    else:
150
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
151

    
152
    env.update(BuildInstanceHookEnvByObject(self, instance))
153

    
154
    return env
155

    
156
  def BuildHooksNodes(self):
157
    """Build hooks nodes.
158

159
    """
160
    instance = self._migrater.instance
161
    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
162
    nl.append(self._migrater.target_node)
163
    return (nl, nl + [instance.primary_node])
164

    
165

    
166
class LUInstanceMigrate(LogicalUnit):
167
  """Migrate an instance.
168

169
  This is migration without shutting down, compared to the failover,
170
  which is done with shutdown.
171

172
  """
173
  HPATH = "instance-migrate"
174
  HTYPE = constants.HTYPE_INSTANCE
175
  REQ_BGL = False
176

    
177
  def ExpandNames(self):
178
    self._ExpandAndLockInstance()
179
    _ExpandNamesForMigration(self)
180

    
181
    self._migrater = \
182
      TLMigrateInstance(self, self.op.instance_name, self.op.cleanup,
183
                        False, self.op.allow_failover, False,
184
                        self.op.allow_runtime_changes,
185
                        constants.DEFAULT_SHUTDOWN_TIMEOUT,
186
                        self.op.ignore_ipolicy)
187

    
188
    self.tasklets = [self._migrater]
189

    
190
  def DeclareLocks(self, level):
191
    _DeclareLocksForMigration(self, level)
192

    
193
  def BuildHooksEnv(self):
194
    """Build hooks env.
195

196
    This runs on master, primary and secondary nodes of the instance.
197

198
    """
199
    instance = self._migrater.instance
200
    source_node = instance.primary_node
201
    target_node = self._migrater.target_node
202
    env = BuildInstanceHookEnvByObject(self, instance)
203
    env.update({
204
      "MIGRATE_LIVE": self._migrater.live,
205
      "MIGRATE_CLEANUP": self.op.cleanup,
206
      "OLD_PRIMARY": source_node,
207
      "NEW_PRIMARY": target_node,
208
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
209
      })
210

    
211
    if instance.disk_template in constants.DTS_INT_MIRROR:
212
      env["OLD_SECONDARY"] = target_node
213
      env["NEW_SECONDARY"] = source_node
214
    else:
215
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
216

    
217
    return env
218

    
219
  def BuildHooksNodes(self):
220
    """Build hooks nodes.
221

222
    """
223
    instance = self._migrater.instance
224
    snodes = list(instance.secondary_nodes)
225
    nl = [self.cfg.GetMasterNode(), instance.primary_node] + snodes
226
    nl.append(self._migrater.target_node)
227
    return (nl, nl)
228

    
229

    
230
class TLMigrateInstance(Tasklet):
231
  """Tasklet class for instance migration.
232

233
  @type live: boolean
234
  @ivar live: whether the migration will be done live or non-live;
235
      this variable is initalized only after CheckPrereq has run
236
  @type cleanup: boolean
237
  @ivar cleanup: Wheater we cleanup from a failed migration
238
  @type iallocator: string
239
  @ivar iallocator: The iallocator used to determine target_node
240
  @type target_node: string
241
  @ivar target_node: If given, the target_node to reallocate the instance to
242
  @type failover: boolean
243
  @ivar failover: Whether operation results in failover or migration
244
  @type fallback: boolean
245
  @ivar fallback: Whether fallback to failover is allowed if migration not
246
                  possible
247
  @type ignore_consistency: boolean
248
  @ivar ignore_consistency: Wheter we should ignore consistency between source
249
                            and target node
250
  @type shutdown_timeout: int
251
  @ivar shutdown_timeout: In case of failover timeout of the shutdown
252
  @type ignore_ipolicy: bool
253
  @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
254

255
  """
256

    
257
  # Constants
258
  _MIGRATION_POLL_INTERVAL = 1      # seconds
259
  _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
260

    
261
  def __init__(self, lu, instance_name, cleanup, failover, fallback,
262
               ignore_consistency, allow_runtime_changes, shutdown_timeout,
263
               ignore_ipolicy):
264
    """Initializes this class.
265

266
    """
267
    Tasklet.__init__(self, lu)
268

    
269
    # Parameters
270
    self.instance_name = instance_name
271
    self.cleanup = cleanup
272
    self.live = False # will be overridden later
273
    self.failover = failover
274
    self.fallback = fallback
275
    self.ignore_consistency = ignore_consistency
276
    self.shutdown_timeout = shutdown_timeout
277
    self.ignore_ipolicy = ignore_ipolicy
278
    self.allow_runtime_changes = allow_runtime_changes
279

    
280
  def CheckPrereq(self):
281
    """Check prerequisites.
282

283
    This checks that the instance is in the cluster.
284

285
    """
286
    instance_name = ExpandInstanceName(self.lu.cfg, self.instance_name)
287
    instance = self.cfg.GetInstanceInfo(instance_name)
288
    assert instance is not None
289
    self.instance = instance
290
    cluster = self.cfg.GetClusterInfo()
291

    
292
    if (not self.cleanup and
293
        not instance.admin_state == constants.ADMINST_UP and
294
        not self.failover and self.fallback):
295
      self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
296
                      " switching to failover")
297
      self.failover = True
298

    
299
    if instance.disk_template not in constants.DTS_MIRRORED:
300
      if self.failover:
301
        text = "failovers"
302
      else:
303
        text = "migrations"
304
      raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
305
                                 " %s" % (instance.disk_template, text),
306
                                 errors.ECODE_STATE)
307

    
308
    if instance.disk_template in constants.DTS_EXT_MIRROR:
309
      CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
310

    
311
      if self.lu.op.iallocator:
312
        assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
313
        self._RunAllocator()
314
      else:
315
        # We set set self.target_node as it is required by
316
        # BuildHooksEnv
317
        self.target_node = self.lu.op.target_node
318

    
319
      # Check that the target node is correct in terms of instance policy
320
      nodeinfo = self.cfg.GetNodeInfo(self.target_node)
321
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
322
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
323
                                                              group_info)
324
      CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
325
                             ignore=self.ignore_ipolicy)
326

    
327
      # self.target_node is already populated, either directly or by the
328
      # iallocator run
329
      target_node = self.target_node
330
      if self.target_node == instance.primary_node:
331
        raise errors.OpPrereqError("Cannot migrate instance %s"
332
                                   " to its primary (%s)" %
333
                                   (instance.name, instance.primary_node),
334
                                   errors.ECODE_STATE)
335

    
336
      if len(self.lu.tasklets) == 1:
337
        # It is safe to release locks only when we're the only tasklet
338
        # in the LU
339
        ReleaseLocks(self.lu, locking.LEVEL_NODE,
340
                     keep=[instance.primary_node, self.target_node])
341
        ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
342

    
343
    else:
344
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
345

    
346
      secondary_nodes = instance.secondary_nodes
347
      if not secondary_nodes:
348
        raise errors.ConfigurationError("No secondary node but using"
349
                                        " %s disk template" %
350
                                        instance.disk_template)
351
      self.target_node = target_node = secondary_nodes[0]
352
      if self.lu.op.iallocator or (self.lu.op.target_node and
353
                                   self.lu.op.target_node != target_node):
354
        if self.failover:
355
          text = "failed over"
356
        else:
357
          text = "migrated"
358
        raise errors.OpPrereqError("Instances with disk template %s cannot"
359
                                   " be %s to arbitrary nodes"
360
                                   " (neither an iallocator nor a target"
361
                                   " node can be passed)" %
362
                                   (instance.disk_template, text),
363
                                   errors.ECODE_INVAL)
364
      nodeinfo = self.cfg.GetNodeInfo(target_node)
365
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
366
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
367
                                                              group_info)
368
      CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
369
                             ignore=self.ignore_ipolicy)
370

    
371
    i_be = cluster.FillBE(instance)
372

    
373
    # check memory requirements on the secondary node
374
    if (not self.cleanup and
375
         (not self.failover or instance.admin_state == constants.ADMINST_UP)):
376
      self.tgt_free_mem = CheckNodeFreeMemory(self.lu, target_node,
377
                                              "migrating instance %s" %
378
                                              instance.name,
379
                                              i_be[constants.BE_MINMEM],
380
                                              instance.hypervisor)
381
    else:
382
      self.lu.LogInfo("Not checking memory on the secondary node as"
383
                      " instance will not be started")
384

    
385
    # check if failover must be forced instead of migration
386
    if (not self.cleanup and not self.failover and
387
        i_be[constants.BE_ALWAYS_FAILOVER]):
388
      self.lu.LogInfo("Instance configured to always failover; fallback"
389
                      " to failover")
390
      self.failover = True
391

    
392
    # check bridge existance
393
    CheckInstanceBridgesExist(self.lu, instance, node=target_node)
394

    
395
    if not self.cleanup:
396
      CheckNodeNotDrained(self.lu, target_node)
397
      if not self.failover:
398
        result = self.rpc.call_instance_migratable(instance.primary_node,
399
                                                   instance)
400
        if result.fail_msg and self.fallback:
401
          self.lu.LogInfo("Can't migrate, instance offline, fallback to"
402
                          " failover")
403
          self.failover = True
404
        else:
405
          result.Raise("Can't migrate, please use failover",
406
                       prereq=True, ecode=errors.ECODE_STATE)
407

    
408
    assert not (self.failover and self.cleanup)
409

    
410
    if not self.failover:
411
      if self.lu.op.live is not None and self.lu.op.mode is not None:
412
        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
413
                                   " parameters are accepted",
414
                                   errors.ECODE_INVAL)
415
      if self.lu.op.live is not None:
416
        if self.lu.op.live:
417
          self.lu.op.mode = constants.HT_MIGRATION_LIVE
418
        else:
419
          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
420
        # reset the 'live' parameter to None so that repeated
421
        # invocations of CheckPrereq do not raise an exception
422
        self.lu.op.live = None
423
      elif self.lu.op.mode is None:
424
        # read the default value from the hypervisor
425
        i_hv = cluster.FillHV(self.instance, skip_globals=False)
426
        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
427

    
428
      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
429
    else:
430
      # Failover is never live
431
      self.live = False
432

    
433
    if not (self.failover or self.cleanup):
434
      remote_info = self.rpc.call_instance_info(instance.primary_node,
435
                                                instance.name,
436
                                                instance.hypervisor)
437
      remote_info.Raise("Error checking instance on node %s" %
438
                        instance.primary_node)
439
      instance_running = bool(remote_info.payload)
440
      if instance_running:
441
        self.current_mem = int(remote_info.payload["memory"])
442

    
443
  def _RunAllocator(self):
444
    """Run the allocator based on input opcode.
445

446
    """
447
    assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
448

    
449
    # FIXME: add a self.ignore_ipolicy option
450
    req = iallocator.IAReqRelocate(name=self.instance_name,
451
                                   relocate_from=[self.instance.primary_node])
452
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
453

    
454
    ial.Run(self.lu.op.iallocator)
455

    
456
    if not ial.success:
457
      raise errors.OpPrereqError("Can't compute nodes using"
458
                                 " iallocator '%s': %s" %
459
                                 (self.lu.op.iallocator, ial.info),
460
                                 errors.ECODE_NORES)
461
    self.target_node = ial.result[0]
462
    self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
463
                    self.instance_name, self.lu.op.iallocator,
464
                    utils.CommaJoin(ial.result))
465

    
466
  def _WaitUntilSync(self):
467
    """Poll with custom rpc for disk sync.
468

469
    This uses our own step-based rpc call.
470

471
    """
472
    self.feedback_fn("* wait until resync is done")
473
    all_done = False
474
    while not all_done:
475
      all_done = True
476
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
477
                                            self.nodes_ip,
478
                                            (self.instance.disks,
479
                                             self.instance))
480
      min_percent = 100
481
      for node, nres in result.items():
482
        nres.Raise("Cannot resync disks on node %s" % node)
483
        node_done, node_percent = nres.payload
484
        all_done = all_done and node_done
485
        if node_percent is not None:
486
          min_percent = min(min_percent, node_percent)
487
      if not all_done:
488
        if min_percent < 100:
489
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
490
        time.sleep(2)
491

    
492
  def _EnsureSecondary(self, node):
493
    """Demote a node to secondary.
494

495
    """
496
    self.feedback_fn("* switching node %s to secondary mode" % node)
497

    
498
    for dev in self.instance.disks:
499
      self.cfg.SetDiskID(dev, node)
500

    
501
    result = self.rpc.call_blockdev_close(node, self.instance.name,
502
                                          self.instance.disks)
503
    result.Raise("Cannot change disk to secondary on node %s" % node)
504

    
505
  def _GoStandalone(self):
506
    """Disconnect from the network.
507

508
    """
509
    self.feedback_fn("* changing into standalone mode")
510
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
511
                                               self.instance.disks)
512
    for node, nres in result.items():
513
      nres.Raise("Cannot disconnect disks node %s" % node)
514

    
515
  def _GoReconnect(self, multimaster):
516
    """Reconnect to the network.
517

518
    """
519
    if multimaster:
520
      msg = "dual-master"
521
    else:
522
      msg = "single-master"
523
    self.feedback_fn("* changing disks into %s mode" % msg)
524
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
525
                                           (self.instance.disks, self.instance),
526
                                           self.instance.name, multimaster)
527
    for node, nres in result.items():
528
      nres.Raise("Cannot change disks config on node %s" % node)
529

    
530
  def _ExecCleanup(self):
531
    """Try to cleanup after a failed migration.
532

533
    The cleanup is done by:
534
      - check that the instance is running only on one node
535
        (and update the config if needed)
536
      - change disks on its secondary node to secondary
537
      - wait until disks are fully synchronized
538
      - disconnect from the network
539
      - change disks into single-master mode
540
      - wait again until disks are fully synchronized
541

542
    """
543
    instance = self.instance
544
    target_node = self.target_node
545
    source_node = self.source_node
546

    
547
    # check running on only one node
548
    self.feedback_fn("* checking where the instance actually runs"
549
                     " (if this hangs, the hypervisor might be in"
550
                     " a bad state)")
551
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
552
    for node, result in ins_l.items():
553
      result.Raise("Can't contact node %s" % node)
554

    
555
    runningon_source = instance.name in ins_l[source_node].payload
556
    runningon_target = instance.name in ins_l[target_node].payload
557

    
558
    if runningon_source and runningon_target:
559
      raise errors.OpExecError("Instance seems to be running on two nodes,"
560
                               " or the hypervisor is confused; you will have"
561
                               " to ensure manually that it runs only on one"
562
                               " and restart this operation")
563

    
564
    if not (runningon_source or runningon_target):
565
      raise errors.OpExecError("Instance does not seem to be running at all;"
566
                               " in this case it's safer to repair by"
567
                               " running 'gnt-instance stop' to ensure disk"
568
                               " shutdown, and then restarting it")
569

    
570
    if runningon_target:
571
      # the migration has actually succeeded, we need to update the config
572
      self.feedback_fn("* instance running on secondary node (%s),"
573
                       " updating config" % target_node)
574
      instance.primary_node = target_node
575
      self.cfg.Update(instance, self.feedback_fn)
576
      demoted_node = source_node
577
    else:
578
      self.feedback_fn("* instance confirmed to be running on its"
579
                       " primary node (%s)" % source_node)
580
      demoted_node = target_node
581

    
582
    if instance.disk_template in constants.DTS_INT_MIRROR:
583
      self._EnsureSecondary(demoted_node)
584
      try:
585
        self._WaitUntilSync()
586
      except errors.OpExecError:
587
        # we ignore here errors, since if the device is standalone, it
588
        # won't be able to sync
589
        pass
590
      self._GoStandalone()
591
      self._GoReconnect(False)
592
      self._WaitUntilSync()
593

    
594
    self.feedback_fn("* done")
595

    
596
  def _RevertDiskStatus(self):
597
    """Try to revert the disk status after a failed migration.
598

599
    """
600
    target_node = self.target_node
601
    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
602
      return
603

    
604
    try:
605
      self._EnsureSecondary(target_node)
606
      self._GoStandalone()
607
      self._GoReconnect(False)
608
      self._WaitUntilSync()
609
    except errors.OpExecError, err:
610
      self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
611
                         " please try to recover the instance manually;"
612
                         " error '%s'" % str(err))
613

    
614
  def _AbortMigration(self):
615
    """Call the hypervisor code to abort a started migration.
616

617
    """
618
    instance = self.instance
619
    target_node = self.target_node
620
    source_node = self.source_node
621
    migration_info = self.migration_info
622

    
623
    abort_result = self.rpc.call_instance_finalize_migration_dst(target_node,
624
                                                                 instance,
625
                                                                 migration_info,
626
                                                                 False)
627
    abort_msg = abort_result.fail_msg
628
    if abort_msg:
629
      logging.error("Aborting migration failed on target node %s: %s",
630
                    target_node, abort_msg)
631
      # Don't raise an exception here, as we stil have to try to revert the
632
      # disk status, even if this step failed.
633

    
634
    abort_result = self.rpc.call_instance_finalize_migration_src(
635
      source_node, instance, False, self.live)
636
    abort_msg = abort_result.fail_msg
637
    if abort_msg:
638
      logging.error("Aborting migration failed on source node %s: %s",
639
                    source_node, abort_msg)
640

    
641
  def _ExecMigration(self):
642
    """Migrate an instance.
643

644
    The migrate is done by:
645
      - change the disks into dual-master mode
646
      - wait until disks are fully synchronized again
647
      - migrate the instance
648
      - change disks on the new secondary node (the old primary) to secondary
649
      - wait until disks are fully synchronized
650
      - change disks into single-master mode
651

652
    """
653
    instance = self.instance
654
    target_node = self.target_node
655
    source_node = self.source_node
656

    
657
    # Check for hypervisor version mismatch and warn the user.
658
    nodeinfo = self.rpc.call_node_info([source_node, target_node],
659
                                       None, [self.instance.hypervisor], False)
660
    for ninfo in nodeinfo.values():
661
      ninfo.Raise("Unable to retrieve node information from node '%s'" %
662
                  ninfo.node)
663
    (_, _, (src_info, )) = nodeinfo[source_node].payload
664
    (_, _, (dst_info, )) = nodeinfo[target_node].payload
665

    
666
    if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
667
        (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
668
      src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
669
      dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
670
      if src_version != dst_version:
671
        self.feedback_fn("* warning: hypervisor version mismatch between"
672
                         " source (%s) and target (%s) node" %
673
                         (src_version, dst_version))
674

    
675
    self.feedback_fn("* checking disk consistency between source and target")
676
    for (idx, dev) in enumerate(instance.disks):
677
      if not CheckDiskConsistency(self.lu, instance, dev, target_node, False):
678
        raise errors.OpExecError("Disk %s is degraded or not fully"
679
                                 " synchronized on target node,"
680
                                 " aborting migration" % idx)
681

    
682
    if self.current_mem > self.tgt_free_mem:
683
      if not self.allow_runtime_changes:
684
        raise errors.OpExecError("Memory ballooning not allowed and not enough"
685
                                 " free memory to fit instance %s on target"
686
                                 " node %s (have %dMB, need %dMB)" %
687
                                 (instance.name, target_node,
688
                                  self.tgt_free_mem, self.current_mem))
689
      self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
690
      rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
691
                                                     instance,
692
                                                     self.tgt_free_mem)
693
      rpcres.Raise("Cannot modify instance runtime memory")
694

    
695
    # First get the migration information from the remote node
696
    result = self.rpc.call_migration_info(source_node, instance)
697
    msg = result.fail_msg
698
    if msg:
699
      log_err = ("Failed fetching source migration information from %s: %s" %
700
                 (source_node, msg))
701
      logging.error(log_err)
702
      raise errors.OpExecError(log_err)
703

    
704
    self.migration_info = migration_info = result.payload
705

    
706
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
707
      # Then switch the disks to master/master mode
708
      self._EnsureSecondary(target_node)
709
      self._GoStandalone()
710
      self._GoReconnect(True)
711
      self._WaitUntilSync()
712

    
713
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
714
    # This fills physical_id slot that may be missing on newly created disks
715
    for disk in instance.disks:
716
      self.cfg.SetDiskID(disk, target_node)
717
    result = self.rpc.call_accept_instance(target_node,
718
                                           instance,
719
                                           migration_info,
720
                                           self.nodes_ip[target_node])
721

    
722
    msg = result.fail_msg
723
    if msg:
724
      logging.error("Instance pre-migration failed, trying to revert"
725
                    " disk status: %s", msg)
726
      self.feedback_fn("Pre-migration failed, aborting")
727
      self._AbortMigration()
728
      self._RevertDiskStatus()
729
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
730
                               (instance.name, msg))
731

    
732
    self.feedback_fn("* migrating instance to %s" % target_node)
733
    result = self.rpc.call_instance_migrate(source_node, instance,
734
                                            self.nodes_ip[target_node],
735
                                            self.live)
736
    msg = result.fail_msg
737
    if msg:
738
      logging.error("Instance migration failed, trying to revert"
739
                    " disk status: %s", msg)
740
      self.feedback_fn("Migration failed, aborting")
741
      self._AbortMigration()
742
      self._RevertDiskStatus()
743
      raise errors.OpExecError("Could not migrate instance %s: %s" %
744
                               (instance.name, msg))
745

    
746
    self.feedback_fn("* starting memory transfer")
747
    last_feedback = time.time()
748
    while True:
749
      result = self.rpc.call_instance_get_migration_status(source_node,
750
                                                           instance)
751
      msg = result.fail_msg
752
      ms = result.payload   # MigrationStatus instance
753
      if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
754
        logging.error("Instance migration failed, trying to revert"
755
                      " disk status: %s", msg)
756
        self.feedback_fn("Migration failed, aborting")
757
        self._AbortMigration()
758
        self._RevertDiskStatus()
759
        if not msg:
760
          msg = "hypervisor returned failure"
761
        raise errors.OpExecError("Could not migrate instance %s: %s" %
762
                                 (instance.name, msg))
763

    
764
      if result.payload.status != constants.HV_MIGRATION_ACTIVE:
765
        self.feedback_fn("* memory transfer complete")
766
        break
767

    
768
      if (utils.TimeoutExpired(last_feedback,
769
                               self._MIGRATION_FEEDBACK_INTERVAL) and
770
          ms.transferred_ram is not None):
771
        mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
772
        self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
773
        last_feedback = time.time()
774

    
775
      time.sleep(self._MIGRATION_POLL_INTERVAL)
776

    
777
    result = self.rpc.call_instance_finalize_migration_src(source_node,
778
                                                           instance,
779
                                                           True,
780
                                                           self.live)
781
    msg = result.fail_msg
782
    if msg:
783
      logging.error("Instance migration succeeded, but finalization failed"
784
                    " on the source node: %s", msg)
785
      raise errors.OpExecError("Could not finalize instance migration: %s" %
786
                               msg)
787

    
788
    instance.primary_node = target_node
789

    
790
    # distribute new instance config to the other nodes
791
    self.cfg.Update(instance, self.feedback_fn)
792

    
793
    result = self.rpc.call_instance_finalize_migration_dst(target_node,
794
                                                           instance,
795
                                                           migration_info,
796
                                                           True)
797
    msg = result.fail_msg
798
    if msg:
799
      logging.error("Instance migration succeeded, but finalization failed"
800
                    " on the target node: %s", msg)
801
      raise errors.OpExecError("Could not finalize instance migration: %s" %
802
                               msg)
803

    
804
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
805
      self._EnsureSecondary(source_node)
806
      self._WaitUntilSync()
807
      self._GoStandalone()
808
      self._GoReconnect(False)
809
      self._WaitUntilSync()
810

    
811
    # If the instance's disk template is `rbd' or `ext' and there was a
812
    # successful migration, unmap the device from the source node.
813
    if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
814
      disks = ExpandCheckDisks(instance, instance.disks)
815
      self.feedback_fn("* unmapping instance's disks from %s" % source_node)
816
      for disk in disks:
817
        result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance))
818
        msg = result.fail_msg
819
        if msg:
820
          logging.error("Migration was successful, but couldn't unmap the"
821
                        " block device %s on source node %s: %s",
822
                        disk.iv_name, source_node, msg)
823
          logging.error("You need to unmap the device %s manually on %s",
824
                        disk.iv_name, source_node)
825

    
826
    self.feedback_fn("* done")
827

    
828
  def _ExecFailover(self):
829
    """Failover an instance.
830

831
    The failover is done by shutting it down on its present node and
832
    starting it on the secondary.
833

834
    """
835
    instance = self.instance
836
    primary_node = self.cfg.GetNodeInfo(instance.primary_node)
837

    
838
    source_node = instance.primary_node
839
    target_node = self.target_node
840

    
841
    if instance.disks_active:
842
      self.feedback_fn("* checking disk consistency between source and target")
843
      for (idx, dev) in enumerate(instance.disks):
844
        # for drbd, these are drbd over lvm
845
        if not CheckDiskConsistency(self.lu, instance, dev, target_node,
846
                                    False):
847
          if primary_node.offline:
848
            self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
849
                             " target node %s" %
850
                             (primary_node.name, idx, target_node))
851
          elif not self.ignore_consistency:
852
            raise errors.OpExecError("Disk %s is degraded on target node,"
853
                                     " aborting failover" % idx)
854
    else:
855
      self.feedback_fn("* not checking disk consistency as instance is not"
856
                       " running")
857

    
858
    self.feedback_fn("* shutting down instance on source node")
859
    logging.info("Shutting down instance %s on node %s",
860
                 instance.name, source_node)
861

    
862
    result = self.rpc.call_instance_shutdown(source_node, instance,
863
                                             self.shutdown_timeout,
864
                                             self.lu.op.reason)
865
    msg = result.fail_msg
866
    if msg:
867
      if self.ignore_consistency or primary_node.offline:
868
        self.lu.LogWarning("Could not shutdown instance %s on node %s,"
869
                           " proceeding anyway; please make sure node"
870
                           " %s is down; error details: %s",
871
                           instance.name, source_node, source_node, msg)
872
      else:
873
        raise errors.OpExecError("Could not shutdown instance %s on"
874
                                 " node %s: %s" %
875
                                 (instance.name, source_node, msg))
876

    
877
    self.feedback_fn("* deactivating the instance's disks on source node")
878
    if not ShutdownInstanceDisks(self.lu, instance, ignore_primary=True):
879
      raise errors.OpExecError("Can't shut down the instance's disks")
880

    
881
    instance.primary_node = target_node
882
    # distribute new instance config to the other nodes
883
    self.cfg.Update(instance, self.feedback_fn)
884

    
885
    # Only start the instance if it's marked as up
886
    if instance.admin_state == constants.ADMINST_UP:
887
      self.feedback_fn("* activating the instance's disks on target node %s" %
888
                       target_node)
889
      logging.info("Starting instance %s on node %s",
890
                   instance.name, target_node)
891

    
892
      disks_ok, _ = AssembleInstanceDisks(self.lu, instance,
893
                                          ignore_secondaries=True)
894
      if not disks_ok:
895
        ShutdownInstanceDisks(self.lu, instance)
896
        raise errors.OpExecError("Can't activate the instance's disks")
897

    
898
      self.feedback_fn("* starting the instance on the target node %s" %
899
                       target_node)
900
      result = self.rpc.call_instance_start(target_node, (instance, None, None),
901
                                            False, self.lu.op.reason)
902
      msg = result.fail_msg
903
      if msg:
904
        ShutdownInstanceDisks(self.lu, instance)
905
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
906
                                 (instance.name, target_node, msg))
907

    
908
  def Exec(self, feedback_fn):
909
    """Perform the migration.
910

911
    """
912
    self.feedback_fn = feedback_fn
913
    self.source_node = self.instance.primary_node
914

    
915
    # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
916
    if self.instance.disk_template in constants.DTS_INT_MIRROR:
917
      self.target_node = self.instance.secondary_nodes[0]
918
      # Otherwise self.target_node has been populated either
919
      # directly, or through an iallocator.
920

    
921
    self.all_nodes = [self.source_node, self.target_node]
922
    self.nodes_ip = dict((name, node.secondary_ip) for (name, node)
923
                         in self.cfg.GetMultiNodeInfo(self.all_nodes))
924

    
925
    if self.failover:
926
      feedback_fn("Failover instance %s" % self.instance.name)
927
      self._ExecFailover()
928
    else:
929
      feedback_fn("Migrating instance %s" % self.instance.name)
930

    
931
      if self.cleanup:
932
        return self._ExecCleanup()
933
      else:
934
        return self._ExecMigration()