Remove two unused RPC functions
[ganeti-local] / lib / rpc.py
index 7d6b266..c6e7cb2 100644 (file)
@@ -31,7 +31,6 @@
 # R0904: Too many public methods
 
 import os
 # R0904: Too many public methods
 
 import os
-import socket
 import logging
 import zlib
 import base64
 import logging
 import zlib
 import base64
@@ -43,7 +42,8 @@ from ganeti import serializer
 from ganeti import constants
 from ganeti import errors
 
 from ganeti import constants
 from ganeti import errors
 
-import ganeti.http.client
+# pylint has a bug here, doesn't see this import
+import ganeti.http.client  # pylint: disable-msg=W0611
 
 
 # Module level variable
 
 
 # Module level variable
@@ -56,10 +56,12 @@ def Init():
   Must be called before using any RPC function.
 
   """
   Must be called before using any RPC function.
 
   """
-  global _http_manager
+  global _http_manager # pylint: disable-msg=W0603
 
   assert not _http_manager, "RPC module initialized more than once"
 
 
   assert not _http_manager, "RPC module initialized more than once"
 
+  http.InitSsl()
+
   _http_manager = http.client.HttpClientManager()
 
 
   _http_manager = http.client.HttpClientManager()
 
 
@@ -69,7 +71,7 @@ def Shutdown():
   Must be called before quitting the program.
 
   """
   Must be called before quitting the program.
 
   """
-  global _http_manager
+  global _http_manager # pylint: disable-msg=W0603
 
   if _http_manager:
     _http_manager.Shutdown()
 
   if _http_manager:
     _http_manager.Shutdown()
@@ -83,75 +85,85 @@ class RpcResult(object):
   calls we can't raise an exception just because one one out of many
   failed, and therefore we use this class to encapsulate the result.
 
   calls we can't raise an exception just because one one out of many
   failed, and therefore we use this class to encapsulate the result.
 
-  @ivar data: the data payload, for successfull results, or None
-  @type failed: boolean
-  @ivar failed: whether the operation failed at RPC level (not
-      application level on the remote node)
+  @ivar data: the data payload, for successful results, or None
   @ivar call: the name of the RPC call
   @ivar node: the name of the node to which we made the call
   @ivar offline: whether the operation failed because the node was
       offline, as opposed to actual failure; offline=True will always
       imply failed=True, in order to allow simpler checking if
       the user doesn't care about the exact failure mode
   @ivar call: the name of the RPC call
   @ivar node: the name of the node to which we made the call
   @ivar offline: whether the operation failed because the node was
       offline, as opposed to actual failure; offline=True will always
       imply failed=True, in order to allow simpler checking if
       the user doesn't care about the exact failure mode
+  @ivar fail_msg: the error message if the call failed
 
   """
   def __init__(self, data=None, failed=False, offline=False,
                call=None, node=None):
 
   """
   def __init__(self, data=None, failed=False, offline=False,
                call=None, node=None):
-    self.failed = failed
     self.offline = offline
     self.call = call
     self.node = node
     self.offline = offline
     self.call = call
     self.node = node
+
     if offline:
     if offline:
-      self.failed = True
-      self.error = "Node is marked offline"
+      self.fail_msg = "Node is marked offline"
       self.data = self.payload = None
     elif failed:
       self.data = self.payload = None
     elif failed:
-      self.error = data
+      self.fail_msg = self._EnsureErr(data)
       self.data = self.payload = None
     else:
       self.data = data
       self.data = self.payload = None
     else:
       self.data = data
-      self.error = None
-      if isinstance(data, (tuple, list)) and len(data) == 2:
-        self.payload = data[1]
-      else:
+      if not isinstance(self.data, (tuple, list)):
+        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
+                         type(self.data))
         self.payload = None
         self.payload = None
