Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_migration.py @ 87ed6b79

History | View | Annotate | Download (37.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with instance migration an failover."""
23

    
24
import logging
25
import time
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti.masterd import iallocator
31
from ganeti import utils
32
from ganeti.cmdlib.base import LogicalUnit, Tasklet
33
from ganeti.cmdlib.common import ExpandInstanceUuidAndName, \
34
  CheckIAllocatorOrNode, ExpandNodeUuidAndName
35
from ganeti.cmdlib.instance_storage import CheckDiskConsistency, \
36
  ExpandCheckDisks, ShutdownInstanceDisks, AssembleInstanceDisks
37
from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \
38
  CheckTargetNodeIPolicy, ReleaseLocks, CheckNodeNotDrained, \
39
  CopyLockList, CheckNodeFreeMemory, CheckInstanceBridgesExist
40

    
41
import ganeti.masterd.instance
42

    
43

    
44
def _ExpandNamesForMigration(lu):
45
  """Expands names for use with L{TLMigrateInstance}.
46

47
  @type lu: L{LogicalUnit}
48

49
  """
50
  if lu.op.target_node is not None:
51
    (lu.op.target_node_uuid, lu.op.target_node) = \
52
      ExpandNodeUuidAndName(lu.cfg, lu.op.target_node_uuid, lu.op.target_node)
53

    
54
  lu.needed_locks[locking.LEVEL_NODE] = []
55
  lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
56

    
57
  lu.needed_locks[locking.LEVEL_NODE_RES] = []
58
  lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
59

    
60
  # The node allocation lock is actually only needed for externally replicated
61
  # instances (e.g. sharedfile or RBD) and if an iallocator is used.
62
  lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
63

    
64

    
65
def _DeclareLocksForMigration(lu, level):
66
  """Declares locks for L{TLMigrateInstance}.
67

68
  @type lu: L{LogicalUnit}
69
  @param level: Lock level
70

71
  """
72
  if level == locking.LEVEL_NODE_ALLOC:
73
    assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
74

    
75
    instance = lu.cfg.GetInstanceInfo(lu.op.instance_uuid)
76

    
77
    # Node locks are already declared here rather than at LEVEL_NODE as we need
78
    # the instance object anyway to declare the node allocation lock.
79
    if instance.disk_template in constants.DTS_EXT_MIRROR:
80
      if lu.op.target_node is None:
81
        lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
82
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
83
      else:
84
        lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
85
                                               lu.op.target_node_uuid]
86
      del lu.recalculate_locks[locking.LEVEL_NODE]
87
    else:
88
      lu._LockInstancesNodes() # pylint: disable=W0212
89

    
90
  elif level == locking.LEVEL_NODE:
91
    # Node locks are declared together with the node allocation lock
92
    assert (lu.needed_locks[locking.LEVEL_NODE] or
93
            lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
94

    
95
  elif level == locking.LEVEL_NODE_RES:
96
    # Copy node locks
97
    lu.needed_locks[locking.LEVEL_NODE_RES] = \
98
      CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
99

    
100

    
101
class LUInstanceFailover(LogicalUnit):
102
  """Failover an instance.
103

104
  """
105
  HPATH = "instance-failover"
106
  HTYPE = constants.HTYPE_INSTANCE
107
  REQ_BGL = False
108

    
109
  def CheckArguments(self):
110
    """Check the arguments.
111

112
    """
113
    self.iallocator = getattr(self.op, "iallocator", None)
114
    self.target_node = getattr(self.op, "target_node", None)
115

    
116
  def ExpandNames(self):
117
    self._ExpandAndLockInstance()
118
    _ExpandNamesForMigration(self)
119

    
120
    self._migrater = \
121
      TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name,
122
                        self.op.cleanup, True, False,
123
                        self.op.ignore_consistency, True,
124
                        self.op.shutdown_timeout, self.op.ignore_ipolicy)
125

    
126
    self.tasklets = [self._migrater]
127

    
128
  def DeclareLocks(self, level):
129
    _DeclareLocksForMigration(self, level)
130

    
131
  def BuildHooksEnv(self):
132
    """Build hooks env.
133

134
    This runs on master, primary and secondary nodes of the instance.
135

136
    """
137
    instance = self._migrater.instance
138
    source_node_uuid = instance.primary_node
139
    target_node_uuid = self._migrater.target_node_uuid
140
    env = {
141
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
142
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
143
      "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid),
144
      "NEW_PRIMARY": self.cfg.GetNodeName(target_node_uuid),
145
      "FAILOVER_CLEANUP": self.op.cleanup,
146
      }
147

    
148
    if instance.disk_template in constants.DTS_INT_MIRROR:
149
      env["OLD_SECONDARY"] = self.cfg.GetNodeName(instance.secondary_nodes[0])
150
      env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid)
151
    else:
152
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
153

    
154
    env.update(BuildInstanceHookEnvByObject(self, instance))
155

    
156
    return env
157

    
158
  def BuildHooksNodes(self):
159
    """Build hooks nodes.
160

161
    """
162
    instance = self._migrater.instance
163
    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
164
    nl.append(self._migrater.target_node_uuid)
165
    return (nl, nl + [instance.primary_node])
166

    
167

    
168
class LUInstanceMigrate(LogicalUnit):
169
  """Migrate an instance.
170

171
  This is migration without shutting down, compared to the failover,
172
  which is done with shutdown.
173

174
  """
175
  HPATH = "instance-migrate"
176
  HTYPE = constants.HTYPE_INSTANCE
177
  REQ_BGL = False
178

    
179
  def ExpandNames(self):
180
    self._ExpandAndLockInstance()
181
    _ExpandNamesForMigration(self)
182

    
183
    self._migrater = \
184
      TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name,
185
                        self.op.cleanup, False, self.op.allow_failover, False,
186
                        self.op.allow_runtime_changes,
187
                        constants.DEFAULT_SHUTDOWN_TIMEOUT,
188
                        self.op.ignore_ipolicy)
189

    
190
    self.tasklets = [self._migrater]
191

    
192
  def DeclareLocks(self, level):
193
    _DeclareLocksForMigration(self, level)
194

    
195
  def BuildHooksEnv(self):
196
    """Build hooks env.
197

198
    This runs on master, primary and secondary nodes of the instance.
199

200
    """
201
    instance = self._migrater.instance
202
    source_node_uuid = instance.primary_node
203
    target_node_uuid = self._migrater.target_node_uuid
204
    env = BuildInstanceHookEnvByObject(self, instance)
205
    env.update({
206
      "MIGRATE_LIVE": self._migrater.live,
207
      "MIGRATE_CLEANUP": self.op.cleanup,
208
      "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid),
209
      "NEW_PRIMARY": self.cfg.GetNodeName(target_node_uuid),
210
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
211
      })
212

    
213
    if instance.disk_template in constants.DTS_INT_MIRROR:
214
      env["OLD_SECONDARY"] = self.cfg.GetNodeName(instance.secondary_nodes[0])
215
      env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid)
216
    else:
217
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
218

    
219
    return env
220

    
221
  def BuildHooksNodes(self):
222
    """Build hooks nodes.
223

224
    """
225
    instance = self._migrater.instance
226
    snode_uuids = list(instance.secondary_nodes)
227
    nl = [self.cfg.GetMasterNode(), instance.primary_node] + snode_uuids
228
    nl.append(self._migrater.target_node_uuid)
229
    return (nl, nl)
230

    
231

    
232
class TLMigrateInstance(Tasklet):
233
  """Tasklet class for instance migration.
234

235
  @type live: boolean
236
  @ivar live: whether the migration will be done live or non-live;
237
      this variable is initalized only after CheckPrereq has run
238
  @type cleanup: boolean
239
  @ivar cleanup: Wheater we cleanup from a failed migration
240
  @type iallocator: string
241
  @ivar iallocator: The iallocator used to determine target_node
242
  @type target_node_uuid: string
243
  @ivar target_node_uuid: If given, the target node UUID to reallocate the
244
      instance to
245
  @type failover: boolean
246
  @ivar failover: Whether operation results in failover or migration
247
  @type fallback: boolean
248
  @ivar fallback: Whether fallback to failover is allowed if migration not
249
                  possible
250
  @type ignore_consistency: boolean
251
  @ivar ignore_consistency: Wheter we should ignore consistency between source
252
                            and target node
253
  @type shutdown_timeout: int
254
  @ivar shutdown_timeout: In case of failover timeout of the shutdown
255
  @type ignore_ipolicy: bool
256
  @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
257

258
  """
259

    
260
  # Constants
261
  _MIGRATION_POLL_INTERVAL = 1      # seconds
262
  _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
263

    
264
  def __init__(self, lu, instance_uuid, instance_name, cleanup, failover,
265
               fallback, ignore_consistency, allow_runtime_changes,
266
               shutdown_timeout, ignore_ipolicy):
267
    """Initializes this class.
268

269
    """
270
    Tasklet.__init__(self, lu)
271

    
272
    # Parameters
273
    self.instance_uuid = instance_uuid
274
    self.instance_name = instance_name
275
    self.cleanup = cleanup
276
    self.live = False # will be overridden later
277
    self.failover = failover
278
    self.fallback = fallback
279
    self.ignore_consistency = ignore_consistency
280
    self.shutdown_timeout = shutdown_timeout
281
    self.ignore_ipolicy = ignore_ipolicy
282
    self.allow_runtime_changes = allow_runtime_changes
283

    
284
  def CheckPrereq(self):
285
    """Check prerequisites.
286

287
    This checks that the instance is in the cluster.
288

289
    """
290
    (self.instance_uuid, self.instance_name) = \
291
      ExpandInstanceUuidAndName(self.lu.cfg, self.instance_uuid,
292
                                self.instance_name)
293
    self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
294
    assert self.instance is not None
295
    cluster = self.cfg.GetClusterInfo()
296

    
297
    if (not self.cleanup and
298
        not self.instance.admin_state == constants.ADMINST_UP and
299
        not self.failover and self.fallback):
300
      self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
301
                      " switching to failover")
302
      self.failover = True
303

    
304
    if self.instance.disk_template not in constants.DTS_MIRRORED:
305
      if self.failover:
306
        text = "failovers"
307
      else:
308
        text = "migrations"
309
      raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
310
                                 " %s" % (self.instance.disk_template, text),
311
                                 errors.ECODE_STATE)
312

    
313
    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
314
      CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
315

    
316
      if self.lu.op.iallocator:
317
        assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
318
        self._RunAllocator()
319
      else:
320
        # We set set self.target_node_uuid as it is required by
321
        # BuildHooksEnv
322
        self.target_node_uuid = self.lu.op.target_node_uuid
323

    
324
      # Check that the target node is correct in terms of instance policy
325
      nodeinfo = self.cfg.GetNodeInfo(self.target_node_uuid)
326
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
327
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
328
                                                              group_info)
329
      CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo,
330
                             self.cfg, ignore=self.ignore_ipolicy)
331

    
332
      # self.target_node is already populated, either directly or by the
333
      # iallocator run
334
      target_node_uuid = self.target_node_uuid
335
      if self.target_node_uuid == self.instance.primary_node:
336
        raise errors.OpPrereqError(
337
          "Cannot migrate instance %s to its primary (%s)" %
338
          (self.instance.name,
339
           self.cfg.GetNodeName(self.instance.primary_node)),
340
          errors.ECODE_STATE)
341

    
342
      if len(self.lu.tasklets) == 1:
343
        # It is safe to release locks only when we're the only tasklet
344
        # in the LU
345
        ReleaseLocks(self.lu, locking.LEVEL_NODE,
346
                     keep=[self.instance.primary_node, self.target_node_uuid])
347
        ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
348

    
349
    else:
350
      secondary_node_uuids = self.instance.secondary_nodes
351
      if not secondary_node_uuids:
352
        raise errors.ConfigurationError("No secondary node but using"
353
                                        " %s disk template" %
354
                                        self.instance.disk_template)
355
      self.target_node_uuid = target_node_uuid = secondary_node_uuids[0]
356
      if self.lu.op.iallocator or \
357
        (self.lu.op.target_node_uuid and
358
         self.lu.op.target_node_uuid != target_node_uuid):
359
        if self.failover:
360
          text = "failed over"
361
        else:
362
          text = "migrated"
363
        raise errors.OpPrereqError("Instances with disk template %s cannot"
364
                                   " be %s to arbitrary nodes"
365
                                   " (neither an iallocator nor a target"
366
                                   " node can be passed)" %
367
                                   (self.instance.disk_template, text),
368
                                   errors.ECODE_INVAL)
369
      nodeinfo = self.cfg.GetNodeInfo(target_node_uuid)
370
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
371
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
372
                                                              group_info)
373
      CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo,
374
                             self.cfg, ignore=self.ignore_ipolicy)
375

    
376
    i_be = cluster.FillBE(self.instance)
377

    
378
    # check memory requirements on the secondary node
379
    if (not self.cleanup and
380
         (not self.failover or
381
           self.instance.admin_state == constants.ADMINST_UP)):
382
      self.tgt_free_mem = CheckNodeFreeMemory(
383
          self.lu, target_node_uuid,
384
          "migrating instance %s" % self.instance.name,
385
          i_be[constants.BE_MINMEM], self.instance.hypervisor,
386
          self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor])
387
    else:
388
      self.lu.LogInfo("Not checking memory on the secondary node as"
389
                      " instance will not be started")
390

    
391
    # check if failover must be forced instead of migration
392
    if (not self.cleanup and not self.failover and
393
        i_be[constants.BE_ALWAYS_FAILOVER]):
394
      self.lu.LogInfo("Instance configured to always failover; fallback"
395
                      " to failover")
396
      self.failover = True
397

    
398
    # check bridge existance
399
    CheckInstanceBridgesExist(self.lu, self.instance,
400
                              node_uuid=target_node_uuid)
401

    
402
    if not self.cleanup:
403
      CheckNodeNotDrained(self.lu, target_node_uuid)
404
      if not self.failover:
405
        result = self.rpc.call_instance_migratable(self.instance.primary_node,
406
                                                   self.instance)
407
        if result.fail_msg and self.fallback:
408
          self.lu.LogInfo("Can't migrate, instance offline, fallback to"
409
                          " failover")
410
          self.failover = True
411
        else:
412
          result.Raise("Can't migrate, please use failover",
413
                       prereq=True, ecode=errors.ECODE_STATE)
414

    
415
    assert not (self.failover and self.cleanup)
416

    
417
    if not self.failover:
418
      if self.lu.op.live is not None and self.lu.op.mode is not None:
419
        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
420
                                   " parameters are accepted",
421
                                   errors.ECODE_INVAL)
422
      if self.lu.op.live is not None:
423
        if self.lu.op.live:
424
          self.lu.op.mode = constants.HT_MIGRATION_LIVE
425
        else:
426
          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
427
        # reset the 'live' parameter to None so that repeated
428
        # invocations of CheckPrereq do not raise an exception
429
        self.lu.op.live = None
430
      elif self.lu.op.mode is None:
431
        # read the default value from the hypervisor
432
        i_hv = cluster.FillHV(self.instance, skip_globals=False)
433
        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
434

    
435
      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
436
    else:
437
      # Failover is never live
438
      self.live = False
439

    
440
    if not (self.failover or self.cleanup):
441
      remote_info = self.rpc.call_instance_info(
442
          self.instance.primary_node, self.instance.name,
443
          self.instance.hypervisor, cluster.hvparams[self.instance.hypervisor])
444
      remote_info.Raise("Error checking instance on node %s" %
445
                        self.cfg.GetNodeName(self.instance.primary_node))
446
      instance_running = bool(remote_info.payload)
447
      if instance_running:
448
        self.current_mem = int(remote_info.payload["memory"])
449

    
450
  def _RunAllocator(self):
451
    """Run the allocator based on input opcode.
452

453
    """
454
    assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
455

    
456
    # FIXME: add a self.ignore_ipolicy option
457
    req = iallocator.IAReqRelocate(
458
          inst_uuid=self.instance_uuid,
459
          relocate_from_node_uuids=[self.instance.primary_node])
460
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
461

    
462
    ial.Run(self.lu.op.iallocator)
463

    
464
    if not ial.success:
465
      raise errors.OpPrereqError("Can't compute nodes using"
466
                                 " iallocator '%s': %s" %
467
                                 (self.lu.op.iallocator, ial.info),
468
                                 errors.ECODE_NORES)
469
    self.target_node_uuid = self.cfg.GetNodeInfoByName(ial.result[0]).uuid
470
    self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
471
                    self.instance_name, self.lu.op.iallocator,
472
                    utils.CommaJoin(ial.result))
473

    
474
  def _WaitUntilSync(self):
475
    """Poll with custom rpc for disk sync.
476

477
    This uses our own step-based rpc call.
478

479
    """
480
    self.feedback_fn("* wait until resync is done")
481
    all_done = False
482
    while not all_done:
483
      all_done = True
484
      result = self.rpc.call_drbd_wait_sync(self.all_node_uuids,
485
                                            (self.instance.disks,
486
                                             self.instance))
487
      min_percent = 100
488
      for node_uuid, nres in result.items():
489
        nres.Raise("Cannot resync disks on node %s" %
490
                   self.cfg.GetNodeName(node_uuid))
491
        node_done, node_percent = nres.payload
492
        all_done = all_done and node_done
493
        if node_percent is not None:
494
          min_percent = min(min_percent, node_percent)
495
      if not all_done:
496
        if min_percent < 100:
497
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
498
        time.sleep(2)
499

    
500
  def _EnsureSecondary(self, node_uuid):
501
    """Demote a node to secondary.
502

503
    """
504
    self.feedback_fn("* switching node %s to secondary mode" %
505
                     self.cfg.GetNodeName(node_uuid))
506

    
507
    result = self.rpc.call_blockdev_close(node_uuid, self.instance.name,
508
                                          (self.instance.disks, self.instance))
509
    result.Raise("Cannot change disk to secondary on node %s" %
510
                 self.cfg.GetNodeName(node_uuid))
511

    
512
  def _GoStandalone(self):
513
    """Disconnect from the network.
514

515
    """
516
    self.feedback_fn("* changing into standalone mode")
517
    result = self.rpc.call_drbd_disconnect_net(
518
               self.all_node_uuids, (self.instance.disks, self.instance))
519
    for node_uuid, nres in result.items():
520
      nres.Raise("Cannot disconnect disks node %s" %
521
                 self.cfg.GetNodeName(node_uuid))
522

    
523
  def _GoReconnect(self, multimaster):
524
    """Reconnect to the network.
525

526
    """
527
    if multimaster:
528
      msg = "dual-master"
529
    else:
530
      msg = "single-master"
531
    self.feedback_fn("* changing disks into %s mode" % msg)
532
    result = self.rpc.call_drbd_attach_net(self.all_node_uuids,
533
                                           (self.instance.disks, self.instance),
534
                                           self.instance.name, multimaster)
535
    for node_uuid, nres in result.items():
536
      nres.Raise("Cannot change disks config on node %s" %
537
                 self.cfg.GetNodeName(node_uuid))
538

    
539
  def _ExecCleanup(self):
540
    """Try to cleanup after a failed migration.
541

542
    The cleanup is done by:
543
      - check that the instance is running only on one node
544
        (and update the config if needed)
545
      - change disks on its secondary node to secondary
546
      - wait until disks are fully synchronized
547
      - disconnect from the network
548
      - change disks into single-master mode
549
      - wait again until disks are fully synchronized
550

551
    """
552
    # check running on only one node
553
    self.feedback_fn("* checking where the instance actually runs"
554
                     " (if this hangs, the hypervisor might be in"
555
                     " a bad state)")
556
    cluster_hvparams = self.cfg.GetClusterInfo().hvparams
557
    ins_l = self.rpc.call_instance_list(self.all_node_uuids,
558
                                        [self.instance.hypervisor],
559
                                        cluster_hvparams)
560
    for node_uuid, result in ins_l.items():
561
      result.Raise("Can't contact node %s" % node_uuid)
562

    
563
    runningon_source = self.instance.name in \
564
                         ins_l[self.source_node_uuid].payload
565
    runningon_target = self.instance.name in \
566
                         ins_l[self.target_node_uuid].payload
567

    
568
    if runningon_source and runningon_target:
569
      raise errors.OpExecError("Instance seems to be running on two nodes,"
570
                               " or the hypervisor is confused; you will have"
571
                               " to ensure manually that it runs only on one"
572
                               " and restart this operation")
573

    
574
    if not (runningon_source or runningon_target):
575
      raise errors.OpExecError("Instance does not seem to be running at all;"
576
                               " in this case it's safer to repair by"
577
                               " running 'gnt-instance stop' to ensure disk"
578
                               " shutdown, and then restarting it")
579

    
580
    if runningon_target:
581
      # the migration has actually succeeded, we need to update the config
582
      self.feedback_fn("* instance running on secondary node (%s),"
583
                       " updating config" %
584
                       self.cfg.GetNodeName(self.target_node_uuid))
585
      self.instance.primary_node = self.target_node_uuid
586
      self.cfg.Update(self.instance, self.feedback_fn)
587
      demoted_node_uuid = self.source_node_uuid
588
    else:
589
      self.feedback_fn("* instance confirmed to be running on its"
590
                       " primary node (%s)" %
591
                       self.cfg.GetNodeName(self.source_node_uuid))
592
      demoted_node_uuid = self.target_node_uuid
593

    
594
    if self.instance.disk_template in constants.DTS_INT_MIRROR:
595
      self._EnsureSecondary(demoted_node_uuid)
596
      try:
597
        self._WaitUntilSync()
598
      except errors.OpExecError:
599
        # we ignore here errors, since if the device is standalone, it
600
        # won't be able to sync
601
        pass
602
      self._GoStandalone()
603
      self._GoReconnect(False)
604
      self._WaitUntilSync()
605

    
606
    self.feedback_fn("* done")
607

    
608
  def _RevertDiskStatus(self):
609
    """Try to revert the disk status after a failed migration.
610

611
    """
612
    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
613
      return
614

    
615
    try:
616
      self._EnsureSecondary(self.target_node_uuid)
617
      self._GoStandalone()
618
      self._GoReconnect(False)
619
      self._WaitUntilSync()
620
    except errors.OpExecError, err:
621
      self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
622
                         " please try to recover the instance manually;"
623
                         " error '%s'" % str(err))
624

    
625
  def _AbortMigration(self):
626
    """Call the hypervisor code to abort a started migration.
627

628
    """
629
    abort_result = self.rpc.call_instance_finalize_migration_dst(
630
                     self.target_node_uuid, self.instance, self.migration_info,
631
                     False)
632
    abort_msg = abort_result.fail_msg
633
    if abort_msg:
634
      logging.error("Aborting migration failed on target node %s: %s",
635
                    self.cfg.GetNodeName(self.target_node_uuid), abort_msg)
636
      # Don't raise an exception here, as we stil have to try to revert the
637
      # disk status, even if this step failed.
638

    
639
    abort_result = self.rpc.call_instance_finalize_migration_src(
640
      self.source_node_uuid, self.instance, False, self.live)
641
    abort_msg = abort_result.fail_msg
642
    if abort_msg:
643
      logging.error("Aborting migration failed on source node %s: %s",
644
                    self.cfg.GetNodeName(self.source_node_uuid), abort_msg)
645

    
646
  def _ExecMigration(self):
647
    """Migrate an instance.
648

649
    The migrate is done by:
650
      - change the disks into dual-master mode
651
      - wait until disks are fully synchronized again
652
      - migrate the instance
653
      - change disks on the new secondary node (the old primary) to secondary
654
      - wait until disks are fully synchronized
655
      - change disks into single-master mode
656

657
    """
658
    # Check for hypervisor version mismatch and warn the user.
659
    hvspecs = [(self.instance.hypervisor,
660
                self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor])]
661
    nodeinfo = self.rpc.call_node_info(
662
                 [self.source_node_uuid, self.target_node_uuid], None, hvspecs)
663
    for ninfo in nodeinfo.values():
664
      ninfo.Raise("Unable to retrieve node information from node '%s'" %
665
                  ninfo.node)
666
    (_, _, (src_info, )) = nodeinfo[self.source_node_uuid].payload
667
    (_, _, (dst_info, )) = nodeinfo[self.target_node_uuid].payload
668

    
669
    if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
670
        (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
671
      src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
672
      dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
673
      if src_version != dst_version:
674
        self.feedback_fn("* warning: hypervisor version mismatch between"
675
                         " source (%s) and target (%s) node" %
676
                         (src_version, dst_version))
677

    
678
    self.feedback_fn("* checking disk consistency between source and target")
679
    for (idx, dev) in enumerate(self.instance.disks):
680
      if not CheckDiskConsistency(self.lu, self.instance, dev,
681
                                  self.target_node_uuid,
682
                                  False):
683
        raise errors.OpExecError("Disk %s is degraded or not fully"
684
                                 " synchronized on target node,"
685
                                 " aborting migration" % idx)
686

    
687
    if self.current_mem > self.tgt_free_mem:
688
      if not self.allow_runtime_changes:
689
        raise errors.OpExecError("Memory ballooning not allowed and not enough"
690
                                 " free memory to fit instance %s on target"
691
                                 " node %s (have %dMB, need %dMB)" %
692
                                 (self.instance.name,
693
                                  self.cfg.GetNodeName(self.target_node_uuid),
694
                                  self.tgt_free_mem, self.current_mem))
695
      self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
696
      rpcres = self.rpc.call_instance_balloon_memory(self.instance.primary_node,
697
                                                     self.instance,
698
                                                     self.tgt_free_mem)
699
      rpcres.Raise("Cannot modify instance runtime memory")
700

    
701
    # First get the migration information from the remote node
702
    result = self.rpc.call_migration_info(self.source_node_uuid, self.instance)
703
    msg = result.fail_msg
704
    if msg:
705
      log_err = ("Failed fetching source migration information from %s: %s" %
706
                 (self.cfg.GetNodeName(self.source_node_uuid), msg))
707
      logging.error(log_err)
708
      raise errors.OpExecError(log_err)
709

    
710
    self.migration_info = migration_info = result.payload
711

    
712
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
713
      # Then switch the disks to master/master mode
714
      self._EnsureSecondary(self.target_node_uuid)
715
      self._GoStandalone()
716
      self._GoReconnect(True)
717
      self._WaitUntilSync()
718

    
719
    self.feedback_fn("* preparing %s to accept the instance" %
720
                     self.cfg.GetNodeName(self.target_node_uuid))
721
    result = self.rpc.call_accept_instance(self.target_node_uuid,
722
                                           self.instance,
723
                                           migration_info,
724
                                           self.nodes_ip[self.target_node_uuid])
725

    
726
    msg = result.fail_msg
727
    if msg:
728
      logging.error("Instance pre-migration failed, trying to revert"
729
                    " disk status: %s", msg)
730
      self.feedback_fn("Pre-migration failed, aborting")
731
      self._AbortMigration()
732
      self._RevertDiskStatus()
733
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
734
                               (self.instance.name, msg))
735

    
736
    self.feedback_fn("* migrating instance to %s" %
737
                     self.cfg.GetNodeName(self.target_node_uuid))
738
    cluster = self.cfg.GetClusterInfo()
739
    result = self.rpc.call_instance_migrate(
740
        self.source_node_uuid, cluster.cluster_name, self.instance,
741
        self.nodes_ip[self.target_node_uuid], self.live)
742
    msg = result.fail_msg
743
    if msg:
744
      logging.error("Instance migration failed, trying to revert"
745
                    " disk status: %s", msg)
746
      self.feedback_fn("Migration failed, aborting")
747
      self._AbortMigration()
748
      self._RevertDiskStatus()
749
      raise errors.OpExecError("Could not migrate instance %s: %s" %
750
                               (self.instance.name, msg))
751

    
752
    self.feedback_fn("* starting memory transfer")
753
    last_feedback = time.time()
754
    while True:
755
      result = self.rpc.call_instance_get_migration_status(
756
                 self.source_node_uuid, self.instance)
757
      msg = result.fail_msg
758
      ms = result.payload   # MigrationStatus instance
759
      if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
760
        logging.error("Instance migration failed, trying to revert"
761
                      " disk status: %s", msg)
762
        self.feedback_fn("Migration failed, aborting")
763
        self._AbortMigration()
764
        self._RevertDiskStatus()
765
        if not msg:
766
          msg = "hypervisor returned failure"
767
        raise errors.OpExecError("Could not migrate instance %s: %s" %
768
                                 (self.instance.name, msg))
769

    
770
      if result.payload.status != constants.HV_MIGRATION_ACTIVE:
771
        self.feedback_fn("* memory transfer complete")
772
        break
773

    
774
      if (utils.TimeoutExpired(last_feedback,
775
                               self._MIGRATION_FEEDBACK_INTERVAL) and
776
          ms.transferred_ram is not None):
777
        mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
778
        self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
779
        last_feedback = time.time()
780

    
781
      time.sleep(self._MIGRATION_POLL_INTERVAL)
782

    
783
    result = self.rpc.call_instance_finalize_migration_src(
784
               self.source_node_uuid, self.instance, True, self.live)
785
    msg = result.fail_msg
786
    if msg:
787
      logging.error("Instance migration succeeded, but finalization failed"
788
                    " on the source node: %s", msg)
789
      raise errors.OpExecError("Could not finalize instance migration: %s" %
790
                               msg)
791

    
792
    self.instance.primary_node = self.target_node_uuid
793

    
794
    # distribute new instance config to the other nodes
795
    self.cfg.Update(self.instance, self.feedback_fn)
796

    
797
    result = self.rpc.call_instance_finalize_migration_dst(
798
               self.target_node_uuid, self.instance, migration_info, True)
799
    msg = result.fail_msg
800
    if msg:
801
      logging.error("Instance migration succeeded, but finalization failed"
802
                    " on the target node: %s", msg)
803
      raise errors.OpExecError("Could not finalize instance migration: %s" %
804
                               msg)
805

    
806
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
807
      self._EnsureSecondary(self.source_node_uuid)
808
      self._WaitUntilSync()
809
      self._GoStandalone()
810
      self._GoReconnect(False)
811
      self._WaitUntilSync()
812

    
813
    # If the instance's disk template is `rbd' or `ext' and there was a
814
    # successful migration, unmap the device from the source node.
815
    if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
816
      disks = ExpandCheckDisks(self.instance, self.instance.disks)
817
      self.feedback_fn("* unmapping instance's disks from %s" %
818
                       self.cfg.GetNodeName(self.source_node_uuid))
819
      for disk in disks:
820
        result = self.rpc.call_blockdev_shutdown(self.source_node_uuid,
821
                                                 (disk, self.instance))
822
        msg = result.fail_msg
823
        if msg:
824
          logging.error("Migration was successful, but couldn't unmap the"
825
                        " block device %s on source node %s: %s",
826
                        disk.iv_name,
827
                        self.cfg.GetNodeName(self.source_node_uuid), msg)
828
          logging.error("You need to unmap the device %s manually on %s",
829
                        disk.iv_name,
830
                        self.cfg.GetNodeName(self.source_node_uuid))
831

    
832
    self.feedback_fn("* done")
833

    
834
  def _ExecFailover(self):
835
    """Failover an instance.
836

837
    The failover is done by shutting it down on its present node and
838
    starting it on the secondary.
839

840
    """
841
    primary_node = self.cfg.GetNodeInfo(self.instance.primary_node)
842

    
843
    source_node_uuid = self.instance.primary_node
844

    
845
    if self.instance.disks_active:
846
      self.feedback_fn("* checking disk consistency between source and target")
847
      for (idx, dev) in enumerate(self.instance.disks):
848
        # for drbd, these are drbd over lvm
849
        if not CheckDiskConsistency(self.lu, self.instance, dev,
850
                                    self.target_node_uuid, False):
851
          if primary_node.offline:
852
            self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
853
                             " target node %s" %
854
                             (primary_node.name, idx,
855
                              self.cfg.GetNodeName(self.target_node_uuid)))
856
          elif not self.ignore_consistency:
857
            raise errors.OpExecError("Disk %s is degraded on target node,"
858
                                     " aborting failover" % idx)
859
    else:
860
      self.feedback_fn("* not checking disk consistency as instance is not"
861
                       " running")
862

    
863
    self.feedback_fn("* shutting down instance on source node")
864
    logging.info("Shutting down instance %s on node %s",
865
                 self.instance.name, self.cfg.GetNodeName(source_node_uuid))
866

    
867
    result = self.rpc.call_instance_shutdown(source_node_uuid, self.instance,
868
                                             self.shutdown_timeout,
869
                                             self.lu.op.reason)
870
    msg = result.fail_msg
871
    if msg:
872
      if self.ignore_consistency or primary_node.offline:
873
        self.lu.LogWarning("Could not shutdown instance %s on node %s,"
874
                           " proceeding anyway; please make sure node"
875
                           " %s is down; error details: %s",
876
                           self.instance.name,
877
                           self.cfg.GetNodeName(source_node_uuid),
878
                           self.cfg.GetNodeName(source_node_uuid), msg)
879
      else:
880
        raise errors.OpExecError("Could not shutdown instance %s on"
881
                                 " node %s: %s" %
882
                                 (self.instance.name,
883
                                  self.cfg.GetNodeName(source_node_uuid), msg))
884

    
885
    self.feedback_fn("* deactivating the instance's disks on source node")
886
    if not ShutdownInstanceDisks(self.lu, self.instance, ignore_primary=True):
887
      raise errors.OpExecError("Can't shut down the instance's disks")
888

    
889
    self.instance.primary_node = self.target_node_uuid
890
    # distribute new instance config to the other nodes
891
    self.cfg.Update(self.instance, self.feedback_fn)
892

    
893
    # Only start the instance if it's marked as up
894
    if self.instance.admin_state == constants.ADMINST_UP:
895
      self.feedback_fn("* activating the instance's disks on target node %s" %
896
                       self.cfg.GetNodeName(self.target_node_uuid))
897
      logging.info("Starting instance %s on node %s", self.instance.name,
898
                   self.cfg.GetNodeName(self.target_node_uuid))
899

    
900
      disks_ok, _ = AssembleInstanceDisks(self.lu, self.instance,
901
                                          ignore_secondaries=True)
902
      if not disks_ok:
903
        ShutdownInstanceDisks(self.lu, self.instance)
904
        raise errors.OpExecError("Can't activate the instance's disks")
905

    
906
      self.feedback_fn("* starting the instance on the target node %s" %
907
                       self.cfg.GetNodeName(self.target_node_uuid))
908
      result = self.rpc.call_instance_start(self.target_node_uuid,
909
                                            (self.instance, None, None), False,
910
                                            self.lu.op.reason)
911
      msg = result.fail_msg
912
      if msg:
913
        ShutdownInstanceDisks(self.lu, self.instance)
914
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
915
                                 (self.instance.name,
916
                                  self.cfg.GetNodeName(self.target_node_uuid),
917
                                  msg))
918

    
919
  def Exec(self, feedback_fn):
920
    """Perform the migration.
921

922
    """
923
    self.feedback_fn = feedback_fn
924
    self.source_node_uuid = self.instance.primary_node
925

    
926
    # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
927
    if self.instance.disk_template in constants.DTS_INT_MIRROR:
928
      self.target_node_uuid = self.instance.secondary_nodes[0]
929
      # Otherwise self.target_node has been populated either
930
      # directly, or through an iallocator.
931

    
932
    self.all_node_uuids = [self.source_node_uuid, self.target_node_uuid]
933
    self.nodes_ip = dict((uuid, node.secondary_ip) for (uuid, node)
934
                         in self.cfg.GetMultiNodeInfo(self.all_node_uuids))
935

    
936
    if self.failover:
937
      feedback_fn("Failover instance %s" % self.instance.name)
938
      self._ExecFailover()
939
    else:
940
      feedback_fn("Migrating instance %s" % self.instance.name)
941

    
942
      if self.cleanup:
943
        return self._ExecCleanup()
944
      else:
945
        return self._ExecMigration()