Merge branch 'stable-2.6' into devel-2.6
[ganeti-local] / lib / rpc.py
index e46df3e..b06c3c7 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
 #
 #
 
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 
 """
 
 
 """
 
-# pylint: disable-msg=C0103,R0201,R0904
+# pylint: disable=C0103,R0201,R0904
 # C0103: Invalid name, since call_ are not valid
 # R0201: Method could be a function, we keep all rpcs instance methods
 # as not to change them back and forth between static/instance methods
 # if they need to start using instance attributes
 # R0904: Too many public methods
 
 # C0103: Invalid name, since call_ are not valid
 # R0201: Method could be a function, we keep all rpcs instance methods
 # as not to change them back and forth between static/instance methods
 # if they need to start using instance attributes
 # R0904: Too many public methods
 
-import os
-import socket
 import logging
 import zlib
 import base64
 import logging
 import zlib
 import base64
+import pycurl
+import threading
 
 from ganeti import utils
 from ganeti import objects
 
 from ganeti import utils
 from ganeti import objects
@@ -42,38 +42,116 @@ from ganeti import http
 from ganeti import serializer
 from ganeti import constants
 from ganeti import errors
 from ganeti import serializer
 from ganeti import constants
 from ganeti import errors
+from ganeti import netutils
+from ganeti import ssconf
+from ganeti import runtime
+from ganeti import compat
+from ganeti import rpc_defs
 
 
-import ganeti.http.client
+# Special module generated at build time
+from ganeti import _generated_rpc
 
 
+# pylint has a bug here, doesn't see this import
+import ganeti.http.client  # pylint: disable=W0611
 
 
-# Module level variable
-_http_manager = None
+
+# Timeout for connecting to nodes (seconds)
+_RPC_CONNECT_TIMEOUT = 5
+
+_RPC_CLIENT_HEADERS = [
+  "Content-type: %s" % http.HTTP_APP_JSON,
+  "Expect:",
+  ]
+
+# Various time constants for the timeout table
+_TMO_URGENT = 60 # one minute
+_TMO_FAST = 5 * 60 # five minutes
+_TMO_NORMAL = 15 * 60 # 15 minutes
+_TMO_SLOW = 3600 # one hour
+_TMO_4HRS = 4 * 3600
+_TMO_1DAY = 86400
+
+#: Special value to describe an offline host
+_OFFLINE = object()
 
 
 def Init():
   """Initializes the module-global HTTP client manager.
 
 
 
 def Init():
   """Initializes the module-global HTTP client manager.
 
-  Must be called before using any RPC function.
+  Must be called before using any RPC function and while exactly one thread is
+  running.
 
   """
 
   """
-  global _http_manager
+  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
+  # one thread running. This check is just a safety measure -- it doesn't
+  # cover all cases.
+  assert threading.activeCount() == 1, \
+         "Found more than one active thread when initializing pycURL"
 
 
-  assert not _http_manager, "RPC module initialized more than once"
+  logging.info("Using PycURL %s", pycurl.version)
 
 
-  _http_manager = http.client.HttpClientManager()
+  pycurl.global_init(pycurl.GLOBAL_ALL)
 
 
 def Shutdown():
   """Stops the module-global HTTP client manager.
 
 
 
 def Shutdown():
   """Stops the module-global HTTP client manager.
 
-  Must be called before quitting the program.
+  Must be called before quitting the program and while exactly one thread is
+  running.
 
   """
 
   """
-  global _http_manager
+  pycurl.global_cleanup()
+
 
 
-  if _http_manager:
-    _http_manager.Shutdown()
-    _http_manager = None
+def _ConfigRpcCurl(curl):
+  noded_cert = str(constants.NODED_CERT_FILE)
+
+  curl.setopt(pycurl.FOLLOWLOCATION, False)
+  curl.setopt(pycurl.CAINFO, noded_cert)
+  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
+  curl.setopt(pycurl.SSL_VERIFYPEER, True)
+  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
+  curl.setopt(pycurl.SSLCERT, noded_cert)
+  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
+  curl.setopt(pycurl.SSLKEY, noded_cert)
+  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
+
+
+def RunWithRPC(fn):
+  """RPC-wrapper decorator.
+
+  When applied to a function, it runs it with the RPC system
+  initialized, and it shutsdown the system afterwards. This means the
+  function must be called without RPC being initialized.
+
+  """
+  def wrapper(*args, **kwargs):
+    Init()
+    try:
+      return fn(*args, **kwargs)
+    finally:
+      Shutdown()
+  return wrapper
+
+
+def _Compress(data):
+  """Compresses a string for transport over RPC.
+
+  Small amounts of data are not compressed.
+
+  @type data: str
+  @param data: Data
+  @rtype: tuple
+  @return: Encoded data to send
+
+  """
+  # Small amounts of data are not compressed
+  if len(data) < 512:
+    return (constants.RPC_ENCODING_NONE, data)
+
+  # Compress with zlib and encode in base64
+  return (constants.RPC_ENCODING_ZLIB_BASE64,
+          base64.b64encode(zlib.compress(data, 3)))
 
 
 class RpcResult(object):
 
 
 class RpcResult(object):
@@ -83,47 +161,50 @@ class RpcResult(object):
   calls we can't raise an exception just because one one out of many
   failed, and therefore we use this class to encapsulate the result.
 
   calls we can't raise an exception just because one one out of many
   failed, and therefore we use this class to encapsulate the result.
 
-  @ivar data: the data payload, for successfull results, or None
-  @type failed: boolean
-  @ivar failed: whether the operation failed at transport level (not
-      application level on the remote node)
+  @ivar data: the data payload, for successful results, or None
   @ivar call: the name of the RPC call
   @ivar node: the name of the node to which we made the call
   @ivar offline: whether the operation failed because the node was
       offline, as opposed to actual failure; offline=True will always
       imply failed=True, in order to allow simpler checking if
       the user doesn't care about the exact failure mode
   @ivar call: the name of the RPC call
   @ivar node: the name of the node to which we made the call
   @ivar offline: whether the operation failed because the node was
       offline, as opposed to actual failure; offline=True will always
       imply failed=True, in order to allow simpler checking if
       the user doesn't care about the exact failure mode
-  @ivar error: the error message if the call failed
+  @ivar fail_msg: the error message if the call failed
 
   """
   def __init__(self, data=None, failed=False, offline=False,
                call=None, node=None):
 
   """
   def __init__(self, data=None, failed=False, offline=False,
                call=None, node=None):
-    self.failed = failed
     self.offline = offline
     self.call = call
     self.node = node
     self.offline = offline
     self.call = call
     self.node = node
+
     if offline:
     if offline:
-      self.failed = True
-      self.error = "Node is marked offline"
+      self.fail_msg = "Node is marked offline"
       self.data = self.payload = None
     elif failed:
       self.data = self.payload = None
     elif failed:
-      self.error = self._EnsureErr(data)
+      self.fail_msg = self._EnsureErr(data)
       self.data = self.payload = None
     else:
       self.data = data
       if not isinstance(self.data, (tuple, list)):
       self.data = self.payload = None
     else:
       self.data = data
       if not isinstance(self.data, (tuple, list)):
-        self.error = ("RPC layer error: invalid result type (%s)" %
-                      type(self.data))
+        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
+                         type(self.data))
+        self.payload = None
       elif len(data) != 2:
       elif len(data) != 2:
-        self.error = ("RPC layer error: invalid result length (%d), "
-                      "expected 2" % len(self.data))
+        self.fail_msg = ("RPC layer error: invalid result length (%d), "
+                         "expected 2" % len(self.data))
+        self.payload = None
       elif not self.data[0]:
       elif not self.data[0]:
-        self.error = self._EnsureErr(self.data[1])
+        self.fail_msg = self._EnsureErr(self.data[1])
+        self.payload = None
       else:
         # finally success
       else:
         # finally success
-        self.error = None
+        self.fail_msg = None
         self.payload = data[1]
 
         self.payload = data[1]
 
+    for attr_name in ["call", "data", "fail_msg",
+                      "node", "offline", "payload"]:
+      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
+
   @staticmethod
   def _EnsureErr(val):
     """Helper to ensure we return a 'True' value for error."""
   @staticmethod
   def _EnsureErr(val):
     """Helper to ensure we return a 'True' value for error."""
@@ -132,985 +213,626 @@ class RpcResult(object):
     else:
       return "No error information"
 
     else:
       return "No error information"
 
-  def Raise(self):
+  def Raise(self, msg, prereq=False, ecode=None):
     """If the result has failed, raise an OpExecError.
 
     This is used so that LU code doesn't have to check for each
     result, but instead can call this function.
 
     """
     """If the result has failed, raise an OpExecError.
 
     This is used so that LU code doesn't have to check for each
     result, but instead can call this function.
 
     """
-    if self.failed:
-      raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
-                               (self.call, self.node, self.error))
-
-  def RemoteFailMsg(self):
-    """Check if the remote procedure failed.
-
-    @return: the fail_msg attribute
-
-    """
-    return self.error
+    if not self.fail_msg:
+      return
 
 
-
-class Client:
-  """RPC Client class.
-
-  This class, given a (remote) method name, a list of parameters and a
-  list of nodes, will contact (in parallel) all nodes, and return a
-  dict of results (key: node name, value: result).
-
-  One current bug is that generic failure is still signalled by
-  'False' result, which is not good. This overloading of values can
-  cause bugs.
+    if not msg: # one could pass None for default message
+      msg = ("Call '%s' to node '%s' has failed: %s" %
+             (self.call, self.node, self.fail_msg))
+    else:
+      msg = "%s: %s" % (msg, self.fail_msg)
+    if prereq:
+      ec = errors.OpPrereqError
+    else:
+      ec = errors.OpExecError
+    if ecode is not None:
+      args = (msg, ecode)
+    else:
+      args = (msg, )
+    raise ec(*args) # pylint: disable=W0142
+
+
+def _SsconfResolver(ssconf_ips, node_list, _,
+                    ssc=ssconf.SimpleStore,
+                    nslookup_fn=netutils.Hostname.GetIP):
+  """Return addresses for given node names.
+
+  @type ssconf_ips: bool
+  @param ssconf_ips: Use the ssconf IPs
+  @type node_list: list
+  @param node_list: List of node names
+  @type ssc: class
+  @param ssc: SimpleStore class that is used to obtain node->ip mappings
+  @type nslookup_fn: callable
+  @param nslookup_fn: function use to do NS lookup
+  @rtype: list of tuple; (string, string)
+  @return: List of tuples containing node name and IP address
 
   """
 
   """
-  def __init__(self, procedure, body, port):
-    self.procedure = procedure
-    self.body = body
-    self.port = port
-    self.nc = {}
-
-    self._ssl_params = \
-      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
-                         ssl_cert_path=constants.SSL_CERT_FILE)
+  ss = ssc()
+  family = ss.GetPrimaryIPFamily()
 
 
-  def ConnectList(self, node_list, address_list=None):
-    """Add a list of nodes to the target nodes.
+  if ssconf_ips:
+    iplist = ss.GetNodePrimaryIPList()
+    ipmap = dict(entry.split() for entry in iplist)
+  else:
+    ipmap = {}
 
 
-    @type node_list: list
-    @param node_list: the list of node names to connect
-    @type address_list: list or None
-    @keyword address_list: either None or a list with node addresses,
-        which must have the same length as the node list
+  result = []
+  for node in node_list:
+    ip = ipmap.get(node)
+    if ip is None:
+      ip = nslookup_fn(node, family=family)
+    result.append((node, ip))
 
 
-    """
-    if address_list is None:
-      address_list = [None for _ in node_list]
-    else:
-      assert len(node_list) == len(address_list), \
-             "Name and address lists should have the same length"
-    for node, address in zip(node_list, address_list):
-      self.ConnectNode(node, address)
+  return result
 
 
-  def ConnectNode(self, name, address=None):
-    """Add a node to the target list.
 
 
-    @type name: str
-    @param name: the node name
-    @type address: str
-    @keyword address: the node address, if known
+class _StaticResolver:
+  def __init__(self, addresses):
+    """Initializes this class.
 
     """
 
     """
-    if address is None:
-      address = name
+    self._addresses = addresses
 
 
-    self.nc[name] = \
-      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
-                                    "/%s" % self.procedure,
-                                    post_data=self.body,
-                                    ssl_params=self._ssl_params,
-                                    ssl_verify_peer=True)
-
-  def GetResults(self):
-    """Call nodes and return results.
-
-    @rtype: list
-    @return: List of RPC results
+  def __call__(self, hosts, _):
+    """Returns static addresses for hosts.
 
     """
 
     """