+      elif len(data) != 2:
+        self.fail_msg = ("RPC layer error: invalid result length (%d), "
+                         "expected 2" % len(self.data))
+        self.payload = None
+      elif not self.data[0]:
+        self.fail_msg = self._EnsureErr(self.data[1])
+        self.payload = None
+      else:
+        # finally success
+        self.fail_msg = None
+        self.payload = data[1]
 
 
-  def Raise(self):
+    assert hasattr(self, "call")
+    assert hasattr(self, "data")
+    assert hasattr(self, "fail_msg")
+    assert hasattr(self, "node")
+    assert hasattr(self, "offline")
+    assert hasattr(self, "payload")
+
+  @staticmethod
+  def _EnsureErr(val):
+    """Helper to ensure we return a 'True' value for error."""
+    if val:
+      return val
+    else:
+      return "No error information"
+
+  def Raise(self, msg, prereq=False, ecode=None):
     """If the result has failed, raise an OpExecError.
 
     This is used so that LU code doesn't have to check for each
     result, but instead can call this function.
 
     """
     """If the result has failed, raise an OpExecError.
 
     This is used so that LU code doesn't have to check for each
     result, but instead can call this function.
 
     """
-    if self.failed:
-      raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
-                               (self.call, self.node, self.error))
-
-  def RemoteFailMsg(self):
-    """Check if the remote procedure failed.
-
-    This is valid only for RPC calls which return result of the form
-    (status, data | error_msg).
-
-    @return: empty string for succcess, otherwise an error message
-
-    """
-    def _EnsureErr(val):
-      """Helper to ensure we return a 'True' value for error."""
-      if val:
-        return val
-      else:
-        return "No error information"
+    if not self.fail_msg:
+      return
 
 
-    if self.failed:
-      return _EnsureErr(self.error)
-    if not isinstance(self.data, (tuple, list)):
-      return "Invalid result type (%s)" % type(self.data)
-    if len(self.data) != 2:
-      return "Invalid result length (%d), expected 2" % len(self.data)
-    if not self.data[0]:
-      return _EnsureErr(self.data[1])
-    return ""
+    if not msg: # one could pass None for default message
+      msg = ("Call '%s' to node '%s' has failed: %s" %
+             (self.call, self.node, self.fail_msg))
+    else:
+      msg = "%s: %s" % (msg, self.fail_msg)
+    if prereq:
+      ec = errors.OpPrereqError
+    else:
+      ec = errors.OpExecError
+    if ecode is not None:
+      args = (msg, prereq)
+    else:
+      args = (msg, )
+    raise ec(*args) # pylint: disable-msg=W0142
 
 
 class Client:
 
 
 class Client:
@@ -161,7 +173,7 @@ class Client:
   list of nodes, will contact (in parallel) all nodes, and return a
   dict of results (key: node name, value: result).
 
   list of nodes, will contact (in parallel) all nodes, and return a
   dict of results (key: node name, value: result).
 
-  One current bug is that generic failure is still signalled by
+  One current bug is that generic failure is still signaled by
   'False' result, which is not good. This overloading of values can
   cause bugs.
 
   'False' result, which is not good. This overloading of values can
   cause bugs.
 
@@ -173,8 +185,8 @@ class Client:
     self.nc = {}
 
     self._ssl_params = \
     self.nc = {}
 
     self._ssl_params = \
-      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
-                         ssl_cert_path=constants.SSL_CERT_FILE)
+      http.HttpSslParams(ssl_key_path=constants.NODED_CERT_FILE,
+                         ssl_cert_path=constants.NODED_CERT_FILE)
 
   def ConnectList(self, node_list, address_list=None):
     """Add a list of nodes to the target nodes.
 
   def ConnectList(self, node_list, address_list=None):
     """Add a list of nodes to the target nodes.
@@ -220,7 +232,7 @@ class Client:
     @return: List of RPC results
 
     """
     @return: List of RPC results
 
     """
-    assert _http_manager, "RPC module not intialized"
+    assert _http_manager, "RPC module not initialized"
 
     _http_manager.ExecRequests(self.nc.values())
 
 
     _http_manager.ExecRequests(self.nc.values())
 
@@ -246,6 +258,21 @@ class Client:
     return results
 
 
     return results
 
 
+def _EncodeImportExportIO(ieio, ieioargs):
+  """Encodes import/export I/O information.
+
+  """
+  if ieio == constants.IEIO_RAW_DISK:
+    assert len(ieioargs) == 1
+    return (ieioargs[0].ToDict(), )
+
+  if ieio == constants.IEIO_SCRIPT:
+    assert len(ieioargs) == 2
+    return (ieioargs[0].ToDict(), ieioargs[1])
+
+  return ieioargs
+
+
 class RpcRunner(object):
   """RPC runner class"""
 
 class RpcRunner(object):
   """RPC runner class"""
 
