Make migration RPC non-blocking
authorAndrea Spadaccini <spadaccio@google.com>
Thu, 22 Sep 2011 18:19:07 +0000 (19:19 +0100)
committerAndrea Spadaccini <spadaccio@google.com>
Thu, 29 Sep 2011 09:41:44 +0000 (10:41 +0100)
To add status reporting for the KVM migration, the instance_migrate RPC
must be non-blocking. Moreover, there must be a way to represent the
migration status and a way to fetch it.

* constants.py:
  - add constants representing the migration statuses

* objects.py:
  - add the MigrationStatus object

* hypervisor/hv_base.py
  - change the FinalizeMigration method name to FinalizeMigrationDst
  - add the FinalizeMigrationSource method
  - add the GetMigrationStatus method

* hypervisor/hv_kvm.py
  - change the implementation of MigrateInstance to be non-blocking
    (i.e. do not poll the status of the migration)
  - implement the new methods defined in BaseHypervisor

* backend.py, server/noded.py, rpc.py
  - add methods to call the new hypervisor methods
  - fix documentation of the existing methods to reflect the changes

* cmdlib.py
  - adapt the logic of TLMigrateInstance._ExecMigration to reflect
    the changes

Signed-off-by: Andrea Spadaccini <spadaccio@google.com>
Reviewed-by: Michael Hanselmann <hansmi@google.com>

lib/backend.py
lib/cmdlib.py
lib/constants.py
lib/hypervisor/hv_base.py
lib/hypervisor/hv_kvm.py
lib/objects.py
lib/rpc.py
lib/server/noded.py

index c38380d..6661a08 100644 (file)
@@ -1280,7 +1280,7 @@ def AcceptInstance(instance, info, target):
     _Fail("Failed to accept instance: %s", err, exc=True)
 
 
-def FinalizeMigration(instance, info, success):
+def FinalizeMigrationDst(instance, info, success):
   """Finalize any preparation to accept an instance.
 
   @type instance: L{objects.Instance}
@@ -1293,9 +1293,9 @@ def FinalizeMigration(instance, info, success):
   """
   hyper = hypervisor.GetHypervisor(instance.hypervisor)
   try:
-    hyper.FinalizeMigration(instance, info, success)
+    hyper.FinalizeMigrationDst(instance, info, success)
   except errors.HypervisorError, err:
-    _Fail("Failed to finalize migration: %s", err, exc=True)
+    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
 
 
 def MigrateInstance(instance, target, live):
@@ -1319,6 +1319,46 @@ def MigrateInstance(instance, target, live):
     _Fail("Failed to migrate instance: %s", err, exc=True)
 
 
+def FinalizeMigrationSource(instance, success, live):
+  """Finalize the instance migration on the source node.
+
+  @type instance: L{objects.Instance}
+  @param instance: the instance definition of the migrated instance
+  @type success: bool
+  @param success: whether the migration succeeded or not
+  @type live: bool
+  @param live: whether the user requested a live migration or not
+  @raise RPCFail: If the execution fails for some reason
+
+  """
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+
+  try:
+    hyper.FinalizeMigrationSource(instance, success, live)
+  except Exception, err:  # pylint: disable=W0703
+    _Fail("Failed to finalize the migration on the source node: %s", err,
+          exc=True)
+
+
+def GetMigrationStatus(instance):
+  """Get the migration status
+
+  @type instance: L{objects.Instance}
+  @param instance: the instance that is being migrated
+  @rtype: L{objects.MigrationStatus}
+  @return: the status of the current migration (one of
+           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
+           progress info that can be retrieved from the hypervisor
+  @raise RPCFail: If the migration status cannot be retrieved
+
+  """
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+  try:
+    return hyper.GetMigrationStatus(instance)
+  except Exception, err:  # pylint: disable=W0703
+    _Fail("Failed to get migration status: %s", err, exc=True)
+
+
 def BlockdevCreate(disk, size, owner, on_primary, info):
   """Creates a block device for an instance.
 
index 8c45f01..b2a6227 100644 (file)
@@ -7025,6 +7025,10 @@ class TLMigrateInstance(Tasklet):
   @ivar shutdown_timeout: In case of failover timeout of the shutdown
 
   """
