Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_migration.py @ 9f7d5fe4

History | View | Annotate | Download (37.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with instance migration an failover."""
23

    
24
import logging
25
import time
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti.masterd import iallocator
31
from ganeti import utils
32
from ganeti.cmdlib.base import LogicalUnit, Tasklet
33
from ganeti.cmdlib.common import ExpandInstanceUuidAndName, \
34
  CheckIAllocatorOrNode, ExpandNodeUuidAndName
35
from ganeti.cmdlib.instance_storage import CheckDiskConsistency, \
36
  ExpandCheckDisks, ShutdownInstanceDisks, AssembleInstanceDisks
37
from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \
38
  CheckTargetNodeIPolicy, ReleaseLocks, CheckNodeNotDrained, \
39
  CopyLockList, CheckNodeFreeMemory, CheckInstanceBridgesExist
40

    
41
import ganeti.masterd.instance
42

    
43

    
44
def _ExpandNamesForMigration(lu):
45
  """Expands names for use with L{TLMigrateInstance}.
46

47
  @type lu: L{LogicalUnit}
48

49
  """
50
  if lu.op.target_node is not None:
51
    (lu.op.target_node_uuid, lu.op.target_node) = \
52
      ExpandNodeUuidAndName(lu.cfg, lu.op.target_node_uuid, lu.op.target_node)
53

    
54
  lu.needed_locks[locking.LEVEL_NODE] = []
55
  lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
56

    
57
  lu.needed_locks[locking.LEVEL_NODE_RES] = []
58
  lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
59

    
60
  # The node allocation lock is actually only needed for externally replicated
61
  # instances (e.g. sharedfile or RBD) and if an iallocator is used.
62
  lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
63

    
64

    
65
def _DeclareLocksForMigration(lu, level):
66
  """Declares locks for L{TLMigrateInstance}.
67

68
  @type lu: L{LogicalUnit}
69
  @param level: Lock level
70

71
  """
72
  if level == locking.LEVEL_NODE_ALLOC:
73
    assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
74

    
75
    instance = lu.cfg.GetInstanceInfo(lu.op.instance_uuid)
76

    
77
    # Node locks are already declared here rather than at LEVEL_NODE as we need
78
    # the instance object anyway to declare the node allocation lock.
79
    if instance.disk_template in constants.DTS_EXT_MIRROR:
80
      if lu.op.target_node is None:
81
        lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
82
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
83
      else:
84
        lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
85
                                               lu.op.target_node_uuid]
86
      del lu.recalculate_locks[locking.LEVEL_NODE]
87
    else:
88
      lu._LockInstancesNodes() # pylint: disable=W0212
89

    
90
  elif level == locking.LEVEL_NODE:
91
    # Node locks are declared together with the node allocation lock
92
    assert (lu.needed_locks[locking.LEVEL_NODE] or
93
            lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
94

    
95
  elif level == locking.LEVEL_NODE_RES:
96
    # Copy node locks
97
    lu.needed_locks[locking.LEVEL_NODE_RES] = \
98
      CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
99

    
100

    
101
class LUInstanceFailover(LogicalUnit):
102
  """Failover an instance.
103

104
  """
105
  HPATH = "instance-failover"
106
  HTYPE = constants.HTYPE_INSTANCE
107
  REQ_BGL = False
108

    
109
  def CheckArguments(self):
110
    """Check the arguments.
111

112
    """
113
    self.iallocator = getattr(self.op, "iallocator", None)
114
    self.target_node = getattr(self.op, "target_node", None)
115

    
116
  def ExpandNames(self):
117
    self._ExpandAndLockInstance()
118
    _ExpandNamesForMigration(self)
119

    
120
    self._migrater = \
121
      TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name,
122
                        False, True, False, self.op.ignore_consistency, True,
123
                        self.op.shutdown_timeout, self.op.ignore_ipolicy)
124

    
125
    self.tasklets = [self._migrater]
126

    
127
  def DeclareLocks(self, level):
128
    _DeclareLocksForMigration(self, level)
129

    
130
  def BuildHooksEnv(self):
131
    """Build hooks env.
132

133
    This runs on master, primary and secondary nodes of the instance.
134

135
    """
136
    instance = self._migrater.instance
137
    source_node_uuid = instance.primary_node
138
    env = {
139
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
140
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
141
      "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid),
142
      "NEW_PRIMARY": self.op.target_node,
143
      }
144

    
145
    if instance.disk_template in constants.DTS_INT_MIRROR:
146
      env["OLD_SECONDARY"] = self.cfg.GetNodeName(instance.secondary_nodes[0])
147
      env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid)
148
    else:
149
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
150

    
151
    env.update(BuildInstanceHookEnvByObject(self, instance))
152

    
153
    return env
154

    
155
  def BuildHooksNodes(self):
156
    """Build hooks nodes.
157

158
    """
159
    instance = self._migrater.instance
160
    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
161
    return (nl, nl + [instance.primary_node])
162

    
163

    
164
class LUInstanceMigrate(LogicalUnit):
165
  """Migrate an instance.
166

167
  This is migration without shutting down, compared to the failover,
168
  which is done with shutdown.
169

170
  """
171
  HPATH = "instance-migrate"
172
  HTYPE = constants.HTYPE_INSTANCE
173
  REQ_BGL = False
174

    
175
  def ExpandNames(self):
176
    self._ExpandAndLockInstance()
177
    _ExpandNamesForMigration(self)
178

    
179
    self._migrater = \
180
      TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name,
181
                        self.op.cleanup, False, self.op.allow_failover, False,
182
                        self.op.allow_runtime_changes,
183
                        constants.DEFAULT_SHUTDOWN_TIMEOUT,
184
                        self.op.ignore_ipolicy)
185

    
186
    self.tasklets = [self._migrater]
187

    
188
  def DeclareLocks(self, level):
189
    _DeclareLocksForMigration(self, level)
190

    
191
  def BuildHooksEnv(self):
192
    """Build hooks env.
193

194
    This runs on master, primary and secondary nodes of the instance.
195

196
    """
197
    instance = self._migrater.instance
198
    source_node_uuid = instance.primary_node
199
    env = BuildInstanceHookEnvByObject(self, instance)
200
    env.update({
201
      "MIGRATE_LIVE": self._migrater.live,
202
      "MIGRATE_CLEANUP": self.op.cleanup,
203
      "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid),
204
      "NEW_PRIMARY": self.op.target_node,
205
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
206
      })
207

    
208
    if instance.disk_template in constants.DTS_INT_MIRROR:
209
      env["OLD_SECONDARY"] = self.cfg.GetNodeName(instance.secondary_nodes[0])
210
      env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid)
211
    else:
212
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
213

    
214
    return env
215

    
216
  def BuildHooksNodes(self):
217
    """Build hooks nodes.
218

219
    """
220
    instance = self._migrater.instance
221
    snode_uuids = list(instance.secondary_nodes)
222
    nl = [self.cfg.GetMasterNode(), instance.primary_node] + snode_uuids
223
    return (nl, nl)
224

    
225

    
226
class TLMigrateInstance(Tasklet):
227
  """Tasklet class for instance migration.
228

229
  @type live: boolean
230
  @ivar live: whether the migration will be done live or non-live;
231
      this variable is initalized only after CheckPrereq has run
232
  @type cleanup: boolean
233
  @ivar cleanup: Wheater we cleanup from a failed migration
234
  @type iallocator: string
235
  @ivar iallocator: The iallocator used to determine target_node
236
  @type target_node_uuid: string
237
  @ivar target_node_uuid: If given, the target node UUID to reallocate the
238
      instance to
239
  @type failover: boolean
240
  @ivar failover: Whether operation results in failover or migration
241
  @type fallback: boolean
242
  @ivar fallback: Whether fallback to failover is allowed if migration not
243
                  possible
244
  @type ignore_consistency: boolean
245
  @ivar ignore_consistency: Wheter we should ignore consistency between source
246
                            and target node
247
  @type shutdown_timeout: int
248
  @ivar shutdown_timeout: In case of failover timeout of the shutdown
249
  @type ignore_ipolicy: bool
250
  @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
251

252
  """
253

    
254
  # Constants
255
  _MIGRATION_POLL_INTERVAL = 1      # seconds
256
  _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
257

    
258
  def __init__(self, lu, instance_uuid, instance_name, cleanup, failover,
259
               fallback, ignore_consistency, allow_runtime_changes,
260
               shutdown_timeout, ignore_ipolicy):
261
    """Initializes this class.
262

263
    """
264
    Tasklet.__init__(self, lu)
265

    
266
    # Parameters
267
    self.instance_uuid = instance_uuid
268
    self.instance_name = instance_name
269
    self.cleanup = cleanup
270
    self.live = False # will be overridden later
271
    self.failover = failover
272
    self.fallback = fallback
273
    self.ignore_consistency = ignore_consistency
274
    self.shutdown_timeout = shutdown_timeout
275
    self.ignore_ipolicy = ignore_ipolicy
276
    self.allow_runtime_changes = allow_runtime_changes
277

    
278
  def CheckPrereq(self):
279
    """Check prerequisites.
280

281
    This checks that the instance is in the cluster.
282

283
    """
284
    (self.instance_uuid, self.instance_name) = \
285
      ExpandInstanceUuidAndName(self.lu.cfg, self.instance_uuid,
286
                                self.instance_name)
287
    self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
288
    assert self.instance is not None
289
    cluster = self.cfg.GetClusterInfo()
290

    
291
    if (not self.cleanup and
292
        not self.instance.admin_state == constants.ADMINST_UP and
293
        not self.failover and self.fallback):
294
      self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
295
                      " switching to failover")
296
      self.failover = True
297

    
298
    if self.instance.disk_template not in constants.DTS_MIRRORED:
299
      if self.failover:
300
        text = "failovers"
301
      else:
302
        text = "migrations"
303
      raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
304
                                 " %s" % (self.instance.disk_template, text),
305
                                 errors.ECODE_STATE)
306

    
307
    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
308
      CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
309

    
310
      if self.lu.op.iallocator:
311
        assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
312
        self._RunAllocator()
313
      else:
314
        # We set set self.target_node_uuid as it is required by
315
        # BuildHooksEnv
316
        self.target_node_uuid = self.lu.op.target_node_uuid
317

    
318
      # Check that the target node is correct in terms of instance policy
319
      nodeinfo = self.cfg.GetNodeInfo(self.target_node_uuid)
320
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
321
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
322
                                                              group_info)
323
      CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo,
324
                             self.cfg, ignore=self.ignore_ipolicy)
325

    
326
      # self.target_node is already populated, either directly or by the
327
      # iallocator run
328
      target_node_uuid = self.target_node_uuid
329
      if self.target_node_uuid == self.instance.primary_node:
330
        raise errors.OpPrereqError(
331
          "Cannot migrate instance %s to its primary (%s)" %
332
          (self.instance.name,
333
           self.cfg.GetNodeName(self.instance.primary_node)),
334
          errors.ECODE_STATE)
335

    
336
      if len(self.lu.tasklets) == 1:
337
        # It is safe to release locks only when we're the only tasklet
338
        # in the LU
339
        ReleaseLocks(self.lu, locking.LEVEL_NODE,
340
                     keep=[self.instance.primary_node, self.target_node_uuid])
341
        ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
342

    
343
    else:
344
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
345

    
346
      secondary_node_uuids = self.instance.secondary_nodes
347
      if not secondary_node_uuids:
348
        raise errors.ConfigurationError("No secondary node but using"
349
                                        " %s disk template" %
350
                                        self.instance.disk_template)
351
      target_node_uuid = secondary_node_uuids[0]
352
      if self.lu.op.iallocator or \
353
        (self.lu.op.target_node_uuid and
354
         self.lu.op.target_node_uuid != target_node_uuid):
355
        if self.failover:
356
          text = "failed over"
357
        else:
358
          text = "migrated"
359
        raise errors.OpPrereqError("Instances with disk template %s cannot"
360
                                   " be %s to arbitrary nodes"
361
                                   " (neither an iallocator nor a target"
362
                                   " node can be passed)" %
363
                                   (self.instance.disk_template, text),
364
                                   errors.ECODE_INVAL)
365
      nodeinfo = self.cfg.GetNodeInfo(target_node_uuid)
366
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
367
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
368
                                                              group_info)
369
      CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo,
370
                             self.cfg, ignore=self.ignore_ipolicy)
371

    
372
    i_be = cluster.FillBE(self.instance)
373

    
374
    # check memory requirements on the secondary node
375
    if (not self.cleanup and
376
         (not self.failover or
377
           self.instance.admin_state == constants.ADMINST_UP)):
378
      self.tgt_free_mem = CheckNodeFreeMemory(
379
          self.lu, target_node_uuid,
380
          "migrating instance %s" % self.instance.name,
381
          i_be[constants.BE_MINMEM], self.instance.hypervisor,
382
          self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor])
383
    else:
384
      self.lu.LogInfo("Not checking memory on the secondary node as"
385
                      " instance will not be started")
386

    
387
    # check if failover must be forced instead of migration
388
    if (not self.cleanup and not self.failover and
389
        i_be[constants.BE_ALWAYS_FAILOVER]):
390
      self.lu.LogInfo("Instance configured to always failover; fallback"
391
                      " to failover")
392
      self.failover = True
393

    
394
    # check bridge existance
395
    CheckInstanceBridgesExist(self.lu, self.instance,
396
                              node_uuid=target_node_uuid)
397

    
398
    if not self.cleanup:
399
      CheckNodeNotDrained(self.lu, target_node_uuid)
400
      if not self.failover:
401
        result = self.rpc.call_instance_migratable(self.instance.primary_node,
402
                                                   self.instance)
403
        if result.fail_msg and self.fallback:
404
          self.lu.LogInfo("Can't migrate, instance offline, fallback to"
405
                          " failover")
406
          self.failover = True
407
        else:
408
          result.Raise("Can't migrate, please use failover",
409
                       prereq=True, ecode=errors.ECODE_STATE)
410

    
411
    assert not (self.failover and self.cleanup)
412

    
413
    if not self.failover:
414
      if self.lu.op.live is not None and self.lu.op.mode is not None:
415
        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
416
                                   " parameters are accepted",
417
                                   errors.ECODE_INVAL)
418
      if self.lu.op.live is not None:
419
        if self.lu.op.live:
420
          self.lu.op.mode = constants.HT_MIGRATION_LIVE
421
        else:
422
          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
423
        # reset the 'live' parameter to None so that repeated
424
        # invocations of CheckPrereq do not raise an exception
425
        self.lu.op.live = None
426
      elif self.lu.op.mode is None:
427
        # read the default value from the hypervisor
428
        i_hv = cluster.FillHV(self.instance, skip_globals=False)
429
        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
430

    
431
      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
432
    else:
433
      # Failover is never live
434
      self.live = False
435

    
436
    if not (self.failover or self.cleanup):
437
      remote_info = self.rpc.call_instance_info(
438
          self.instance.primary_node, self.instance.name,
439
          self.instance.hypervisor, cluster.hvparams[self.instance.hypervisor])
440
      remote_info.Raise("Error checking instance on node %s" %
441
                        self.cfg.GetNodeName(self.instance.primary_node))
442
      instance_running = bool(remote_info.payload)
443
      if instance_running:
444
        self.current_mem = int(remote_info.payload["memory"])
445

    
446
  def _RunAllocator(self):
447
    """Run the allocator based on input opcode.
448

449
    """
450
    assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
451

    
452
    # FIXME: add a self.ignore_ipolicy option
453
    req = iallocator.IAReqRelocate(
454
          inst_uuid=self.instance_uuid,
455
          relocate_from_node_uuids=[self.instance.primary_node])
456
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
457

    
458
    ial.Run(self.lu.op.iallocator)
459

    
460
    if not ial.success:
461
      raise errors.OpPrereqError("Can't compute nodes using"
462
                                 " iallocator '%s': %s" %
463
                                 (self.lu.op.iallocator, ial.info),
464
                                 errors.ECODE_NORES)
465
    self.target_node_uuid = self.cfg.GetNodeInfoByName(ial.result[0]).uuid
466
    self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
467
                    self.instance_name, self.lu.op.iallocator,
468
                    utils.CommaJoin(ial.result))
469

    
470
  def _WaitUntilSync(self):
471
    """Poll with custom rpc for disk sync.
472

473
    This uses our own step-based rpc call.
474

475
    """
476
    self.feedback_fn("* wait until resync is done")
477
    all_done = False
478
    while not all_done:
479
      all_done = True
480
      result = self.rpc.call_drbd_wait_sync(self.all_node_uuids,
481
                                            self.nodes_ip,
482
                                            (self.instance.disks,
483
                                             self.instance))
484
      min_percent = 100
485
      for node_uuid, nres in result.items():
486
        nres.Raise("Cannot resync disks on node %s" %
487
                   self.cfg.GetNodeName(node_uuid))
488
        node_done, node_percent = nres.payload
489
        all_done = all_done and node_done
490
        if node_percent is not None:
491
          min_percent = min(min_percent, node_percent)
492
      if not all_done:
493
        if min_percent < 100:
494
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
495
        time.sleep(2)
496

    
497
  def _EnsureSecondary(self, node_uuid):
498
    """Demote a node to secondary.
499

500
    """
501
    self.feedback_fn("* switching node %s to secondary mode" %
502
                     self.cfg.GetNodeName(node_uuid))
503

    
504
    for dev in self.instance.disks:
505
      self.cfg.SetDiskID(dev, node_uuid)
506

    
507
    result = self.rpc.call_blockdev_close(node_uuid, self.instance.name,
508
                                          self.instance.disks)
509
    result.Raise("Cannot change disk to secondary on node %s" %
510
                 self.cfg.GetNodeName(node_uuid))
511

    
512
  def _GoStandalone(self):
513
    """Disconnect from the network.
514

515
    """
516
    self.feedback_fn("* changing into standalone mode")
517
    result = self.rpc.call_drbd_disconnect_net(self.all_node_uuids,
518
                                               self.nodes_ip,
519
                                               self.instance.disks)
520
    for node_uuid, nres in result.items():
521
      nres.Raise("Cannot disconnect disks node %s" %
522
                 self.cfg.GetNodeName(node_uuid))
523

    
524
  def _GoReconnect(self, multimaster):
525
    """Reconnect to the network.
526

527
    """
528
    if multimaster:
529
      msg = "dual-master"
530
    else:
531
      msg = "single-master"
532
    self.feedback_fn("* changing disks into %s mode" % msg)
533
    result = self.rpc.call_drbd_attach_net(self.all_node_uuids, self.nodes_ip,
534
                                           (self.instance.disks, self.instance),
535
                                           self.instance.name, multimaster)
536
    for node_uuid, nres in result.items():
537
      nres.Raise("Cannot change disks config on node %s" %
538
                 self.cfg.GetNodeName(node_uuid))
539

    
540
  def _ExecCleanup(self):
541
    """Try to cleanup after a failed migration.
542

543
    The cleanup is done by:
544
      - check that the instance is running only on one node
545
        (and update the config if needed)
546
      - change disks on its secondary node to secondary
547
      - wait until disks are fully synchronized
548
      - disconnect from the network
549
      - change disks into single-master mode
550
      - wait again until disks are fully synchronized
551

552
    """
553
    # check running on only one node
554
    self.feedback_fn("* checking where the instance actually runs"
555
                     " (if this hangs, the hypervisor might be in"
556
                     " a bad state)")
557
    cluster_hvparams = self.cfg.GetClusterInfo().hvparams
558
    ins_l = self.rpc.call_instance_list(self.all_node_uuids,
559
                                        [self.instance.hypervisor],
560
                                        cluster_hvparams)
561
    for node_uuid, result in ins_l.items():
562
      result.Raise("Can't contact node %s" % node_uuid)
563

    
564
    runningon_source = self.instance.name in \
565
                         ins_l[self.source_node_uuid].payload
566
    runningon_target = self.instance.name in \
567
                         ins_l[self.target_node_uuid].payload
568

    
569
    if runningon_source and runningon_target:
570
      raise errors.OpExecError("Instance seems to be running on two nodes,"
571
                               " or the hypervisor is confused; you will have"
572
                               " to ensure manually that it runs only on one"
573
                               " and restart this operation")
574

    
575
    if not (runningon_source or runningon_target):
576
      raise errors.OpExecError("Instance does not seem to be running at all;"
577
                               " in this case it's safer to repair by"
578
                               " running 'gnt-instance stop' to ensure disk"
579
                               " shutdown, and then restarting it")
580

    
581
    if runningon_target:
582
      # the migration has actually succeeded, we need to update the config
583
      self.feedback_fn("* instance running on secondary node (%s),"
584
                       " updating config" %
585
                       self.cfg.GetNodeName(self.target_node_uuid))
586
      self.instance.primary_node = self.target_node_uuid
587
      self.cfg.Update(self.instance, self.feedback_fn)
588
      demoted_node_uuid = self.source_node_uuid
589
    else:
590
      self.feedback_fn("* instance confirmed to be running on its"
591
                       " primary node (%s)" %
592
                       self.cfg.GetNodeName(self.source_node_uuid))
593
      demoted_node_uuid = self.target_node_uuid
594

    
595
    if self.instance.disk_template in constants.DTS_INT_MIRROR:
596
      self._EnsureSecondary(demoted_node_uuid)
597
      try:
598
        self._WaitUntilSync()
599
      except errors.OpExecError:
600
        # we ignore here errors, since if the device is standalone, it
601
        # won't be able to sync
602
        pass
603
      self._GoStandalone()
604
      self._GoReconnect(False)
605
      self._WaitUntilSync()
606

    
607
    self.feedback_fn("* done")
608

    
609
  def _RevertDiskStatus(self):
610
    """Try to revert the disk status after a failed migration.
611

612
    """
613
    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
614
      return
615

    
616
    try:
617
      self._EnsureSecondary(self.target_node_uuid)
618
      self._GoStandalone()
619
      self._GoReconnect(False)
620
      self._WaitUntilSync()
621
    except errors.OpExecError, err:
622
      self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
623
                         " please try to recover the instance manually;"
624
                         " error '%s'" % str(err))
625

    
626
  def _AbortMigration(self):
627
    """Call the hypervisor code to abort a started migration.
628

629
    """
630
    abort_result = self.rpc.call_instance_finalize_migration_dst(
631
                     self.target_node_uuid, self.instance, self.migration_info,
632
                     False)
633
    abort_msg = abort_result.fail_msg
634
    if abort_msg:
635
      logging.error("Aborting migration failed on target node %s: %s",
636
                    self.cfg.GetNodeName(self.target_node_uuid), abort_msg)
637
      # Don't raise an exception here, as we stil have to try to revert the
638
      # disk status, even if this step failed.
639

    
640
    abort_result = self.rpc.call_instance_finalize_migration_src(
641
      self.source_node_uuid, self.instance, False, self.live)
642
    abort_msg = abort_result.fail_msg
643
    if abort_msg:
644
      logging.error("Aborting migration failed on source node %s: %s",
645
                    self.cfg.GetNodeName(self.source_node_uuid), abort_msg)
646

    
647
  def _ExecMigration(self):
648
    """Migrate an instance.
649

650
    The migrate is done by:
651
      - change the disks into dual-master mode
652
      - wait until disks are fully synchronized again
653
      - migrate the instance
654
      - change disks on the new secondary node (the old primary) to secondary
655
      - wait until disks are fully synchronized
656
      - change disks into single-master mode
657

658
    """
659
    # Check for hypervisor version mismatch and warn the user.
660
    hvspecs = [(self.instance.hypervisor,
661
                self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor])]
662
    nodeinfo = self.rpc.call_node_info(
663
                 [self.source_node_uuid, self.target_node_uuid], None, hvspecs)
664
    for ninfo in nodeinfo.values():
665
      ninfo.Raise("Unable to retrieve node information from node '%s'" %
666
                  ninfo.node)
667
    (_, _, (src_info, )) = nodeinfo[self.source_node_uuid].payload
668
    (_, _, (dst_info, )) = nodeinfo[self.target_node_uuid].payload
669

    
670
    if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
671
        (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
672
      src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
673
      dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
674
      if src_version != dst_version:
675
        self.feedback_fn("* warning: hypervisor version mismatch between"
676
                         " source (%s) and target (%s) node" %
677
                         (src_version, dst_version))
678

    
679
    self.feedback_fn("* checking disk consistency between source and target")
680
    for (idx, dev) in enumerate(self.instance.disks):
681
      if not CheckDiskConsistency(self.lu, self.instance, dev,
682
                                  self.target_node_uuid,
683
                                  False):
684
        raise errors.OpExecError("Disk %s is degraded or not fully"
685
                                 " synchronized on target node,"
686
                                 " aborting migration" % idx)
687

    
688
    if self.current_mem > self.tgt_free_mem:
689
      if not self.allow_runtime_changes:
690
        raise errors.OpExecError("Memory ballooning not allowed and not enough"
691
                                 " free memory to fit instance %s on target"
692
                                 " node %s (have %dMB, need %dMB)" %
693
                                 (self.instance.name,
694
                                  self.cfg.GetNodeName(self.target_node_uuid),
695
                                  self.tgt_free_mem, self.current_mem))
696
      self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
697
      rpcres = self.rpc.call_instance_balloon_memory(self.instance.primary_node,
698
                                                     self.instance,
699
                                                     self.tgt_free_mem)
700
      rpcres.Raise("Cannot modify instance runtime memory")
701

    
702
    # First get the migration information from the remote node
703
    result = self.rpc.call_migration_info(self.source_node_uuid, self.instance)
704
    msg = result.fail_msg
705
    if msg:
706
      log_err = ("Failed fetching source migration information from %s: %s" %
707
                 (self.cfg.GetNodeName(self.source_node_uuid), msg))
708
      logging.error(log_err)
709
      raise errors.OpExecError(log_err)
710

    
711
    self.migration_info = migration_info = result.payload
712

    
713
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
714
      # Then switch the disks to master/master mode
715
      self._EnsureSecondary(self.target_node_uuid)
716
      self._GoStandalone()
717
      self._GoReconnect(True)
718
      self._WaitUntilSync()
719

    
720
    self.feedback_fn("* preparing %s to accept the instance" %
721
                     self.cfg.GetNodeName(self.target_node_uuid))
722
    result = self.rpc.call_accept_instance(self.target_node_uuid,
723
                                           self.instance,
724
                                           migration_info,
725
                                           self.nodes_ip[self.target_node_uuid])
726

    
727
    msg = result.fail_msg
728
    if msg:
729
      logging.error("Instance pre-migration failed, trying to revert"
730
                    " disk status: %s", msg)
731
      self.feedback_fn("Pre-migration failed, aborting")
732
      self._AbortMigration()
733
      self._RevertDiskStatus()
734
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
735
                               (self.instance.name, msg))
736

    
737
    self.feedback_fn("* migrating instance to %s" %
738
                     self.cfg.GetNodeName(self.target_node_uuid))
739
    cluster = self.cfg.GetClusterInfo()
740
    result = self.rpc.call_instance_migrate(
741
        self.source_node_uuid, cluster.cluster_name, self.instance,
742
        self.nodes_ip[self.target_node_uuid], self.live)
743
    msg = result.fail_msg
744
    if msg:
745
      logging.error("Instance migration failed, trying to revert"
746
                    " disk status: %s", msg)
747
      self.feedback_fn("Migration failed, aborting")
748
      self._AbortMigration()
749
      self._RevertDiskStatus()
750
      raise errors.OpExecError("Could not migrate instance %s: %s" %
751
                               (self.instance.name, msg))
752

    
753
    self.feedback_fn("* starting memory transfer")
754
    last_feedback = time.time()
755
    while True:
756
      result = self.rpc.call_instance_get_migration_status(
757
                 self.source_node_uuid, self.instance)
758
      msg = result.fail_msg
759
      ms = result.payload   # MigrationStatus instance
760
      if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
761
        logging.error("Instance migration failed, trying to revert"
762
                      " disk status: %s", msg)
763
        self.feedback_fn("Migration failed, aborting")
764
        self._AbortMigration()
765
        self._RevertDiskStatus()
766
        if not msg:
767
          msg = "hypervisor returned failure"
768
        raise errors.OpExecError("Could not migrate instance %s: %s" %
769
                                 (self.instance.name, msg))
770

    
771
      if result.payload.status != constants.HV_MIGRATION_ACTIVE:
772
        self.feedback_fn("* memory transfer complete")
773
        break
774

    
775
      if (utils.TimeoutExpired(last_feedback,
776
                               self._MIGRATION_FEEDBACK_INTERVAL) and
777
          ms.transferred_ram is not None):
778
        mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
779
        self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
780
        last_feedback = time.time()
781

    
782
      time.sleep(self._MIGRATION_POLL_INTERVAL)
783

    
784
    result = self.rpc.call_instance_finalize_migration_src(
785
               self.source_node_uuid, self.instance, True, self.live)
786
    msg = result.fail_msg
787
    if msg:
788
      logging.error("Instance migration succeeded, but finalization failed"
789
                    " on the source node: %s", msg)
790
      raise errors.OpExecError("Could not finalize instance migration: %s" %
791
                               msg)
792

    
793
    self.instance.primary_node = self.target_node_uuid
794

    
795
    # distribute new instance config to the other nodes
796
    self.cfg.Update(self.instance, self.feedback_fn)
797

    
798
    result = self.rpc.call_instance_finalize_migration_dst(
799
               self.target_node_uuid, self.instance, migration_info, True)
800
    msg = result.fail_msg
801
    if msg:
802
      logging.error("Instance migration succeeded, but finalization failed"
803
                    " on the target node: %s", msg)
804
      raise errors.OpExecError("Could not finalize instance migration: %s" %
805
                               msg)
806

    
807
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
808
      self._EnsureSecondary(self.source_node_uuid)
809
      self._WaitUntilSync()
810
      self._GoStandalone()
811
      self._GoReconnect(False)
812
      self._WaitUntilSync()
813

    
814
    # If the instance's disk template is `rbd' or `ext' and there was a
815
    # successful migration, unmap the device from the source node.
816
    if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
817
      disks = ExpandCheckDisks(self.instance, self.instance.disks)
818
      self.feedback_fn("* unmapping instance's disks from %s" %
819
                       self.cfg.GetNodeName(self.source_node_uuid))
820
      for disk in disks:
821
        result = self.rpc.call_blockdev_shutdown(self.source_node_uuid,
822
                                                 (disk, self.instance))
823
        msg = result.fail_msg
824
        if msg:
825
          logging.error("Migration was successful, but couldn't unmap the"
826
                        " block device %s on source node %s: %s",
827
                        disk.iv_name,
828
                        self.cfg.GetNodeName(self.source_node_uuid), msg)
829
          logging.error("You need to unmap the device %s manually on %s",
830
                        disk.iv_name,
831
                        self.cfg.GetNodeName(self.source_node_uuid))
832

    
833
    self.feedback_fn("* done")
834

    
835
  def _ExecFailover(self):
836
    """Failover an instance.
837

838
    The failover is done by shutting it down on its present node and
839
    starting it on the secondary.
840

841
    """
842
    primary_node = self.cfg.GetNodeInfo(self.instance.primary_node)
843

    
844
    source_node_uuid = self.instance.primary_node
845

    
846
    if self.instance.disks_active:
847
      self.feedback_fn("* checking disk consistency between source and target")
848
      for (idx, dev) in enumerate(self.instance.disks):
849
        # for drbd, these are drbd over lvm
850
        if not CheckDiskConsistency(self.lu, self.instance, dev,
851
                                    self.target_node_uuid, False):
852
          if primary_node.offline:
853
            self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
854
                             " target node %s" %
855
                             (primary_node.name, idx,
856
                              self.cfg.GetNodeName(self.target_node_uuid)))
857
          elif not self.ignore_consistency:
858
            raise errors.OpExecError("Disk %s is degraded on target node,"
859
                                     " aborting failover" % idx)
860
    else:
861
      self.feedback_fn("* not checking disk consistency as instance is not"
862
                       " running")
863

    
864
    self.feedback_fn("* shutting down instance on source node")
865
    logging.info("Shutting down instance %s on node %s",
866
                 self.instance.name, self.cfg.GetNodeName(source_node_uuid))
867

    
868
    result = self.rpc.call_instance_shutdown(source_node_uuid, self.instance,
869
                                             self.shutdown_timeout,
870
                                             self.lu.op.reason)
871
    msg = result.fail_msg
872
    if msg:
873
      if self.ignore_consistency or primary_node.offline:
874
        self.lu.LogWarning("Could not shutdown instance %s on node %s,"
875
                           " proceeding anyway; please make sure node"
876
                           " %s is down; error details: %s",
877
                           self.instance.name,
878
                           self.cfg.GetNodeName(source_node_uuid),
879
                           self.cfg.GetNodeName(source_node_uuid), msg)
880
      else:
881
        raise errors.OpExecError("Could not shutdown instance %s on"
882
                                 " node %s: %s" %
883
                                 (self.instance.name,
884
                                  self.cfg.GetNodeName(source_node_uuid), msg))
885

    
886
    self.feedback_fn("* deactivating the instance's disks on source node")
887
    if not ShutdownInstanceDisks(self.lu, self.instance, ignore_primary=True):
888
      raise errors.OpExecError("Can't shut down the instance's disks")
889

    
890
    self.instance.primary_node = self.target_node_uuid
891
    # distribute new instance config to the other nodes
892
    self.cfg.Update(self.instance, self.feedback_fn)
893

    
894
    # Only start the instance if it's marked as up
895
    if self.instance.admin_state == constants.ADMINST_UP:
896
      self.feedback_fn("* activating the instance's disks on target node %s" %
897
                       self.cfg.GetNodeName(self.target_node_uuid))
898
      logging.info("Starting instance %s on node %s", self.instance.name,
899
                   self.cfg.GetNodeName(self.target_node_uuid))
900

    
901
      disks_ok, _ = AssembleInstanceDisks(self.lu, self.instance,
902
                                          ignore_secondaries=True)
903
      if not disks_ok:
904
        ShutdownInstanceDisks(self.lu, self.instance)
905
        raise errors.OpExecError("Can't activate the instance's disks")
906

    
907
      self.feedback_fn("* starting the instance on the target node %s" %
908
                       self.cfg.GetNodeName(self.target_node_uuid))
909
      result = self.rpc.call_instance_start(self.target_node_uuid,
910
                                            (self.instance, None, None), False,
911
                                            self.lu.op.reason)
912
      msg = result.fail_msg
913
      if msg:
914
        ShutdownInstanceDisks(self.lu, self.instance)
915
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
916
                                 (self.instance.name,
917
                                  self.cfg.GetNodeName(self.target_node_uuid),
918
                                  msg))
919

    
920
  def Exec(self, feedback_fn):
921
    """Perform the migration.
922

923
    """
924
    self.feedback_fn = feedback_fn
925
    self.source_node_uuid = self.instance.primary_node
926

    
927
    # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
928
    if self.instance.disk_template in constants.DTS_INT_MIRROR:
929
      self.target_node_uuid = self.instance.secondary_nodes[0]
930
      # Otherwise self.target_node has been populated either
931
      # directly, or through an iallocator.
932

    
933
    self.all_node_uuids = [self.source_node_uuid, self.target_node_uuid]
934
    self.nodes_ip = dict((uuid, node.secondary_ip) for (uuid, node)
935
                         in self.cfg.GetMultiNodeInfo(self.all_node_uuids))
936

    
937
    if self.failover:
938
      feedback_fn("Failover instance %s" % self.instance.name)
939
      self._ExecFailover()
940
    else:
941
      feedback_fn("Migrating instance %s" % self.instance.name)
942

    
943
      if self.cleanup:
944
        return self._ExecCleanup()
945
      else:
946
        return self._ExecMigration()