@@ -258,7 +285,7 @@ class RpcRunner(object):
 
     """
     self._cfg = cfg
 
     """
     self._cfg = cfg
-    self.port = utils.GetNodeDaemonPort()
+    self.port = utils.GetDaemonPort(constants.NODED)
 
   def _InstDict(self, instance, hvp=None, bep=None):
     """Convert the given instance to a dict.
 
   def _InstDict(self, instance, hvp=None, bep=None):
     """Convert the given instance to a dict.
@@ -269,9 +296,9 @@ class RpcRunner(object):
     @type instance: L{objects.Instance}
     @param instance: an Instance object
     @type hvp: dict or None
     @type instance: L{objects.Instance}
     @param instance: an Instance object
     @type hvp: dict or None
-    @param hvp: a dictionary with overriden hypervisor parameters
+    @param hvp: a dictionary with overridden hypervisor parameters
     @type bep: dict or None
     @type bep: dict or None
-    @param bep: a dictionary with overriden backend parameters
+    @param bep: a dictionary with overridden backend parameters
     @rtype: dict
     @return: the instance dict, with the hvparams filled with the
         cluster defaults
     @rtype: dict
     @return: the instance dict, with the hvparams filled with the
         cluster defaults
@@ -294,7 +321,7 @@ class RpcRunner(object):
   def _ConnectList(self, client, node_list, call):
     """Helper for computing node addresses.
 
   def _ConnectList(self, client, node_list, call):
     """Helper for computing node addresses.
 
-    @type client: L{Client}
+    @type client: L{ganeti.rpc.Client}
     @param client: a C{Client} instance
     @type node_list: list
     @param node_list: the node list we should connect
     @param client: a C{Client} instance
     @type node_list: list
     @param node_list: the node list we should connect
@@ -324,7 +351,7 @@ class RpcRunner(object):
   def _ConnectNode(self, client, node, call):
     """Helper for computing one node's address.
 
   def _ConnectNode(self, client, node, call):
     """Helper for computing one node's address.
 
-    @type client: L{Client}
+    @type client: L{ganeti.rpc.Client}
     @param client: a C{Client} instance
     @type node: str
     @param node: the node we should connect
     @param client: a C{Client} instance
     @type node: str
     @param node: the node we should connect
@@ -359,7 +386,7 @@ class RpcRunner(object):
 
     """
     body = serializer.DumpJson(args, indent=False)
 
     """
     body = serializer.DumpJson(args, indent=False)
-    c = Client(procedure, body, utils.GetNodeDaemonPort())
+    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
     c.ConnectList(node_list, address_list=address_list)
     return c.GetResults()
 
     c.ConnectList(node_list, address_list=address_list)
     return c.GetResults()
 
@@ -381,7 +408,7 @@ class RpcRunner(object):
 
     """
     body = serializer.DumpJson(args, indent=False)
 
     """
     body = serializer.DumpJson(args, indent=False)
-    c = Client(procedure, body, utils.GetNodeDaemonPort())
+    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
     c.ConnectNode(node)
     return c.GetResults()[node]
 
     c.ConnectNode(node)
     return c.GetResults()[node]
 
@@ -409,13 +436,13 @@ class RpcRunner(object):
   # Begin RPC calls
   #
 
   # Begin RPC calls
   #
 
-  def call_volume_list(self, node_list, vg_name):
+  def call_lv_list(self, node_list, vg_name):
     """Gets the logical volumes present in a given volume group.
 
     This is a multi-node call.
 
     """
     """Gets the logical volumes present in a given volume group.
 
     This is a multi-node call.
 
     """
-    return self._MultiNodeCall(node_list, "volume_list", [vg_name])
+    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
 
   def call_vg_list(self, node_list):
     """Gets the volume group list.
 
   def call_vg_list(self, node_list):
     """Gets the volume group list.
@@ -425,6 +452,33 @@ class RpcRunner(object):
     """
     return self._MultiNodeCall(node_list, "vg_list", [])
 
     """
     return self._MultiNodeCall(node_list, "vg_list", [])
 