-    assert _http_manager, "RPC module not intialized"
-
-    _http_manager.ExecRequests(self.nc.values())
-
-    results = {}
+    assert len(hosts) == len(self._addresses)
+    return zip(hosts, self._addresses)
 
 
-    for name, req in self.nc.iteritems():
-      if req.success and req.resp_status_code == http.HTTP_OK:
-        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
-                                  node=name, call=self.procedure)
-        continue
 
 
-      # TODO: Better error reporting
-      if req.error:
-        msg = req.error
-      else:
-        msg = req.resp_body
+def _CheckConfigNode(name, node, accept_offline_node):
+  """Checks if a node is online.
 
 
-      logging.error("RPC error in %s from node %s: %s",
-                    self.procedure, name, msg)
-      results[name] = RpcResult(data=msg, failed=True, node=name,
-                                call=self.procedure)
+  @type name: string
+  @param name: Node name
+  @type node: L{objects.Node} or None
+  @param node: Node object
 
 
-    return results
+  """
+  if node is None:
+    # Depend on DNS for name resolution
+    ip = name
+  elif node.offline and not accept_offline_node:
+    ip = _OFFLINE
+  else:
+    ip = node.primary_ip
+  return (name, ip)
 
 
 
 
-class RpcRunner(object):
-  """RPC runner class"""
+def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
+  """Calculate node addresses using configuration.
 
 
-  def __init__(self, cfg):
-    """Initialized the rpc runner.
+  """
+  accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
 
 
-    @type cfg:  C{config.ConfigWriter}
-    @param cfg: the configuration object that will be used to get data
-                about the cluster
+  assert accept_offline_node or opts is None, "Unknown option"
 
 
-    """
-    self._cfg = cfg
-    self.port = utils.GetNodeDaemonPort()
+  # Special case for single-host lookups
+  if len(hosts) == 1:
+    (name, ) = hosts
+    return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
+  else:
+    all_nodes = all_nodes_fn()
+    return [_CheckConfigNode(name, all_nodes.get(name, None),
+                             accept_offline_node)
+            for name in hosts]
 
 
-  def _InstDict(self, instance, hvp=None, bep=None):
-    """Convert the given instance to a dict.
 
 
-    This is done via the instance's ToDict() method and additionally
-    we fill the hvparams with the cluster defaults.
+class _RpcProcessor:
+  def __init__(self, resolver, port, lock_monitor_cb=None):
+    """Initializes this class.
 
 
-    @type instance: L{objects.Instance}
-    @param instance: an Instance object
-    @type hvp: dict or None
-    @param hvp: a dictionary with overriden hypervisor parameters
-    @type bep: dict or None
-    @param bep: a dictionary with overriden backend parameters
-    @rtype: dict
-    @return: the instance dict, with the hvparams filled with the
-        cluster defaults
+    @param resolver: callable accepting a list of hostnames, returning a list
+      of tuples containing name and IP address (IP address can be the name or
+      the special value L{_OFFLINE} to mark offline machines)
+    @type port: int
+    @param port: TCP port
+    @param lock_monitor_cb: Callable for registering with lock monitor
 
     """
 
     """
-    idict = instance.ToDict()
-    cluster = self._cfg.GetClusterInfo()
-    idict["hvparams"] = cluster.FillHV(instance)
-    if hvp is not None:
-      idict["hvparams"].update(hvp)
-    idict["beparams"] = cluster.FillBE(instance)
-    if bep is not None:
-      idict["beparams"].update(bep)
-    for nic in idict["nics"]:
-      nic['nicparams'] = objects.FillDict(
-        cluster.nicparams[constants.PP_DEFAULT],
-        nic['nicparams'])
-    return idict
+    self._resolver = resolver
+    self._port = port
+    self._lock_monitor_cb = lock_monitor_cb
 
 
-  def _ConnectList(self, client, node_list, call):
-    """Helper for computing node addresses.
+  @staticmethod
+  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
+    """Prepares requests by sorting offline hosts into separate list.
 
 
-    @type client: L{Client}
-    @param client: a C{Client} instance
-    @type node_list: list
-    @param node_list: the node list we should connect
-    @type call: string
-    @param call: the name of the remote procedure call, for filling in
-        correctly any eventual offline nodes' results
+    @type body: dict
+    @param body: a dictionary with per-host body data
 
     """
 
     """
-    all_nodes = self._cfg.GetAllNodesInfo()
-    name_list = []
-    addr_list = []
-    skip_dict = {}
-    for node in node_list:
-      if node in all_nodes:
-        if all_nodes[node].offline:
-          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
-          continue
-        val = all_nodes[node].primary_ip
+    results = {}
+    requests = {}
+
+    assert isinstance(body, dict)
+    assert len(body) == len(hosts)
+    assert compat.all(isinstance(v, str) for v in body.values())
+    assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
+        "%s != %s" % (hosts, body.keys())
+
+    for (name, ip) in hosts:
+      if ip is _OFFLINE:
+        # Node is marked as offline
+        results[name] = RpcResult(node=name, offline=True, call=procedure)
       else:
       else:
-        val = None
-      addr_list.append(val)
-      name_list.append(node)
-    if name_list:
-      client.ConnectList(name_list, address_list=addr_list)
-    return skip_dict
-
-  def _ConnectNode(self, client, node, call):
-    """Helper for computing one node's address.
-
-    @type client: L{Client}
-    @param client: a C{Client} instance
-    @type node: str
-    @param node: the node we should connect
-    @type call: string
-    @param call: the name of the remote procedure call, for filling in
-        correctly any eventual offline nodes' results
-
-    """
-    node_info = self._cfg.GetNodeInfo(node)
-    if node_info is not None:
-      if node_info.offline:
-        return RpcResult(node=node, offline=True, call=call)
-      addr = node_info.primary_ip
-    else:
-      addr = None
-    client.ConnectNode(node, address=addr)
-
-  def _MultiNodeCall(self, node_list, procedure, args):
-    """Helper for making a multi-node call
-
-    """
-    body = serializer.DumpJson(args, indent=False)
-    c = Client(procedure, body, self.port)
-    skip_dict = self._ConnectList(c, node_list, procedure)
-    skip_dict.update(c.GetResults())
-    return skip_dict
-
-  @classmethod
-  def _StaticMultiNodeCall(cls, node_list, procedure, args,
-                           address_list=None):
-    """Helper for making a multi-node static call
+        requests[name] = \
+          http.client.HttpClientRequest(str(ip), port,
+                                        http.HTTP_POST, str("/%s" % procedure),
+                                        headers=_RPC_CLIENT_HEADERS,
+                                        post_data=body[name],
+                                        read_timeout=read_timeout,
+                                        nicename="%s/%s" % (name, procedure),
+                                        curl_config_fn=_ConfigRpcCurl)
 
 
-    """
-    body = serializer.DumpJson(args, indent=False)
-    c = Client(procedure, body, utils.GetNodeDaemonPort())
-    c.ConnectList(node_list, address_list=address_list)
-    return c.GetResults()
-
-  def _SingleNodeCall(self, node, procedure, args):
-    """Helper for making a single-node call
-
-    """
-    body = serializer.DumpJson(args, indent=False)
-    c = Client(procedure, body, self.port)
-    result = self._ConnectNode(c, node, procedure)
-    if result is None:
-      # we did connect, node is not offline
-      result = c.GetResults()[node]
-    return result
-
-  @classmethod
-  def _StaticSingleNodeCall(cls, node, procedure, args):
-    """Helper for making a single-node static call
-
-    """
-    body = serializer.DumpJson(args, indent=False)
-    c = Client(procedure, body, utils.GetNodeDaemonPort())
-    c.ConnectNode(node)
-    return c.GetResults()[node]
+    return (results, requests)
 
   @staticmethod
 
   @staticmethod