+
+  # Constants
+  _MIGRATION_POLL_INTERVAL = 0.5
+
   def __init__(self, lu, instance_name, cleanup=False,
                failover=False, fallback=False,
                ignore_consistency=False,
@@ -7348,12 +7352,13 @@ class TLMigrateInstance(Tasklet):
     """
     instance = self.instance
     target_node = self.target_node
+    source_node = self.source_node
     migration_info = self.migration_info
 
-    abort_result = self.rpc.call_finalize_migration(target_node,
-                                                    instance,
-                                                    migration_info,
-                                                    False)
+    abort_result = self.rpc.call_instance_finalize_migration_dst(target_node,
+                                                                 instance,
+                                                                 migration_info,
+                                                                 False)
     abort_msg = abort_result.fail_msg
     if abort_msg:
       logging.error("Aborting migration failed on target node %s: %s",
@@ -7361,6 +7366,13 @@ class TLMigrateInstance(Tasklet):
       # Don't raise an exception here, as we stil have to try to revert the
       # disk status, even if this step failed.
 
+    abort_result = self.rpc.call_instance_finalize_migration_src(source_node,
+        instance, False, self.live)
+    abort_msg = abort_result.fail_msg
+    if abort_msg:
+      logging.error("Aborting migration failed on source node %s: %s",
+                    source_node, abort_msg)
+
   def _ExecMigration(self):
     """Migrate an instance.
 
@@ -7432,18 +7444,51 @@ class TLMigrateInstance(Tasklet):
       raise errors.OpExecError("Could not migrate instance %s: %s" %
                                (instance.name, msg))
 
+    self.feedback_fn("* starting memory transfer")
+    while True:
+      result = self.rpc.call_instance_get_migration_status(source_node,
+                                                           instance)
+      msg = result.fail_msg
+      ms = result.payload   # MigrationStatus instance
+      if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
+        logging.error("Instance migration failed, trying to revert"
+                      " disk status: %s", msg)
+        self.feedback_fn("Migration failed, aborting")
+        self._AbortMigration()
+        self._RevertDiskStatus()
+        raise errors.OpExecError("Could not migrate instance %s: %s" %
+                                 (instance.name, msg))
+
+      if result.payload.status != constants.HV_MIGRATION_ACTIVE:
+        self.feedback_fn("* memory transfer complete")
+        break
+
+      time.sleep(self._MIGRATION_POLL_INTERVAL)
+
+    result = self.rpc.call_instance_finalize_migration_src(source_node,
+                                                           instance,
+                                                           True,
+                                                           self.live)
+    msg = result.fail_msg
+    if msg:
+      logging.error("Instance migration succeeded, but finalization failed"
+                    " on the source node: %s", msg)
+      raise errors.OpExecError("Could not finalize instance migration: %s" %
+                               msg)
+
     instance.primary_node = target_node
+
     # distribute new instance config to the other nodes
     self.cfg.Update(instance, self.feedback_fn)
 
-    result = self.rpc.call_finalize_migration(target_node,
-                                              instance,
-                                              migration_info,
-                                              True)
+    result = self.rpc.call_instance_finalize_migration_dst(target_node,
+                                                           instance,
+                                                           migration_info,
+                                                           True)
     msg = result.fail_msg
     if msg:
-      logging.error("Instance migration succeeded, but finalization failed:"
-                    " %s", msg)
+      logging.error("Instance migration succeeded, but finalization failed"
+                    " on the target node: %s", msg)
       raise errors.OpExecError("Could not finalize instance migration: %s" %
                                msg)
 
index dd72454..9dc202b 100644 (file)
@@ -797,6 +797,27 @@ HVS_PARAMETER_TYPES = {
 
 HVS_PARAMETERS = frozenset(HVS_PARAMETER_TYPES.keys())
 
+# Migration statuses
+HV_MIGRATION_COMPLETED = "completed"
+HV_MIGRATION_ACTIVE = "active"
+HV_MIGRATION_FAILED = "failed"
+HV_MIGRATION_CANCELLED = "cancelled"
+
+HV_MIGRATION_VALID_STATUSES = frozenset([
+  HV_MIGRATION_COMPLETED,
+  HV_MIGRATION_ACTIVE,
+  HV_MIGRATION_FAILED,
+  HV_MIGRATION_CANCELLED,
+  ])
+
+HV_MIGRATION_FAILED_STATUSES = frozenset([
+  HV_MIGRATION_FAILED,
+  HV_MIGRATION_CANCELLED,
+  ])
+
+# KVM-specific statuses
+HV_KVM_MIGRATION_VALID_STATUSES = HV_MIGRATION_VALID_STATUSES
+
 # Backend parameter names
 BE_MEMORY = "memory"
 BE_VCPUS = "vcpus"
index 0886933..824842c 100644 (file)
@@ -291,8 +291,8 @@ class BaseHypervisor(object):
     """
     pass
 
-  def FinalizeMigration(self, instance, info, success):
-    """Finalized an instance migration.
+  def FinalizeMigrationDst(self, instance, info, success):
+    """Finalize the instance migration on the target node.
 
     Should finalize or revert any preparation done to accept the instance.
     Since by default we do no preparation, we also don't have anything to do
@@ -320,6 +320,32 @@ class BaseHypervisor(object):
     """
     raise NotImplementedError
 
+  def FinalizeMigrationSource(self, instance, success, live):
+    """Finalize the instance migration on the source node.
+
+    @type instance: L{objects.Instance}
+    @param instance: the instance that was migrated
+    @type success: bool
+    @param success: whether the migration succeeded or not
+    @type live: bool
+    @param live: whether the user requested a live migration or not
+
+    """
+    pass
+
+  def GetMigrationStatus(self, instance):
+    """Get the migration status
+
+    @type instance: L{objects.Instance}
+    @param instance: the instance that is being migrated
+    @rtype: L{objects.MigrationStatus}
+    @return: the status of the current migration (one of
+             L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
+             progress info that can be retrieved from the hypervisor
+
+    """
+    raise NotImplementedError
+
   @classmethod
   def CheckParameterSyntax(cls, hvparams):
     """Check the given parameters for validity.
index 8ea1d71..2714dbd 100644 (file)
@@ -1574,8 +1574,8 @@ class KVMHypervisor(hv_base.BaseHypervisor):
     incoming_address = (target, instance.hvparams[constants.HV_MIGRATION_PORT])
     self._ExecuteKVMRuntime(instance, kvm_runtime, incoming=incoming_address)
 
-  def FinalizeMigration(self, instance, info, success):
-    """Finalize an instance migration.
+  def FinalizeMigrationDst(self, instance, info, success):
+    """Finalize the instance migration on the target node.
 
     Stop the incoming mode KVM.
 
@@ -1622,7 +1622,7 @@ class KVMHypervisor(hv_base.BaseHypervisor):
     """
     instance_name = instance.name
     port = instance.hvparams[constants.HV_MIGRATION_PORT]
-    pidfile, pid, alive = self._InstancePidAlive(instance_name)
+    _, _, alive = self._InstancePidAlive(instance_name)
     if not alive:
       raise errors.HypervisorError("Instance not running, cannot migrate")
 
@@ -1640,42 +1640,57 @@ class KVMHypervisor(hv_base.BaseHypervisor):
     migrate_command = "migrate -d tcp:%s:%s" % (target, port)
     self._CallMonitorCommand(instance_name, migrate_command)
 
+  def FinalizeMigrationSource(self, instance, success, live):
+    """Finalize the instance migration on the source node.
+
+    @type instance: L{objects.Instance}
+    @param instance: the instance that was migrated
+    @type success: bool
+    @param success: whether the migration succeeded or not
+    @type live: bool
+    @param live: whether the user requested a live migration or not
+
+    """
+    if success:
+      pidfile, pid, _ = self._InstancePidAlive(instance.name)
+      utils.KillProcess(pid)
+      self._RemoveInstanceRuntimeFiles(pidfile, instance.name)
+    elif live:
+      self._CallMonitorCommand(instance.name, self._CONT_CMD)
+
+  def GetMigrationStatus(self, instance):
+    """Get the migration status
+
+    @type instance: L{objects.Instance}
+    @param instance: the instance that is being migrated
+    @rtype: L{objects.MigrationStatus}
+    @return: the status of the current migration (one of
+             L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
+             progress info that can be retrieved from the hypervisor
+
+    """
     info_command = "info migrate"
-    done = False
-    broken_answers = 0
-    while not done:
-      result = self._CallMonitorCommand(instance_name, info_command)
+    for _ in range(self._MIGRATION_INFO_MAX_BAD_ANSWERS):
+      result = self._CallMonitorCommand(instance.name, info_command)
       match = self._MIGRATION_STATUS_RE.search(result.stdout)
       if not match:
-        broken_answers += 1
         if not result.stdout:
           logging.info("KVM: empty 'info migrate' result")
         else:
           logging.warning("KVM: unknown 'info migrate' result: %s",
                           result.stdout)
-        time.sleep(self._MIGRATION_INFO_RETRY_DELAY)
       else:
         status = match.group(1)
-        if status == "completed":
-          done = True
-        elif status == "active":
-          # reset the broken answers count
-          broken_answers = 0
-          time.sleep(self._MIGRATION_INFO_RETRY_DELAY)
-        elif status == "failed" or status == "cancelled":
-          if not live:
-            self._CallMonitorCommand(instance_name, self._CONT_CMD)
-          raise errors.HypervisorError("Migration %s at the kvm level" %
-                                       status)
-        else:
-          logging.warning("KVM: unknown migration status '%s'", status)
-          broken_answers += 1
-          time.sleep(self._MIGRATION_INFO_RETRY_DELAY)
-      if broken_answers >= self._MIGRATION_INFO_MAX_BAD_ANSWERS:
-        raise errors.HypervisorError("Too many 'info migrate' broken answers")
+        if status in constants.HV_KVM_MIGRATION_VALID_STATUSES:
+          migration_status = objects.MigrationStatus(status=status)
+          return migration_status
 
-    utils.KillProcess(pid)
-    self._RemoveInstanceRuntimeFiles(pidfile, instance_name)
+        logging.warning("KVM: unknown migration status '%s'", status)
+
+      time.sleep(self._MIGRATION_INFO_RETRY_DELAY)
+
+    return objects.MigrationStatus(status=constants.HV_MIGRATION_FAILED,
+                                  info="Too many 'info migrate' broken answers")
 
   def GetNodeInfo(self):
     """Return information about the node.
index 8cc7ac3..29f03e0 100644 (file)
@@ -1503,6 +1503,17 @@ class QueryFieldsResponse(_QueryResponseBase):
     ]
 
 
+class MigrationStatus(ConfigObject):
+  """Object holding the status of a migration.
+
+  """
+  __slots__ = [
+    "status",
+    "transferred_ram",
+    "total_ram",
+    ]
+
+
 class InstanceConsole(ConfigObject):
   """Object describing how to access the console of an instance.
 
index 746fd74..b405900 100644 (file)
@@ -703,7 +703,7 @@ class RpcRunner(object):
                                 [self._InstDict(instance), info, target])
 
   @_RpcTimeout(_TMO_NORMAL)
-  def call_finalize_migration(self, node, instance, info, success):
+  def call_instance_finalize_migration_dst(self, node, instance, info, success):
     """Finalize any target-node migration specific operation.
 
     This is called both in case of a successful migration and in case of error
@@ -721,7 +721,7 @@ class RpcRunner(object):
     @param success: whether the migration was a success or a failure
 
     """
-    return self._SingleNodeCall(node, "finalize_migration",
+    return self._SingleNodeCall(node, "instance_finalize_migration_dst",
                                 [self._InstDict(instance), info, success])
 
   @_RpcTimeout(_TMO_SLOW)
@@ -744,6 +744,43 @@ class RpcRunner(object):
     return self._SingleNodeCall(node, "instance_migrate",
                                 [self._InstDict(instance), target, live])
 
+  @_RpcTimeout(_TMO_SLOW)
+  def call_instance_finalize_migration_src(self, node, instance, success, live):
+    """Finalize the instance migration on the source node.
+
+    This is a single-node call.
+
+    @type instance: L{objects.Instance}
+    @param instance: the instance that was migrated
+    @type success: bool
+    @param success: whether the migration succeeded or not
+    @type live: bool
+    @param live: whether the user requested a live migration or not
+
+    """
+    return self._SingleNodeCall(node, "instance_finalize_migration_src",
+                                [self._InstDict(instance), success, live])
+
+  @_RpcTimeout(_TMO_SLOW)
+  def call_instance_get_migration_status(self, node, instance):
+    """Report migration status.
+
+    This is a single-node call that must be executed on the source node.
+
+    @type instance: L{objects.Instance}
+    @param instance: the instance that is being migrated
+    @rtype: L{objects.MigrationStatus}
+    @return: the status of the current migration (one of
+             L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
+             progress info that can be retrieved from the hypervisor
+
+    """
+    result = self._SingleNodeCall(node, "instance_get_migration_status",
+                                  [self._InstDict(instance)])
+    if not result.fail_msg and result.payload is not None:
+      result.payload = objects.MigrationStatus.FromDict(result.payload)
+    return result
+
   @_RpcTimeout(_TMO_NORMAL)
   def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
     """Reboots an instance.
index e852ca9..bdd534d 100644 (file)
@@ -579,13 +579,13 @@ class NodeHttpServer(http.server.HttpServer):
     return backend.AcceptInstance(instance, info, target)
 
   @staticmethod
-  def perspective_finalize_migration(params):
-    """Finalize the instance migration.
+  def perspective_instance_finalize_migration_dst(params):
+    """Finalize the instance migration on the destination node.
 
     """
     instance, info, success = params
     instance = objects.Instance.FromDict(instance)
-    return backend.FinalizeMigration(instance, info, success)
+    return backend.FinalizeMigrationDst(instance, info, success)
 
   @staticmethod
   def perspective_instance_migrate(params):
@@ -597,6 +597,23 @@ class NodeHttpServer(http.server.HttpServer):
     return backend.MigrateInstance(instance, target, live)
 
   @staticmethod
+  def perspective_instance_finalize_migration_src(params):
+    """Finalize the instance migration on the source node.
+
+    """
+    instance, success, live = params
+    instance = objects.Instance.FromDict(instance)
+    return backend.FinalizeMigrationSource(instance, success, live)
+
+  @staticmethod
+  def perspective_instance_get_migration_status(params):
+    """Reports migration status.
+
+    """
+    instance = objects.Instance.FromDict(params[0])
+    return backend.GetMigrationStatus(instance).ToDict()
+
+  @staticmethod
   def perspective_instance_reboot(params):
     """Reboot an instance.