+  def call_storage_list(self, node_list, su_name, su_args, name, fields):
+    """Get list of storage units.
+
+    This is a multi-node call.
+
+    """
+    return self._MultiNodeCall(node_list, "storage_list",
+                               [su_name, su_args, name, fields])
+
+  def call_storage_modify(self, node, su_name, su_args, name, changes):
+    """Modify a storage unit.
+
+    This is a single-node call.
+
+    """
+    return self._SingleNodeCall(node, "storage_modify",
+                                [su_name, su_args, name, changes])
+
+  def call_storage_execute(self, node, su_name, su_args, name, op):
+    """Executes an operation on a storage unit.
+
+    This is a single-node call.
+
+    """
+    return self._SingleNodeCall(node, "storage_execute",
+                                [su_name, su_args, name, op])
+
   def call_bridges_exist(self, node, bridges_list):
     """Checks if a node has all the bridges given.
 
   def call_bridges_exist(self, node, bridges_list):
     """Checks if a node has all the bridges given.
 
@@ -446,14 +500,14 @@ class RpcRunner(object):
     idict = self._InstDict(instance, hvp=hvp, bep=bep)
     return self._SingleNodeCall(node, "instance_start", [idict])
 
     idict = self._InstDict(instance, hvp=hvp, bep=bep)
     return self._SingleNodeCall(node, "instance_start", [idict])
 
-  def call_instance_shutdown(self, node, instance):
+  def call_instance_shutdown(self, node, instance, timeout):
     """Stops an instance.
 
     This is a single-node call.
 
     """
     return self._SingleNodeCall(node, "instance_shutdown",
     """Stops an instance.
 
     This is a single-node call.
 
     """
     return self._SingleNodeCall(node, "instance_shutdown",
-                                [self._InstDict(instance)])
+                                [self._InstDict(instance), timeout])
 
   def call_migration_info(self, node, instance):
     """Gather the information necessary to prepare an instance migration.
 
   def call_migration_info(self, node, instance):
     """Gather the information necessary to prepare an instance migration.
@@ -527,32 +581,33 @@ class RpcRunner(object):
     return self._SingleNodeCall(node, "instance_migrate",
                                 [self._InstDict(instance), target, live])
 
     return self._SingleNodeCall(node, "instance_migrate",
                                 [self._InstDict(instance), target, live])
 
-  def call_instance_reboot(self, node, instance, reboot_type):
+  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
     """Reboots an instance.
 
     This is a single-node call.
 
     """
     return self._SingleNodeCall(node, "instance_reboot",
     """Reboots an instance.
 
     This is a single-node call.
 
     """
     return self._SingleNodeCall(node, "instance_reboot",
-                                [self._InstDict(instance), reboot_type])
+                                [self._InstDict(inst), reboot_type,
+                                 shutdown_timeout])
 
 
-  def call_instance_os_add(self, node, inst, reinstall):
+  def call_instance_os_add(self, node, inst, reinstall, debug):
     """Installs an OS on the given instance.
 
     This is a single-node call.
 
     """
     return self._SingleNodeCall(node, "instance_os_add",
     """Installs an OS on the given instance.
 
     This is a single-node call.
 
     """
     return self._SingleNodeCall(node, "instance_os_add",
-                                [self._InstDict(inst), reinstall])
+                                [self._InstDict(inst), reinstall, debug])
 
 
-  def call_instance_run_rename(self, node, inst, old_name):
+  def call_instance_run_rename(self, node, inst, old_name, debug):
     """Run the OS rename script for an instance.
 
     This is a single-node call.
 
     """
     return self._SingleNodeCall(node, "instance_run_rename",
     """Run the OS rename script for an instance.
 
     This is a single-node call.
 
     """
     return self._SingleNodeCall(node, "instance_run_rename",
-                                [self._InstDict(inst), old_name])
+                                [self._InstDict(inst), old_name, debug])
 
   def call_instance_info(self, node, instance, hname):
     """Returns information about a single instance.
 
   def call_instance_info(self, node, instance, hname):
     """Returns information about a single instance.
@@ -647,25 +702,8 @@ class RpcRunner(object):
         memory information
 
     """
         memory information
 
     """