-  def _Compress(data):
-    """Compresses a string for transport over RPC.
-
-    Small amounts of data are not compressed.
-
-    @type data: str
-    @param data: Data
-    @rtype: tuple
-    @return: Encoded data to send
-
-    """
-    # Small amounts of data are not compressed
-    if len(data) < 512:
-      return (constants.RPC_ENCODING_NONE, data)
-
-    # Compress with zlib and encode in base64
-    return (constants.RPC_ENCODING_ZLIB_BASE64,
-            base64.b64encode(zlib.compress(data, 3)))
-
-  #
-  # Begin RPC calls
-  #
-
-  def call_volume_list(self, node_list, vg_name):
-    """Gets the logical volumes present in a given volume group.
-
-    This is a multi-node call.
-
-    """
-    return self._MultiNodeCall(node_list, "volume_list", [vg_name])
-
-  def call_vg_list(self, node_list):
-    """Gets the volume group list.
-
-    This is a multi-node call.
-
-    """
-    return self._MultiNodeCall(node_list, "vg_list", [])
-
-  def call_bridges_exist(self, node, bridges_list):
-    """Checks if a node has all the bridges given.
-
-    This method checks if all bridges given in the bridges_list are
-    present on the remote node, so that an instance that uses interfaces
-    on those bridges can be started.
-
-    This is a single-node call.
-
-    """
-    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
-
-  def call_instance_start(self, node, instance, hvp, bep):
-    """Starts an instance.
-
-    This is a single-node call.
-
-    """
-    idict = self._InstDict(instance, hvp=hvp, bep=bep)
-    return self._SingleNodeCall(node, "instance_start", [idict])
-
-  def call_instance_shutdown(self, node, instance):
-    """Stops an instance.
-
-    This is a single-node call.
-
-    """
-    return self._SingleNodeCall(node, "instance_shutdown",
-                                [self._InstDict(instance)])
-
-  def call_migration_info(self, node, instance):
-    """Gather the information necessary to prepare an instance migration.
-
-    This is a single-node call.
-
-    @type node: string
-    @param node: the node on which the instance is currently running
-    @type instance: C{objects.Instance}
-    @param instance: the instance definition
-
-    """
-    return self._SingleNodeCall(node, "migration_info",
-                                [self._InstDict(instance)])
-
-  def call_accept_instance(self, node, instance, info, target):
-    """Prepare a node to accept an instance.
-
-    This is a single-node call.
-
-    @type node: string
-    @param node: the target node for the migration
-    @type instance: C{objects.Instance}
-    @param instance: the instance definition
-    @type info: opaque/hypervisor specific (string/data)
-    @param info: result for the call_migration_info call
-    @type target: string
-    @param target: target hostname (usually ip address) (on the node itself)
-
-    """
-    return self._SingleNodeCall(node, "accept_instance",
-                                [self._InstDict(instance), info, target])
-
-  def call_finalize_migration(self, node, instance, info, success):
-    """Finalize any target-node migration specific operation.
-
-    This is called both in case of a successful migration and in case of error
-    (in which case it should abort the migration).
-
-    This is a single-node call.
-
-    @type node: string
-    @param node: the target node for the migration
-    @type instance: C{objects.Instance}
-    @param instance: the instance definition
-    @type info: opaque/hypervisor specific (string/data)
-    @param info: result for the call_migration_info call
-    @type success: boolean
-    @param success: whether the migration was a success or a failure
-
-    """
-    return self._SingleNodeCall(node, "finalize_migration",
-                                [self._InstDict(instance), info, success])
-
-  def call_instance_migrate(self, node, instance, target, live):
-    """Migrate an instance.
-
-    This is a single-node call.
-
-    @type node: string
-    @param node: the node on which the instance is currently running
-    @type instance: C{objects.Instance}
-    @param instance: the instance definition
-    @type target: string
-    @param target: the target node name
-    @type live: boolean
-    @param live: whether the migration should be done live or not (the
-        interpretation of this parameter is left to the hypervisor)
-
-    """
-    return self._SingleNodeCall(node, "instance_migrate",
-                                [self._InstDict(instance), target, live])
-
-  def call_instance_reboot(self, node, instance, reboot_type):
-    """Reboots an instance.
-
-    This is a single-node call.
-
-    """
-    return self._SingleNodeCall(node, "instance_reboot",
-                                [self._InstDict(instance), reboot_type])
-
-  def call_instance_os_add(self, node, inst, reinstall):
-    """Installs an OS on the given instance.
-
-    This is a single-node call.
-
-    """
-    return self._SingleNodeCall(node, "instance_os_add",
-                                [self._InstDict(inst), reinstall])
-
-  def call_instance_run_rename(self, node, inst, old_name):
-    """Run the OS rename script for an instance.
-
-    This is a single-node call.
-
-    """
-    return self._SingleNodeCall(node, "instance_run_rename",
-                                [self._InstDict(inst), old_name])
-
-  def call_instance_info(self, node, instance, hname):
-    """Returns information about a single instance.
-
-    This is a single-node call.
-
-    @type node: list
-    @param node: the list of nodes to query
-    @type instance: string
-    @param instance: the instance name
-    @type hname: string
-    @param hname: the hypervisor type of the instance
-
-    """
-    return self._SingleNodeCall(node, "instance_info", [instance, hname])
-
-  def call_instance_migratable(self, node, instance):
-    """Checks whether the given instance can be migrated.
-
-    This is a single-node call.
-
-    @param node: the node to query
-    @type instance: L{objects.Instance}
-    @param instance: the instance to check
-
+  def _CombineResults(results, requests, procedure):
+    """Combines pre-computed results for offline hosts with actual call results.
 
     """
 
     """
