Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_migration.py @ bc0a2284

History | View | Annotate | Download (35.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with instance migration an failover."""
23

    
24
import logging
25
import time
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti.masterd import iallocator
31
from ganeti import utils
32
from ganeti.cmdlib.base import LogicalUnit, Tasklet
33
from ganeti.cmdlib.common import ExpandInstanceName, \
34
  CheckIAllocatorOrNode, ExpandNodeName
35
from ganeti.cmdlib.instance_storage import CheckDiskConsistency, \
36
  ExpandCheckDisks, ShutdownInstanceDisks, AssembleInstanceDisks
37
from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \
38
  CheckTargetNodeIPolicy, ReleaseLocks, CheckNodeNotDrained, \
39
  CopyLockList, CheckNodeFreeMemory, CheckInstanceBridgesExist
40

    
41
import ganeti.masterd.instance
42

    
43

    
44
def _ExpandNamesForMigration(lu):
45
  """Expands names for use with L{TLMigrateInstance}.
46

47
  @type lu: L{LogicalUnit}
48

49
  """
50
  if lu.op.target_node is not None:
51
    lu.op.target_node = ExpandNodeName(lu.cfg, lu.op.target_node)
52

    
53
  lu.needed_locks[locking.LEVEL_NODE] = []
54
  lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
55

    
56
  lu.needed_locks[locking.LEVEL_NODE_RES] = []
57
  lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
58

    
59
  # The node allocation lock is actually only needed for externally replicated
60
  # instances (e.g. sharedfile or RBD) and if an iallocator is used.
61
  lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
62

    
63

    
64
def _DeclareLocksForMigration(lu, level):
65
  """Declares locks for L{TLMigrateInstance}.
66

67
  @type lu: L{LogicalUnit}
68
  @param level: Lock level
69

70
  """
71
  if level == locking.LEVEL_NODE_ALLOC:
72
    assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
73

    
74
    instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
75

    
76
    # Node locks are already declared here rather than at LEVEL_NODE as we need
77
    # the instance object anyway to declare the node allocation lock.
78
    if instance.disk_template in constants.DTS_EXT_MIRROR:
79
      if lu.op.target_node is None:
80
        lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
81
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
82
      else:
83
        lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
84
                                               lu.op.target_node]
85
      del lu.recalculate_locks[locking.LEVEL_NODE]
86
    else:
87
      lu._LockInstancesNodes() # pylint: disable=W0212
88

    
89
  elif level == locking.LEVEL_NODE:
90
    # Node locks are declared together with the node allocation lock
91
    assert (lu.needed_locks[locking.LEVEL_NODE] or
92
            lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
93

    
94
  elif level == locking.LEVEL_NODE_RES:
95
    # Copy node locks
96
    lu.needed_locks[locking.LEVEL_NODE_RES] = \
97
      CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
98

    
99

    
100
class LUInstanceFailover(LogicalUnit):
101
  """Failover an instance.
102

103
  """
104
  HPATH = "instance-failover"
105
  HTYPE = constants.HTYPE_INSTANCE
106
  REQ_BGL = False
107

    
108
  def CheckArguments(self):
109
    """Check the arguments.
110

111
    """
112
    self.iallocator = getattr(self.op, "iallocator", None)
113
    self.target_node = getattr(self.op, "target_node", None)
114

    
115
  def ExpandNames(self):
116
    self._ExpandAndLockInstance()
117
    _ExpandNamesForMigration(self)
118

    
119
    self._migrater = \
120
      TLMigrateInstance(self, self.op.instance_name, False, True, False,
121
                        self.op.ignore_consistency, True,
122
                        self.op.shutdown_timeout, self.op.ignore_ipolicy)
123

    
124
    self.tasklets = [self._migrater]
125

    
126
  def DeclareLocks(self, level):
127
    _DeclareLocksForMigration(self, level)
128

    
129
  def BuildHooksEnv(self):
130
    """Build hooks env.
131

132
    This runs on master, primary and secondary nodes of the instance.
133

134
    """
135
    instance = self._migrater.instance
136
    source_node = instance.primary_node
137
    target_node = self.op.target_node
138
    env = {
139
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
140
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
141
      "OLD_PRIMARY": source_node,
142
      "NEW_PRIMARY": target_node,
143
      }
144

    
145
    if instance.disk_template in constants.DTS_INT_MIRROR:
146
      env["OLD_SECONDARY"] = instance.secondary_nodes[0]
147
      env["NEW_SECONDARY"] = source_node
148
    else:
149
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
150

    
151
    env.update(BuildInstanceHookEnvByObject(self, instance))
152

    
153
    return env
154

    
155
  def BuildHooksNodes(self):
156
    """Build hooks nodes.
157

158
    """
159
    instance = self._migrater.instance
160
    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
161
    return (nl, nl + [instance.primary_node])
162

    
163

    
164
class LUInstanceMigrate(LogicalUnit):
165
  """Migrate an instance.
166

167
  This is migration without shutting down, compared to the failover,
168
  which is done with shutdown.
169

170
  """
171
  HPATH = "instance-migrate"
172
  HTYPE = constants.HTYPE_INSTANCE
173
  REQ_BGL = False
174

    
175
  def ExpandNames(self):
176
    self._ExpandAndLockInstance()
177
    _ExpandNamesForMigration(self)
178

    
179
    self._migrater = \
180
      TLMigrateInstance(self, self.op.instance_name, self.op.cleanup,
181
                        False, self.op.allow_failover, False,
182
                        self.op.allow_runtime_changes,
183
                        constants.DEFAULT_SHUTDOWN_TIMEOUT,
184
                        self.op.ignore_ipolicy)
185

    
186
    self.tasklets = [self._migrater]
187

    
188
  def DeclareLocks(self, level):
189
    _DeclareLocksForMigration(self, level)
190

    
191
  def BuildHooksEnv(self):
192
    """Build hooks env.
193

194
    This runs on master, primary and secondary nodes of the instance.
195

196
    """
197
    instance = self._migrater.instance
198
    source_node = instance.primary_node
199
    target_node = self.op.target_node
200
    env = BuildInstanceHookEnvByObject(self, instance)
201
    env.update({
202
      "MIGRATE_LIVE": self._migrater.live,
203
      "MIGRATE_CLEANUP": self.op.cleanup,
204
      "OLD_PRIMARY": source_node,
205
      "NEW_PRIMARY": target_node,
206
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
207
      })
208

    
209
    if instance.disk_template in constants.DTS_INT_MIRROR:
210
      env["OLD_SECONDARY"] = target_node
211
      env["NEW_SECONDARY"] = source_node
212
    else:
213
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
214

    
215
    return env
216

    
217
  def BuildHooksNodes(self):
218
    """Build hooks nodes.
219

220
    """
221
    instance = self._migrater.instance
222
    snodes = list(instance.secondary_nodes)
223
    nl = [self.cfg.GetMasterNode(), instance.primary_node] + snodes
224
    return (nl, nl)
225

    
226

    
227
class TLMigrateInstance(Tasklet):
228
  """Tasklet class for instance migration.
229

230
  @type live: boolean
231
  @ivar live: whether the migration will be done live or non-live;
232
      this variable is initalized only after CheckPrereq has run
233
  @type cleanup: boolean
234
  @ivar cleanup: Wheater we cleanup from a failed migration
235
  @type iallocator: string
236
  @ivar iallocator: The iallocator used to determine target_node
237
  @type target_node: string
238
  @ivar target_node: If given, the target_node to reallocate the instance to
239
  @type failover: boolean
240
  @ivar failover: Whether operation results in failover or migration
241
  @type fallback: boolean
242
  @ivar fallback: Whether fallback to failover is allowed if migration not
243
                  possible
244
  @type ignore_consistency: boolean
245
  @ivar ignore_consistency: Wheter we should ignore consistency between source
246
                            and target node
247
  @type shutdown_timeout: int
248
  @ivar shutdown_timeout: In case of failover timeout of the shutdown
249
  @type ignore_ipolicy: bool
250
  @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
251

252
  """
253

    
254
  # Constants
255
  _MIGRATION_POLL_INTERVAL = 1      # seconds
256
  _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
257

    
258
  def __init__(self, lu, instance_name, cleanup, failover, fallback,
259
               ignore_consistency, allow_runtime_changes, shutdown_timeout,
260
               ignore_ipolicy):
261
    """Initializes this class.
262

263
    """
264
    Tasklet.__init__(self, lu)
265

    
266
    # Parameters
267
    self.instance_name = instance_name
268
    self.cleanup = cleanup
269
    self.live = False # will be overridden later
270
    self.failover = failover
271
    self.fallback = fallback
272
    self.ignore_consistency = ignore_consistency
273
    self.shutdown_timeout = shutdown_timeout
274
    self.ignore_ipolicy = ignore_ipolicy
275
    self.allow_runtime_changes = allow_runtime_changes
276

    
277
  def CheckPrereq(self):
278
    """Check prerequisites.
279

280
    This checks that the instance is in the cluster.
281

282
    """
283
    instance_name = ExpandInstanceName(self.lu.cfg, self.instance_name)
284
    instance = self.cfg.GetInstanceInfo(instance_name)
285
    assert instance is not None
286
    self.instance = instance
287
    cluster = self.cfg.GetClusterInfo()
288

    
289
    if (not self.cleanup and
290
        not instance.admin_state == constants.ADMINST_UP and
291
        not self.failover and self.fallback):
292
      self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
293
                      " switching to failover")
294
      self.failover = True
295

    
296
    if instance.disk_template not in constants.DTS_MIRRORED:
297
      if self.failover:
298
        text = "failovers"
299
      else:
300
        text = "migrations"
301
      raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
302
                                 " %s" % (instance.disk_template, text),
303
                                 errors.ECODE_STATE)
304

    
305
    if instance.disk_template in constants.DTS_EXT_MIRROR:
306
      CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
307

    
308
      if self.lu.op.iallocator:
309
        assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
310
        self._RunAllocator()
311
      else:
312
        # We set set self.target_node as it is required by
313
        # BuildHooksEnv
314
        self.target_node = self.lu.op.target_node
315

    
316
      # Check that the target node is correct in terms of instance policy
317
      nodeinfo = self.cfg.GetNodeInfo(self.target_node)
318
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
319
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
320
                                                              group_info)
321
      CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
322
                             ignore=self.ignore_ipolicy)
323

    
324
      # self.target_node is already populated, either directly or by the
325
      # iallocator run
326
      target_node = self.target_node
327
      if self.target_node == instance.primary_node:
328
        raise errors.OpPrereqError("Cannot migrate instance %s"
329
                                   " to its primary (%s)" %
330
                                   (instance.name, instance.primary_node),
331
                                   errors.ECODE_STATE)
332

    
333
      if len(self.lu.tasklets) == 1:
334
        # It is safe to release locks only when we're the only tasklet
335
        # in the LU
336
        ReleaseLocks(self.lu, locking.LEVEL_NODE,
337
                     keep=[instance.primary_node, self.target_node])
338
        ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
339

    
340
    else:
341
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
342

    
343
      secondary_nodes = instance.secondary_nodes
344
      if not secondary_nodes:
345
        raise errors.ConfigurationError("No secondary node but using"
346
                                        " %s disk template" %
347
                                        instance.disk_template)
348
      target_node = secondary_nodes[0]
349
      if self.lu.op.iallocator or (self.lu.op.target_node and
350
                                   self.lu.op.target_node != target_node):
351
        if self.failover:
352
          text = "failed over"
353
        else:
354
          text = "migrated"
355
        raise errors.OpPrereqError("Instances with disk template %s cannot"
356
                                   " be %s to arbitrary nodes"
357
                                   " (neither an iallocator nor a target"
358
                                   " node can be passed)" %
359
                                   (instance.disk_template, text),
360
                                   errors.ECODE_INVAL)
361
      nodeinfo = self.cfg.GetNodeInfo(target_node)
362
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
363
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
364
                                                              group_info)
365
      CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
366
                             ignore=self.ignore_ipolicy)
367

    
368
    i_be = cluster.FillBE(instance)
369

    
370
    # check memory requirements on the secondary node
371
    if (not self.cleanup and
372
         (not self.failover or instance.admin_state == constants.ADMINST_UP)):
373
      self.tgt_free_mem = CheckNodeFreeMemory(
374
          self.lu, target_node, "migrating instance %s" % instance.name,
375
          i_be[constants.BE_MINMEM], instance.hypervisor,
376
          self.cfg.GetClusterInfo().hvparams[instance.hypervisor])
377
    else:
378
      self.lu.LogInfo("Not checking memory on the secondary node as"
379
                      " instance will not be started")
380

    
381
    # check if failover must be forced instead of migration
382
    if (not self.cleanup and not self.failover and
383
        i_be[constants.BE_ALWAYS_FAILOVER]):
384
      self.lu.LogInfo("Instance configured to always failover; fallback"
385
                      " to failover")
386
      self.failover = True
387

    
388
    # check bridge existance
389
    CheckInstanceBridgesExist(self.lu, instance, node=target_node)
390

    
391
    if not self.cleanup:
392
      CheckNodeNotDrained(self.lu, target_node)
393
      if not self.failover:
394
        result = self.rpc.call_instance_migratable(instance.primary_node,
395
                                                   instance)
396
        if result.fail_msg and self.fallback:
397
          self.lu.LogInfo("Can't migrate, instance offline, fallback to"
398
                          " failover")
399
          self.failover = True
400
        else:
401
          result.Raise("Can't migrate, please use failover",
402
                       prereq=True, ecode=errors.ECODE_STATE)
403

    
404
    assert not (self.failover and self.cleanup)
405

    
406
    if not self.failover:
407
      if self.lu.op.live is not None and self.lu.op.mode is not None:
408
        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
409
                                   " parameters are accepted",
410
                                   errors.ECODE_INVAL)
411
      if self.lu.op.live is not None:
412
        if self.lu.op.live:
413
          self.lu.op.mode = constants.HT_MIGRATION_LIVE
414
        else:
415
          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
416
        # reset the 'live' parameter to None so that repeated
417
        # invocations of CheckPrereq do not raise an exception
418
        self.lu.op.live = None
419
      elif self.lu.op.mode is None:
420
        # read the default value from the hypervisor
421
        i_hv = cluster.FillHV(self.instance, skip_globals=False)
422
        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
423

    
424
      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
425
    else:
426
      # Failover is never live
427
      self.live = False
428

    
429
    if not (self.failover or self.cleanup):
430
      remote_info = self.rpc.call_instance_info(
431
          instance.primary_node, instance.name, instance.hypervisor,
432
          cluster.hvparams[instance.hypervisor])
433
      remote_info.Raise("Error checking instance on node %s" %
434
                        instance.primary_node)
435
      instance_running = bool(remote_info.payload)
436
      if instance_running:
437
        self.current_mem = int(remote_info.payload["memory"])
438

    
439
  def _RunAllocator(self):
440
    """Run the allocator based on input opcode.
441

442
    """
443
    assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
444

    
445
    # FIXME: add a self.ignore_ipolicy option
446
    req = iallocator.IAReqRelocate(name=self.instance_name,
447
                                   relocate_from=[self.instance.primary_node])
448
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
449

    
450
    ial.Run(self.lu.op.iallocator)
451

    
452
    if not ial.success:
453
      raise errors.OpPrereqError("Can't compute nodes using"
454
                                 " iallocator '%s': %s" %
455
                                 (self.lu.op.iallocator, ial.info),
456
                                 errors.ECODE_NORES)
457
    self.target_node = ial.result[0]
458
    self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
459
                    self.instance_name, self.lu.op.iallocator,
460
                    utils.CommaJoin(ial.result))
461

    
462
  def _WaitUntilSync(self):
463
    """Poll with custom rpc for disk sync.
464

465
    This uses our own step-based rpc call.
466

467
    """
468
    self.feedback_fn("* wait until resync is done")
469
    all_done = False
470
    while not all_done:
471
      all_done = True
472
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
473
                                            self.nodes_ip,
474
                                            (self.instance.disks,
475
                                             self.instance))
476
      min_percent = 100
477
      for node, nres in result.items():
478
        nres.Raise("Cannot resync disks on node %s" % node)
479
        node_done, node_percent = nres.payload
480
        all_done = all_done and node_done
481
        if node_percent is not None:
482
          min_percent = min(min_percent, node_percent)
483
      if not all_done:
484
        if min_percent < 100:
485
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
486
        time.sleep(2)
487

    
488
  def _EnsureSecondary(self, node):
489
    """Demote a node to secondary.
490

491
    """
492
    self.feedback_fn("* switching node %s to secondary mode" % node)
493

    
494
    for dev in self.instance.disks:
495
      self.cfg.SetDiskID(dev, node)
496

    
497
    result = self.rpc.call_blockdev_close(node, self.instance.name,
498
                                          self.instance.disks)
499
    result.Raise("Cannot change disk to secondary on node %s" % node)
500

    
501
  def _GoStandalone(self):
502
    """Disconnect from the network.
503

504
    """
505
    self.feedback_fn("* changing into standalone mode")
506
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
507
                                               self.instance.disks)
508
    for node, nres in result.items():
509
      nres.Raise("Cannot disconnect disks node %s" % node)
510

    
511
  def _GoReconnect(self, multimaster):
512
    """Reconnect to the network.
513

514
    """
515
    if multimaster:
516
      msg = "dual-master"
517
    else:
518
      msg = "single-master"
519
    self.feedback_fn("* changing disks into %s mode" % msg)
520
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
521
                                           (self.instance.disks, self.instance),
522
                                           self.instance.name, multimaster)
523
    for node, nres in result.items():
524
      nres.Raise("Cannot change disks config on node %s" % node)
525

    
526
  def _ExecCleanup(self):
527
    """Try to cleanup after a failed migration.
528

529
    The cleanup is done by:
530
      - check that the instance is running only on one node
531
        (and update the config if needed)
532
      - change disks on its secondary node to secondary
533
      - wait until disks are fully synchronized
534
      - disconnect from the network
535
      - change disks into single-master mode
536
      - wait again until disks are fully synchronized
537

538
    """
539
    instance = self.instance
540
    target_node = self.target_node
541
    source_node = self.source_node
542

    
543
    # check running on only one node
544
    self.feedback_fn("* checking where the instance actually runs"
545
                     " (if this hangs, the hypervisor might be in"
546
                     " a bad state)")
547
    cluster_hvparams = self.cfg.GetClusterInfo().hvparams
548
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor],
549
                                        cluster_hvparams)
550
    for node, result in ins_l.items():
551
      result.Raise("Can't contact node %s" % node)
552

    
553
    runningon_source = instance.name in ins_l[source_node].payload
554
    runningon_target = instance.name in ins_l[target_node].payload
555

    
556
    if runningon_source and runningon_target:
557
      raise errors.OpExecError("Instance seems to be running on two nodes,"
558
                               " or the hypervisor is confused; you will have"
559
                               " to ensure manually that it runs only on one"
560
                               " and restart this operation")
561

    
562
    if not (runningon_source or runningon_target):
563
      raise errors.OpExecError("Instance does not seem to be running at all;"
564
                               " in this case it's safer to repair by"
565
                               " running 'gnt-instance stop' to ensure disk"
566
                               " shutdown, and then restarting it")
567

    
568
    if runningon_target:
569
      # the migration has actually succeeded, we need to update the config
570
      self.feedback_fn("* instance running on secondary node (%s),"
571
                       " updating config" % target_node)
572
      instance.primary_node = target_node
573
      self.cfg.Update(instance, self.feedback_fn)
574
      demoted_node = source_node
575
    else:
576
      self.feedback_fn("* instance confirmed to be running on its"
577
                       " primary node (%s)" % source_node)
578
      demoted_node = target_node
579

    
580
    if instance.disk_template in constants.DTS_INT_MIRROR:
581
      self._EnsureSecondary(demoted_node)
582
      try:
583
        self._WaitUntilSync()
584
      except errors.OpExecError:
585
        # we ignore here errors, since if the device is standalone, it
586
        # won't be able to sync
587
        pass
588
      self._GoStandalone()
589
      self._GoReconnect(False)
590
      self._WaitUntilSync()
591

    
592
    self.feedback_fn("* done")
593

    
594
  def _RevertDiskStatus(self):
595
    """Try to revert the disk status after a failed migration.
596

597
    """
598
    target_node = self.target_node
599
    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
600
      return
601

    
602
    try:
603
      self._EnsureSecondary(target_node)
604
      self._GoStandalone()
605
      self._GoReconnect(False)
606
      self._WaitUntilSync()
607
    except errors.OpExecError, err:
608
      self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
609
                         " please try to recover the instance manually;"
610
                         " error '%s'" % str(err))
611

    
612
  def _AbortMigration(self):
613
    """Call the hypervisor code to abort a started migration.
614

615
    """
616
    instance = self.instance
617
    target_node = self.target_node
618
    source_node = self.source_node
619
    migration_info = self.migration_info
620

    
621
    abort_result = self.rpc.call_instance_finalize_migration_dst(target_node,
622
                                                                 instance,
623
                                                                 migration_info,
624
                                                                 False)
625
    abort_msg = abort_result.fail_msg
626
    if abort_msg:
627
      logging.error("Aborting migration failed on target node %s: %s",
628
                    target_node, abort_msg)
629
      # Don't raise an exception here, as we stil have to try to revert the
630
      # disk status, even if this step failed.
631

    
632
    abort_result = self.rpc.call_instance_finalize_migration_src(
633
      source_node, instance, False, self.live)
634
    abort_msg = abort_result.fail_msg
635
    if abort_msg:
636
      logging.error("Aborting migration failed on source node %s: %s",
637
                    source_node, abort_msg)
638

    
639
  def _ExecMigration(self):
640
    """Migrate an instance.
641

642
    The migrate is done by:
643
      - change the disks into dual-master mode
644
      - wait until disks are fully synchronized again
645
      - migrate the instance
646
      - change disks on the new secondary node (the old primary) to secondary
647
      - wait until disks are fully synchronized
648
      - change disks into single-master mode
649

650
    """
651
    instance = self.instance
652
    target_node = self.target_node
653
    source_node = self.source_node
654

    
655
    # Check for hypervisor version mismatch and warn the user.
656
    hvspecs = [(instance.hypervisor,
657
                self.cfg.GetClusterInfo().hvparams[instance.hypervisor])]
658
    nodeinfo = self.rpc.call_node_info([source_node, target_node],
659
                                       None, hvspecs, False)
660
    for ninfo in nodeinfo.values():
661
      ninfo.Raise("Unable to retrieve node information from node '%s'" %
662
                  ninfo.node)
663
    (_, _, (src_info, )) = nodeinfo[source_node].payload
664
    (_, _, (dst_info, )) = nodeinfo[target_node].payload
665

    
666
    if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
667
        (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
668
      src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
669
      dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
670
      if src_version != dst_version:
671
        self.feedback_fn("* warning: hypervisor version mismatch between"
672
                         " source (%s) and target (%s) node" %
673
                         (src_version, dst_version))
674

    
675
    self.feedback_fn("* checking disk consistency between source and target")
676
    for (idx, dev) in enumerate(instance.disks):
677
      if not CheckDiskConsistency(self.lu, instance, dev, target_node, False):
678
        raise errors.OpExecError("Disk %s is degraded or not fully"
679
                                 " synchronized on target node,"
680
                                 " aborting migration" % idx)
681

    
682
    if self.current_mem > self.tgt_free_mem:
683
      if not self.allow_runtime_changes:
684
        raise errors.OpExecError("Memory ballooning not allowed and not enough"
685
                                 " free memory to fit instance %s on target"
686
                                 " node %s (have %dMB, need %dMB)" %
687
                                 (instance.name, target_node,
688
                                  self.tgt_free_mem, self.current_mem))
689
      self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
690
      rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
691
                                                     instance,
692
                                                     self.tgt_free_mem)
693
      rpcres.Raise("Cannot modify instance runtime memory")
694

    
695
    # First get the migration information from the remote node
696
    result = self.rpc.call_migration_info(source_node, instance)
697
    msg = result.fail_msg
698
    if msg:
699
      log_err = ("Failed fetching source migration information from %s: %s" %
700
                 (source_node, msg))
701
      logging.error(log_err)
702
      raise errors.OpExecError(log_err)
703

    
704
    self.migration_info = migration_info = result.payload
705

    
706
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
707
      # Then switch the disks to master/master mode
708
      self._EnsureSecondary(target_node)
709
      self._GoStandalone()
710
      self._GoReconnect(True)
711
      self._WaitUntilSync()
712

    
713
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
714
    result = self.rpc.call_accept_instance(target_node,
715
                                           instance,
716
                                           migration_info,
717
                                           self.nodes_ip[target_node])
718

    
719
    msg = result.fail_msg
720
    if msg:
721
      logging.error("Instance pre-migration failed, trying to revert"
722
                    " disk status: %s", msg)
723
      self.feedback_fn("Pre-migration failed, aborting")
724
      self._AbortMigration()
725
      self._RevertDiskStatus()
726
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
727
                               (instance.name, msg))
728

    
729
    self.feedback_fn("* migrating instance to %s" % target_node)
730
    cluster = self.cfg.GetClusterInfo()
731
    result = self.rpc.call_instance_migrate(
732
        source_node, cluster.cluster_name, instance, self.nodes_ip[target_node],
733
        self.live)
734
    msg = result.fail_msg
735
    if msg:
736
      logging.error("Instance migration failed, trying to revert"
737
                    " disk status: %s", msg)
738
      self.feedback_fn("Migration failed, aborting")
739
      self._AbortMigration()
740
      self._RevertDiskStatus()
741
      raise errors.OpExecError("Could not migrate instance %s: %s" %
742
                               (instance.name, msg))
743

    
744
    self.feedback_fn("* starting memory transfer")
745
    last_feedback = time.time()
746
    while True:
747
      result = self.rpc.call_instance_get_migration_status(source_node,
748
                                                           instance)
749
      msg = result.fail_msg
750
      ms = result.payload   # MigrationStatus instance
751
      if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
752
        logging.error("Instance migration failed, trying to revert"
753
                      " disk status: %s", msg)
754
        self.feedback_fn("Migration failed, aborting")
755
        self._AbortMigration()
756
        self._RevertDiskStatus()
757
        if not msg:
758
          msg = "hypervisor returned failure"
759
        raise errors.OpExecError("Could not migrate instance %s: %s" %
760
                                 (instance.name, msg))
761

    
762
      if result.payload.status != constants.HV_MIGRATION_ACTIVE:
763
        self.feedback_fn("* memory transfer complete")
764
        break
765

    
766
      if (utils.TimeoutExpired(last_feedback,
767
                               self._MIGRATION_FEEDBACK_INTERVAL) and
768
          ms.transferred_ram is not None):
769
        mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
770
        self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
771
        last_feedback = time.time()
772

    
773
      time.sleep(self._MIGRATION_POLL_INTERVAL)
774

    
775
    result = self.rpc.call_instance_finalize_migration_src(source_node,
776
                                                           instance,
777
                                                           True,
778
                                                           self.live)
779
    msg = result.fail_msg
780
    if msg:
781
      logging.error("Instance migration succeeded, but finalization failed"
782
                    " on the source node: %s", msg)
783
      raise errors.OpExecError("Could not finalize instance migration: %s" %
784
                               msg)
785

    
786
    instance.primary_node = target_node
787

    
788
    # distribute new instance config to the other nodes
789
    self.cfg.Update(instance, self.feedback_fn)
790

    
791
    result = self.rpc.call_instance_finalize_migration_dst(target_node,
792
                                                           instance,
793
                                                           migration_info,
794
                                                           True)
795
    msg = result.fail_msg
796
    if msg:
797
      logging.error("Instance migration succeeded, but finalization failed"
798
                    " on the target node: %s", msg)
799
      raise errors.OpExecError("Could not finalize instance migration: %s" %
800
                               msg)
801

    
802
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
803
      self._EnsureSecondary(source_node)
804
      self._WaitUntilSync()
805
      self._GoStandalone()
806
      self._GoReconnect(False)
807
      self._WaitUntilSync()
808

    
809
    # If the instance's disk template is `rbd' or `ext' and there was a
810
    # successful migration, unmap the device from the source node.
811
    if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
812
      disks = ExpandCheckDisks(instance, instance.disks)
813
      self.feedback_fn("* unmapping instance's disks from %s" % source_node)
814
      for disk in disks:
815
        result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance))
816
        msg = result.fail_msg
817
        if msg:
818
          logging.error("Migration was successful, but couldn't unmap the"
819
                        " block device %s on source node %s: %s",
820
                        disk.iv_name, source_node, msg)
821
          logging.error("You need to unmap the device %s manually on %s",
822
                        disk.iv_name, source_node)
823

    
824
    self.feedback_fn("* done")
825

    
826
  def _ExecFailover(self):
827
    """Failover an instance.
828

829
    The failover is done by shutting it down on its present node and
830
    starting it on the secondary.
831

832
    """
833
    instance = self.instance
834
    primary_node = self.cfg.GetNodeInfo(instance.primary_node)
835

    
836
    source_node = instance.primary_node
837
    target_node = self.target_node
838

    
839
    if instance.disks_active:
840
      self.feedback_fn("* checking disk consistency between source and target")
841
      for (idx, dev) in enumerate(instance.disks):
842
        # for drbd, these are drbd over lvm
843
        if not CheckDiskConsistency(self.lu, instance, dev, target_node,
844
                                    False):
845
          if primary_node.offline:
846
            self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
847
                             " target node %s" %
848
                             (primary_node.name, idx, target_node))
849
          elif not self.ignore_consistency:
850
            raise errors.OpExecError("Disk %s is degraded on target node,"
851
                                     " aborting failover" % idx)
852
    else:
853
      self.feedback_fn("* not checking disk consistency as instance is not"
854
                       " running")
855

    
856
    self.feedback_fn("* shutting down instance on source node")
857
    logging.info("Shutting down instance %s on node %s",
858
                 instance.name, source_node)
859

    
860
    result = self.rpc.call_instance_shutdown(source_node, instance,
861
                                             self.shutdown_timeout,
862
                                             self.lu.op.reason)
863
    msg = result.fail_msg
864
    if msg:
865
      if self.ignore_consistency or primary_node.offline:
866
        self.lu.LogWarning("Could not shutdown instance %s on node %s,"
867
                           " proceeding anyway; please make sure node"
868
                           " %s is down; error details: %s",
869
                           instance.name, source_node, source_node, msg)
870
      else:
871
        raise errors.OpExecError("Could not shutdown instance %s on"
872
                                 " node %s: %s" %
873
                                 (instance.name, source_node, msg))
874

    
875
    self.feedback_fn("* deactivating the instance's disks on source node")
876
    if not ShutdownInstanceDisks(self.lu, instance, ignore_primary=True):
877
      raise errors.OpExecError("Can't shut down the instance's disks")
878

    
879
    instance.primary_node = target_node
880
    # distribute new instance config to the other nodes
881
    self.cfg.Update(instance, self.feedback_fn)
882

    
883
    # Only start the instance if it's marked as up
884
    if instance.admin_state == constants.ADMINST_UP:
885
      self.feedback_fn("* activating the instance's disks on target node %s" %
886
                       target_node)
887
      logging.info("Starting instance %s on node %s",
888
                   instance.name, target_node)
889

    
890
      disks_ok, _ = AssembleInstanceDisks(self.lu, instance,
891
                                          ignore_secondaries=True)
892
      if not disks_ok:
893
        ShutdownInstanceDisks(self.lu, instance)
894
        raise errors.OpExecError("Can't activate the instance's disks")
895

    
896
      self.feedback_fn("* starting the instance on the target node %s" %
897
                       target_node)
898
      result = self.rpc.call_instance_start(target_node, (instance, None, None),
899
                                            False, self.lu.op.reason)
900
      msg = result.fail_msg
901
      if msg:
902
        ShutdownInstanceDisks(self.lu, instance)
903
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
904
                                 (instance.name, target_node, msg))
905

    
906
  def Exec(self, feedback_fn):
907
    """Perform the migration.
908

909
    """
910
    self.feedback_fn = feedback_fn
911
    self.source_node = self.instance.primary_node
912

    
913
    # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
914
    if self.instance.disk_template in constants.DTS_INT_MIRROR:
915
      self.target_node = self.instance.secondary_nodes[0]
916
      # Otherwise self.target_node has been populated either
917
      # directly, or through an iallocator.
918

    
919
    self.all_nodes = [self.source_node, self.target_node]
920
    self.nodes_ip = dict((name, node.secondary_ip) for (name, node)
921
                         in self.cfg.GetMultiNodeInfo(self.all_nodes))
922

    
923
    if self.failover:
924
      feedback_fn("Failover instance %s" % self.instance.name)
925
      self._ExecFailover()
926
    else:
927
      feedback_fn("Migrating instance %s" % self.instance.name)
928

    
929
      if self.cleanup:
930
        return self._ExecCleanup()
931
      else:
932
        return self._ExecMigration()