-    retux = self._MultiNodeCall(node_list, "node_info",
-                                [vg_name, hypervisor_type])
-
-    for result in retux.itervalues():
-      if result.failed or not isinstance(result.data, dict):
-        result.data = {}
-      if result.offline:
-        log_name = None
-      else:
-        log_name = "call_node_info"
-
-      utils.CheckDict(result.data, {
-        'memory_total' : '-',
-        'memory_dom0' : '-',
-        'memory_free' : '-',
-        'vg_size' : 'node_unreachable',
-        'vg_free' : '-',
-        }, log_name)
-    return retux
+    return self._MultiNodeCall(node_list, "node_info",
+                               [vg_name, hypervisor_type])
 
   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
     """Add a node to the cluster.
 
   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
     """Add a node to the cluster.
@@ -686,14 +724,14 @@ class RpcRunner(object):
                                [checkdict, cluster_name])
 
   @classmethod
                                [checkdict, cluster_name])
 
   @classmethod
-  def call_node_start_master(cls, node, start_daemons):
+  def call_node_start_master(cls, node, start_daemons, no_voting):
     """Tells a node to activate itself as a master.
 
     This is a single-node call.
 
     """
     return cls._StaticSingleNodeCall(node, "node_start_master",
     """Tells a node to activate itself as a master.
 
     This is a single-node call.
 
     """
     return cls._StaticSingleNodeCall(node, "node_start_master",
-                                     [start_daemons])
+                                     [start_daemons, no_voting])
 
   @classmethod
   def call_node_stop_master(cls, node, stop_daemons):
 
   @classmethod
   def call_node_stop_master(cls, node, stop_daemons):
@@ -714,13 +752,14 @@ class RpcRunner(object):
     # TODO: should this method query down nodes?
     return cls._StaticMultiNodeCall(node_list, "master_info", [])
 
     # TODO: should this method query down nodes?
     return cls._StaticMultiNodeCall(node_list, "master_info", [])
 
-  def call_version(self, node_list):
+  @classmethod
+  def call_version(cls, node_list):
     """Query node version.
 
     This is a multi-node call.
 
     """
     """Query node version.
 
     This is a multi-node call.
 
     """
-    return self._MultiNodeCall(node_list, "version", [])
+    return cls._StaticMultiNodeCall(node_list, "version", [])
 
   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
     """Request creation of a given block device.
 
   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
     """Request creation of a given block device.
@@ -791,8 +830,12 @@ class RpcRunner(object):
     This is a single-node call.
 
     """
     This is a single-node call.
 
     """
-    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
-                                [dsk.ToDict() for dsk in disks])
+    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
+                                  [dsk.ToDict() for dsk in disks])
+    if not result.fail_msg:
+      result.payload = [objects.BlockDevStatus.FromDict(i)
+                        for i in result.payload]
+    return result
 
   def call_blockdev_find(self, node, disk):
     """Request identification of a given block device.
 
   def call_blockdev_find(self, node, disk):
     """Request identification of a given block device.
@@ -800,7 +843,10 @@ class RpcRunner(object):
     This is a single-node call.
 
     """
     This is a single-node call.
 
     """
-    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
+    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
+    if not result.fail_msg and result.payload is not None:
+      result.payload = objects.BlockDevStatus.FromDict(result.payload)
+    return result
 
   def call_blockdev_close(self, node, instance_name, disks):
     """Closes the given block devices.
 
   def call_blockdev_close(self, node, instance_name, disks):
     """Closes the given block devices.
@@ -811,6 +857,15 @@ class RpcRunner(object):
     params = [instance_name, [cf.ToDict() for cf in disks]]
     return self._SingleNodeCall(node, "blockdev_close", params)
 
     params = [instance_name, [cf.ToDict() for cf in disks]]
     return self._SingleNodeCall(node, "blockdev_close", params)
 
+  def call_blockdev_getsizes(self, node, disks):
+    """Returns the size of the given disks.
+
+    This is a single-node call.
+
+    """
+    params = [[cf.ToDict() for cf in disks]]
+    return self._SingleNodeCall(node, "blockdev_getsize", params)
+
   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
     """Disconnects the network of the given drbd devices.
 
   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
     """Disconnects the network of the given drbd devices.
 
@@ -881,13 +936,7 @@ class RpcRunner(object):
     This is a multi-node call.
 
     """
     This is a multi-node call.
 
     """
