Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_migration.py @ 1c4910f7

History | View | Annotate | Download (37.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with instance migration an failover."""
23

    
24
import logging
25
import time
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti.masterd import iallocator
31
from ganeti import utils
32
from ganeti.cmdlib.base import LogicalUnit, Tasklet
33
from ganeti.cmdlib.common import ExpandInstanceUuidAndName, \
34
  CheckIAllocatorOrNode, ExpandNodeUuidAndName
35
from ganeti.cmdlib.instance_storage import CheckDiskConsistency, \
36
  ExpandCheckDisks, ShutdownInstanceDisks, AssembleInstanceDisks
37
from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \
38
  CheckTargetNodeIPolicy, ReleaseLocks, CheckNodeNotDrained, \
39
  CopyLockList, CheckNodeFreeMemory, CheckInstanceBridgesExist
40

    
41
import ganeti.masterd.instance
42

    
43

    
44
def _ExpandNamesForMigration(lu):
45
  """Expands names for use with L{TLMigrateInstance}.
46

47
  @type lu: L{LogicalUnit}
48

49
  """
50
  if lu.op.target_node is not None:
51
    (lu.op.target_node_uuid, lu.op.target_node) = \
52
      ExpandNodeUuidAndName(lu.cfg, lu.op.target_node_uuid, lu.op.target_node)
53

    
54
  lu.needed_locks[locking.LEVEL_NODE] = []
55
  lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
56

    
57
  lu.needed_locks[locking.LEVEL_NODE_RES] = []
58
  lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
59

    
60
  # The node allocation lock is actually only needed for externally replicated
61
  # instances (e.g. sharedfile or RBD) and if an iallocator is used.
62
  lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
63

    
64

    
65
def _DeclareLocksForMigration(lu, level):
66
  """Declares locks for L{TLMigrateInstance}.
67

68
  @type lu: L{LogicalUnit}
69
  @param level: Lock level
70

71
  """
72
  if level == locking.LEVEL_NODE_ALLOC:
73
    assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
74

    
75
    instance = lu.cfg.GetInstanceInfo(lu.op.instance_uuid)
76

    
77
    # Node locks are already declared here rather than at LEVEL_NODE as we need
78
    # the instance object anyway to declare the node allocation lock.
79
    if instance.disk_template in constants.DTS_EXT_MIRROR:
80
      if lu.op.target_node is None:
81
        lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
82
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
83
      else:
84
        lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
85
                                               lu.op.target_node_uuid]
86
      del lu.recalculate_locks[locking.LEVEL_NODE]
87
    else:
88
      lu._LockInstancesNodes() # pylint: disable=W0212
89

    
90
  elif level == locking.LEVEL_NODE:
91
    # Node locks are declared together with the node allocation lock
92
    assert (lu.needed_locks[locking.LEVEL_NODE] or
93
            lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
94

    
95
  elif level == locking.LEVEL_NODE_RES:
96
    # Copy node locks
97
    lu.needed_locks[locking.LEVEL_NODE_RES] = \
98
      CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
99

    
100

    
101
class LUInstanceFailover(LogicalUnit):
102
  """Failover an instance.
103

104
  """
105
  HPATH = "instance-failover"
106
  HTYPE = constants.HTYPE_INSTANCE
107
  REQ_BGL = False
108

    
109
  def CheckArguments(self):
110
    """Check the arguments.
111

112
    """
113
    self.iallocator = getattr(self.op, "iallocator", None)
114
    self.target_node = getattr(self.op, "target_node", None)
115

    
116
  def ExpandNames(self):
117
    self._ExpandAndLockInstance()
118
    _ExpandNamesForMigration(self)
119

    
120
    self._migrater = \
121
      TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name,
122
                        self.op.cleanup, True, False,
123
                        self.op.ignore_consistency, True,
124
                        self.op.shutdown_timeout, self.op.ignore_ipolicy)
125

    
126
    self.tasklets = [self._migrater]
127

    
128
  def DeclareLocks(self, level):
129
    _DeclareLocksForMigration(self, level)
130

    
131
  def BuildHooksEnv(self):
132
    """Build hooks env.
133

134
    This runs on master, primary and secondary nodes of the instance.
135

136
    """
137
    instance = self._migrater.instance
138
    source_node_uuid = instance.primary_node
139
    target_node_uuid = self._migrater.target_node_uuid
140
    env = {
141
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
142
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
143
      "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid),
144
      "NEW_PRIMARY": self.cfg.GetNodeName(target_node_uuid),
145
      "FAILOVER_CLEANUP": self.op.cleanup,
146
      }
147

    
148
    if instance.disk_template in constants.DTS_INT_MIRROR:
149
      env["OLD_SECONDARY"] = self.cfg.GetNodeName(instance.secondary_nodes[0])
150
      env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid)
151
    else:
152
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
153

    
154
    env.update(BuildInstanceHookEnvByObject(self, instance))
155

    
156
    return env
157

    
158
  def BuildHooksNodes(self):
159
    """Build hooks nodes.
160

161
    """
162
    instance = self._migrater.instance
163
    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
164
    nl.append(self._migrater.target_node_uuid)
165
    return (nl, nl + [instance.primary_node])
166

    
167

    
168
class LUInstanceMigrate(LogicalUnit):
169
  """Migrate an instance.
170

171
  This is migration without shutting down, compared to the failover,
172
  which is done with shutdown.
173

174
  """
175
  HPATH = "instance-migrate"
176
  HTYPE = constants.HTYPE_INSTANCE
177
  REQ_BGL = False
178

    
179
  def ExpandNames(self):
180
    self._ExpandAndLockInstance()
181
    _ExpandNamesForMigration(self)
182

    
183
    self._migrater = \
184
      TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name,
185
                        self.op.cleanup, False, self.op.allow_failover, False,
186
                        self.op.allow_runtime_changes,
187
                        constants.DEFAULT_SHUTDOWN_TIMEOUT,
188
                        self.op.ignore_ipolicy)
189

    
190
    self.tasklets = [self._migrater]
191

    
192
  def DeclareLocks(self, level):
193
    _DeclareLocksForMigration(self, level)
194

    
195
  def BuildHooksEnv(self):
196
    """Build hooks env.
197

198
    This runs on master, primary and secondary nodes of the instance.
199

200
    """
201
    instance = self._migrater.instance
202
    source_node_uuid = instance.primary_node
203
    target_node_uuid = self._migrater.target_node_uuid
204
    env = BuildInstanceHookEnvByObject(self, instance)
205
    env.update({
206
      "MIGRATE_LIVE": self._migrater.live,
207
      "MIGRATE_CLEANUP": self.op.cleanup,
208
      "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid),
209
      "NEW_PRIMARY": self.cfg.GetNodeName(target_node_uuid),
210
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
211
      })
212

    
213
    if instance.disk_template in constants.DTS_INT_MIRROR:
214
      env["OLD_SECONDARY"] = self.cfg.GetNodeName(instance.secondary_nodes[0])
215
      env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid)
216
    else:
217
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
218

    
219
    return env
220

    
221
  def BuildHooksNodes(self):
222
    """Build hooks nodes.
223

224
    """
225
    instance = self._migrater.instance
226
    snode_uuids = list(instance.secondary_nodes)
227
    nl = [self.cfg.GetMasterNode(), instance.primary_node] + snode_uuids
228
    nl.append(self._migrater.target_node_uuid)
229
    return (nl, nl)
230

    
231

    
232
class TLMigrateInstance(Tasklet):
233
  """Tasklet class for instance migration.
234

235
  @type live: boolean
236
  @ivar live: whether the migration will be done live or non-live;
237
      this variable is initalized only after CheckPrereq has run
238
  @type cleanup: boolean
239
  @ivar cleanup: Wheater we cleanup from a failed migration
240
  @type iallocator: string
241
  @ivar iallocator: The iallocator used to determine target_node
242
  @type target_node_uuid: string
243
  @ivar target_node_uuid: If given, the target node UUID to reallocate the
244
      instance to
245
  @type failover: boolean
246
  @ivar failover: Whether operation results in failover or migration
247
  @type fallback: boolean
248
  @ivar fallback: Whether fallback to failover is allowed if migration not
249
                  possible
250
  @type ignore_consistency: boolean
251
  @ivar ignore_consistency: Wheter we should ignore consistency between source
252
                            and target node
253
  @type shutdown_timeout: int
254
  @ivar shutdown_timeout: In case of failover timeout of the shutdown
255
  @type ignore_ipolicy: bool
256
  @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
257

258
  """
259

    
260
  # Constants
261
  _MIGRATION_POLL_INTERVAL = 1      # seconds
262
  _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
263

    
264
  def __init__(self, lu, instance_uuid, instance_name, cleanup, failover,
265
               fallback, ignore_consistency, allow_runtime_changes,
266
               shutdown_timeout, ignore_ipolicy):
267
    """Initializes this class.
268

269
    """
270
    Tasklet.__init__(self, lu)
271

    
272
    # Parameters
273
    self.instance_uuid = instance_uuid
274
    self.instance_name = instance_name
275
    self.cleanup = cleanup
276
    self.live = False # will be overridden later
277
    self.failover = failover
278
    self.fallback = fallback
279
    self.ignore_consistency = ignore_consistency
280
    self.shutdown_timeout = shutdown_timeout
281
    self.ignore_ipolicy = ignore_ipolicy
282
    self.allow_runtime_changes = allow_runtime_changes
283

    
284
  def CheckPrereq(self):
285
    """Check prerequisites.
286

287
    This checks that the instance is in the cluster.
288

289
    """
290
    (self.instance_uuid, self.instance_name) = \
291
      ExpandInstanceUuidAndName(self.lu.cfg, self.instance_uuid,
292
                                self.instance_name)
293
    self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
294
    assert self.instance is not None
295
    cluster = self.cfg.GetClusterInfo()
296

    
297
    if (not self.cleanup and
298
        not self.instance.admin_state == constants.ADMINST_UP and
299
        not self.failover and self.fallback):
300
      self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
301
                      " switching to failover")
302
      self.failover = True
303

    
304
    if self.instance.disk_template not in constants.DTS_MIRRORED:
305
      if self.failover:
306
        text = "failovers"
307
      else:
308
        text = "migrations"
309
      raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
310
                                 " %s" % (self.instance.disk_template, text),
311
                                 errors.ECODE_STATE)
312

    
313
    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
314
      CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
315

    
316
      if self.lu.op.iallocator:
317
        assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
318
        self._RunAllocator()
319
      else:
320
        # We set set self.target_node_uuid as it is required by
321
        # BuildHooksEnv
322
        self.target_node_uuid = self.lu.op.target_node_uuid
323

    
324
      # Check that the target node is correct in terms of instance policy
325
      nodeinfo = self.cfg.GetNodeInfo(self.target_node_uuid)
326
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
327
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
328
                                                              group_info)
329
      CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo,
330
                             self.cfg, ignore=self.ignore_ipolicy)
331

    
332
      # self.target_node is already populated, either directly or by the
333
      # iallocator run
334
      target_node_uuid = self.target_node_uuid
335
      if self.target_node_uuid == self.instance.primary_node:
336
        raise errors.OpPrereqError(
337
          "Cannot migrate instance %s to its primary (%s)" %
338
          (self.instance.name,
339
           self.cfg.GetNodeName(self.instance.primary_node)),
340
          errors.ECODE_STATE)
341

    
342
      if len(self.lu.tasklets) == 1:
343
        # It is safe to release locks only when we're the only tasklet
344
        # in the LU
345
        ReleaseLocks(self.lu, locking.LEVEL_NODE,
346
                     keep=[self.instance.primary_node, self.target_node_uuid])
347
        ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
348

    
349
    else:
350
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
351

    
352
      secondary_node_uuids = self.instance.secondary_nodes
353
      if not secondary_node_uuids:
354
        raise errors.ConfigurationError("No secondary node but using"
355
                                        " %s disk template" %
356
                                        self.instance.disk_template)
357
      self.target_node_uuid = target_node_uuid = secondary_node_uuids[0]
358
      if self.lu.op.iallocator or \
359
        (self.lu.op.target_node_uuid and
360
         self.lu.op.target_node_uuid != target_node_uuid):
361
        if self.failover:
362
          text = "failed over"
363
        else:
364
          text = "migrated"
365
        raise errors.OpPrereqError("Instances with disk template %s cannot"
366
                                   " be %s to arbitrary nodes"
367
                                   " (neither an iallocator nor a target"
368
                                   " node can be passed)" %
369
                                   (self.instance.disk_template, text),
370
                                   errors.ECODE_INVAL)
371
      nodeinfo = self.cfg.GetNodeInfo(target_node_uuid)
372
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
373
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
374
                                                              group_info)
375
      CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo,
376
                             self.cfg, ignore=self.ignore_ipolicy)
377

    
378
    i_be = cluster.FillBE(self.instance)
379

    
380
    # check memory requirements on the secondary node
381
    if (not self.cleanup and
382
         (not self.failover or
383
           self.instance.admin_state == constants.ADMINST_UP)):
384
      self.tgt_free_mem = CheckNodeFreeMemory(
385
          self.lu, target_node_uuid,
386
          "migrating instance %s" % self.instance.name,
387
          i_be[constants.BE_MINMEM], self.instance.hypervisor,
388
          self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor])
389
    else:
390
      self.lu.LogInfo("Not checking memory on the secondary node as"
391
                      " instance will not be started")
392

    
393
    # check if failover must be forced instead of migration
394
    if (not self.cleanup and not self.failover and
395
        i_be[constants.BE_ALWAYS_FAILOVER]):
396
      self.lu.LogInfo("Instance configured to always failover; fallback"
397
                      " to failover")
398
      self.failover = True
399

    
400
    # check bridge existance
401
    CheckInstanceBridgesExist(self.lu, self.instance,
402
                              node_uuid=target_node_uuid)
403

    
404
    if not self.cleanup:
405
      CheckNodeNotDrained(self.lu, target_node_uuid)
406
      if not self.failover:
407
        result = self.rpc.call_instance_migratable(self.instance.primary_node,
408
                                                   self.instance)
409
        if result.fail_msg and self.fallback:
410
          self.lu.LogInfo("Can't migrate, instance offline, fallback to"
411
                          " failover")
412
          self.failover = True
413
        else:
414
          result.Raise("Can't migrate, please use failover",
415
                       prereq=True, ecode=errors.ECODE_STATE)
416

    
417
    assert not (self.failover and self.cleanup)
418

    
419
    if not self.failover:
420
      if self.lu.op.live is not None and self.lu.op.mode is not None:
421
        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
422
                                   " parameters are accepted",
423
                                   errors.ECODE_INVAL)
424
      if self.lu.op.live is not None:
425
        if self.lu.op.live:
426
          self.lu.op.mode = constants.HT_MIGRATION_LIVE
427
        else:
428
          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
429
        # reset the 'live' parameter to None so that repeated
430
        # invocations of CheckPrereq do not raise an exception
431
        self.lu.op.live = None
432
      elif self.lu.op.mode is None:
433
        # read the default value from the hypervisor
434
        i_hv = cluster.FillHV(self.instance, skip_globals=False)
435
        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
436

    
437
      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
438
    else:
439
      # Failover is never live
440
      self.live = False
441

    
442
    if not (self.failover or self.cleanup):
443
      remote_info = self.rpc.call_instance_info(
444
          self.instance.primary_node, self.instance.name,
445
          self.instance.hypervisor, cluster.hvparams[self.instance.hypervisor])
446
      remote_info.Raise("Error checking instance on node %s" %
447
                        self.cfg.GetNodeName(self.instance.primary_node))
448
      instance_running = bool(remote_info.payload)
449
      if instance_running:
450
        self.current_mem = int(remote_info.payload["memory"])
451

    
452
  def _RunAllocator(self):
453
    """Run the allocator based on input opcode.
454

455
    """
456
    assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
457

    
458
    # FIXME: add a self.ignore_ipolicy option
459
    req = iallocator.IAReqRelocate(
460
          inst_uuid=self.instance_uuid,
461
          relocate_from_node_uuids=[self.instance.primary_node])
462
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
463

    
464
    ial.Run(self.lu.op.iallocator)
465

    
466
    if not ial.success:
467
      raise errors.OpPrereqError("Can't compute nodes using"
468
                                 " iallocator '%s': %s" %
469
                                 (self.lu.op.iallocator, ial.info),
470
                                 errors.ECODE_NORES)
471
    self.target_node_uuid = self.cfg.GetNodeInfoByName(ial.result[0]).uuid
472
    self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
473
                    self.instance_name, self.lu.op.iallocator,
474
                    utils.CommaJoin(ial.result))
475

    
476
  def _WaitUntilSync(self):
477
    """Poll with custom rpc for disk sync.
478

479
    This uses our own step-based rpc call.
480

481
    """
482
    self.feedback_fn("* wait until resync is done")
483
    all_done = False
484
    while not all_done:
485
      all_done = True
486
      result = self.rpc.call_drbd_wait_sync(self.all_node_uuids,
487
                                            (self.instance.disks,
488
                                             self.instance))
489
      min_percent = 100
490
      for node_uuid, nres in result.items():
491
        nres.Raise("Cannot resync disks on node %s" %
492
                   self.cfg.GetNodeName(node_uuid))
493
        node_done, node_percent = nres.payload
494
        all_done = all_done and node_done
495
        if node_percent is not None:
496
          min_percent = min(min_percent, node_percent)
497
      if not all_done:
498
        if min_percent < 100:
499
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
500
        time.sleep(2)
501

    
502
  def _EnsureSecondary(self, node_uuid):
503
    """Demote a node to secondary.
504

505
    """
506
    self.feedback_fn("* switching node %s to secondary mode" %
507
                     self.cfg.GetNodeName(node_uuid))
508

    
509
    result = self.rpc.call_blockdev_close(node_uuid, self.instance.name,
510
                                          (self.instance.disks, self.instance))
511
    result.Raise("Cannot change disk to secondary on node %s" %
512
                 self.cfg.GetNodeName(node_uuid))
513

    
514
  def _GoStandalone(self):
515
    """Disconnect from the network.
516

517
    """
518
    self.feedback_fn("* changing into standalone mode")
519
    result = self.rpc.call_drbd_disconnect_net(
520
               self.all_node_uuids, (self.instance.disks, self.instance))
521
    for node_uuid, nres in result.items():
522
      nres.Raise("Cannot disconnect disks node %s" %
523
                 self.cfg.GetNodeName(node_uuid))
524

    
525
  def _GoReconnect(self, multimaster):
526
    """Reconnect to the network.
527

528
    """
529
    if multimaster:
530
      msg = "dual-master"
531
    else:
532
      msg = "single-master"
533
    self.feedback_fn("* changing disks into %s mode" % msg)
534
    result = self.rpc.call_drbd_attach_net(self.all_node_uuids,
535
                                           (self.instance.disks, self.instance),
536
                                           self.instance.name, multimaster)
537
    for node_uuid, nres in result.items():
538
      nres.Raise("Cannot change disks config on node %s" %
539
                 self.cfg.GetNodeName(node_uuid))
540

    
541
  def _ExecCleanup(self):
542
    """Try to cleanup after a failed migration.
543

544
    The cleanup is done by:
545
      - check that the instance is running only on one node
546
        (and update the config if needed)
547
      - change disks on its secondary node to secondary
548
      - wait until disks are fully synchronized
549
      - disconnect from the network
550
      - change disks into single-master mode
551
      - wait again until disks are fully synchronized
552

553
    """
554
    # check running on only one node
555
    self.feedback_fn("* checking where the instance actually runs"
556
                     " (if this hangs, the hypervisor might be in"
557
                     " a bad state)")
558
    cluster_hvparams = self.cfg.GetClusterInfo().hvparams
559
    ins_l = self.rpc.call_instance_list(self.all_node_uuids,
560
                                        [self.instance.hypervisor],
561
                                        cluster_hvparams)
562
    for node_uuid, result in ins_l.items():
563
      result.Raise("Can't contact node %s" % node_uuid)
564

    
565
    runningon_source = self.instance.name in \
566
                         ins_l[self.source_node_uuid].payload
567
    runningon_target = self.instance.name in \
568
                         ins_l[self.target_node_uuid].payload
569

    
570
    if runningon_source and runningon_target:
571
      raise errors.OpExecError("Instance seems to be running on two nodes,"
572
                               " or the hypervisor is confused; you will have"
573
                               " to ensure manually that it runs only on one"
574
                               " and restart this operation")
575

    
576
    if not (runningon_source or runningon_target):
577
      raise errors.OpExecError("Instance does not seem to be running at all;"
578
                               " in this case it's safer to repair by"
579
                               " running 'gnt-instance stop' to ensure disk"
580
                               " shutdown, and then restarting it")
581

    
582
    if runningon_target:
583
      # the migration has actually succeeded, we need to update the config
584
      self.feedback_fn("* instance running on secondary node (%s),"
585
                       " updating config" %
586
                       self.cfg.GetNodeName(self.target_node_uuid))
587
      self.instance.primary_node = self.target_node_uuid
588
      self.cfg.Update(self.instance, self.feedback_fn)
589
      demoted_node_uuid = self.source_node_uuid
590
    else:
591
      self.feedback_fn("* instance confirmed to be running on its"
592
                       " primary node (%s)" %
593
                       self.cfg.GetNodeName(self.source_node_uuid))
594
      demoted_node_uuid = self.target_node_uuid
595

    
596
    if self.instance.disk_template in constants.DTS_INT_MIRROR:
597
      self._EnsureSecondary(demoted_node_uuid)
598
      try:
599
        self._WaitUntilSync()
600
      except errors.OpExecError:
601
        # we ignore here errors, since if the device is standalone, it
602
        # won't be able to sync
603
        pass
604
      self._GoStandalone()
605
      self._GoReconnect(False)
606
      self._WaitUntilSync()
607

    
608
    self.feedback_fn("* done")
609

    
610
  def _RevertDiskStatus(self):
611
    """Try to revert the disk status after a failed migration.
612

613
    """
614
    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
615
      return
616

    
617
    try:
618
      self._EnsureSecondary(self.target_node_uuid)
619
      self._GoStandalone()
620
      self._GoReconnect(False)
621
      self._WaitUntilSync()
622
    except errors.OpExecError, err:
623
      self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
624
                         " please try to recover the instance manually;"
625
                         " error '%s'" % str(err))
626

    
627
  def _AbortMigration(self):
628
    """Call the hypervisor code to abort a started migration.
629

630
    """
631
    abort_result = self.rpc.call_instance_finalize_migration_dst(
632
                     self.target_node_uuid, self.instance, self.migration_info,
633
                     False)
634
    abort_msg = abort_result.fail_msg
635
    if abort_msg:
636
      logging.error("Aborting migration failed on target node %s: %s",
637
                    self.cfg.GetNodeName(self.target_node_uuid), abort_msg)
638
      # Don't raise an exception here, as we stil have to try to revert the
639
      # disk status, even if this step failed.
640

    
641
    abort_result = self.rpc.call_instance_finalize_migration_src(
642
      self.source_node_uuid, self.instance, False, self.live)
643
    abort_msg = abort_result.fail_msg
644
    if abort_msg:
645
      logging.error("Aborting migration failed on source node %s: %s",
646
                    self.cfg.GetNodeName(self.source_node_uuid), abort_msg)
647

    
648
  def _ExecMigration(self):
649
    """Migrate an instance.
650

651
    The migrate is done by:
652
      - change the disks into dual-master mode
653
      - wait until disks are fully synchronized again
654
      - migrate the instance
655
      - change disks on the new secondary node (the old primary) to secondary
656
      - wait until disks are fully synchronized
657
      - change disks into single-master mode
658

659
    """
660
    # Check for hypervisor version mismatch and warn the user.
661
    hvspecs = [(self.instance.hypervisor,
662
                self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor])]
663
    nodeinfo = self.rpc.call_node_info(
664
                 [self.source_node_uuid, self.target_node_uuid], None, hvspecs)
665
    for ninfo in nodeinfo.values():
666
      ninfo.Raise("Unable to retrieve node information from node '%s'" %
667
                  ninfo.node)
668
    (_, _, (src_info, )) = nodeinfo[self.source_node_uuid].payload
669
    (_, _, (dst_info, )) = nodeinfo[self.target_node_uuid].payload
670

    
671
    if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
672
        (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
673
      src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
674
      dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
675
      if src_version != dst_version:
676
        self.feedback_fn("* warning: hypervisor version mismatch between"
677
                         " source (%s) and target (%s) node" %
678
                         (src_version, dst_version))
679

    
680
    self.feedback_fn("* checking disk consistency between source and target")
681
    for (idx, dev) in enumerate(self.instance.disks):
682
      if not CheckDiskConsistency(self.lu, self.instance, dev,
683
                                  self.target_node_uuid,
684
                                  False):
685
        raise errors.OpExecError("Disk %s is degraded or not fully"
686
                                 " synchronized on target node,"
687
                                 " aborting migration" % idx)
688

    
689
    if self.current_mem > self.tgt_free_mem:
690
      if not self.allow_runtime_changes:
691
        raise errors.OpExecError("Memory ballooning not allowed and not enough"
692
                                 " free memory to fit instance %s on target"
693
                                 " node %s (have %dMB, need %dMB)" %
694
                                 (self.instance.name,
695
                                  self.cfg.GetNodeName(self.target_node_uuid),
696
                                  self.tgt_free_mem, self.current_mem))
697
      self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
698
      rpcres = self.rpc.call_instance_balloon_memory(self.instance.primary_node,
699
                                                     self.instance,
700
                                                     self.tgt_free_mem)
701
      rpcres.Raise("Cannot modify instance runtime memory")
702

    
703
    # First get the migration information from the remote node
704
    result = self.rpc.call_migration_info(self.source_node_uuid, self.instance)
705
    msg = result.fail_msg
706
    if msg:
707
      log_err = ("Failed fetching source migration information from %s: %s" %
708
                 (self.cfg.GetNodeName(self.source_node_uuid), msg))
709
      logging.error(log_err)
710
      raise errors.OpExecError(log_err)
711

    
712
    self.migration_info = migration_info = result.payload
713

    
714
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
715
      # Then switch the disks to master/master mode
716
      self._EnsureSecondary(self.target_node_uuid)
717
      self._GoStandalone()
718
      self._GoReconnect(True)
719
      self._WaitUntilSync()
720

    
721
    self.feedback_fn("* preparing %s to accept the instance" %
722
                     self.cfg.GetNodeName(self.target_node_uuid))
723
    result = self.rpc.call_accept_instance(self.target_node_uuid,
724
                                           self.instance,
725
                                           migration_info,
726
                                           self.nodes_ip[self.target_node_uuid])
727

    
728
    msg = result.fail_msg
729
    if msg:
730
      logging.error("Instance pre-migration failed, trying to revert"
731
                    " disk status: %s", msg)
732
      self.feedback_fn("Pre-migration failed, aborting")
733
      self._AbortMigration()
734
      self._RevertDiskStatus()
735
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
736
                               (self.instance.name, msg))
737

    
738
    self.feedback_fn("* migrating instance to %s" %
739
                     self.cfg.GetNodeName(self.target_node_uuid))
740
    cluster = self.cfg.GetClusterInfo()
741
    result = self.rpc.call_instance_migrate(
742
        self.source_node_uuid, cluster.cluster_name, self.instance,
743
        self.nodes_ip[self.target_node_uuid], self.live)
744
    msg = result.fail_msg
745
    if msg:
746
      logging.error("Instance migration failed, trying to revert"
747
                    " disk status: %s", msg)
748
      self.feedback_fn("Migration failed, aborting")
749
      self._AbortMigration()
750
      self._RevertDiskStatus()
751
      raise errors.OpExecError("Could not migrate instance %s: %s" %
752
                               (self.instance.name, msg))
753

    
754
    self.feedback_fn("* starting memory transfer")
755
    last_feedback = time.time()
756
    while True:
757
      result = self.rpc.call_instance_get_migration_status(
758
                 self.source_node_uuid, self.instance)
759
      msg = result.fail_msg
760
      ms = result.payload   # MigrationStatus instance
761
      if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
762
        logging.error("Instance migration failed, trying to revert"
763
                      " disk status: %s", msg)
764
        self.feedback_fn("Migration failed, aborting")
765
        self._AbortMigration()
766
        self._RevertDiskStatus()
767
        if not msg:
768
          msg = "hypervisor returned failure"
769
        raise errors.OpExecError("Could not migrate instance %s: %s" %
770
                                 (self.instance.name, msg))
771

    
772
      if result.payload.status != constants.HV_MIGRATION_ACTIVE:
773
        self.feedback_fn("* memory transfer complete")
774
        break
775

    
776
      if (utils.TimeoutExpired(last_feedback,
777
                               self._MIGRATION_FEEDBACK_INTERVAL) and
778
          ms.transferred_ram is not None):
779
        mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
780
        self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
781
        last_feedback = time.time()
782

    
783
      time.sleep(self._MIGRATION_POLL_INTERVAL)
784

    
785
    result = self.rpc.call_instance_finalize_migration_src(
786
               self.source_node_uuid, self.instance, True, self.live)
787
    msg = result.fail_msg
788
    if msg:
789
      logging.error("Instance migration succeeded, but finalization failed"
790
                    " on the source node: %s", msg)
791
      raise errors.OpExecError("Could not finalize instance migration: %s" %
792
                               msg)
793

    
794
    self.instance.primary_node = self.target_node_uuid
795

    
796
    # distribute new instance config to the other nodes
797
    self.cfg.Update(self.instance, self.feedback_fn)
798

    
799
    result = self.rpc.call_instance_finalize_migration_dst(
800
               self.target_node_uuid, self.instance, migration_info, True)
801
    msg = result.fail_msg
802
    if msg:
803
      logging.error("Instance migration succeeded, but finalization failed"
804
                    " on the target node: %s", msg)
805
      raise errors.OpExecError("Could not finalize instance migration: %s" %
806
                               msg)
807

    
808
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
809
      self._EnsureSecondary(self.source_node_uuid)
810
      self._WaitUntilSync()
811
      self._GoStandalone()
812
      self._GoReconnect(False)
813
      self._WaitUntilSync()
814

    
815
    # If the instance's disk template is `rbd' or `ext' and there was a
816
    # successful migration, unmap the device from the source node.
817
    if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
818
      disks = ExpandCheckDisks(self.instance, self.instance.disks)
819
      self.feedback_fn("* unmapping instance's disks from %s" %
820
                       self.cfg.GetNodeName(self.source_node_uuid))
821
      for disk in disks:
822
        result = self.rpc.call_blockdev_shutdown(self.source_node_uuid,
823
                                                 (disk, self.instance))
824
        msg = result.fail_msg
825
        if msg:
826
          logging.error("Migration was successful, but couldn't unmap the"
827
                        " block device %s on source node %s: %s",
828
                        disk.iv_name,
829
                        self.cfg.GetNodeName(self.source_node_uuid), msg)
830
          logging.error("You need to unmap the device %s manually on %s",
831
                        disk.iv_name,
832
                        self.cfg.GetNodeName(self.source_node_uuid))
833

    
834
    self.feedback_fn("* done")
835

    
836
  def _ExecFailover(self):
837
    """Failover an instance.
838

839
    The failover is done by shutting it down on its present node and
840
    starting it on the secondary.
841

842
    """
843
    primary_node = self.cfg.GetNodeInfo(self.instance.primary_node)
844

    
845
    source_node_uuid = self.instance.primary_node
846

    
847
    if self.instance.disks_active:
848
      self.feedback_fn("* checking disk consistency between source and target")
849
      for (idx, dev) in enumerate(self.instance.disks):
850
        # for drbd, these are drbd over lvm
851
        if not CheckDiskConsistency(self.lu, self.instance, dev,
852
                                    self.target_node_uuid, False):
853
          if primary_node.offline:
854
            self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
855
                             " target node %s" %
856
                             (primary_node.name, idx,
857
                              self.cfg.GetNodeName(self.target_node_uuid)))
858
          elif not self.ignore_consistency:
859
            raise errors.OpExecError("Disk %s is degraded on target node,"
860
                                     " aborting failover" % idx)
861
    else:
862
      self.feedback_fn("* not checking disk consistency as instance is not"
863
                       " running")
864

    
865
    self.feedback_fn("* shutting down instance on source node")
866
    logging.info("Shutting down instance %s on node %s",
867
                 self.instance.name, self.cfg.GetNodeName(source_node_uuid))
868

    
869
    result = self.rpc.call_instance_shutdown(source_node_uuid, self.instance,
870
                                             self.shutdown_timeout,
871
                                             self.lu.op.reason)
872
    msg = result.fail_msg
873
    if msg:
874
      if self.ignore_consistency or primary_node.offline:
875
        self.lu.LogWarning("Could not shutdown instance %s on node %s,"
876
                           " proceeding anyway; please make sure node"
877
                           " %s is down; error details: %s",
878
                           self.instance.name,
879
                           self.cfg.GetNodeName(source_node_uuid),
880
                           self.cfg.GetNodeName(source_node_uuid), msg)
881
      else:
882
        raise errors.OpExecError("Could not shutdown instance %s on"
883
                                 " node %s: %s" %
884
                                 (self.instance.name,
885
                                  self.cfg.GetNodeName(source_node_uuid), msg))
886

    
887
    self.feedback_fn("* deactivating the instance's disks on source node")
888
    if not ShutdownInstanceDisks(self.lu, self.instance, ignore_primary=True):
889
      raise errors.OpExecError("Can't shut down the instance's disks")
890

    
891
    self.instance.primary_node = self.target_node_uuid
892
    # distribute new instance config to the other nodes
893
    self.cfg.Update(self.instance, self.feedback_fn)
894

    
895
    # Only start the instance if it's marked as up
896
    if self.instance.admin_state == constants.ADMINST_UP:
897
      self.feedback_fn("* activating the instance's disks on target node %s" %
898
                       self.cfg.GetNodeName(self.target_node_uuid))
899
      logging.info("Starting instance %s on node %s", self.instance.name,
900
                   self.cfg.GetNodeName(self.target_node_uuid))
901

    
902
      disks_ok, _ = AssembleInstanceDisks(self.lu, self.instance,
903
                                          ignore_secondaries=True)
904
      if not disks_ok:
905
        ShutdownInstanceDisks(self.lu, self.instance)
906
        raise errors.OpExecError("Can't activate the instance's disks")
907

    
908
      self.feedback_fn("* starting the instance on the target node %s" %
909
                       self.cfg.GetNodeName(self.target_node_uuid))
910
      result = self.rpc.call_instance_start(self.target_node_uuid,
911
                                            (self.instance, None, None), False,
912
                                            self.lu.op.reason)
913
      msg = result.fail_msg
914
      if msg:
915
        ShutdownInstanceDisks(self.lu, self.instance)
916
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
917
                                 (self.instance.name,
918
                                  self.cfg.GetNodeName(self.target_node_uuid),
919
                                  msg))
920

    
921
  def Exec(self, feedback_fn):
922
    """Perform the migration.
923

924
    """
925
    self.feedback_fn = feedback_fn
926
    self.source_node_uuid = self.instance.primary_node
927

    
928
    # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
929
    if self.instance.disk_template in constants.DTS_INT_MIRROR:
930
      self.target_node_uuid = self.instance.secondary_nodes[0]
931
      # Otherwise self.target_node has been populated either
932
      # directly, or through an iallocator.
933

    
934
    self.all_node_uuids = [self.source_node_uuid, self.target_node_uuid]
935
    self.nodes_ip = dict((uuid, node.secondary_ip) for (uuid, node)
936
                         in self.cfg.GetMultiNodeInfo(self.all_node_uuids))
937

    
938
    if self.failover:
939
      feedback_fn("Failover instance %s" % self.instance.name)
940
      self._ExecFailover()
941
    else:
942
      feedback_fn("Migrating instance %s" % self.instance.name)
943

    
944
      if self.cleanup:
945
        return self._ExecCleanup()
946
      else:
947
        return self._ExecMigration()