Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_migration.py @ 6ccce5d4

History | View | Annotate | Download (38.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with instance migration an failover."""
23

    
24
import logging
25
import time
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti.masterd import iallocator
31
from ganeti import utils
32
from ganeti.cmdlib.base import LogicalUnit, Tasklet
33
from ganeti.cmdlib.common import ExpandInstanceUuidAndName, \
34
  CheckIAllocatorOrNode, ExpandNodeUuidAndName
35
from ganeti.cmdlib.instance_storage import CheckDiskConsistency, \
36
  ExpandCheckDisks, ShutdownInstanceDisks, AssembleInstanceDisks
37
from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \
38
  CheckTargetNodeIPolicy, ReleaseLocks, CheckNodeNotDrained, \
39
  CopyLockList, CheckNodeFreeMemory, CheckInstanceBridgesExist
40

    
41
import ganeti.masterd.instance
42

    
43

    
44
def _ExpandNamesForMigration(lu):
45
  """Expands names for use with L{TLMigrateInstance}.
46

47
  @type lu: L{LogicalUnit}
48

49
  """
50
  if lu.op.target_node is not None:
51
    (lu.op.target_node_uuid, lu.op.target_node) = \
52
      ExpandNodeUuidAndName(lu.cfg, lu.op.target_node_uuid, lu.op.target_node)
53

    
54
  lu.needed_locks[locking.LEVEL_NODE] = []
55
  lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
56

    
57
  lu.needed_locks[locking.LEVEL_NODE_RES] = []
58
  lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
59

    
60
  # The node allocation lock is actually only needed for externally replicated
61
  # instances (e.g. sharedfile or RBD) and if an iallocator is used.
62
  lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
63

    
64

    
65
def _DeclareLocksForMigration(lu, level):
66
  """Declares locks for L{TLMigrateInstance}.
67

68
  @type lu: L{LogicalUnit}
69
  @param level: Lock level
70

71
  """
72
  if level == locking.LEVEL_NODE_ALLOC:
73
    assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
74

    
75
    instance = lu.cfg.GetInstanceInfo(lu.op.instance_uuid)
76

    
77
    # Node locks are already declared here rather than at LEVEL_NODE as we need
78
    # the instance object anyway to declare the node allocation lock.
79
    if instance.disk_template in constants.DTS_EXT_MIRROR:
80
      if lu.op.target_node is None:
81
        lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
82
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
83
      else:
84
        lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
85
                                               lu.op.target_node_uuid]
86
      del lu.recalculate_locks[locking.LEVEL_NODE]
87
    else:
88
      lu._LockInstancesNodes() # pylint: disable=W0212
89

    
90
  elif level == locking.LEVEL_NODE:
91
    # Node locks are declared together with the node allocation lock
92
    assert (lu.needed_locks[locking.LEVEL_NODE] or
93
            lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
94

    
95
  elif level == locking.LEVEL_NODE_RES:
96
    # Copy node locks
97
    lu.needed_locks[locking.LEVEL_NODE_RES] = \
98
      CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
99

    
100

    
101
class LUInstanceFailover(LogicalUnit):
102
  """Failover an instance.
103

104
  """
105
  HPATH = "instance-failover"
106
  HTYPE = constants.HTYPE_INSTANCE
107
  REQ_BGL = False
108

    
109
  def CheckArguments(self):
110
    """Check the arguments.
111

112
    """
113
    self.iallocator = getattr(self.op, "iallocator", None)
114
    self.target_node = getattr(self.op, "target_node", None)
115

    
116
  def ExpandNames(self):
117
    self._ExpandAndLockInstance()
118
    _ExpandNamesForMigration(self)
119

    
120
    self._migrater = \
121
      TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name,
122
                        self.op.cleanup, True, False,
123
                        self.op.ignore_consistency, True,
124
                        self.op.shutdown_timeout, self.op.ignore_ipolicy)
125

    
126
    self.tasklets = [self._migrater]
127

    
128
  def DeclareLocks(self, level):
129
    _DeclareLocksForMigration(self, level)
130

    
131
  def BuildHooksEnv(self):
132
    """Build hooks env.
133

134
    This runs on master, primary and secondary nodes of the instance.
135

136
    """
137
    instance = self._migrater.instance
138
    source_node_uuid = instance.primary_node
139
    target_node_uuid = self._migrater.target_node_uuid
140
    env = {
141
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
142
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
143
      "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid),
144
      "NEW_PRIMARY": self.cfg.GetNodeName(target_node_uuid),
145
      "FAILOVER_CLEANUP": self.op.cleanup,
146
      }
147

    
148
    if instance.disk_template in constants.DTS_INT_MIRROR:
149
      secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance)
150
      env["OLD_SECONDARY"] = self.cfg.GetNodeName(secondary_nodes[0])
151
      env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid)
152
    else:
153
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
154

    
155
    env.update(BuildInstanceHookEnvByObject(self, instance))
156

    
157
    return env
158

    
159
  def BuildHooksNodes(self):
160
    """Build hooks nodes.
161

162
    """
163
    instance = self._migrater.instance
164
    secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance)
165
    nl = [self.cfg.GetMasterNode()] + list(secondary_nodes)
166
    nl.append(self._migrater.target_node_uuid)
167
    return (nl, nl + [instance.primary_node])
168

    
169

    
170
class LUInstanceMigrate(LogicalUnit):
171
  """Migrate an instance.
172

173
  This is migration without shutting down, compared to the failover,
174
  which is done with shutdown.
175

176
  """
177
  HPATH = "instance-migrate"
178
  HTYPE = constants.HTYPE_INSTANCE
179
  REQ_BGL = False
180

    
181
  def ExpandNames(self):
182
    self._ExpandAndLockInstance()
183
    _ExpandNamesForMigration(self)
184

    
185
    self._migrater = \
186
      TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name,
187
                        self.op.cleanup, False, self.op.allow_failover, False,
188
                        self.op.allow_runtime_changes,
189
                        constants.DEFAULT_SHUTDOWN_TIMEOUT,
190
                        self.op.ignore_ipolicy)
191

    
192
    self.tasklets = [self._migrater]
193

    
194
  def DeclareLocks(self, level):
195
    _DeclareLocksForMigration(self, level)
196

    
197
  def BuildHooksEnv(self):
198
    """Build hooks env.
199

200
    This runs on master, primary and secondary nodes of the instance.
201

202
    """
203
    instance = self._migrater.instance
204
    source_node_uuid = instance.primary_node
205
    target_node_uuid = self._migrater.target_node_uuid
206
    env = BuildInstanceHookEnvByObject(self, instance)
207
    env.update({
208
      "MIGRATE_LIVE": self._migrater.live,
209
      "MIGRATE_CLEANUP": self.op.cleanup,
210
      "OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid),
211
      "NEW_PRIMARY": self.cfg.GetNodeName(target_node_uuid),
212
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
213
      })
214

    
215
    if instance.disk_template in constants.DTS_INT_MIRROR:
216
      secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance)
217
      env["OLD_SECONDARY"] = self.cfg.GetNodeName(secondary_nodes[0])
218
      env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid)
219
    else:
220
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
221

    
222
    return env
223

    
224
  def BuildHooksNodes(self):
225
    """Build hooks nodes.
226

227
    """
228
    instance = self._migrater.instance
229
    secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance)
230
    snode_uuids = list(secondary_nodes)
231
    nl = [self.cfg.GetMasterNode(), instance.primary_node] + snode_uuids
232
    nl.append(self._migrater.target_node_uuid)
233
    return (nl, nl)
234

    
235

    
236
class TLMigrateInstance(Tasklet):
237
  """Tasklet class for instance migration.
238

239
  @type live: boolean
240
  @ivar live: whether the migration will be done live or non-live;
241
      this variable is initalized only after CheckPrereq has run
242
  @type cleanup: boolean
243
  @ivar cleanup: Wheater we cleanup from a failed migration
244
  @type iallocator: string
245
  @ivar iallocator: The iallocator used to determine target_node
246
  @type target_node_uuid: string
247
  @ivar target_node_uuid: If given, the target node UUID to reallocate the
248
      instance to
249
  @type failover: boolean
250
  @ivar failover: Whether operation results in failover or migration
251
  @type fallback: boolean
252
  @ivar fallback: Whether fallback to failover is allowed if migration not
253
                  possible
254
  @type ignore_consistency: boolean
255
  @ivar ignore_consistency: Wheter we should ignore consistency between source
256
                            and target node
257
  @type shutdown_timeout: int
258
  @ivar shutdown_timeout: In case of failover timeout of the shutdown
259
  @type ignore_ipolicy: bool
260
  @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
261

262
  """
263

    
264
  # Constants
265
  _MIGRATION_POLL_INTERVAL = 1      # seconds
266
  _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
267

    
268
  def __init__(self, lu, instance_uuid, instance_name, cleanup, failover,
269
               fallback, ignore_consistency, allow_runtime_changes,
270
               shutdown_timeout, ignore_ipolicy):
271
    """Initializes this class.
272

273
    """
274
    Tasklet.__init__(self, lu)
275

    
276
    # Parameters
277
    self.instance_uuid = instance_uuid
278
    self.instance_name = instance_name
279
    self.cleanup = cleanup
280
    self.live = False # will be overridden later
281
    self.failover = failover
282
    self.fallback = fallback
283
    self.ignore_consistency = ignore_consistency
284
    self.shutdown_timeout = shutdown_timeout
285
    self.ignore_ipolicy = ignore_ipolicy
286
    self.allow_runtime_changes = allow_runtime_changes
287

    
288
  def CheckPrereq(self):
289
    """Check prerequisites.
290

291
    This checks that the instance is in the cluster.
292

293
    """
294
    (self.instance_uuid, self.instance_name) = \
295
      ExpandInstanceUuidAndName(self.lu.cfg, self.instance_uuid,
296
                                self.instance_name)
297
    self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
298
    assert self.instance is not None
299
    cluster = self.cfg.GetClusterInfo()
300

    
301
    if (not self.cleanup and
302
        not self.instance.admin_state == constants.ADMINST_UP and
303
        not self.failover and self.fallback):
304
      self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
305
                      " switching to failover")
306
      self.failover = True
307

    
308
    if self.instance.disk_template not in constants.DTS_MIRRORED:
309
      if self.failover:
310
        text = "failovers"
311
      else:
312
        text = "migrations"
313
      raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
314
                                 " %s" % (self.instance.disk_template, text),
315
                                 errors.ECODE_STATE)
316

    
317
    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
318
      CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
319

    
320
      if self.lu.op.iallocator:
321
        assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
322
        self._RunAllocator()
323
      else:
324
        # We set set self.target_node_uuid as it is required by
325
        # BuildHooksEnv
326
        self.target_node_uuid = self.lu.op.target_node_uuid
327

    
328
      # Check that the target node is correct in terms of instance policy
329
      nodeinfo = self.cfg.GetNodeInfo(self.target_node_uuid)
330
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
331
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
332
                                                              group_info)
333
      CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo,
334
                             self.cfg, ignore=self.ignore_ipolicy)
335

    
336
      # self.target_node is already populated, either directly or by the
337
      # iallocator run
338
      target_node_uuid = self.target_node_uuid
339
      if self.target_node_uuid == self.instance.primary_node:
340
        raise errors.OpPrereqError(
341
          "Cannot migrate instance %s to its primary (%s)" %
342
          (self.instance.name,
343
           self.cfg.GetNodeName(self.instance.primary_node)),
344
          errors.ECODE_STATE)
345

    
346
      if len(self.lu.tasklets) == 1:
347
        # It is safe to release locks only when we're the only tasklet
348
        # in the LU
349
        ReleaseLocks(self.lu, locking.LEVEL_NODE,
350
                     keep=[self.instance.primary_node, self.target_node_uuid])
351
        ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
352

    
353
    else:
354
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
355

    
356
      secondary_node_uuids = self.cfg.GetInstanceSecondaryNodes(self.instance)
357
      if not secondary_node_uuids:
358
        raise errors.ConfigurationError("No secondary node but using"
359
                                        " %s disk template" %
360
                                        self.instance.disk_template)
361
      self.target_node_uuid = target_node_uuid = secondary_node_uuids[0]
362
      if self.lu.op.iallocator or \
363
        (self.lu.op.target_node_uuid and
364
         self.lu.op.target_node_uuid != target_node_uuid):
365
        if self.failover:
366
          text = "failed over"
367
        else:
368
          text = "migrated"
369
        raise errors.OpPrereqError("Instances with disk template %s cannot"
370
                                   " be %s to arbitrary nodes"
371
                                   " (neither an iallocator nor a target"
372
                                   " node can be passed)" %
373
                                   (self.instance.disk_template, text),
374
                                   errors.ECODE_INVAL)
375
      nodeinfo = self.cfg.GetNodeInfo(target_node_uuid)
376
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
377
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
378
                                                              group_info)
379
      CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo,
380
                             self.cfg, ignore=self.ignore_ipolicy)
381

    
382
    i_be = cluster.FillBE(self.instance)
383

    
384
    # check memory requirements on the secondary node
385
    if (not self.cleanup and
386
         (not self.failover or
387
           self.instance.admin_state == constants.ADMINST_UP)):
388
      self.tgt_free_mem = CheckNodeFreeMemory(
389
          self.lu, target_node_uuid,
390
          "migrating instance %s" % self.instance.name,
391
          i_be[constants.BE_MINMEM], self.instance.hypervisor,
392
          self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor])
393
    else:
394
      self.lu.LogInfo("Not checking memory on the secondary node as"
395
                      " instance will not be started")
396

    
397
    # check if failover must be forced instead of migration
398
    if (not self.cleanup and not self.failover and
399
        i_be[constants.BE_ALWAYS_FAILOVER]):
400
      self.lu.LogInfo("Instance configured to always failover; fallback"
401
                      " to failover")
402
      self.failover = True
403

    
404
    # check bridge existance
405
    CheckInstanceBridgesExist(self.lu, self.instance,
406
                              node_uuid=target_node_uuid)
407

    
408
    if not self.cleanup:
409
      CheckNodeNotDrained(self.lu, target_node_uuid)
410
      if not self.failover:
411
        result = self.rpc.call_instance_migratable(self.instance.primary_node,
412
                                                   self.instance)
413
        if result.fail_msg and self.fallback:
414
          self.lu.LogInfo("Can't migrate, instance offline, fallback to"
415
                          " failover")
416
          self.failover = True
417
        else:
418
          result.Raise("Can't migrate, please use failover",
419
                       prereq=True, ecode=errors.ECODE_STATE)
420

    
421
    assert not (self.failover and self.cleanup)
422

    
423
    if not self.failover:
424
      if self.lu.op.live is not None and self.lu.op.mode is not None:
425
        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
426
                                   " parameters are accepted",
427
                                   errors.ECODE_INVAL)
428
      if self.lu.op.live is not None:
429
        if self.lu.op.live:
430
          self.lu.op.mode = constants.HT_MIGRATION_LIVE
431
        else:
432
          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
433
        # reset the 'live' parameter to None so that repeated
434
        # invocations of CheckPrereq do not raise an exception
435
        self.lu.op.live = None
436
      elif self.lu.op.mode is None:
437
        # read the default value from the hypervisor
438
        i_hv = cluster.FillHV(self.instance, skip_globals=False)
439
        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
440

    
441
      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
442
    else:
443
      # Failover is never live
444
      self.live = False
445

    
446
    if not (self.failover or self.cleanup):
447
      remote_info = self.rpc.call_instance_info(
448
          self.instance.primary_node, self.instance.name,
449
          self.instance.hypervisor, cluster.hvparams[self.instance.hypervisor])
450
      remote_info.Raise("Error checking instance on node %s" %
451
                        self.cfg.GetNodeName(self.instance.primary_node))
452
      instance_running = bool(remote_info.payload)
453
      if instance_running:
454
        self.current_mem = int(remote_info.payload["memory"])
455

    
456
  def _RunAllocator(self):
457
    """Run the allocator based on input opcode.
458

459
    """
460
    assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
461

    
462
    # FIXME: add a self.ignore_ipolicy option
463
    req = iallocator.IAReqRelocate(
464
          inst_uuid=self.instance_uuid,
465
          relocate_from_node_uuids=[self.instance.primary_node])
466
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
467

    
468
    ial.Run(self.lu.op.iallocator)
469

    
470
    if not ial.success:
471
      raise errors.OpPrereqError("Can't compute nodes using"
472
                                 " iallocator '%s': %s" %
473
                                 (self.lu.op.iallocator, ial.info),
474
                                 errors.ECODE_NORES)
475
    self.target_node_uuid = self.cfg.GetNodeInfoByName(ial.result[0]).uuid
476
    self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
477
                    self.instance_name, self.lu.op.iallocator,
478
                    utils.CommaJoin(ial.result))
479

    
480
  def _WaitUntilSync(self):
481
    """Poll with custom rpc for disk sync.
482

483
    This uses our own step-based rpc call.
484

485
    """
486
    self.feedback_fn("* wait until resync is done")
487
    all_done = False
488
    while not all_done:
489
      all_done = True
490
      result = self.rpc.call_drbd_wait_sync(self.all_node_uuids,
491
                                            (self.instance.disks,
492
                                             self.instance))
493
      min_percent = 100
494
      for node_uuid, nres in result.items():
495
        nres.Raise("Cannot resync disks on node %s" %
496
                   self.cfg.GetNodeName(node_uuid))
497
        node_done, node_percent = nres.payload
498
        all_done = all_done and node_done
499
        if node_percent is not None:
500
          min_percent = min(min_percent, node_percent)
501
      if not all_done:
502
        if min_percent < 100:
503
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
504
        time.sleep(2)
505

    
506
  def _EnsureSecondary(self, node_uuid):
507
    """Demote a node to secondary.
508

509
    """
510
    self.feedback_fn("* switching node %s to secondary mode" %
511
                     self.cfg.GetNodeName(node_uuid))
512

    
513
    result = self.rpc.call_blockdev_close(node_uuid, self.instance.name,
514
                                          (self.instance.disks, self.instance))
515
    result.Raise("Cannot change disk to secondary on node %s" %
516
                 self.cfg.GetNodeName(node_uuid))
517

    
518
  def _GoStandalone(self):
519
    """Disconnect from the network.
520

521
    """
522
    self.feedback_fn("* changing into standalone mode")
523
    result = self.rpc.call_drbd_disconnect_net(
524
               self.all_node_uuids, (self.instance.disks, self.instance))
525
    for node_uuid, nres in result.items():
526
      nres.Raise("Cannot disconnect disks node %s" %
527
                 self.cfg.GetNodeName(node_uuid))
528

    
529
  def _GoReconnect(self, multimaster):
530
    """Reconnect to the network.
531

532
    """
533
    if multimaster:
534
      msg = "dual-master"
535
    else:
536
      msg = "single-master"
537
    self.feedback_fn("* changing disks into %s mode" % msg)
538
    result = self.rpc.call_drbd_attach_net(self.all_node_uuids,
539
                                           (self.instance.disks, self.instance),
540
                                           self.instance.name, multimaster)
541
    for node_uuid, nres in result.items():
542
      nres.Raise("Cannot change disks config on node %s" %
543
                 self.cfg.GetNodeName(node_uuid))
544

    
545
  def _ExecCleanup(self):
546
    """Try to cleanup after a failed migration.
547

548
    The cleanup is done by:
549
      - check that the instance is running only on one node
550
        (and update the config if needed)
551
      - change disks on its secondary node to secondary
552
      - wait until disks are fully synchronized
553
      - disconnect from the network
554
      - change disks into single-master mode
555
      - wait again until disks are fully synchronized
556

557
    """
558
    # check running on only one node
559
    self.feedback_fn("* checking where the instance actually runs"
560
                     " (if this hangs, the hypervisor might be in"
561
                     " a bad state)")
562
    cluster_hvparams = self.cfg.GetClusterInfo().hvparams
563
    ins_l = self.rpc.call_instance_list(self.all_node_uuids,
564
                                        [self.instance.hypervisor],
565
                                        cluster_hvparams)
566
    for node_uuid, result in ins_l.items():
567
      result.Raise("Can't contact node %s" % node_uuid)
568

    
569
    runningon_source = self.instance.name in \
570
                         ins_l[self.source_node_uuid].payload
571
    runningon_target = self.instance.name in \
572
                         ins_l[self.target_node_uuid].payload
573

    
574
    if runningon_source and runningon_target:
575
      raise errors.OpExecError("Instance seems to be running on two nodes,"
576
                               " or the hypervisor is confused; you will have"
577
                               " to ensure manually that it runs only on one"
578
                               " and restart this operation")
579

    
580
    if not (runningon_source or runningon_target):
581
      raise errors.OpExecError("Instance does not seem to be running at all;"
582
                               " in this case it's safer to repair by"
583
                               " running 'gnt-instance stop' to ensure disk"
584
                               " shutdown, and then restarting it")
585

    
586
    if runningon_target:
587
      # the migration has actually succeeded, we need to update the config
588
      self.feedback_fn("* instance running on secondary node (%s),"
589
                       " updating config" %
590
                       self.cfg.GetNodeName(self.target_node_uuid))
591
      self.instance.primary_node = self.target_node_uuid
592
      self.cfg.Update(self.instance, self.feedback_fn)
593
      demoted_node_uuid = self.source_node_uuid
594
    else:
595
      self.feedback_fn("* instance confirmed to be running on its"
596
                       " primary node (%s)" %
597
                       self.cfg.GetNodeName(self.source_node_uuid))
598
      demoted_node_uuid = self.target_node_uuid
599

    
600
    if self.instance.disk_template in constants.DTS_INT_MIRROR:
601
      self._EnsureSecondary(demoted_node_uuid)
602
      try:
603
        self._WaitUntilSync()
604
      except errors.OpExecError:
605
        # we ignore here errors, since if the device is standalone, it
606
        # won't be able to sync
607
        pass
608
      self._GoStandalone()
609
      self._GoReconnect(False)
610
      self._WaitUntilSync()
611

    
612
    self.feedback_fn("* done")
613

    
614
  def _RevertDiskStatus(self):
615
    """Try to revert the disk status after a failed migration.
616

617
    """
618
    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
619
      return
620

    
621
    try:
622
      self._EnsureSecondary(self.target_node_uuid)
623
      self._GoStandalone()
624
      self._GoReconnect(False)
625
      self._WaitUntilSync()
626
    except errors.OpExecError, err:
627
      self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
628
                         " please try to recover the instance manually;"
629
                         " error '%s'" % str(err))
630

    
631
  def _AbortMigration(self):
632
    """Call the hypervisor code to abort a started migration.
633

634
    """
635
    abort_result = self.rpc.call_instance_finalize_migration_dst(
636
                     self.target_node_uuid, self.instance, self.migration_info,
637
                     False)
638
    abort_msg = abort_result.fail_msg
639
    if abort_msg:
640
      logging.error("Aborting migration failed on target node %s: %s",
641
                    self.cfg.GetNodeName(self.target_node_uuid), abort_msg)
642
      # Don't raise an exception here, as we stil have to try to revert the
643
      # disk status, even if this step failed.
644

    
645
    abort_result = self.rpc.call_instance_finalize_migration_src(
646
      self.source_node_uuid, self.instance, False, self.live)
647
    abort_msg = abort_result.fail_msg
648
    if abort_msg:
649
      logging.error("Aborting migration failed on source node %s: %s",
650
                    self.cfg.GetNodeName(self.source_node_uuid), abort_msg)
651

    
652
  def _ExecMigration(self):
653
    """Migrate an instance.
654

655
    The migrate is done by:
656
      - change the disks into dual-master mode
657
      - wait until disks are fully synchronized again
658
      - migrate the instance
659
      - change disks on the new secondary node (the old primary) to secondary
660
      - wait until disks are fully synchronized
661
      - change disks into single-master mode
662

663
    """
664
    # Check for hypervisor version mismatch and warn the user.
665
    hvspecs = [(self.instance.hypervisor,
666
                self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor])]
667
    nodeinfo = self.rpc.call_node_info(
668
                 [self.source_node_uuid, self.target_node_uuid], None, hvspecs)
669
    for ninfo in nodeinfo.values():
670
      ninfo.Raise("Unable to retrieve node information from node '%s'" %
671
                  ninfo.node)
672
    (_, _, (src_info, )) = nodeinfo[self.source_node_uuid].payload
673
    (_, _, (dst_info, )) = nodeinfo[self.target_node_uuid].payload
674

    
675
    if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
676
        (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
677
      src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
678
      dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
679
      if src_version != dst_version:
680
        self.feedback_fn("* warning: hypervisor version mismatch between"
681
                         " source (%s) and target (%s) node" %
682
                         (src_version, dst_version))
683

    
684
    self.feedback_fn("* checking disk consistency between source and target")
685
    for (idx, dev) in enumerate(self.instance.disks):
686
      if not CheckDiskConsistency(self.lu, self.instance, dev,
687
                                  self.target_node_uuid,
688
                                  False):
689
        raise errors.OpExecError("Disk %s is degraded or not fully"
690
                                 " synchronized on target node,"
691
                                 " aborting migration" % idx)
692

    
693
    if self.current_mem > self.tgt_free_mem:
694
      if not self.allow_runtime_changes:
695
        raise errors.OpExecError("Memory ballooning not allowed and not enough"
696
                                 " free memory to fit instance %s on target"
697
                                 " node %s (have %dMB, need %dMB)" %
698
                                 (self.instance.name,
699
                                  self.cfg.GetNodeName(self.target_node_uuid),
700
                                  self.tgt_free_mem, self.current_mem))
701
      self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
702
      rpcres = self.rpc.call_instance_balloon_memory(self.instance.primary_node,
703
                                                     self.instance,
704
                                                     self.tgt_free_mem)
705
      rpcres.Raise("Cannot modify instance runtime memory")
706

    
707
    # First get the migration information from the remote node
708
    result = self.rpc.call_migration_info(self.source_node_uuid, self.instance)
709
    msg = result.fail_msg
710
    if msg:
711
      log_err = ("Failed fetching source migration information from %s: %s" %
712
                 (self.cfg.GetNodeName(self.source_node_uuid), msg))
713
      logging.error(log_err)
714
      raise errors.OpExecError(log_err)
715

    
716
    self.migration_info = migration_info = result.payload
717

    
718
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
719
      # Then switch the disks to master/master mode
720
      self._EnsureSecondary(self.target_node_uuid)
721
      self._GoStandalone()
722
      self._GoReconnect(True)
723
      self._WaitUntilSync()
724

    
725
    self.feedback_fn("* preparing %s to accept the instance" %
726
                     self.cfg.GetNodeName(self.target_node_uuid))
727
    result = self.rpc.call_accept_instance(self.target_node_uuid,
728
                                           self.instance,
729
                                           migration_info,
730
                                           self.nodes_ip[self.target_node_uuid])
731

    
732
    msg = result.fail_msg
733
    if msg:
734
      logging.error("Instance pre-migration failed, trying to revert"
735
                    " disk status: %s", msg)
736
      self.feedback_fn("Pre-migration failed, aborting")
737
      self._AbortMigration()
738
      self._RevertDiskStatus()
739
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
740
                               (self.instance.name, msg))
741

    
742
    self.feedback_fn("* migrating instance to %s" %
743
                     self.cfg.GetNodeName(self.target_node_uuid))
744
    cluster = self.cfg.GetClusterInfo()
745
    result = self.rpc.call_instance_migrate(
746
        self.source_node_uuid, cluster.cluster_name, self.instance,
747
        self.nodes_ip[self.target_node_uuid], self.live)
748
    msg = result.fail_msg
749
    if msg:
750
      logging.error("Instance migration failed, trying to revert"
751
                    " disk status: %s", msg)
752
      self.feedback_fn("Migration failed, aborting")
753
      self._AbortMigration()
754
      self._RevertDiskStatus()
755
      raise errors.OpExecError("Could not migrate instance %s: %s" %
756
                               (self.instance.name, msg))
757

    
758
    self.feedback_fn("* starting memory transfer")
759
    last_feedback = time.time()
760
    while True:
761
      result = self.rpc.call_instance_get_migration_status(
762
                 self.source_node_uuid, self.instance)
763
      msg = result.fail_msg
764
      ms = result.payload   # MigrationStatus instance
765
      if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
766
        logging.error("Instance migration failed, trying to revert"
767
                      " disk status: %s", msg)
768
        self.feedback_fn("Migration failed, aborting")
769
        self._AbortMigration()
770
        self._RevertDiskStatus()
771
        if not msg:
772
          msg = "hypervisor returned failure"
773
        raise errors.OpExecError("Could not migrate instance %s: %s" %
774
                                 (self.instance.name, msg))
775

    
776
      if result.payload.status != constants.HV_MIGRATION_ACTIVE:
777
        self.feedback_fn("* memory transfer complete")
778
        break
779

    
780
      if (utils.TimeoutExpired(last_feedback,
781
                               self._MIGRATION_FEEDBACK_INTERVAL) and
782
          ms.transferred_ram is not None):
783
        mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
784
        self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
785
        last_feedback = time.time()
786

    
787
      time.sleep(self._MIGRATION_POLL_INTERVAL)
788

    
789
    result = self.rpc.call_instance_finalize_migration_src(
790
               self.source_node_uuid, self.instance, True, self.live)
791
    msg = result.fail_msg
792
    if msg:
793
      logging.error("Instance migration succeeded, but finalization failed"
794
                    " on the source node: %s", msg)
795
      raise errors.OpExecError("Could not finalize instance migration: %s" %
796
                               msg)
797

    
798
    self.instance.primary_node = self.target_node_uuid
799

    
800
    # distribute new instance config to the other nodes
801
    self.cfg.Update(self.instance, self.feedback_fn)
802

    
803
    result = self.rpc.call_instance_finalize_migration_dst(
804
               self.target_node_uuid, self.instance, migration_info, True)
805
    msg = result.fail_msg
806
    if msg:
807
      logging.error("Instance migration succeeded, but finalization failed"
808
                    " on the target node: %s", msg)
809
      raise errors.OpExecError("Could not finalize instance migration: %s" %
810
                               msg)
811

    
812
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
813
      self._EnsureSecondary(self.source_node_uuid)
814
      self._WaitUntilSync()
815
      self._GoStandalone()
816
      self._GoReconnect(False)
817
      self._WaitUntilSync()
818

    
819
    # If the instance's disk template is `rbd' or `ext' and there was a
820
    # successful migration, unmap the device from the source node.
821
    if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
822
      disks = ExpandCheckDisks(self.instance, self.instance.disks)
823
      self.feedback_fn("* unmapping instance's disks from %s" %
824
                       self.cfg.GetNodeName(self.source_node_uuid))
825
      for disk in disks:
826
        result = self.rpc.call_blockdev_shutdown(self.source_node_uuid,
827
                                                 (disk, self.instance))
828
        msg = result.fail_msg
829
        if msg:
830
          logging.error("Migration was successful, but couldn't unmap the"
831
                        " block device %s on source node %s: %s",
832
                        disk.iv_name,
833
                        self.cfg.GetNodeName(self.source_node_uuid), msg)
834
          logging.error("You need to unmap the device %s manually on %s",
835
                        disk.iv_name,
836
                        self.cfg.GetNodeName(self.source_node_uuid))
837

    
838
    self.feedback_fn("* done")
839

    
840
  def _ExecFailover(self):
841
    """Failover an instance.
842

843
    The failover is done by shutting it down on its present node and
844
    starting it on the secondary.
845

846
    """
847
    primary_node = self.cfg.GetNodeInfo(self.instance.primary_node)
848

    
849
    source_node_uuid = self.instance.primary_node
850

    
851
    if self.instance.disks_active:
852
      self.feedback_fn("* checking disk consistency between source and target")
853
      for (idx, dev) in enumerate(self.instance.disks):
854
        # for drbd, these are drbd over lvm
855
        if not CheckDiskConsistency(self.lu, self.instance, dev,
856
                                    self.target_node_uuid, False):
857
          if primary_node.offline:
858
            self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
859
                             " target node %s" %
860
                             (primary_node.name, idx,
861
                              self.cfg.GetNodeName(self.target_node_uuid)))
862
          elif not self.ignore_consistency:
863
            raise errors.OpExecError("Disk %s is degraded on target node,"
864
                                     " aborting failover" % idx)
865
    else:
866
      self.feedback_fn("* not checking disk consistency as instance is not"
867
                       " running")
868

    
869
    self.feedback_fn("* shutting down instance on source node")
870
    logging.info("Shutting down instance %s on node %s",
871
                 self.instance.name, self.cfg.GetNodeName(source_node_uuid))
872

    
873
    result = self.rpc.call_instance_shutdown(source_node_uuid, self.instance,
874
                                             self.shutdown_timeout,
875
                                             self.lu.op.reason)
876
    msg = result.fail_msg
877
    if msg:
878
      if self.ignore_consistency or primary_node.offline:
879
        self.lu.LogWarning("Could not shutdown instance %s on node %s,"
880
                           " proceeding anyway; please make sure node"
881
                           " %s is down; error details: %s",
882
                           self.instance.name,
883
                           self.cfg.GetNodeName(source_node_uuid),
884
                           self.cfg.GetNodeName(source_node_uuid), msg)
885
      else:
886
        raise errors.OpExecError("Could not shutdown instance %s on"
887
                                 " node %s: %s" %
888
                                 (self.instance.name,
889
                                  self.cfg.GetNodeName(source_node_uuid), msg))
890

    
891
    self.feedback_fn("* deactivating the instance's disks on source node")
892
    if not ShutdownInstanceDisks(self.lu, self.instance, ignore_primary=True):
893
      raise errors.OpExecError("Can't shut down the instance's disks")
894

    
895
    self.instance.primary_node = self.target_node_uuid
896
    # distribute new instance config to the other nodes
897
    self.cfg.Update(self.instance, self.feedback_fn)
898

    
899
    # Only start the instance if it's marked as up
900
    if self.instance.admin_state == constants.ADMINST_UP:
901
      self.feedback_fn("* activating the instance's disks on target node %s" %
902
                       self.cfg.GetNodeName(self.target_node_uuid))
903
      logging.info("Starting instance %s on node %s", self.instance.name,
904
                   self.cfg.GetNodeName(self.target_node_uuid))
905

    
906
      disks_ok, _ = AssembleInstanceDisks(self.lu, self.instance,
907
                                          ignore_secondaries=True)
908
      if not disks_ok:
909
        ShutdownInstanceDisks(self.lu, self.instance)
910
        raise errors.OpExecError("Can't activate the instance's disks")
911

    
912
      self.feedback_fn("* starting the instance on the target node %s" %
913
                       self.cfg.GetNodeName(self.target_node_uuid))
914
      result = self.rpc.call_instance_start(self.target_node_uuid,
915
                                            (self.instance, None, None), False,
916
                                            self.lu.op.reason)
917
      msg = result.fail_msg
918
      if msg:
919
        ShutdownInstanceDisks(self.lu, self.instance)
920
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
921
                                 (self.instance.name,
922
                                  self.cfg.GetNodeName(self.target_node_uuid),
923
                                  msg))
924

    
925
  def Exec(self, feedback_fn):
926
    """Perform the migration.
927

928
    """
929
    self.feedback_fn = feedback_fn
930
    self.source_node_uuid = self.instance.primary_node
931

    
932
    # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
933
    if self.instance.disk_template in constants.DTS_INT_MIRROR:
934
      secondary_nodes = self.cfg.GetInstanceSecondaryNodes(self.instance)
935
      self.target_node_uuid = secondary_nodes[0]
936
      # Otherwise self.target_node has been populated either
937
      # directly, or through an iallocator.
938

    
939
    self.all_node_uuids = [self.source_node_uuid, self.target_node_uuid]
940
    self.nodes_ip = dict((uuid, node.secondary_ip) for (uuid, node)
941
                         in self.cfg.GetMultiNodeInfo(self.all_node_uuids))
942

    
943
    if self.failover:
944
      feedback_fn("Failover instance %s" % self.instance.name)
945
      self._ExecFailover()
946
    else:
947
      feedback_fn("Migrating instance %s" % self.instance.name)
948

    
949
      if self.cleanup:
950
        return self._ExecCleanup()
951
      else:
952
        return self._ExecMigration()