-    result = self._MultiNodeCall(node_list, "os_diagnose", [])
-
-    for node_result in result.values():
-      if not node_result.failed and node_result.data:
-        node_result.data = [objects.OS.FromDict(oss)
-                            for oss in node_result.data]
-    return result
+    return self._MultiNodeCall(node_list, "os_diagnose", [])
 
   def call_os_get(self, node, name):
     """Returns an OS definition.
 
   def call_os_get(self, node, name):
     """Returns an OS definition.
@@ -896,8 +945,8 @@ class RpcRunner(object):
 
     """
     result = self._SingleNodeCall(node, "os_get", [name])
 
     """
     result = self._SingleNodeCall(node, "os_get", [name])
-    if not result.failed and isinstance(result.data, dict):
-      result.data = objects.OS.FromDict(result.data)
+    if not result.fail_msg and isinstance(result.payload, dict):
+      result.payload = objects.OS.FromDict(result.payload)
     return result
 
   def call_hooks_runner(self, node_list, hpath, phase, env):
     return result
 
   def call_hooks_runner(self, node_list, hpath, phase, env):
@@ -934,24 +983,24 @@ class RpcRunner(object):
     return self._SingleNodeCall(node, "blockdev_grow",
                                 [cf_bdev.ToDict(), amount])
 
     return self._SingleNodeCall(node, "blockdev_grow",
                                 [cf_bdev.ToDict(), amount])
 
-  def call_blockdev_snapshot(self, node, cf_bdev):
-    """Request a snapshot of the given block device.
+  def call_blockdev_export(self, node, cf_bdev,
+                           dest_node, dest_path, cluster_name):
+    """Export a given disk to another node.
 
     This is a single-node call.
 
     """
 
     This is a single-node call.
 
     """
-    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
+    return self._SingleNodeCall(node, "blockdev_export",
+                                [cf_bdev.ToDict(), dest_node, dest_path,
+                                 cluster_name])
 
 
-  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
-                           cluster_name, idx):
-    """Request the export of a given snapshot.
+  def call_blockdev_snapshot(self, node, cf_bdev):
+    """Request a snapshot of the given block device.
 
     This is a single-node call.
 
     """
 
     This is a single-node call.
 
     """
-    return self._SingleNodeCall(node, "snapshot_export",
-                                [snap_bdev.ToDict(), dest_node,
-                                 self._InstDict(instance), cluster_name, idx])
+    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
 
   def call_finalize_export(self, node, instance, snap_disks):
     """Request the completion of an export operation.
 
   def call_finalize_export(self, node, instance, snap_disks):
     """Request the completion of an export operation.
@@ -963,7 +1012,10 @@ class RpcRunner(object):
     """
     flat_disks = []
     for disk in snap_disks:
     """
     flat_disks = []
     for disk in snap_disks:
-      flat_disks.append(disk.ToDict())
+      if isinstance(disk, bool):
+        flat_disks.append(disk)
+      else:
+        flat_disks.append(disk.ToDict())
 
     return self._SingleNodeCall(node, "finalize_export",
                                 [self._InstDict(instance), flat_disks])
 
     return self._SingleNodeCall(node, "finalize_export",
                                 [self._InstDict(instance), flat_disks])
@@ -976,17 +1028,6 @@ class RpcRunner(object):
     """
     return self._SingleNodeCall(node, "export_info", [path])
 
     """
     return self._SingleNodeCall(node, "export_info", [path])
 
-  def call_instance_os_import(self, node, inst, src_node, src_images,
-                              cluster_name):
-    """Request the import of a backup into an instance.
-
-    This is a single-node call.
-
-    """
-    return self._SingleNodeCall(node, "instance_os_import",
-                                [self._InstDict(inst), src_node, src_images,
-                                 cluster_name])
-
   def call_export_list(self, node_list):
     """Gets the stored exports list.
 
   def call_export_list(self, node_list):
     """Gets the stored exports list.
 
@@ -1004,7 +1045,7 @@ class RpcRunner(object):
     return self._SingleNodeCall(node, "export_remove", [export])
 
   @classmethod
     return self._SingleNodeCall(node, "export_remove", [export])
 
   @classmethod
-  def call_node_leave_cluster(cls, node):
+  def call_node_leave_cluster(cls, node, modify_ssh_setup):
     """Requests a node to clean the cluster information it has.
 
     This will remove the configuration information from the ganeti data
     """Requests a node to clean the cluster information it has.
 
     This will remove the configuration information from the ganeti data