-    return self._SingleNodeCall(node, "instance_migratable",
-                                [self._InstDict(instance)])
-
-  def call_all_instances_info(self, node_list, hypervisor_list):
-    """Returns information about all instances on the given nodes.
-
-    This is a multi-node call.
-
-    @type node_list: list
-    @param node_list: the list of nodes to query
-    @type hypervisor_list: list
-    @param hypervisor_list: the hypervisors to query for instances
-
-    """
-    return self._MultiNodeCall(node_list, "all_instances_info",
-                               [hypervisor_list])
-
-  def call_instance_list(self, node_list, hypervisor_list):
-    """Returns the list of running instances on a given node.
-
-    This is a multi-node call.
-
-    @type node_list: list
-    @param node_list: the list of nodes to query
-    @type hypervisor_list: list
-    @param hypervisor_list: the hypervisors to query for instances
-
-    """
-    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
-
-  def call_node_tcp_ping(self, node, source, target, port, timeout,
-                         live_port_needed):
-    """Do a TcpPing on the remote node
-
-    This is a single-node call.
-
-    """
-    return self._SingleNodeCall(node, "node_tcp_ping",
-                                [source, target, port, timeout,
-                                 live_port_needed])
-
-  def call_node_has_ip_address(self, node, address):
-    """Checks if a node has the given IP address.
-
-    This is a single-node call.
-
-    """
-    return self._SingleNodeCall(node, "node_has_ip_address", [address])
-
-  def call_node_info(self, node_list, vg_name, hypervisor_type):
-    """Return node information.
-
-    This will return memory information and volume group size and free
-    space.
-
-    This is a multi-node call.
-
-    @type node_list: list
-    @param node_list: the list of nodes to query
-    @type vg_name: C{string}
-    @param vg_name: the name of the volume group to ask for disk space
-        information
-    @type hypervisor_type: C{str}
-    @param hypervisor_type: the name of the hypervisor to ask for
-        memory information
-
-    """
-    return self._MultiNodeCall(node_list, "node_info",
-                               [vg_name, hypervisor_type])
-
-  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
-    """Add a node to the cluster.
-
-    This is a single-node call.
-
-    """
-    return self._SingleNodeCall(node, "node_add",
-                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
-
-  def call_node_verify(self, node_list, checkdict, cluster_name):
-    """Request verification of given parameters.
-
-    This is a multi-node call.
-
-    """
-    return self._MultiNodeCall(node_list, "node_verify",
-                               [checkdict, cluster_name])
-
-  @classmethod
-  def call_node_start_master(cls, node, start_daemons):
-    """Tells a node to activate itself as a master.
-
-    This is a single-node call.
-
-    """
-    return cls._StaticSingleNodeCall(node, "node_start_master",
-                                     [start_daemons])
-
-  @classmethod
-  def call_node_stop_master(cls, node, stop_daemons):
-    """Tells a node to demote itself from master status.
-
-    This is a single-node call.
-
-    """
-    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
-
-  @classmethod
-  def call_master_info(cls, node_list):
-    """Query master info.
-
-    This is a multi-node call.
-
-    """
-    # TODO: should this method query down nodes?
-    return cls._StaticMultiNodeCall(node_list, "master_info", [])
-
-  def call_version(self, node_list):
-    """Query node version.
-
-    This is a multi-node call.
-
-    """
-    return self._MultiNodeCall(node_list, "version", [])
-
-  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
-    """Request creation of a given block device.
-
-    This is a single-node call.
-
-    """
-    return self._SingleNodeCall(node, "blockdev_create",
-                                [bdev.ToDict(), size, owner, on_primary, info])
-
-  def call_blockdev_remove(self, node, bdev):
-    """Request removal of a given block device.
-
-    This is a single-node call.
-
-    """
-    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
-
-  def call_blockdev_rename(self, node, devlist):
-    """Request rename of the given block devices.
-
-    This is a single-node call.
-
-    """
-    return self._SingleNodeCall(node, "blockdev_rename",
-                                [(d.ToDict(), uid) for d, uid in devlist])
+    for name, req in requests.items():
+      if req.success and req.resp_status_code == http.HTTP_OK:
+        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
+                                node=name, call=procedure)
+      else:
+        # TODO: Better error reporting
+        if req.error:
+          msg = req.error
+        else:
+          msg = req.resp_body
 
 
-  def call_blockdev_assemble(self, node, disk, owner, on_primary):
-    """Request assembling of a given block device.
+        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
+        host_result = RpcResult(data=msg, failed=True, node=name,
+                                call=procedure)
 
 
-    This is a single-node call.
+      results[name] = host_result
 
 
-    """
-    return self._SingleNodeCall(node, "blockdev_assemble",
-                                [disk.ToDict(), owner, on_primary])
+    return results
 
 
-  def call_blockdev_shutdown(self, node, disk):
-    """Request shutdown of a given block device.
+  def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
+               _req_process_fn=None):
+    """Makes an RPC request to a number of nodes.
 
 
-    This is a single-node call.
+    @type hosts: sequence
+    @param hosts: Hostnames
+    @type procedure: string
+    @param procedure: Request path
+    @type body: dictionary
+    @param body: dictionary with request bodies per host
+    @type read_timeout: int or None
+    @param read_timeout: Read timeout for request
 
     """
 
     """
-    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
+    assert read_timeout is not None, \
+      "Missing RPC read timeout for procedure '%s'" % procedure
 
 
-  def call_blockdev_addchildren(self, node, bdev, ndevs):
-    """Request adding a list of children to a (mirroring) device.
+    if _req_process_fn is None:
+      _req_process_fn = http.client.ProcessRequests
 
 
-    This is a single-node call.
+    (results, requests) = \
+      self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
+                            procedure, body, read_timeout)
 
 
-    """
-    return self._SingleNodeCall(node, "blockdev_addchildren",
-                                [bdev.ToDict(),
-                                 [disk.ToDict() for disk in ndevs]])
+    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
 
 
-  def call_blockdev_removechildren(self, node, bdev, ndevs):
-    """Request removing a list of children from a (mirroring) device.
+    assert not frozenset(results).intersection(requests)
 
 
-    This is a single-node call.
-
-    """
-    return self._SingleNodeCall(node, "blockdev_removechildren",
-                                [bdev.ToDict(),
-                                 [disk.ToDict() for disk in ndevs]])
+    return self._CombineResults(results, requests, procedure)
 
 
-  def call_blockdev_getmirrorstatus(self, node, disks):
-    """Request status of a (mirroring) device.
 
 
-    This is a single-node call.
+class _RpcClientBase:
+  def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
+               _req_process_fn=None):
+    """Initializes this class.
 
     """
 
     """
-    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
-                                [dsk.ToDict() for dsk in disks])
+    proc = _RpcProcessor(resolver,
+                         netutils.GetDaemonPort(constants.NODED),
+                         lock_monitor_cb=lock_monitor_cb)
+    self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
+    self._encoder = compat.partial(self._EncodeArg, encoder_fn)
 
 
-  def call_blockdev_find(self, node, disk):
-    """Request identification of a given block device.
-
-    This is a single-node call.
+  @staticmethod
+  def _EncodeArg(encoder_fn, (argkind, value)):
+    """Encode argument.
 
     """
 
     """
