Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_migration.py @ 896cc964

History | View | Annotate | Download (37.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with instance migration an failover."""
23

    
24
import logging
25
import time
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti.masterd import iallocator
31
from ganeti import utils
32
from ganeti.cmdlib.base import LogicalUnit, Tasklet
33
from ganeti.cmdlib.common import ExpandInstanceUuidAndName, \
34
  CheckIAllocatorOrNode, ExpandNodeUuidAndName
35
from ganeti.cmdlib.instance_storage import CheckDiskConsistency, \
36
  ExpandCheckDisks, ShutdownInstanceDisks, AssembleInstanceDisks
37
from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \
38
  CheckTargetNodeIPolicy, ReleaseLocks, CheckNodeNotDrained, \
39
  CopyLockList, CheckNodeFreeMemory, CheckInstanceBridgesExist
40

    
41
import ganeti.masterd.instance
42

    
43

    
44
def _ExpandNamesForMigration(lu):
45
  """Expands names for use with L{TLMigrateInstance}.
46

47
  @type lu: L{LogicalUnit}
48

49
  """
50
  if lu.op.target_node is not None:
51
    (lu.op.target_node_uuid, lu.op.target_node) = \
52
      ExpandNodeUuidAndName(lu.cfg, lu.op.target_node_uuid, lu.op.target_node)
53

    
54
  lu.needed_locks[locking.LEVEL_NODE] = []
55
  lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
56

    
57
  lu.needed_locks[locking.LEVEL_NODE_RES] = []
58
  lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
59

    
60
  # The node allocation lock is actually only needed for externally replicated
61
  # instances (e.g. sharedfile or RBD) and if an iallocator is used.
62
  lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
63

    
64

    
65
def _DeclareLocksForMigration(lu, level):
66
  """Declares locks for L{TLMigrateInstance}.
67

68
  @type lu: L{LogicalUnit}
69
  @param level: Lock level
70

71
  """
72
  if level == locking.LEVEL_NODE_ALLOC:
73
    assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
74

    
75
    instance = lu.cfg.GetInstanceInfo(lu.op.instance_uuid)
76

    
77
    # Node locks are already declared here rather than at LEVEL_NODE as we need
78
    # the instance object anyway to declare the node allocation lock.
79
    if instance.disk_template in constants.DTS_EXT_MIRROR:
80
      if lu.op.target_node is None:
81
        lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
82
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
83
      else:
84
        lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
85
                                               lu.op.target_node_uuid]
86
      del lu.recalculate_locks[locking.LEVEL_NODE]
87
    else:
88
      lu._LockInstancesNodes() # pylint: disable=W0212
89

    
90
  elif level == locking.LEVEL_NODE:
91
    # Node locks are declared together with the node allocation lock
92
    assert (lu.needed_locks[locking.LEVEL_NODE] or
93
            lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
94

    
95
  elif level == locking.LEVEL_NODE_RES:
96
    # Copy node locks
97
    lu.needed_locks[locking.LEVEL_NODE_RES] = \
98
      CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
99

    
100

    
101
class LUInstanceFailover(LogicalUnit):
102
  """Failover an instance.
103

104
  """
105
  HPATH = "instance-failover"
106
  HTYPE = constants.HTYPE_INSTANCE
107
  REQ_BGL = False
108

    
109
  def CheckArguments(self):
110
    """Check the arguments.
111

112
    """
113
    self.iallocator = getattr(self.op, "iallocator", None)
114
    self.target_node = getattr(self.op, "target_node", None)
115

    
116
  def ExpandNames(self):
117
    self._ExpandAndLockInstance()
118
    _ExpandNamesForMigration(self)
119

    
120
    self._migrater = \
121
      TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name,
122
                        self.op.cleanup, True, False,
123
                        self.op.ignore_consistency, True,
124
                        self.op.shutdown_timeout, self.op.ignore_ipolicy)
125

    
126
    self.tasklets = [self._migrater]
127

    
128
  def DeclareLocks(self, level):
129
    _DeclareLocksForMigration(self, level)
130

    
131
  def BuildHooksEnv(self):
132
    """Build hooks env.
133

134
    This runs on master, primary and secondary nodes of the instance.
135

136
    """
137
    instance = self._migrater.instance
138
    source_node_uuid = instance.primary_node
139
    env = {
140
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
141
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
142
      "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid),
143
      "NEW_PRIMARY": self.op.target_node,
144
      "FAILOVER_CLEANUP": self.op.cleanup,
145
      }
146

    
147
    if instance.disk_template in constants.DTS_INT_MIRROR:
148
      env["OLD_SECONDARY"] = self.cfg.GetNodeName(instance.secondary_nodes[0])
149
      env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid)
150
    else:
151
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
152

    
153
    env.update(BuildInstanceHookEnvByObject(self, instance))
154

    
155
    return env
156

    
157
  def BuildHooksNodes(self):
158
    """Build hooks nodes.
159

160
    """
161
    instance = self._migrater.instance
162
    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
163
    return (nl, nl + [instance.primary_node])
164

    
165

    
166
class LUInstanceMigrate(LogicalUnit):
167
  """Migrate an instance.
168

169
  This is migration without shutting down, compared to the failover,
170
  which is done with shutdown.
171

172
  """
173
  HPATH = "instance-migrate"
174
  HTYPE = constants.HTYPE_INSTANCE
175
  REQ_BGL = False
176

    
177
  def ExpandNames(self):
178
    self._ExpandAndLockInstance()
179
    _ExpandNamesForMigration(self)
180

    
181
    self._migrater = \
182
      TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name,
183
                        self.op.cleanup, False, self.op.allow_failover, False,
184
                        self.op.allow_runtime_changes,
185
                        constants.DEFAULT_SHUTDOWN_TIMEOUT,
186
                        self.op.ignore_ipolicy)
187

    
188
    self.tasklets = [self._migrater]
189

    
190
  def DeclareLocks(self, level):
191
    _DeclareLocksForMigration(self, level)
192

    
193
  def BuildHooksEnv(self):
194
    """Build hooks env.
195

196
    This runs on master, primary and secondary nodes of the instance.
197

198
    """
199
    instance = self._migrater.instance
200
    source_node_uuid = instance.primary_node
201
    env = BuildInstanceHookEnvByObject(self, instance)
202
    env.update({
203
      "MIGRATE_LIVE": self._migrater.live,
204
      "MIGRATE_CLEANUP": self.op.cleanup,
205
      "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid),
206
      "NEW_PRIMARY": self.op.target_node,
207
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
208
      })
209

    
210
    if instance.disk_template in constants.DTS_INT_MIRROR:
211
      env["OLD_SECONDARY"] = self.cfg.GetNodeName(instance.secondary_nodes[0])
212
      env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid)
213
    else:
214
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
215

    
216
    return env
217

    
218
  def BuildHooksNodes(self):
219
    """Build hooks nodes.
220

221
    """
222
    instance = self._migrater.instance
223
    snode_uuids = list(instance.secondary_nodes)
224
    nl = [self.cfg.GetMasterNode(), instance.primary_node] + snode_uuids
225
    return (nl, nl)
226

    
227

    
228
class TLMigrateInstance(Tasklet):
229
  """Tasklet class for instance migration.
230

231
  @type live: boolean
232
  @ivar live: whether the migration will be done live or non-live;
233
      this variable is initalized only after CheckPrereq has run
234
  @type cleanup: boolean
235
  @ivar cleanup: Wheater we cleanup from a failed migration
236
  @type iallocator: string
237
  @ivar iallocator: The iallocator used to determine target_node
238
  @type target_node_uuid: string
239
  @ivar target_node_uuid: If given, the target node UUID to reallocate the
240
      instance to
241
  @type failover: boolean
242
  @ivar failover: Whether operation results in failover or migration
243
  @type fallback: boolean
244
  @ivar fallback: Whether fallback to failover is allowed if migration not
245
                  possible
246
  @type ignore_consistency: boolean
247
  @ivar ignore_consistency: Wheter we should ignore consistency between source
248
                            and target node
249
  @type shutdown_timeout: int
250
  @ivar shutdown_timeout: In case of failover timeout of the shutdown
251
  @type ignore_ipolicy: bool
252
  @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
253

254
  """
255

    
256
  # Constants
257
  _MIGRATION_POLL_INTERVAL = 1      # seconds
258
  _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
259

    
260
  def __init__(self, lu, instance_uuid, instance_name, cleanup, failover,
261
               fallback, ignore_consistency, allow_runtime_changes,
262
               shutdown_timeout, ignore_ipolicy):
263
    """Initializes this class.
264

265
    """
266
    Tasklet.__init__(self, lu)
267

    
268
    # Parameters
269
    self.instance_uuid = instance_uuid
270
    self.instance_name = instance_name
271
    self.cleanup = cleanup
272
    self.live = False # will be overridden later
273
    self.failover = failover
274
    self.fallback = fallback
275
    self.ignore_consistency = ignore_consistency
276
    self.shutdown_timeout = shutdown_timeout
277
    self.ignore_ipolicy = ignore_ipolicy
278
    self.allow_runtime_changes = allow_runtime_changes
279

    
280
  def CheckPrereq(self):
281
    """Check prerequisites.
282

283
    This checks that the instance is in the cluster.
284

285
    """
286
    (self.instance_uuid, self.instance_name) = \
287
      ExpandInstanceUuidAndName(self.lu.cfg, self.instance_uuid,
288
                                self.instance_name)
289
    self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
290
    assert self.instance is not None
291
    cluster = self.cfg.GetClusterInfo()
292

    
293
    if (not self.cleanup and
294
        not self.instance.admin_state == constants.ADMINST_UP and
295
        not self.failover and self.fallback):
296
      self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
297
                      " switching to failover")
298
      self.failover = True
299

    
300
    if self.instance.disk_template not in constants.DTS_MIRRORED:
301
      if self.failover:
302
        text = "failovers"
303
      else:
304
        text = "migrations"
305
      raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
306
                                 " %s" % (self.instance.disk_template, text),
307
                                 errors.ECODE_STATE)
308

    
309
    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
310
      CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
311

    
312
      if self.lu.op.iallocator:
313
        assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
314
        self._RunAllocator()
315
      else:
316
        # We set set self.target_node_uuid as it is required by
317
        # BuildHooksEnv
318
        self.target_node_uuid = self.lu.op.target_node_uuid
319

    
320
      # Check that the target node is correct in terms of instance policy
321
      nodeinfo = self.cfg.GetNodeInfo(self.target_node_uuid)
322
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
323
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
324
                                                              group_info)
325
      CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo,
326
                             self.cfg, ignore=self.ignore_ipolicy)
327

    
328
      # self.target_node is already populated, either directly or by the
329
      # iallocator run
330
      target_node_uuid = self.target_node_uuid
331
      if self.target_node_uuid == self.instance.primary_node:
332
        raise errors.OpPrereqError(
333
          "Cannot migrate instance %s to its primary (%s)" %
334
          (self.instance.name,
335
           self.cfg.GetNodeName(self.instance.primary_node)),
336
          errors.ECODE_STATE)
337

    
338
      if len(self.lu.tasklets) == 1:
339
        # It is safe to release locks only when we're the only tasklet
340
        # in the LU
341
        ReleaseLocks(self.lu, locking.LEVEL_NODE,
342
                     keep=[self.instance.primary_node, self.target_node_uuid])
343
        ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
344

    
345
    else:
346
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
347

    
348
      secondary_node_uuids = self.instance.secondary_nodes
349
      if not secondary_node_uuids:
350
        raise errors.ConfigurationError("No secondary node but using"
351
                                        " %s disk template" %
352
                                        self.instance.disk_template)
353
      target_node_uuid = secondary_node_uuids[0]
354
      if self.lu.op.iallocator or \
355
        (self.lu.op.target_node_uuid and
356
         self.lu.op.target_node_uuid != target_node_uuid):
357
        if self.failover:
358
          text = "failed over"
359
        else:
360
          text = "migrated"
361
        raise errors.OpPrereqError("Instances with disk template %s cannot"
362
                                   " be %s to arbitrary nodes"
363
                                   " (neither an iallocator nor a target"
364
                                   " node can be passed)" %
365
                                   (self.instance.disk_template, text),
366
                                   errors.ECODE_INVAL)
367
      nodeinfo = self.cfg.GetNodeInfo(target_node_uuid)
368
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
369
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
370
                                                              group_info)
371
      CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo,
372
                             self.cfg, ignore=self.ignore_ipolicy)
373

    
374
    i_be = cluster.FillBE(self.instance)
375

    
376
    # check memory requirements on the secondary node
377
    if (not self.cleanup and
378
         (not self.failover or
379
           self.instance.admin_state == constants.ADMINST_UP)):
380
      self.tgt_free_mem = CheckNodeFreeMemory(
381
          self.lu, target_node_uuid,
382
          "migrating instance %s" % self.instance.name,
383
          i_be[constants.BE_MINMEM], self.instance.hypervisor,
384
          self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor])
385
    else:
386
      self.lu.LogInfo("Not checking memory on the secondary node as"
387
                      " instance will not be started")
388

    
389
    # check if failover must be forced instead of migration
390
    if (not self.cleanup and not self.failover and
391
        i_be[constants.BE_ALWAYS_FAILOVER]):
392
      self.lu.LogInfo("Instance configured to always failover; fallback"
393
                      " to failover")
394
      self.failover = True
395

    
396
    # check bridge existance
397
    CheckInstanceBridgesExist(self.lu, self.instance,
398
                              node_uuid=target_node_uuid)
399

    
400
    if not self.cleanup:
401
      CheckNodeNotDrained(self.lu, target_node_uuid)
402
      if not self.failover:
403
        result = self.rpc.call_instance_migratable(self.instance.primary_node,
404
                                                   self.instance)
405
        if result.fail_msg and self.fallback:
406
          self.lu.LogInfo("Can't migrate, instance offline, fallback to"
407
                          " failover")
408
          self.failover = True
409
        else:
410
          result.Raise("Can't migrate, please use failover",
411
                       prereq=True, ecode=errors.ECODE_STATE)
412

    
413
    assert not (self.failover and self.cleanup)
414

    
415
    if not self.failover:
416
      if self.lu.op.live is not None and self.lu.op.mode is not None:
417
        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
418
                                   " parameters are accepted",
419
                                   errors.ECODE_INVAL)
420
      if self.lu.op.live is not None:
421
        if self.lu.op.live:
422
          self.lu.op.mode = constants.HT_MIGRATION_LIVE
423
        else:
424
          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
425
        # reset the 'live' parameter to None so that repeated
426
        # invocations of CheckPrereq do not raise an exception
427
        self.lu.op.live = None
428
      elif self.lu.op.mode is None:
429
        # read the default value from the hypervisor
430
        i_hv = cluster.FillHV(self.instance, skip_globals=False)
431
        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
432

    
433
      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
434
    else:
435
      # Failover is never live
436
      self.live = False
437

    
438
    if not (self.failover or self.cleanup):
439
      remote_info = self.rpc.call_instance_info(
440
          self.instance.primary_node, self.instance.name,
441
          self.instance.hypervisor, cluster.hvparams[self.instance.hypervisor])
442
      remote_info.Raise("Error checking instance on node %s" %
443
                        self.cfg.GetNodeName(self.instance.primary_node))
444
      instance_running = bool(remote_info.payload)
445
      if instance_running:
446
        self.current_mem = int(remote_info.payload["memory"])
447

    
448
  def _RunAllocator(self):
449
    """Run the allocator based on input opcode.
450

451
    """
452
    assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
453

    
454
    # FIXME: add a self.ignore_ipolicy option
455
    req = iallocator.IAReqRelocate(
456
          inst_uuid=self.instance_uuid,
457
          relocate_from_node_uuids=[self.instance.primary_node])
458
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
459

    
460
    ial.Run(self.lu.op.iallocator)
461

    
462
    if not ial.success:
463
      raise errors.OpPrereqError("Can't compute nodes using"
464
                                 " iallocator '%s': %s" %
465
                                 (self.lu.op.iallocator, ial.info),
466
                                 errors.ECODE_NORES)
467
    self.target_node_uuid = self.cfg.GetNodeInfoByName(ial.result[0]).uuid
468
    self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
469
                    self.instance_name, self.lu.op.iallocator,
470
                    utils.CommaJoin(ial.result))
471

    
472
  def _WaitUntilSync(self):
473
    """Poll with custom rpc for disk sync.
474

475
    This uses our own step-based rpc call.
476

477
    """
478
    self.feedback_fn("* wait until resync is done")
479
    all_done = False
480
    while not all_done:
481
      all_done = True
482
      result = self.rpc.call_drbd_wait_sync(self.all_node_uuids,
483
                                            (self.instance.disks,
484
                                             self.instance))
485
      min_percent = 100
486
      for node_uuid, nres in result.items():
487
        nres.Raise("Cannot resync disks on node %s" %
488
                   self.cfg.GetNodeName(node_uuid))
489
        node_done, node_percent = nres.payload
490
        all_done = all_done and node_done
491
        if node_percent is not None:
492
          min_percent = min(min_percent, node_percent)
493
      if not all_done:
494
        if min_percent < 100:
495
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
496
        time.sleep(2)
497

    
498
  def _EnsureSecondary(self, node_uuid):
499
    """Demote a node to secondary.
500

501
    """
502
    self.feedback_fn("* switching node %s to secondary mode" %
503
                     self.cfg.GetNodeName(node_uuid))
504

    
505
    result = self.rpc.call_blockdev_close(node_uuid, self.instance.name,
506
                                          (self.instance.disks, self.instance))
507
    result.Raise("Cannot change disk to secondary on node %s" %
508
                 self.cfg.GetNodeName(node_uuid))
509

    
510
  def _GoStandalone(self):
511
    """Disconnect from the network.
512

513
    """
514
    self.feedback_fn("* changing into standalone mode")
515
    result = self.rpc.call_drbd_disconnect_net(
516
               self.all_node_uuids, (self.instance.disks, self.instance))
517
    for node_uuid, nres in result.items():
518
      nres.Raise("Cannot disconnect disks node %s" %
519
                 self.cfg.GetNodeName(node_uuid))
520

    
521
  def _GoReconnect(self, multimaster):
522
    """Reconnect to the network.
523

524
    """
525
    if multimaster:
526
      msg = "dual-master"
527
    else:
528
      msg = "single-master"
529
    self.feedback_fn("* changing disks into %s mode" % msg)
530
    result = self.rpc.call_drbd_attach_net(self.all_node_uuids,
531
                                           (self.instance.disks, self.instance),
532
                                           self.instance.name, multimaster)
533
    for node_uuid, nres in result.items():
534
      nres.Raise("Cannot change disks config on node %s" %
535
                 self.cfg.GetNodeName(node_uuid))
536

    
537
  def _ExecCleanup(self):
538
    """Try to cleanup after a failed migration.
539

540
    The cleanup is done by:
541
      - check that the instance is running only on one node
542
        (and update the config if needed)
543
      - change disks on its secondary node to secondary
544
      - wait until disks are fully synchronized
545
      - disconnect from the network
546
      - change disks into single-master mode
547
      - wait again until disks are fully synchronized
548

549
    """
550
    # check running on only one node
551
    self.feedback_fn("* checking where the instance actually runs"
552
                     " (if this hangs, the hypervisor might be in"
553
                     " a bad state)")
554
    cluster_hvparams = self.cfg.GetClusterInfo().hvparams
555
    ins_l = self.rpc.call_instance_list(self.all_node_uuids,
556
                                        [self.instance.hypervisor],
557
                                        cluster_hvparams)
558
    for node_uuid, result in ins_l.items():
559
      result.Raise("Can't contact node %s" % node_uuid)
560

    
561
    runningon_source = self.instance.name in \
562
                         ins_l[self.source_node_uuid].payload
563
    runningon_target = self.instance.name in \
564
                         ins_l[self.target_node_uuid].payload
565

    
566
    if runningon_source and runningon_target:
567
      raise errors.OpExecError("Instance seems to be running on two nodes,"
568
                               " or the hypervisor is confused; you will have"
569
                               " to ensure manually that it runs only on one"
570
                               " and restart this operation")
571

    
572
    if not (runningon_source or runningon_target):
573
      raise errors.OpExecError("Instance does not seem to be running at all;"
574
                               " in this case it's safer to repair by"
575
                               " running 'gnt-instance stop' to ensure disk"
576
                               " shutdown, and then restarting it")
577

    
578
    if runningon_target:
579
      # the migration has actually succeeded, we need to update the config
580
      self.feedback_fn("* instance running on secondary node (%s),"
581
                       " updating config" %
582
                       self.cfg.GetNodeName(self.target_node_uuid))
583
      self.instance.primary_node = self.target_node_uuid
584
      self.cfg.Update(self.instance, self.feedback_fn)
585
      demoted_node_uuid = self.source_node_uuid
586
    else:
587
      self.feedback_fn("* instance confirmed to be running on its"
588
                       " primary node (%s)" %
589
                       self.cfg.GetNodeName(self.source_node_uuid))
590
      demoted_node_uuid = self.target_node_uuid
591

    
592
    if self.instance.disk_template in constants.DTS_INT_MIRROR:
593
      self._EnsureSecondary(demoted_node_uuid)
594
      try:
595
        self._WaitUntilSync()
596
      except errors.OpExecError:
597
        # we ignore here errors, since if the device is standalone, it
598
        # won't be able to sync
599
        pass
600
      self._GoStandalone()
601
      self._GoReconnect(False)
602
      self._WaitUntilSync()
603

    
604
    self.feedback_fn("* done")
605

    
606
  def _RevertDiskStatus(self):
607
    """Try to revert the disk status after a failed migration.
608

609
    """
610
    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
611
      return
612

    
613
    try:
614
      self._EnsureSecondary(self.target_node_uuid)
615
      self._GoStandalone()
616
      self._GoReconnect(False)
617
      self._WaitUntilSync()
618
    except errors.OpExecError, err:
619
      self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
620
                         " please try to recover the instance manually;"
621
                         " error '%s'" % str(err))
622

    
623
  def _AbortMigration(self):
624
    """Call the hypervisor code to abort a started migration.
625

626
    """
627
    abort_result = self.rpc.call_instance_finalize_migration_dst(
628
                     self.target_node_uuid, self.instance, self.migration_info,
629
                     False)
630
    abort_msg = abort_result.fail_msg
631
    if abort_msg:
632
      logging.error("Aborting migration failed on target node %s: %s",
633
                    self.cfg.GetNodeName(self.target_node_uuid), abort_msg)
634
      # Don't raise an exception here, as we stil have to try to revert the
635
      # disk status, even if this step failed.
636

    
637
    abort_result = self.rpc.call_instance_finalize_migration_src(
638
      self.source_node_uuid, self.instance, False, self.live)
639
    abort_msg = abort_result.fail_msg
640
    if abort_msg:
641
      logging.error("Aborting migration failed on source node %s: %s",
642
                    self.cfg.GetNodeName(self.source_node_uuid), abort_msg)
643

    
644
  def _ExecMigration(self):
645
    """Migrate an instance.
646

647
    The migrate is done by:
648
      - change the disks into dual-master mode
649
      - wait until disks are fully synchronized again
650
      - migrate the instance
651
      - change disks on the new secondary node (the old primary) to secondary
652
      - wait until disks are fully synchronized
653
      - change disks into single-master mode
654

655
    """
656
    # Check for hypervisor version mismatch and warn the user.
657
    hvspecs = [(self.instance.hypervisor,
658
                self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor])]
659
    nodeinfo = self.rpc.call_node_info(
660
                 [self.source_node_uuid, self.target_node_uuid], None, hvspecs)
661
    for ninfo in nodeinfo.values():
662
      ninfo.Raise("Unable to retrieve node information from node '%s'" %
663
                  ninfo.node)
664
    (_, _, (src_info, )) = nodeinfo[self.source_node_uuid].payload
665
    (_, _, (dst_info, )) = nodeinfo[self.target_node_uuid].payload
666

    
667
    if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
668
        (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
669
      src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
670
      dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
671
      if src_version != dst_version:
672
        self.feedback_fn("* warning: hypervisor version mismatch between"
673
                         " source (%s) and target (%s) node" %
674
                         (src_version, dst_version))
675

    
676
    self.feedback_fn("* checking disk consistency between source and target")
677
    for (idx, dev) in enumerate(self.instance.disks):
678
      if not CheckDiskConsistency(self.lu, self.instance, dev,
679
                                  self.target_node_uuid,
680
                                  False):
681
        raise errors.OpExecError("Disk %s is degraded or not fully"
682
                                 " synchronized on target node,"
683
                                 " aborting migration" % idx)
684

    
685
    if self.current_mem > self.tgt_free_mem:
686
      if not self.allow_runtime_changes:
687
        raise errors.OpExecError("Memory ballooning not allowed and not enough"
688
                                 " free memory to fit instance %s on target"
689
                                 " node %s (have %dMB, need %dMB)" %
690
                                 (self.instance.name,
691
                                  self.cfg.GetNodeName(self.target_node_uuid),
692
                                  self.tgt_free_mem, self.current_mem))
693
      self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
694
      rpcres = self.rpc.call_instance_balloon_memory(self.instance.primary_node,
695
                                                     self.instance,
696
                                                     self.tgt_free_mem)
697
      rpcres.Raise("Cannot modify instance runtime memory")
698

    
699
    # First get the migration information from the remote node
700
    result = self.rpc.call_migration_info(self.source_node_uuid, self.instance)
701
    msg = result.fail_msg
702
    if msg:
703
      log_err = ("Failed fetching source migration information from %s: %s" %
704
                 (self.cfg.GetNodeName(self.source_node_uuid), msg))
705
      logging.error(log_err)
706
      raise errors.OpExecError(log_err)
707

    
708
    self.migration_info = migration_info = result.payload
709

    
710
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
711
      # Then switch the disks to master/master mode
712
      self._EnsureSecondary(self.target_node_uuid)
713
      self._GoStandalone()
714
      self._GoReconnect(True)
715
      self._WaitUntilSync()
716

    
717
    self.feedback_fn("* preparing %s to accept the instance" %
718
                     self.cfg.GetNodeName(self.target_node_uuid))
719
    result = self.rpc.call_accept_instance(self.target_node_uuid,
720
                                           self.instance,
721
                                           migration_info,
722
                                           self.nodes_ip[self.target_node_uuid])
723

    
724
    msg = result.fail_msg
725
    if msg:
726
      logging.error("Instance pre-migration failed, trying to revert"
727
                    " disk status: %s", msg)
728
      self.feedback_fn("Pre-migration failed, aborting")
729
      self._AbortMigration()
730
      self._RevertDiskStatus()
731
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
732
                               (self.instance.name, msg))
733

    
734
    self.feedback_fn("* migrating instance to %s" %
735
                     self.cfg.GetNodeName(self.target_node_uuid))
736
    cluster = self.cfg.GetClusterInfo()
737
    result = self.rpc.call_instance_migrate(
738
        self.source_node_uuid, cluster.cluster_name, self.instance,
739
        self.nodes_ip[self.target_node_uuid], self.live)
740
    msg = result.fail_msg
741
    if msg:
742
      logging.error("Instance migration failed, trying to revert"
743
                    " disk status: %s", msg)
744
      self.feedback_fn("Migration failed, aborting")
745
      self._AbortMigration()
746
      self._RevertDiskStatus()
747
      raise errors.OpExecError("Could not migrate instance %s: %s" %
748
                               (self.instance.name, msg))
749

    
750
    self.feedback_fn("* starting memory transfer")
751
    last_feedback = time.time()
752
    while True:
753
      result = self.rpc.call_instance_get_migration_status(
754
                 self.source_node_uuid, self.instance)
755
      msg = result.fail_msg
756
      ms = result.payload   # MigrationStatus instance
757
      if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
758
        logging.error("Instance migration failed, trying to revert"
759
                      " disk status: %s", msg)
760
        self.feedback_fn("Migration failed, aborting")
761
        self._AbortMigration()
762
        self._RevertDiskStatus()
763
        if not msg:
764
          msg = "hypervisor returned failure"
765
        raise errors.OpExecError("Could not migrate instance %s: %s" %
766
                                 (self.instance.name, msg))
767

    
768
      if result.payload.status != constants.HV_MIGRATION_ACTIVE:
769
        self.feedback_fn("* memory transfer complete")
770
        break
771

    
772
      if (utils.TimeoutExpired(last_feedback,
773
                               self._MIGRATION_FEEDBACK_INTERVAL) and
774
          ms.transferred_ram is not None):
775
        mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
776
        self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
777
        last_feedback = time.time()
778

    
779
      time.sleep(self._MIGRATION_POLL_INTERVAL)
780

    
781
    result = self.rpc.call_instance_finalize_migration_src(
782
               self.source_node_uuid, self.instance, True, self.live)
783
    msg = result.fail_msg
784
    if msg:
785
      logging.error("Instance migration succeeded, but finalization failed"
786
                    " on the source node: %s", msg)
787
      raise errors.OpExecError("Could not finalize instance migration: %s" %
788
                               msg)
789

    
790
    self.instance.primary_node = self.target_node_uuid
791

    
792
    # distribute new instance config to the other nodes
793
    self.cfg.Update(self.instance, self.feedback_fn)
794

    
795
    result = self.rpc.call_instance_finalize_migration_dst(
796
               self.target_node_uuid, self.instance, migration_info, True)
797
    msg = result.fail_msg
798
    if msg:
799
      logging.error("Instance migration succeeded, but finalization failed"
800
                    " on the target node: %s", msg)
801
      raise errors.OpExecError("Could not finalize instance migration: %s" %
802
                               msg)
803

    
804
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
805
      self._EnsureSecondary(self.source_node_uuid)
806
      self._WaitUntilSync()
807
      self._GoStandalone()
808
      self._GoReconnect(False)
809
      self._WaitUntilSync()
810

    
811
    # If the instance's disk template is `rbd' or `ext' and there was a
812
    # successful migration, unmap the device from the source node.
813
    if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
814
      disks = ExpandCheckDisks(self.instance, self.instance.disks)
815
      self.feedback_fn("* unmapping instance's disks from %s" %
816
                       self.cfg.GetNodeName(self.source_node_uuid))
817
      for disk in disks:
818
        result = self.rpc.call_blockdev_shutdown(self.source_node_uuid,
819
                                                 (disk, self.instance))
820
        msg = result.fail_msg
821
        if msg:
822
          logging.error("Migration was successful, but couldn't unmap the"
823
                        " block device %s on source node %s: %s",
824
                        disk.iv_name,
825
                        self.cfg.GetNodeName(self.source_node_uuid), msg)
826
          logging.error("You need to unmap the device %s manually on %s",
827
                        disk.iv_name,
828
                        self.cfg.GetNodeName(self.source_node_uuid))
829

    
830
    self.feedback_fn("* done")
831

    
832
  def _ExecFailover(self):
833
    """Failover an instance.
834

835
    The failover is done by shutting it down on its present node and
836
    starting it on the secondary.
837

838
    """
839
    primary_node = self.cfg.GetNodeInfo(self.instance.primary_node)
840

    
841
    source_node_uuid = self.instance.primary_node
842

    
843
    if self.instance.disks_active:
844
      self.feedback_fn("* checking disk consistency between source and target")
845
      for (idx, dev) in enumerate(self.instance.disks):
846
        # for drbd, these are drbd over lvm
847
        if not CheckDiskConsistency(self.lu, self.instance, dev,
848
                                    self.target_node_uuid, False):
849
          if primary_node.offline:
850
            self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
851
                             " target node %s" %
852
                             (primary_node.name, idx,
853
                              self.cfg.GetNodeName(self.target_node_uuid)))
854
          elif not self.ignore_consistency:
855
            raise errors.OpExecError("Disk %s is degraded on target node,"
856
                                     " aborting failover" % idx)
857
    else:
858
      self.feedback_fn("* not checking disk consistency as instance is not"
859
                       " running")
860

    
861
    self.feedback_fn("* shutting down instance on source node")
862
    logging.info("Shutting down instance %s on node %s",
863
                 self.instance.name, self.cfg.GetNodeName(source_node_uuid))
864

    
865
    result = self.rpc.call_instance_shutdown(source_node_uuid, self.instance,
866
                                             self.shutdown_timeout,
867
                                             self.lu.op.reason)
868
    msg = result.fail_msg
869
    if msg:
870
      if self.ignore_consistency or primary_node.offline:
871
        self.lu.LogWarning("Could not shutdown instance %s on node %s,"
872
                           " proceeding anyway; please make sure node"
873
                           " %s is down; error details: %s",
874
                           self.instance.name,
875
                           self.cfg.GetNodeName(source_node_uuid),
876
                           self.cfg.GetNodeName(source_node_uuid), msg)
877
      else:
878
        raise errors.OpExecError("Could not shutdown instance %s on"
879
                                 " node %s: %s" %
880
                                 (self.instance.name,
881
                                  self.cfg.GetNodeName(source_node_uuid), msg))
882

    
883
    self.feedback_fn("* deactivating the instance's disks on source node")
884
    if not ShutdownInstanceDisks(self.lu, self.instance, ignore_primary=True):
885
      raise errors.OpExecError("Can't shut down the instance's disks")
886

    
887
    self.instance.primary_node = self.target_node_uuid
888
    # distribute new instance config to the other nodes
889
    self.cfg.Update(self.instance, self.feedback_fn)
890

    
891
    # Only start the instance if it's marked as up
892
    if self.instance.admin_state == constants.ADMINST_UP:
893
      self.feedback_fn("* activating the instance's disks on target node %s" %
894
                       self.cfg.GetNodeName(self.target_node_uuid))
895
      logging.info("Starting instance %s on node %s", self.instance.name,
896
                   self.cfg.GetNodeName(self.target_node_uuid))
897

    
898
      disks_ok, _ = AssembleInstanceDisks(self.lu, self.instance,
899
                                          ignore_secondaries=True)
900
      if not disks_ok:
901
        ShutdownInstanceDisks(self.lu, self.instance)
902
        raise errors.OpExecError("Can't activate the instance's disks")
903

    
904
      self.feedback_fn("* starting the instance on the target node %s" %
905
                       self.cfg.GetNodeName(self.target_node_uuid))
906
      result = self.rpc.call_instance_start(self.target_node_uuid,
907
                                            (self.instance, None, None), False,
908
                                            self.lu.op.reason)
909
      msg = result.fail_msg
910
      if msg:
911
        ShutdownInstanceDisks(self.lu, self.instance)
912
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
913
                                 (self.instance.name,
914
                                  self.cfg.GetNodeName(self.target_node_uuid),
915
                                  msg))
916

    
917
  def Exec(self, feedback_fn):
918
    """Perform the migration.
919

920
    """
921
    self.feedback_fn = feedback_fn
922
    self.source_node_uuid = self.instance.primary_node
923

    
924
    # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
925
    if self.instance.disk_template in constants.DTS_INT_MIRROR:
926
      self.target_node_uuid = self.instance.secondary_nodes[0]
927
      # Otherwise self.target_node has been populated either
928
      # directly, or through an iallocator.
929

    
930
    self.all_node_uuids = [self.source_node_uuid, self.target_node_uuid]
931
    self.nodes_ip = dict((uuid, node.secondary_ip) for (uuid, node)
932
                         in self.cfg.GetMultiNodeInfo(self.all_node_uuids))
933

    
934
    if self.failover:
935
      feedback_fn("Failover instance %s" % self.instance.name)
936
      self._ExecFailover()
937
    else:
938
      feedback_fn("Migrating instance %s" % self.instance.name)
939

    
940
      if self.cleanup:
941
        return self._ExecCleanup()
942
      else:
943
        return self._ExecMigration()