@@ -1013,7 +1054,8 @@ class RpcRunner(object):
     This is a single-node call.
 
     """
     This is a single-node call.
 
     """
-    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
+    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
+                                     [modify_ssh_setup])
 
   def call_node_volumes(self, node_list):
     """Gets all volumes on node(s).
 
   def call_node_volumes(self, node_list):
     """Gets all volumes on node(s).
@@ -1031,7 +1073,6 @@ class RpcRunner(object):
     """
     return self._SingleNodeCall(node, "node_demote_from_mc", [])
 
     """
     return self._SingleNodeCall(node, "node_demote_from_mc", [])
 
-
   def call_node_powercycle(self, node, hypervisor):
     """Tries to powercycle a node.
 
   def call_node_powercycle(self, node, hypervisor):
     """Tries to powercycle a node.
 
@@ -1040,7 +1081,6 @@ class RpcRunner(object):
     """
     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
 
     """
     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
 
-
   def call_test_delay(self, node_list, duration):
     """Sleep for a fixed time on given node(s).
 
   def call_test_delay(self, node_list, duration):
     """Sleep for a fixed time on given node(s).
 
@@ -1139,3 +1179,114 @@ class RpcRunner(object):
     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
                                [hvname, hv_full])
     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
                                [hvname, hv_full])
+
+  def call_x509_cert_create(self, node, validity):
+    """Creates a new X509 certificate for SSL/TLS.
+
+    This is a single-node call.
+
+    @type validity: int
+    @param validity: Validity in seconds
+
+    """
+    return self._SingleNodeCall(node, "x509_cert_create", [validity])
+
+  def call_x509_cert_remove(self, node, name):
+    """Removes a X509 certificate.
+
+    This is a single-node call.
+
+    @type name: string
+    @param name: Certificate name
+
+    """
+    return self._SingleNodeCall(node, "x509_cert_remove", [name])
+
+  def call_import_start(self, node, x509_key_name, source_x509_ca,
+                        instance, dest, dest_args):
+    """Starts a listener for an import.
+
+    This is a single-node call.
+
+    @type node: string
+    @param node: Node name
+    @type instance: C{objects.Instance}
+    @param instance: Instance object
+
+    """
+    return self._SingleNodeCall(node, "import_start",
+                                [x509_key_name, source_x509_ca,
+                                 self._InstDict(instance), dest,
+                                 _EncodeImportExportIO(dest, dest_args)])
+
+  def call_export_start(self, node, x509_key_name, dest_x509_ca, host, port,
+                        instance, source, source_args):
+    """Starts an export daemon.
+
+    This is a single-node call.
+
+    @type node: string
+    @param node: Node name
+    @type instance: C{objects.Instance}
+    @param instance: Instance object
+
+    """
+    return self._SingleNodeCall(node, "export_start",
+                                [x509_key_name, dest_x509_ca, host, port,
+                                 self._InstDict(instance), source,
+                                 _EncodeImportExportIO(source, source_args)])
+
+  def call_impexp_status(self, node, names):
+    """Gets the status of an import or export.
+
+    This is a single-node call.
+
+    @type node: string
+    @param node: Node name
+    @type names: List of strings
+    @param names: Import/export names
+    @rtype: List of L{objects.ImportExportStatus} instances
+    @return: Returns a list of the state of each named import/export or None if
+             a status couldn't be retrieved
+
+    """
+    result = self._SingleNodeCall(node, "impexp_status", [names])
+
+    if not result.fail_msg:
+      decoded = []
+
+      for i in result.payload:
+        if i is None:
+          decoded.append(None)
+          continue
+        decoded.append(objects.ImportExportStatus.FromDict(i))
+
+      result.payload = decoded
+
+    return result
+
+  def call_impexp_abort(self, node, name):
+    """Aborts an import or export.
+
+    This is a single-node call.
+
+    @type node: string
+    @param node: Node name
+    @type name: string
+    @param name: Import/export name
+
+    """
+    return self._SingleNodeCall(node, "impexp_abort", [name])
+
+  def call_impexp_cleanup(self, node, name):
+    """Cleans up after an import or export.
+
+    This is a single-node call.
+
+    @type node: string
+    @param node: Node name
+    @type name: string
+    @param name: Import/export name
+
+    """
+    return self._SingleNodeCall(node, "impexp_cleanup", [name])