-    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
-
-  def call_blockdev_close(self, node, instance_name, disks):
-    """Closes the given block devices.
+    if argkind is None:
+      return value
+    else:
+      return encoder_fn(argkind)(value)
 
 
-    This is a single-node call.
+  def _Call(self, cdef, node_list, args):
+    """Entry point for automatically generated RPC wrappers.
 
     """
 
     """
-    params = [instance_name, [cf.ToDict() for cf in disks]]
-    return self._SingleNodeCall(node, "blockdev_close", params)
+    (procedure, _, resolver_opts, timeout, argdefs,
+     prep_fn, postproc_fn, _) = cdef
 
 
-  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
-    """Disconnects the network of the given drbd devices.
+    if callable(timeout):
+      read_timeout = timeout(args)
+    else:
+      read_timeout = timeout
 
 
-    This is a multi-node call.
+    if callable(resolver_opts):
+      req_resolver_opts = resolver_opts(args)
+    else:
+      req_resolver_opts = resolver_opts
 
 
-    """
-    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
-                               [nodes_ip, [cf.ToDict() for cf in disks]])
+    if len(args) != len(argdefs):
+      raise errors.ProgrammerError("Number of passed arguments doesn't match")
 
 
-  def call_drbd_attach_net(self, node_list, nodes_ip,
-                           disks, instance_name, multimaster):
-    """Disconnects the given drbd devices.
+    enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
+    if prep_fn is None:
+      # for a no-op prep_fn, we serialise the body once, and then we
+      # reuse it in the dictionary values
+      body = serializer.DumpJson(enc_args)
+      pnbody = dict((n, body) for n in node_list)
+    else:
+      # for a custom prep_fn, we pass the encoded arguments and the
+      # node name to the prep_fn, and we serialise its return value
+      assert callable(prep_fn)
+      pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
+                    for n in node_list)
+
+    result = self._proc(node_list, procedure, pnbody, read_timeout,
+                        req_resolver_opts)
+
+    if postproc_fn:
+      return dict(map(lambda (key, value): (key, postproc_fn(value)),
+                      result.items()))
+    else:
+      return result
 
 
-    This is a multi-node call.
 
 
-    """
-    return self._MultiNodeCall(node_list, "drbd_attach_net",
-                               [nodes_ip, [cf.ToDict() for cf in disks],
-                                instance_name, multimaster])
+def _ObjectToDict(value):
+  """Converts an object to a dictionary.
 
 
-  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
-    """Waits for the synchronization of drbd devices is complete.
+  @note: See L{objects}.
 
 
-    This is a multi-node call.
+  """
+  return value.ToDict()
 
 
-    """
-    return self._MultiNodeCall(node_list, "drbd_wait_sync",
-                               [nodes_ip, [cf.ToDict() for cf in disks]])
 
 
-  @classmethod
-  def call_upload_file(cls, node_list, file_name, address_list=None):
-    """Upload a file.
+def _ObjectListToDict(value):
+  """Converts a list of L{objects} to dictionaries.
 
 
-    The node will refuse the operation in case the file is not on the
-    approved file list.
+  """
+  return map(_ObjectToDict, value)
 
 
-    This is a multi-node call.
 
 
-    @type node_list: list
-    @param node_list: the list of node names to upload to
-    @type file_name: str
-    @param file_name: the filename to upload
-    @type address_list: list or None
-    @keyword address_list: an optional list of node addresses, in order
-        to optimize the RPC speed
+def _EncodeNodeToDiskDict(value):
+  """Encodes a dictionary with node name as key and disk objects as values.
 
 
-    """
-    file_contents = utils.ReadFile(file_name)
-    data = cls._Compress(file_contents)
-    st = os.stat(file_name)
-    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
-              st.st_atime, st.st_mtime]
-    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
-                                    address_list=address_list)
+  """
+  return dict((name, _ObjectListToDict(disks))
+              for name, disks in value.items())
 
 
-  @classmethod
-  def call_write_ssconf_files(cls, node_list, values):
-    """Write ssconf files.
 
 
-    This is a multi-node call.
+def _PrepareFileUpload(getents_fn, filename):
+  """Loads a file and prepares it for an upload to nodes.
 
 
-    """
-    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
+  """
+  statcb = utils.FileStatHelper()
+  data = _Compress(utils.ReadFile(filename, preread=statcb))
+  st = statcb.st
 
 
-  def call_os_diagnose(self, node_list):
-    """Request a diagnose of OS definitions.
+  if getents_fn is None:
+    getents_fn = runtime.GetEnts
 
 
-    This is a multi-node call.
+  getents = getents_fn()
 
 
-    """
-    return self._MultiNodeCall(node_list, "os_diagnose", [])
+  return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
+          getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
 
 
-  def call_os_get(self, node, name):
-    """Returns an OS definition.
 
 
-    This is a single-node call.
+def _PrepareFinalizeExportDisks(snap_disks):
+  """Encodes disks for finalizing export.
 
 
-    """
-    result = self._SingleNodeCall(node, "os_get", [name])
-    if not result.failed and isinstance(result.data, dict):
-      result.data = objects.OS.FromDict(result.data)
-    return result
+  """
+  flat_disks = []
 
 
-  def call_hooks_runner(self, node_list, hpath, phase, env):
-    """Call the hooks runner.
+  for disk in snap_disks:
+    if isinstance(disk, bool):
+      flat_disks.append(disk)
+    else:
+      flat_disks.append(disk.ToDict())
 
 
-    Args:
-      - op: the OpCode instance
-      - env: a dictionary with the environment
+  return flat_disks
 
 
-    This is a multi-node call.
 
 
-    """
-    params = [hpath, phase, env]
-    return self._MultiNodeCall(node_list, "hooks_runner", params)
+def _EncodeImportExportIO((ieio, ieioargs)):
+  """Encodes import/export I/O information.
 
 
-  def call_iallocator_runner(self, node, name, idata):
-    """Call an iallocator on a remote node
+  """
+  if ieio == constants.IEIO_RAW_DISK:
+    assert len(ieioargs) == 1
+    return (ieio, (ieioargs[0].ToDict(), ))
 
 
-    Args:
-      - name: the iallocator name
-      - input: the json-encoded input string
+  if ieio == constants.IEIO_SCRIPT:
+    assert len(ieioargs) == 2
+    return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
 
 
-    This is a single-node call.
+  return (ieio, ieioargs)
 
 
-    """
-    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
 
 
-  def call_blockdev_grow(self, node, cf_bdev, amount):
-    """Request a snapshot of the given block device.
+def _EncodeBlockdevRename(value):
+  """Encodes information for renaming block devices.
 
 
-    This is a single-node call.
+  """
+  return [(d.ToDict(), uid) for d, uid in value]
 
 
-    """
-    return self._SingleNodeCall(node, "blockdev_grow",
-                                [cf_bdev.ToDict(), amount])
 
 
-  def call_blockdev_snapshot(self, node, cf_bdev):
-    """Request a snapshot of the given block device.
+def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
+  """Annotates just DRBD disks layouts.
 
 
-    This is a single-node call.
+  """
+  assert disk.dev_type == constants.LD_DRBD8
 
 
-    """
-    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
+  disk.params = objects.FillDict(drbd_params, disk.params)
+  (dev_data, dev_meta) = disk.children
+  dev_data.params = objects.FillDict(data_params, dev_data.params)
+  dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
 
 
-  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
-                           cluster_name, idx):
-    """Request the export of a given snapshot.
+  return disk
 
 
-    This is a single-node call.
 
 
-    """
-    return self._SingleNodeCall(node, "snapshot_export",
-                                [snap_bdev.ToDict(), dest_node,
-                                 self._InstDict(instance), cluster_name, idx])
+def _AnnotateDParamsGeneric(disk, (params, )):
+  """Generic disk parameter annotation routine.
 
 
-  def call_finalize_export(self, node, instance, snap_disks):
-    """Request the completion of an export operation.
+  """
+  assert disk.dev_type != constants.LD_DRBD8
 
 
-    This writes the export config file, etc.
+  disk.params = objects.FillDict(params, disk.params)
 
 
-    This is a single-node call.
+  return disk
 
 
-    """
-    flat_disks = []
-    for disk in snap_disks:
-      flat_disks.append(disk.ToDict())
 
 
-    return self._SingleNodeCall(node, "finalize_export",
-                                [self._InstDict(instance), flat_disks])
+def AnnotateDiskParams(template, disks, disk_params):
+  """Annotates the disk objects with the disk parameters.
 
 
-  def call_export_info(self, node, path):
-    """Queries the export information in a given path.
+  @param template: The disk template used
+  @param disks: The list of disks objects to annotate
+  @param disk_params: The disk paramaters for annotation
+  @returns: A list of disk objects annotated
 
 
-    This is a single-node call.
+  """
+  ld_params = objects.Disk.ComputeLDParams(template, disk_params)
 
 
-    """
-    return self._SingleNodeCall(node, "export_info", [path])
+  if template == constants.DT_DRBD8:
+    annotation_fn = _AnnotateDParamsDRBD
+  elif template == constants.DT_DISKLESS:
+    annotation_fn = lambda disk, _: disk
+  else:
+    annotation_fn = _AnnotateDParamsGeneric
 
 
-  def call_instance_os_import(self, node, inst, src_node, src_images,
-                              cluster_name):
-    """Request the import of a backup into an instance.
+  new_disks = []
+  for disk in disks:
+    new_disks.append(annotation_fn(disk.Copy(), ld_params))
 
 
-    This is a single-node call.
+  return new_disks
 
 
-    """
-    return self._SingleNodeCall(node, "instance_os_import",
-                                [self._InstDict(inst), src_node, src_images,
-                                 cluster_name])
 
 
-  def call_export_list(self, node_list):
-    """Gets the stored exports list.
+#: Generic encoders
+_ENCODERS = {
+  rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
+  rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
+  rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
+  rpc_defs.ED_COMPRESS: _Compress,
+  rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
+  rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
+  rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
+  }
 
 
-    This is a multi-node call.
 
 
-    """
-    return self._MultiNodeCall(node_list, "export_list", [])
+class RpcRunner(_RpcClientBase,
+                _generated_rpc.RpcClientDefault,
+                _generated_rpc.RpcClientBootstrap,
+                _generated_rpc.RpcClientDnsOnly,
+                _generated_rpc.RpcClientConfig):
+  """RPC runner class.
 
 
-  def call_export_remove(self, node, export):
-    """Requests removal of a given export.
+  """
+  def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
+    """Initialized the RPC runner.
 
 
-    This is a single-node call.
+    @type cfg: L{config.ConfigWriter}
+    @param cfg: Configuration
+    @type lock_monitor_cb: callable
+    @param lock_monitor_cb: Lock monitor callback
 
     """
 
     """
