Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_migration.py @ f6d4260c

History | View | Annotate | Download (35.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with instance migration an failover."""
23

    
24
import logging
25
import time
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti.masterd import iallocator
31
from ganeti import utils
32
from ganeti.cmdlib.base import LogicalUnit, Tasklet
33
from ganeti.cmdlib.common import ExpandInstanceName, \
34
  CheckIAllocatorOrNode, ExpandNodeName
35
from ganeti.cmdlib.instance_storage import CheckDiskConsistency, \
36
  ExpandCheckDisks, ShutdownInstanceDisks, AssembleInstanceDisks
37
from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \
38
  CheckTargetNodeIPolicy, ReleaseLocks, CheckNodeNotDrained, \
39
  CopyLockList, CheckNodeFreeMemory, CheckInstanceBridgesExist
40

    
41
import ganeti.masterd.instance
42

    
43

    
44
def _ExpandNamesForMigration(lu):
45
  """Expands names for use with L{TLMigrateInstance}.
46

47
  @type lu: L{LogicalUnit}
48

49
  """
50
  if lu.op.target_node is not None:
51
    lu.op.target_node = ExpandNodeName(lu.cfg, lu.op.target_node)
52

    
53
  lu.needed_locks[locking.LEVEL_NODE] = []
54
  lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
55

    
56
  lu.needed_locks[locking.LEVEL_NODE_RES] = []
57
  lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
58

    
59
  # The node allocation lock is actually only needed for externally replicated
60
  # instances (e.g. sharedfile or RBD) and if an iallocator is used.
61
  lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
62

    
63

    
64
def _DeclareLocksForMigration(lu, level):
65
  """Declares locks for L{TLMigrateInstance}.
66

67
  @type lu: L{LogicalUnit}
68
  @param level: Lock level
69

70
  """
71
  if level == locking.LEVEL_NODE_ALLOC:
72
    assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
73

    
74
    instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
75

    
76
    # Node locks are already declared here rather than at LEVEL_NODE as we need
77
    # the instance object anyway to declare the node allocation lock.
78
    if instance.disk_template in constants.DTS_EXT_MIRROR:
79
      if lu.op.target_node is None:
80
        lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
81
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
82
      else:
83
        lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
84
                                               lu.op.target_node]
85
      del lu.recalculate_locks[locking.LEVEL_NODE]
86
    else:
87
      lu._LockInstancesNodes() # pylint: disable=W0212
88

    
89
  elif level == locking.LEVEL_NODE:
90
    # Node locks are declared together with the node allocation lock
91
    assert (lu.needed_locks[locking.LEVEL_NODE] or
92
            lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
93

    
94
  elif level == locking.LEVEL_NODE_RES:
95
    # Copy node locks
96
    lu.needed_locks[locking.LEVEL_NODE_RES] = \
97
      CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
98

    
99

    
100
class LUInstanceFailover(LogicalUnit):
101
  """Failover an instance.
102

103
  """
104
  HPATH = "instance-failover"
105
  HTYPE = constants.HTYPE_INSTANCE
106
  REQ_BGL = False
107

    
108
  def CheckArguments(self):
109
    """Check the arguments.
110

111
    """
112
    self.iallocator = getattr(self.op, "iallocator", None)
113
    self.target_node = getattr(self.op, "target_node", None)
114

    
115
  def ExpandNames(self):
116
    self._ExpandAndLockInstance()
117
    _ExpandNamesForMigration(self)
118

    
119
    self._migrater = \
120
      TLMigrateInstance(self, self.op.instance_name, self.op.cleanup, True,
121
                        False, self.op.ignore_consistency, True,
122
                        self.op.shutdown_timeout, self.op.ignore_ipolicy)
123

    
124
    self.tasklets = [self._migrater]
125

    
126
  def DeclareLocks(self, level):
127
    _DeclareLocksForMigration(self, level)
128

    
129
  def BuildHooksEnv(self):
130
    """Build hooks env.
131

132
    This runs on master, primary and secondary nodes of the instance.
133

134
    """
135
    instance = self._migrater.instance
136
    source_node = instance.primary_node
137
    target_node = self.op.target_node
138
    env = {
139
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
140
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
141
      "OLD_PRIMARY": source_node,
142
      "NEW_PRIMARY": target_node,
143
      "FAILOVER_CLEANUP": self.op.cleanup,
144
      }
145

    
146
    if instance.disk_template in constants.DTS_INT_MIRROR:
147
      env["OLD_SECONDARY"] = instance.secondary_nodes[0]
148
      env["NEW_SECONDARY"] = source_node
149
    else:
150
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
151

    
152
    env.update(BuildInstanceHookEnvByObject(self, instance))
153

    
154
    return env
155

    
156
  def BuildHooksNodes(self):
157
    """Build hooks nodes.
158

159
    """
160
    instance = self._migrater.instance
161
    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
162
    return (nl, nl + [instance.primary_node])
163

    
164

    
165
class LUInstanceMigrate(LogicalUnit):
166
  """Migrate an instance.
167

168
  This is migration without shutting down, compared to the failover,
169
  which is done with shutdown.
170

171
  """
172
  HPATH = "instance-migrate"
173
  HTYPE = constants.HTYPE_INSTANCE
174
  REQ_BGL = False
175

    
176
  def ExpandNames(self):
177
    self._ExpandAndLockInstance()
178
    _ExpandNamesForMigration(self)
179

    
180
    self._migrater = \
181
      TLMigrateInstance(self, self.op.instance_name, self.op.cleanup,
182
                        False, self.op.allow_failover, False,
183
                        self.op.allow_runtime_changes,
184
                        constants.DEFAULT_SHUTDOWN_TIMEOUT,
185
                        self.op.ignore_ipolicy)
186

    
187
    self.tasklets = [self._migrater]
188

    
189
  def DeclareLocks(self, level):
190
    _DeclareLocksForMigration(self, level)
191

    
192
  def BuildHooksEnv(self):
193
    """Build hooks env.
194

195
    This runs on master, primary and secondary nodes of the instance.
196

197
    """
198
    instance = self._migrater.instance
199
    source_node = instance.primary_node
200
    target_node = self.op.target_node
201
    env = BuildInstanceHookEnvByObject(self, instance)
202
    env.update({
203
      "MIGRATE_LIVE": self._migrater.live,
204
      "MIGRATE_CLEANUP": self.op.cleanup,
205
      "OLD_PRIMARY": source_node,
206
      "NEW_PRIMARY": target_node,
207
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
208
      })
209

    
210
    if instance.disk_template in constants.DTS_INT_MIRROR:
211
      env["OLD_SECONDARY"] = target_node
212
      env["NEW_SECONDARY"] = source_node
213
    else:
214
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
215

    
216
    return env
217

    
218
  def BuildHooksNodes(self):
219
    """Build hooks nodes.
220

221
    """
222
    instance = self._migrater.instance
223
    snodes = list(instance.secondary_nodes)
224
    nl = [self.cfg.GetMasterNode(), instance.primary_node] + snodes
225
    return (nl, nl)
226

    
227

    
228
class TLMigrateInstance(Tasklet):
229
  """Tasklet class for instance migration.
230

231
  @type live: boolean
232
  @ivar live: whether the migration will be done live or non-live;
233
      this variable is initalized only after CheckPrereq has run
234
  @type cleanup: boolean
235
  @ivar cleanup: Wheater we cleanup from a failed migration
236
  @type iallocator: string
237
  @ivar iallocator: The iallocator used to determine target_node
238
  @type target_node: string
239
  @ivar target_node: If given, the target_node to reallocate the instance to
240
  @type failover: boolean
241
  @ivar failover: Whether operation results in failover or migration
242
  @type fallback: boolean
243
  @ivar fallback: Whether fallback to failover is allowed if migration not
244
                  possible
245
  @type ignore_consistency: boolean
246
  @ivar ignore_consistency: Wheter we should ignore consistency between source
247
                            and target node
248
  @type shutdown_timeout: int
249
  @ivar shutdown_timeout: In case of failover timeout of the shutdown
250
  @type ignore_ipolicy: bool
251
  @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
252

253
  """
254

    
255
  # Constants
256
  _MIGRATION_POLL_INTERVAL = 1      # seconds
257
  _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
258

    
259
  def __init__(self, lu, instance_name, cleanup, failover, fallback,
260
               ignore_consistency, allow_runtime_changes, shutdown_timeout,
261
               ignore_ipolicy):
262
    """Initializes this class.
263

264
    """
265
    Tasklet.__init__(self, lu)
266

    
267
    # Parameters
268
    self.instance_name = instance_name
269
    self.cleanup = cleanup
270
    self.live = False # will be overridden later
271
    self.failover = failover
272
    self.fallback = fallback
273
    self.ignore_consistency = ignore_consistency
274
    self.shutdown_timeout = shutdown_timeout
275
    self.ignore_ipolicy = ignore_ipolicy
276
    self.allow_runtime_changes = allow_runtime_changes
277

    
278
  def CheckPrereq(self):
279
    """Check prerequisites.
280

281
    This checks that the instance is in the cluster.
282

283
    """
284
    instance_name = ExpandInstanceName(self.lu.cfg, self.instance_name)
285
    instance = self.cfg.GetInstanceInfo(instance_name)
286
    assert instance is not None
287
    self.instance = instance
288
    cluster = self.cfg.GetClusterInfo()
289

    
290
    if (not self.cleanup and
291
        not instance.admin_state == constants.ADMINST_UP and
292
        not self.failover and self.fallback):
293
      self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
294
                      " switching to failover")
295
      self.failover = True
296

    
297
    if instance.disk_template not in constants.DTS_MIRRORED:
298
      if self.failover:
299
        text = "failovers"
300
      else:
301
        text = "migrations"
302
      raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
303
                                 " %s" % (instance.disk_template, text),
304
                                 errors.ECODE_STATE)
305

    
306
    if instance.disk_template in constants.DTS_EXT_MIRROR:
307
      CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
308

    
309
      if self.lu.op.iallocator:
310
        assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
311
        self._RunAllocator()
312
      else:
313
        # We set set self.target_node as it is required by
314
        # BuildHooksEnv
315
        self.target_node = self.lu.op.target_node
316

    
317
      # Check that the target node is correct in terms of instance policy
318
      nodeinfo = self.cfg.GetNodeInfo(self.target_node)
319
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
320
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
321
                                                              group_info)
322
      CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
323
                             ignore=self.ignore_ipolicy)
324

    
325
      # self.target_node is already populated, either directly or by the
326
      # iallocator run
327
      target_node = self.target_node
328
      if self.target_node == instance.primary_node:
329
        raise errors.OpPrereqError("Cannot migrate instance %s"
330
                                   " to its primary (%s)" %
331
                                   (instance.name, instance.primary_node),
332
                                   errors.ECODE_STATE)
333

    
334
      if len(self.lu.tasklets) == 1:
335
        # It is safe to release locks only when we're the only tasklet
336
        # in the LU
337
        ReleaseLocks(self.lu, locking.LEVEL_NODE,
338
                     keep=[instance.primary_node, self.target_node])
339
        ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
340

    
341
    else:
342
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
343

    
344
      secondary_nodes = instance.secondary_nodes
345
      if not secondary_nodes:
346
        raise errors.ConfigurationError("No secondary node but using"
347
                                        " %s disk template" %
348
                                        instance.disk_template)
349
      target_node = secondary_nodes[0]
350
      if self.lu.op.iallocator or (self.lu.op.target_node and
351
                                   self.lu.op.target_node != target_node):
352
        if self.failover:
353
          text = "failed over"
354
        else:
355
          text = "migrated"
356
        raise errors.OpPrereqError("Instances with disk template %s cannot"
357
                                   " be %s to arbitrary nodes"
358
                                   " (neither an iallocator nor a target"
359
                                   " node can be passed)" %
360
                                   (instance.disk_template, text),
361
                                   errors.ECODE_INVAL)
362
      nodeinfo = self.cfg.GetNodeInfo(target_node)
363
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
364
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
365
                                                              group_info)
366
      CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
367
                             ignore=self.ignore_ipolicy)
368

    
369
    i_be = cluster.FillBE(instance)
370

    
371
    # check memory requirements on the secondary node
372
    if (not self.cleanup and
373
         (not self.failover or instance.admin_state == constants.ADMINST_UP)):
374
      self.tgt_free_mem = CheckNodeFreeMemory(self.lu, target_node,
375
                                              "migrating instance %s" %
376
                                              instance.name,
377
                                              i_be[constants.BE_MINMEM],
378
                                              instance.hypervisor)
379
    else:
380
      self.lu.LogInfo("Not checking memory on the secondary node as"
381
                      " instance will not be started")
382

    
383
    # check if failover must be forced instead of migration
384
    if (not self.cleanup and not self.failover and
385
        i_be[constants.BE_ALWAYS_FAILOVER]):
386
      self.lu.LogInfo("Instance configured to always failover; fallback"
387
                      " to failover")
388
      self.failover = True
389

    
390
    # check bridge existance
391
    CheckInstanceBridgesExist(self.lu, instance, node=target_node)
392

    
393
    if not self.cleanup:
394
      CheckNodeNotDrained(self.lu, target_node)
395
      if not self.failover:
396
        result = self.rpc.call_instance_migratable(instance.primary_node,
397
                                                   instance)
398
        if result.fail_msg and self.fallback:
399
          self.lu.LogInfo("Can't migrate, instance offline, fallback to"
400
                          " failover")
401
          self.failover = True
402
        else:
403
          result.Raise("Can't migrate, please use failover",
404
                       prereq=True, ecode=errors.ECODE_STATE)
405

    
406
    assert not (self.failover and self.cleanup)
407

    
408
    if not self.failover:
409
      if self.lu.op.live is not None and self.lu.op.mode is not None:
410
        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
411
                                   " parameters are accepted",
412
                                   errors.ECODE_INVAL)
413
      if self.lu.op.live is not None:
414
        if self.lu.op.live:
415
          self.lu.op.mode = constants.HT_MIGRATION_LIVE
416
        else:
417
          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
418
        # reset the 'live' parameter to None so that repeated
419
        # invocations of CheckPrereq do not raise an exception
420
        self.lu.op.live = None
421
      elif self.lu.op.mode is None:
422
        # read the default value from the hypervisor
423
        i_hv = cluster.FillHV(self.instance, skip_globals=False)
424
        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
425

    
426
      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
427
    else:
428
      # Failover is never live
429
      self.live = False
430

    
431
    if not (self.failover or self.cleanup):
432
      remote_info = self.rpc.call_instance_info(instance.primary_node,
433
                                                instance.name,
434
                                                instance.hypervisor)
435
      remote_info.Raise("Error checking instance on node %s" %
436
                        instance.primary_node)
437
      instance_running = bool(remote_info.payload)
438
      if instance_running:
439
        self.current_mem = int(remote_info.payload["memory"])
440

    
441
  def _RunAllocator(self):
442
    """Run the allocator based on input opcode.
443

444
    """
445
    assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
446

    
447
    # FIXME: add a self.ignore_ipolicy option
448
    req = iallocator.IAReqRelocate(name=self.instance_name,
449
                                   relocate_from=[self.instance.primary_node])
450
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
451

    
452
    ial.Run(self.lu.op.iallocator)
453

    
454
    if not ial.success:
455
      raise errors.OpPrereqError("Can't compute nodes using"
456
                                 " iallocator '%s': %s" %
457
                                 (self.lu.op.iallocator, ial.info),
458
                                 errors.ECODE_NORES)
459
    self.target_node = ial.result[0]
460
    self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
461
                    self.instance_name, self.lu.op.iallocator,
462
                    utils.CommaJoin(ial.result))
463

    
464
  def _WaitUntilSync(self):
465
    """Poll with custom rpc for disk sync.
466

467
    This uses our own step-based rpc call.
468

469
    """
470
    self.feedback_fn("* wait until resync is done")
471
    all_done = False
472
    while not all_done:
473
      all_done = True
474
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
475
                                            self.nodes_ip,
476
                                            (self.instance.disks,
477
                                             self.instance))
478
      min_percent = 100
479
      for node, nres in result.items():
480
        nres.Raise("Cannot resync disks on node %s" % node)
481
        node_done, node_percent = nres.payload
482
        all_done = all_done and node_done
483
        if node_percent is not None:
484
          min_percent = min(min_percent, node_percent)
485
      if not all_done:
486
        if min_percent < 100:
487
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
488
        time.sleep(2)
489

    
490
  def _EnsureSecondary(self, node):
491
    """Demote a node to secondary.
492

493
    """
494
    self.feedback_fn("* switching node %s to secondary mode" % node)
495

    
496
    for dev in self.instance.disks:
497
      self.cfg.SetDiskID(dev, node)
498

    
499
    result = self.rpc.call_blockdev_close(node, self.instance.name,
500
                                          self.instance.disks)
501
    result.Raise("Cannot change disk to secondary on node %s" % node)
502

    
503
  def _GoStandalone(self):
504
    """Disconnect from the network.
505

506
    """
507
    self.feedback_fn("* changing into standalone mode")
508
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
509
                                               self.instance.disks)
510
    for node, nres in result.items():
511
      nres.Raise("Cannot disconnect disks node %s" % node)
512

    
513
  def _GoReconnect(self, multimaster):
514
    """Reconnect to the network.
515

516
    """
517
    if multimaster:
518
      msg = "dual-master"
519
    else:
520
      msg = "single-master"
521
    self.feedback_fn("* changing disks into %s mode" % msg)
522
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
523
                                           (self.instance.disks, self.instance),
524
                                           self.instance.name, multimaster)
525
    for node, nres in result.items():
526
      nres.Raise("Cannot change disks config on node %s" % node)
527

    
528
  def _ExecCleanup(self):
529
    """Try to cleanup after a failed migration.
530

531
    The cleanup is done by:
532
      - check that the instance is running only on one node
533
        (and update the config if needed)
534
      - change disks on its secondary node to secondary
535
      - wait until disks are fully synchronized
536
      - disconnect from the network
537
      - change disks into single-master mode
538
      - wait again until disks are fully synchronized
539

540
    """
541
    instance = self.instance
542
    target_node = self.target_node
543
    source_node = self.source_node
544

    
545
    # check running on only one node
546
    self.feedback_fn("* checking where the instance actually runs"
547
                     " (if this hangs, the hypervisor might be in"
548
                     " a bad state)")
549
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
550
    for node, result in ins_l.items():
551
      result.Raise("Can't contact node %s" % node)
552

    
553
    runningon_source = instance.name in ins_l[source_node].payload
554
    runningon_target = instance.name in ins_l[target_node].payload
555

    
556
    if runningon_source and runningon_target:
557
      raise errors.OpExecError("Instance seems to be running on two nodes,"
558
                               " or the hypervisor is confused; you will have"
559
                               " to ensure manually that it runs only on one"
560
                               " and restart this operation")
561

    
562
    if not (runningon_source or runningon_target):
563
      raise errors.OpExecError("Instance does not seem to be running at all;"
564
                               " in this case it's safer to repair by"
565
                               " running 'gnt-instance stop' to ensure disk"
566
                               " shutdown, and then restarting it")
567

    
568
    if runningon_target:
569
      # the migration has actually succeeded, we need to update the config
570
      self.feedback_fn("* instance running on secondary node (%s),"
571
                       " updating config" % target_node)
572
      instance.primary_node = target_node
573
      self.cfg.Update(instance, self.feedback_fn)
574
      demoted_node = source_node
575
    else:
576
      self.feedback_fn("* instance confirmed to be running on its"
577
                       " primary node (%s)" % source_node)
578
      demoted_node = target_node
579

    
580
    if instance.disk_template in constants.DTS_INT_MIRROR:
581
      self._EnsureSecondary(demoted_node)
582
      try:
583
        self._WaitUntilSync()
584
      except errors.OpExecError:
585
        # we ignore here errors, since if the device is standalone, it
586
        # won't be able to sync
587
        pass
588
      self._GoStandalone()
589
      self._GoReconnect(False)
590
      self._WaitUntilSync()
591

    
592
    self.feedback_fn("* done")
593

    
594
  def _RevertDiskStatus(self):
595
    """Try to revert the disk status after a failed migration.
596

597
    """
598
    target_node = self.target_node
599
    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
600
      return
601

    
602
    try:
603
      self._EnsureSecondary(target_node)
604
      self._GoStandalone()
605
      self._GoReconnect(False)
606
      self._WaitUntilSync()
607
    except errors.OpExecError, err:
608
      self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
609
                         " please try to recover the instance manually;"
610
                         " error '%s'" % str(err))
611

    
612
  def _AbortMigration(self):
613
    """Call the hypervisor code to abort a started migration.
614

615
    """
616
    instance = self.instance
617
    target_node = self.target_node
618
    source_node = self.source_node
619
    migration_info = self.migration_info
620

    
621
    abort_result = self.rpc.call_instance_finalize_migration_dst(target_node,
622
                                                                 instance,
623
                                                                 migration_info,
624
                                                                 False)
625
    abort_msg = abort_result.fail_msg
626
    if abort_msg:
627
      logging.error("Aborting migration failed on target node %s: %s",
628
                    target_node, abort_msg)
629
      # Don't raise an exception here, as we stil have to try to revert the
630
      # disk status, even if this step failed.
631

    
632
    abort_result = self.rpc.call_instance_finalize_migration_src(
633
      source_node, instance, False, self.live)
634
    abort_msg = abort_result.fail_msg
635
    if abort_msg:
636
      logging.error("Aborting migration failed on source node %s: %s",
637
                    source_node, abort_msg)
638

    
639
  def _ExecMigration(self):
640
    """Migrate an instance.
641

642
    The migrate is done by:
643
      - change the disks into dual-master mode
644
      - wait until disks are fully synchronized again
645
      - migrate the instance
646
      - change disks on the new secondary node (the old primary) to secondary
647
      - wait until disks are fully synchronized
648
      - change disks into single-master mode
649

650
    """
651
    instance = self.instance
652
    target_node = self.target_node
653
    source_node = self.source_node
654

    
655
    # Check for hypervisor version mismatch and warn the user.
656
    nodeinfo = self.rpc.call_node_info([source_node, target_node],
657
                                       None, [self.instance.hypervisor], False)
658
    for ninfo in nodeinfo.values():
659
      ninfo.Raise("Unable to retrieve node information from node '%s'" %
660
                  ninfo.node)
661
    (_, _, (src_info, )) = nodeinfo[source_node].payload
662
    (_, _, (dst_info, )) = nodeinfo[target_node].payload
663

    
664
    if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
665
        (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
666
      src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
667
      dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
668
      if src_version != dst_version:
669
        self.feedback_fn("* warning: hypervisor version mismatch between"
670
                         " source (%s) and target (%s) node" %
671
                         (src_version, dst_version))
672

    
673
    self.feedback_fn("* checking disk consistency between source and target")
674
    for (idx, dev) in enumerate(instance.disks):
675
      if not CheckDiskConsistency(self.lu, instance, dev, target_node, False):
676
        raise errors.OpExecError("Disk %s is degraded or not fully"
677
                                 " synchronized on target node,"
678
                                 " aborting migration" % idx)
679

    
680
    if self.current_mem > self.tgt_free_mem:
681
      if not self.allow_runtime_changes:
682
        raise errors.OpExecError("Memory ballooning not allowed and not enough"
683
                                 " free memory to fit instance %s on target"
684
                                 " node %s (have %dMB, need %dMB)" %
685
                                 (instance.name, target_node,
686
                                  self.tgt_free_mem, self.current_mem))
687
      self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
688
      rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
689
                                                     instance,
690
                                                     self.tgt_free_mem)
691
      rpcres.Raise("Cannot modify instance runtime memory")
692

    
693
    # First get the migration information from the remote node
694
    result = self.rpc.call_migration_info(source_node, instance)
695
    msg = result.fail_msg
696
    if msg:
697
      log_err = ("Failed fetching source migration information from %s: %s" %
698
                 (source_node, msg))
699
      logging.error(log_err)
700
      raise errors.OpExecError(log_err)
701

    
702
    self.migration_info = migration_info = result.payload
703

    
704
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
705
      # Then switch the disks to master/master mode
706
      self._EnsureSecondary(target_node)
707
      self._GoStandalone()
708
      self._GoReconnect(True)
709
      self._WaitUntilSync()
710

    
711
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
712
    for disk in instance.disks:
713
      self.cfg.SetDiskID(disk, target_node)
714
    result = self.rpc.call_accept_instance(target_node,
715
                                           instance,
716
                                           migration_info,
717
                                           self.nodes_ip[target_node])
718

    
719
    msg = result.fail_msg
720
    if msg:
721
      logging.error("Instance pre-migration failed, trying to revert"
722
                    " disk status: %s", msg)
723
      self.feedback_fn("Pre-migration failed, aborting")
724
      self._AbortMigration()
725
      self._RevertDiskStatus()
726
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
727
                               (instance.name, msg))
728

    
729
    self.feedback_fn("* migrating instance to %s" % target_node)
730
    result = self.rpc.call_instance_migrate(source_node, instance,
731
                                            self.nodes_ip[target_node],
732
                                            self.live)
733
    msg = result.fail_msg
734
    if msg:
735
      logging.error("Instance migration failed, trying to revert"
736
                    " disk status: %s", msg)
737
      self.feedback_fn("Migration failed, aborting")
738
      self._AbortMigration()
739
      self._RevertDiskStatus()
740
      raise errors.OpExecError("Could not migrate instance %s: %s" %
741
                               (instance.name, msg))
742

    
743
    self.feedback_fn("* starting memory transfer")
744
    last_feedback = time.time()
745
    while True:
746
      result = self.rpc.call_instance_get_migration_status(source_node,
747
                                                           instance)
748
      msg = result.fail_msg
749
      ms = result.payload   # MigrationStatus instance
750
      if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
751
        logging.error("Instance migration failed, trying to revert"
752
                      " disk status: %s", msg)
753
        self.feedback_fn("Migration failed, aborting")
754
        self._AbortMigration()
755
        self._RevertDiskStatus()
756
        if not msg:
757
          msg = "hypervisor returned failure"
758
        raise errors.OpExecError("Could not migrate instance %s: %s" %
759
                                 (instance.name, msg))
760

    
761
      if result.payload.status != constants.HV_MIGRATION_ACTIVE:
762
        self.feedback_fn("* memory transfer complete")
763
        break
764

    
765
      if (utils.TimeoutExpired(last_feedback,
766
                               self._MIGRATION_FEEDBACK_INTERVAL) and
767
          ms.transferred_ram is not None):
768
        mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
769
        self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
770
        last_feedback = time.time()
771

    
772
      time.sleep(self._MIGRATION_POLL_INTERVAL)
773

    
774
    result = self.rpc.call_instance_finalize_migration_src(source_node,
775
                                                           instance,
776
                                                           True,
777
                                                           self.live)
778
    msg = result.fail_msg
779
    if msg:
780
      logging.error("Instance migration succeeded, but finalization failed"
781
                    " on the source node: %s", msg)
782
      raise errors.OpExecError("Could not finalize instance migration: %s" %
783
                               msg)
784

    
785
    instance.primary_node = target_node
786

    
787
    # distribute new instance config to the other nodes
788
    self.cfg.Update(instance, self.feedback_fn)
789

    
790
    result = self.rpc.call_instance_finalize_migration_dst(target_node,
791
                                                           instance,
792
                                                           migration_info,
793
                                                           True)
794
    msg = result.fail_msg
795
    if msg:
796
      logging.error("Instance migration succeeded, but finalization failed"
797
                    " on the target node: %s", msg)
798
      raise errors.OpExecError("Could not finalize instance migration: %s" %
799
                               msg)
800

    
801
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
802
      self._EnsureSecondary(source_node)
803
      self._WaitUntilSync()
804
      self._GoStandalone()
805
      self._GoReconnect(False)
806
      self._WaitUntilSync()
807

    
808
    # If the instance's disk template is `rbd' or `ext' and there was a
809
    # successful migration, unmap the device from the source node.
810
    if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
811
      disks = ExpandCheckDisks(instance, instance.disks)
812
      self.feedback_fn("* unmapping instance's disks from %s" % source_node)
813
      for disk in disks:
814
        result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance))
815
        msg = result.fail_msg
816
        if msg:
817
          logging.error("Migration was successful, but couldn't unmap the"
818
                        " block device %s on source node %s: %s",
819
                        disk.iv_name, source_node, msg)
820
          logging.error("You need to unmap the device %s manually on %s",
821
                        disk.iv_name, source_node)
822

    
823
    self.feedback_fn("* done")
824

    
825
  def _ExecFailover(self):
826
    """Failover an instance.
827

828
    The failover is done by shutting it down on its present node and
829
    starting it on the secondary.
830

831
    """
832
    instance = self.instance
833
    primary_node = self.cfg.GetNodeInfo(instance.primary_node)
834

    
835
    source_node = instance.primary_node
836
    target_node = self.target_node
837

    
838
    if instance.disks_active:
839
      self.feedback_fn("* checking disk consistency between source and target")
840
      for (idx, dev) in enumerate(instance.disks):
841
        # for drbd, these are drbd over lvm
842
        if not CheckDiskConsistency(self.lu, instance, dev, target_node,
843
                                    False):
844
          if primary_node.offline:
845
            self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
846
                             " target node %s" %
847
                             (primary_node.name, idx, target_node))
848
          elif not self.ignore_consistency:
849
            raise errors.OpExecError("Disk %s is degraded on target node,"
850
                                     " aborting failover" % idx)
851
    else:
852
      self.feedback_fn("* not checking disk consistency as instance is not"
853
                       " running")
854

    
855
    self.feedback_fn("* shutting down instance on source node")
856
    logging.info("Shutting down instance %s on node %s",
857
                 instance.name, source_node)
858

    
859
    result = self.rpc.call_instance_shutdown(source_node, instance,
860
                                             self.shutdown_timeout,
861
                                             self.lu.op.reason)
862
    msg = result.fail_msg
863
    if msg:
864
      if self.ignore_consistency or primary_node.offline:
865
        self.lu.LogWarning("Could not shutdown instance %s on node %s,"
866
                           " proceeding anyway; please make sure node"
867
                           " %s is down; error details: %s",
868
                           instance.name, source_node, source_node, msg)
869
      else:
870
        raise errors.OpExecError("Could not shutdown instance %s on"
871
                                 " node %s: %s" %
872
                                 (instance.name, source_node, msg))
873

    
874
    self.feedback_fn("* deactivating the instance's disks on source node")
875
    if not ShutdownInstanceDisks(self.lu, instance, ignore_primary=True):
876
      raise errors.OpExecError("Can't shut down the instance's disks")
877

    
878
    instance.primary_node = target_node
879
    # distribute new instance config to the other nodes
880
    self.cfg.Update(instance, self.feedback_fn)
881

    
882
    # Only start the instance if it's marked as up
883
    if instance.admin_state == constants.ADMINST_UP:
884
      self.feedback_fn("* activating the instance's disks on target node %s" %
885
                       target_node)
886
      logging.info("Starting instance %s on node %s",
887
                   instance.name, target_node)
888

    
889
      disks_ok, _ = AssembleInstanceDisks(self.lu, instance,
890
                                          ignore_secondaries=True)
891
      if not disks_ok:
892
        ShutdownInstanceDisks(self.lu, instance)
893
        raise errors.OpExecError("Can't activate the instance's disks")
894

    
895
      self.feedback_fn("* starting the instance on the target node %s" %
896
                       target_node)
897
      result = self.rpc.call_instance_start(target_node, (instance, None, None),
898
                                            False, self.lu.op.reason)
899
      msg = result.fail_msg
900
      if msg:
901
        ShutdownInstanceDisks(self.lu, instance)
902
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
903
                                 (instance.name, target_node, msg))
904

    
905
  def Exec(self, feedback_fn):
906
    """Perform the migration.
907

908
    """
909
    self.feedback_fn = feedback_fn
910
    self.source_node = self.instance.primary_node
911

    
912
    # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
913
    if self.instance.disk_template in constants.DTS_INT_MIRROR:
914
      self.target_node = self.instance.secondary_nodes[0]
915
      # Otherwise self.target_node has been populated either
916
      # directly, or through an iallocator.
917

    
918
    self.all_nodes = [self.source_node, self.target_node]
919
    self.nodes_ip = dict((name, node.secondary_ip) for (name, node)
920
                         in self.cfg.GetMultiNodeInfo(self.all_nodes))
921

    
922
    if self.failover:
923
      feedback_fn("Failover instance %s" % self.instance.name)
924
      self._ExecFailover()
925
    else:
926
      feedback_fn("Migrating instance %s" % self.instance.name)
927

    
928
      if self.cleanup:
929
        return self._ExecCleanup()
930
      else:
931
        return self._ExecMigration()