-    return self._SingleNodeCall(node, "export_remove", [export])
+    self._cfg = cfg
 
 
-  @classmethod
-  def call_node_leave_cluster(cls, node):
-    """Requests a node to clean the cluster information it has.
+    encoders = _ENCODERS.copy()
+
+    encoders.update({
+      # Encoders requiring configuration object
+      rpc_defs.ED_INST_DICT: self._InstDict,
+      rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
+      rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
+
+      # Encoders annotating disk parameters
+      rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
+      rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
+
+      # Encoders with special requirements
+      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
+      })
+
+    # Resolver using configuration
+    resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
+                              cfg.GetAllNodesInfo)
+
+    # Pylint doesn't recognize multiple inheritance properly, see
+    # <http://www.logilab.org/ticket/36586> and
+    # <http://www.logilab.org/ticket/35642>
+    # pylint: disable=W0233
+    _RpcClientBase.__init__(self, resolver, encoders.get,
+                            lock_monitor_cb=lock_monitor_cb,
+                            _req_process_fn=_req_process_fn)
+    _generated_rpc.RpcClientConfig.__init__(self)
+    _generated_rpc.RpcClientBootstrap.__init__(self)
+    _generated_rpc.RpcClientDnsOnly.__init__(self)
+    _generated_rpc.RpcClientDefault.__init__(self)
+
+  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
+    """Convert the given instance to a dict.
 
 
-    This will remove the configuration information from the ganeti data
-    dir.
+    This is done via the instance's ToDict() method and additionally
+    we fill the hvparams with the cluster defaults.
 
 
-    This is a single-node call.
+    @type instance: L{objects.Instance}
+    @param instance: an Instance object
+    @type hvp: dict or None
+    @param hvp: a dictionary with overridden hypervisor parameters
+    @type bep: dict or None
+    @param bep: a dictionary with overridden backend parameters
+    @type osp: dict or None
+    @param osp: a dictionary with overridden os parameters
+    @rtype: dict
+    @return: the instance dict, with the hvparams filled with the
+        cluster defaults
 
     """
 
     """
-    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
-
-  def call_node_volumes(self, node_list):
-    """Gets all volumes on node(s).
+    idict = instance.ToDict()
+    cluster = self._cfg.GetClusterInfo()
+    idict["hvparams"] = cluster.FillHV(instance)
+    if hvp is not None:
+      idict["hvparams"].update(hvp)
+    idict["beparams"] = cluster.FillBE(instance)
+    if bep is not None:
+      idict["beparams"].update(bep)
+    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
+    if osp is not None:
+      idict["osparams"].update(osp)
+    for nic in idict["nics"]:
+      nic["nicparams"] = objects.FillDict(
+        cluster.nicparams[constants.PP_DEFAULT],
+        nic["nicparams"])
+    idict["disks"] = self._DisksDictDP((instance.disks, instance))
+    return idict
 
 
-    This is a multi-node call.
+  def _InstDictHvpBepDp(self, (instance, hvp, bep)):
+    """Wrapper for L{_InstDict}.
 
     """
 
     """
-    return self._MultiNodeCall(node_list, "node_volumes", [])
-
-  def call_node_demote_from_mc(self, node):
-    """Demote a node from the master candidate role.
+    return self._InstDict(instance, hvp=hvp, bep=bep)
 
 
-    This is a single-node call.
+  def _InstDictOspDp(self, (instance, osparams)):
+    """Wrapper for L{_InstDict}.
 
     """
 
     """
-    return self._SingleNodeCall(node, "node_demote_from_mc", [])
+    return self._InstDict(instance, osp=osparams)
 
 
-
-  def call_node_powercycle(self, node, hypervisor):
-    """Tries to powercycle a node.
-
-    This is a single-node call.
+  def _DisksDictDP(self, (disks, instance)):
+    """Wrapper for L{AnnotateDiskParams}.
 
     """
 
     """
-    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
-
-
-  def call_test_delay(self, node_list, duration):
-    """Sleep for a fixed time on given node(s).
+    diskparams = self._cfg.GetInstanceDiskParams(instance)
+    return [disk.ToDict()
+            for disk in AnnotateDiskParams(instance.disk_template,
+                                           disks, diskparams)]
 
 
-    This is a multi-node call.
+  def _SingleDiskDictDP(self, (disk, instance)):
+    """Wrapper for L{AnnotateDiskParams}.
 
     """
 
     """
-    return self._MultiNodeCall(node_list, "test_delay", [duration])
+    (anno_disk,) = self._DisksDictDP(([disk], instance))
+    return anno_disk
 
 
-  def call_file_storage_dir_create(self, node, file_storage_dir):
-    """Create the given file storage directory.
 
 
-    This is a single-node call.
+class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
+  """RPC wrappers for job queue.
 
 
-    """
-    return self._SingleNodeCall(node, "file_storage_dir_create",
-                                [file_storage_dir])
-
-  def call_file_storage_dir_remove(self, node, file_storage_dir):
-    """Remove the given file storage directory.
-
-    This is a single-node call.
+  """
+  def __init__(self, context, address_list):
+    """Initializes this class.
 
     """
 
     """
-    return self._SingleNodeCall(node, "file_storage_dir_remove",
-                                [file_storage_dir])
-
-  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
-                                   new_file_storage_dir):
-    """Rename file storage directory.
+    if address_list is None:
+      resolver = compat.partial(_SsconfResolver, True)
+    else:
+      # Caller provided an address list
+      resolver = _StaticResolver(address_list)
 
 
-    This is a single-node call.
+    _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
+                            lock_monitor_cb=context.glm.AddToLockMonitor)
+    _generated_rpc.RpcClientJobQueue.__init__(self)
 
 
-    """
-    return self._SingleNodeCall(node, "file_storage_dir_rename",
-                                [old_file_storage_dir, new_file_storage_dir])
 
 
-  @classmethod
-  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
-    """Update job queue.
+class BootstrapRunner(_RpcClientBase,
+                      _generated_rpc.RpcClientBootstrap,
+                      _generated_rpc.RpcClientDnsOnly):
+  """RPC wrappers for bootstrapping.
 
 
-    This is a multi-node call.
+  """
+  def __init__(self):
+    """Initializes this class.
 
     """
 
     """
-    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
-                                    [file_name, cls._Compress(content)],
-                                    address_list=address_list)
+    # Pylint doesn't recognize multiple inheritance properly, see
+    # <http://www.logilab.org/ticket/36586> and
+    # <http://www.logilab.org/ticket/35642>
+    # pylint: disable=W0233
+    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
+                            _ENCODERS.get)
+    _generated_rpc.RpcClientBootstrap.__init__(self)
+    _generated_rpc.RpcClientDnsOnly.__init__(self)
 
 
-  @classmethod
-  def call_jobqueue_purge(cls, node):
-    """Purge job queue.
 
 
-    This is a single-node call.
+class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
+  """RPC wrappers for calls using only DNS.
 
 
-    """
-    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
-
-  @classmethod
-  def call_jobqueue_rename(cls, node_list, address_list, rename):
-    """Rename a job queue file.
-
-    This is a multi-node call.
+  """
+  def __init__(self):
+    """Initialize this class.
 
     """
 
     """
-    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
-                                    address_list=address_list)
+    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
+                            _ENCODERS.get)
+    _generated_rpc.RpcClientDnsOnly.__init__(self)
 
 
-  @classmethod
-  def call_jobqueue_set_drain(cls, node_list, drain_flag):
-    """Set the drain flag on the queue.
 
 
-    This is a multi-node call.
+class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
+  """RPC wrappers for L{config}.
 
 
-    @type node_list: list
-    @param node_list: the list of nodes to query
-    @type drain_flag: bool
-    @param drain_flag: if True, will set the drain flag, otherwise reset it.
+  """
+  def __init__(self, context, address_list, _req_process_fn=None,
+               _getents=None):
+    """Initializes this class.
 
     """
 
     """
-    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
-                                    [drain_flag])
+    if context:
+      lock_monitor_cb = context.glm.AddToLockMonitor
+    else:
+      lock_monitor_cb = None
 
 
-  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
-    """Validate the hypervisor params.
+    if address_list is None:
+      resolver = compat.partial(_SsconfResolver, True)
+    else:
+      # Caller provided an address list
+      resolver = _StaticResolver(address_list)
 
 
-    This is a multi-node call.
+    encoders = _ENCODERS.copy()
 
 
-    @type node_list: list
-    @param node_list: the list of nodes to query
-    @type hvname: string
-    @param hvname: the hypervisor name
-    @type hvparams: dict
-    @param hvparams: the hypervisor parameters to be validated
+    encoders.update({
+      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
+      })
 
 
-    """
-    cluster = self._cfg.GetClusterInfo()
-    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
-    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
-                               [hvname, hv_full])
+    _RpcClientBase.__init__(self, resolver, encoders.get,
+                            lock_monitor_cb=lock_monitor_cb,
+                            _req_process_fn=_req_process_fn)
+    _generated_rpc.RpcClientConfig.__init__(self)