Simplify handling of regular fields in LUQuery*
[ganeti-local] / lib / rpc.py
index 368d1f7..8101538 100644 (file)
 # 02110-1301, USA.
 
 
 # 02110-1301, USA.
 
 
-"""Script to show add a new node to the cluster
+"""Inter-node RPC library.
 
 """
 
 
 """
 
-# pylint: disable-msg=C0103
+# pylint: disable-msg=C0103,R0201,R0904
+# C0103: Invalid name, since call_ are not valid
+# R0201: Method could be a function, we keep all rpcs instance methods
+# as not to change them back and forth between static/instance methods
+# if they need to start using instance attributes
+# R0904: Too many public methods
 
 import os
 
 import os
-import socket
-import httplib
+import logging
+import zlib
+import base64
 
 
-import simplejson
-
-from ganeti import logger
 from ganeti import utils
 from ganeti import objects
 from ganeti import utils
 from ganeti import objects
-from ganeti import ssconf
+from ganeti import http
+from ganeti import serializer
+from ganeti import constants
+from ganeti import errors
+
+import ganeti.http.client
+
+
+# Module level variable
+_http_manager = None
+
+
+def Init():
+  """Initializes the module-global HTTP client manager.
+
+  Must be called before using any RPC function.
+
+  """
+  global _http_manager
+
+  assert not _http_manager, "RPC module initialized more than once"
+
+  _http_manager = http.client.HttpClientManager()
+
+
+def Shutdown():
+  """Stops the module-global HTTP client manager.
+
+  Must be called before quitting the program.
 
 
+  """
+  global _http_manager
+
+  if _http_manager:
+    _http_manager.Shutdown()
+    _http_manager = None
+
+
+class RpcResult(object):
+  """RPC Result class.
 
 
-class NodeController:
-  """Node-handling class.
+  This class holds an RPC result. It is needed since in multi-node
+  calls we can't raise an exception just because one one out of many
+  failed, and therefore we use this class to encapsulate the result.
 
 
-  For each node that we speak with, we create an instance of this
-  class, so that we have a safe place to store the details of this
-  individual call.
+  @ivar data: the data payload, for successful results, or None
+  @ivar call: the name of the RPC call
+  @ivar node: the name of the node to which we made the call
+  @ivar offline: whether the operation failed because the node was
+      offline, as opposed to actual failure; offline=True will always
+      imply failed=True, in order to allow simpler checking if
+      the user doesn't care about the exact failure mode
+  @ivar fail_msg: the error message if the call failed
 
   """
 
   """
-  def __init__(self, parent, node):
-    self.parent = parent
+  def __init__(self, data=None, failed=False, offline=False,
+               call=None, node=None):
+    self.offline = offline
+    self.call = call
     self.node = node
     self.node = node
-    self.failed = False
-
-    self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
-    try:
-      hc.connect()
-      hc.putrequest('PUT', "/%s" % self.parent.procedure,
-                    skip_accept_encoding=True)
-      hc.putheader('Content-Length', str(len(parent.body)))
-      hc.endheaders()
-      hc.send(parent.body)
-    except socket.error, err:
-      logger.Error("Error connecting to %s: %s" % (node, str(err)))
-      self.failed = True
-
-  def get_response(self):
-    """Try to process the response from the node.
-
-    """
-    if self.failed:
-      # we already failed in connect
-      return False
-    resp = self.http_conn.getresponse()
-    if resp.status != 200:
-      return False
-    try:
-      length = int(resp.getheader('Content-Length', '0'))
-    except ValueError:
-      return False
-    if not length:
-      logger.Error("Zero-length reply from %s" % self.node)
-      return False
-    payload = resp.read(length)
-    unload = simplejson.loads(payload)
-    return unload
+    if offline:
+      self.fail_msg = "Node is marked offline"
+      self.data = self.payload = None
+    elif failed:
+      self.fail_msg = self._EnsureErr(data)
+      self.data = self.payload = None
+    else:
+      self.data = data
+      if not isinstance(self.data, (tuple, list)):
+        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
+                         type(self.data))
+      elif len(data) != 2:
+        self.fail_msg = ("RPC layer error: invalid result length (%d), "
+                         "expected 2" % len(self.data))
+      elif not self.data[0]:
+        self.fail_msg = self._EnsureErr(self.data[1])
+      else:
+        # finally success
+        self.fail_msg = None
+        self.payload = data[1]
+
+  @staticmethod
+  def _EnsureErr(val):
+    """Helper to ensure we return a 'True' value for error."""
+    if val:
+      return val
+    else:
+      return "No error information"
+
+  def Raise(self, msg, prereq=False):
+    """If the result has failed, raise an OpExecError.
+
+    This is used so that LU code doesn't have to check for each
+    result, but instead can call this function.
+
+    """
+    if not self.fail_msg:
+      return
+
+    if not msg: # one could pass None for default message
+      msg = ("Call '%s' to node '%s' has failed: %s" %
+             (self.call, self.node, self.fail_msg))
+    else:
+      msg = "%s: %s" % (msg, self.fail_msg)
+    if prereq:
+      ec = errors.OpPrereqError
+    else:
+      ec = errors.OpExecError
+    raise ec(msg)
 
 
 class Client:
 
 
 class Client:
@@ -91,738 +155,1015 @@ class Client:
   list of nodes, will contact (in parallel) all nodes, and return a
   dict of results (key: node name, value: result).
 
   list of nodes, will contact (in parallel) all nodes, and return a
   dict of results (key: node name, value: result).
 
-  One current bug is that generic failure is still signalled by
+  One current bug is that generic failure is still signaled by
   'False' result, which is not good. This overloading of values can
   cause bugs.
 
   """
   'False' result, which is not good. This overloading of values can
   cause bugs.
 
   """
-  result_set = False
-  result = False
-  allresult = []
-
-  def __init__(self, procedure, args):
-    ss = ssconf.SimpleStore()
-    self.port = ss.GetNodeDaemonPort()
-    self.nodepw = ss.GetNodeDaemonPassword()
-    self.nc = {}
-    self.results = {}
+  def __init__(self, procedure, body, port):
     self.procedure = procedure
     self.procedure = procedure
-    self.args = args
-    self.body = simplejson.dumps(args)
+    self.body = body
+    self.port = port
+    self.nc = {}
 
 
-  #--- generic connector -------------
+    self._ssl_params = \
+      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
+                         ssl_cert_path=constants.SSL_CERT_FILE)
 
 
-  def connect_list(self, node_list):
+  def ConnectList(self, node_list, address_list=None):
     """Add a list of nodes to the target nodes.
 
     """Add a list of nodes to the target nodes.
 
+    @type node_list: list
+    @param node_list: the list of node names to connect
+    @type address_list: list or None
+    @keyword address_list: either None or a list with node addresses,
+        which must have the same length as the node list
+
     """
     """
-    for node in node_list:
-      self.connect(node)
+    if address_list is None:
+      address_list = [None for _ in node_list]
+    else:
+      assert len(node_list) == len(address_list), \
+             "Name and address lists should have the same length"
+    for node, address in zip(node_list, address_list):
+      self.ConnectNode(node, address)
 
 
-  def connect(self, connect_node):
+  def ConnectNode(self, name, address=None):
     """Add a node to the target list.
 
     """Add a node to the target list.
 
+    @type name: str
+    @param name: the node name
+    @type address: str
+    @keyword address: the node address, if known
+
     """
     """
-    self.nc[connect_node] = nc = NodeController(self, connect_node)
+    if address is None:
+      address = name
+
+    self.nc[name] = \
+      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
+                                    "/%s" % self.procedure,
+                                    post_data=self.body,
+                                    ssl_params=self._ssl_params,
+                                    ssl_verify_peer=True)
+
+  def GetResults(self):
+    """Call nodes and return results.
 
 
-  def getresult(self):
-    """Return the results of the call.
+    @rtype: list
+    @return: List of RPC results
 
     """
 
     """
-    return self.results
+    assert _http_manager, "RPC module not initialized"
+
+    _http_manager.ExecRequests(self.nc.values())
+
+    results = {}
+
+    for name, req in self.nc.iteritems():
+      if req.success and req.resp_status_code == http.HTTP_OK:
+        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
+                                  node=name, call=self.procedure)
+        continue
+
+      # TODO: Better error reporting
+      if req.error:
+        msg = req.error
+      else:
+        msg = req.resp_body
+
+      logging.error("RPC error in %s from node %s: %s",
+                    self.procedure, name, msg)
+      results[name] = RpcResult(data=msg, failed=True, node=name,
+                                call=self.procedure)
+
+    return results
+
 
 
-  def run(self):
-    """Wrapper over reactor.run().
+class RpcRunner(object):
+  """RPC runner class"""
 
 
-    This function simply calls reactor.run() if we have any requests
-    queued, otherwise it does nothing.
+  def __init__(self, cfg):
+    """Initialized the rpc runner.
+
+    @type cfg:  C{config.ConfigWriter}
+    @param cfg: the configuration object that will be used to get data
+                about the cluster
 
     """
 
     """
-    for node, nc in self.nc.items():
-      self.results[node] = nc.get_response()
+    self._cfg = cfg
+    self.port = utils.GetDaemonPort(constants.NODED)
 
 
+  def _InstDict(self, instance, hvp=None, bep=None):
+    """Convert the given instance to a dict.
 
 
-def call_volume_list(node_list, vg_name):
-  """Gets the logical volumes present in a given volume group.
+    This is done via the instance's ToDict() method and additionally
+    we fill the hvparams with the cluster defaults.
 
 
-  This is a multi-node call.
+    @type instance: L{objects.Instance}
+    @param instance: an Instance object
+    @type hvp: dict or None
+    @param hvp: a dictionary with overridden hypervisor parameters
+    @type bep: dict or None
+    @param bep: a dictionary with overridden backend parameters
+    @rtype: dict
+    @return: the instance dict, with the hvparams filled with the
+        cluster defaults
 
 
-  """
-  c = Client("volume_list", [vg_name])
-  c.connect_list(node_list)
-  c.run()
-  return c.getresult()
+    """
+    idict = instance.ToDict()
+    cluster = self._cfg.GetClusterInfo()
+    idict["hvparams"] = cluster.FillHV(instance)
+    if hvp is not None:
+      idict["hvparams"].update(hvp)
+    idict["beparams"] = cluster.FillBE(instance)
+    if bep is not None:
+      idict["beparams"].update(bep)
+    for nic in idict["nics"]:
+      nic['nicparams'] = objects.FillDict(
+        cluster.nicparams[constants.PP_DEFAULT],
+        nic['nicparams'])
+    return idict
+
+  def _ConnectList(self, client, node_list, call):
+    """Helper for computing node addresses.
+
+    @type client: L{ganeti.rpc.Client}
+    @param client: a C{Client} instance
+    @type node_list: list
+    @param node_list: the node list we should connect
+    @type call: string
+    @param call: the name of the remote procedure call, for filling in
+        correctly any eventual offline nodes' results
 
 
+    """
+    all_nodes = self._cfg.GetAllNodesInfo()
+    name_list = []
+    addr_list = []
+    skip_dict = {}
+    for node in node_list:
+      if node in all_nodes:
+        if all_nodes[node].offline:
+          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
+          continue
+        val = all_nodes[node].primary_ip
+      else:
+        val = None
+      addr_list.append(val)
+      name_list.append(node)
+    if name_list:
+      client.ConnectList(name_list, address_list=addr_list)
+    return skip_dict
+
+  def _ConnectNode(self, client, node, call):
+    """Helper for computing one node's address.
+
+    @type client: L{ganeti.rpc.Client}
+    @param client: a C{Client} instance
+    @type node: str
+    @param node: the node we should connect
+    @type call: string
+    @param call: the name of the remote procedure call, for filling in
+        correctly any eventual offline nodes' results
 
 
-def call_vg_list(node_list):
-  """Gets the volume group list.
+    """
+    node_info = self._cfg.GetNodeInfo(node)
+    if node_info is not None:
+      if node_info.offline:
+        return RpcResult(node=node, offline=True, call=call)
+      addr = node_info.primary_ip
+    else:
+      addr = None
+    client.ConnectNode(node, address=addr)
 
 
-  This is a multi-node call.
+  def _MultiNodeCall(self, node_list, procedure, args):
+    """Helper for making a multi-node call
 
 
-  """
-  c = Client("vg_list", [])
-  c.connect_list(node_list)
-  c.run()
-  return c.getresult()
+    """
+    body = serializer.DumpJson(args, indent=False)
+    c = Client(procedure, body, self.port)
+    skip_dict = self._ConnectList(c, node_list, procedure)
+    skip_dict.update(c.GetResults())
+    return skip_dict
 
 
+  @classmethod
+  def _StaticMultiNodeCall(cls, node_list, procedure, args,
+                           address_list=None):
+    """Helper for making a multi-node static call
 
 
-def call_bridges_exist(node, bridges_list):
-  """Checks if a node has all the bridges given.
+    """
+    body = serializer.DumpJson(args, indent=False)
+    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
+    c.ConnectList(node_list, address_list=address_list)
+    return c.GetResults()
 
 
-  This method checks if all bridges given in the bridges_list are
-  present on the remote node, so that an instance that uses interfaces
-  on those bridges can be started.
+  def _SingleNodeCall(self, node, procedure, args):
+    """Helper for making a single-node call
 
 
-  This is a single-node call.
+    """
+    body = serializer.DumpJson(args, indent=False)
+    c = Client(procedure, body, self.port)
+    result = self._ConnectNode(c, node, procedure)
+    if result is None:
+      # we did connect, node is not offline
+      result = c.GetResults()[node]
+    return result
 
 
-  """
-  c = Client("bridges_exist", [bridges_list])
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+  @classmethod
+  def _StaticSingleNodeCall(cls, node, procedure, args):
+    """Helper for making a single-node static call
 
 
+    """
+    body = serializer.DumpJson(args, indent=False)
+    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
+    c.ConnectNode(node)
+    return c.GetResults()[node]
 
 
-def call_instance_start(node, instance, extra_args):
-  """Starts an instance.
+  @staticmethod
+  def _Compress(data):
+    """Compresses a string for transport over RPC.
 
 
-  This is a single-node call.
+    Small amounts of data are not compressed.
 
 
-  """
-  c = Client("instance_start", [instance.ToDict(), extra_args])
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+    @type data: str
+    @param data: Data
+    @rtype: tuple
+    @return: Encoded data to send
 
 
+    """
+    # Small amounts of data are not compressed
+    if len(data) < 512:
+      return (constants.RPC_ENCODING_NONE, data)
 
 
-def call_instance_shutdown(node, instance):
-  """Stops an instance.
+    # Compress with zlib and encode in base64
+    return (constants.RPC_ENCODING_ZLIB_BASE64,
+            base64.b64encode(zlib.compress(data, 3)))
 
 
-  This is a single-node call.
+  #
+  # Begin RPC calls
+  #
 
 
-  """
-  c = Client("instance_shutdown", [instance.ToDict()])
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+  def call_lv_list(self, node_list, vg_name):
+    """Gets the logical volumes present in a given volume group.
 
 
+    This is a multi-node call.
 
 
-def call_instance_migrate(node, instance, target, live):
-  """Migrate an instance.
+    """
+    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
 
 
-  This is a single-node call.
+  def call_vg_list(self, node_list):
+    """Gets the volume group list.
 
 
-  """
-  c = Client("instance_migrate", [instance.name, target, live])
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+    This is a multi-node call.
 
 
+    """
+    return self._MultiNodeCall(node_list, "vg_list", [])
 
 
-def call_instance_reboot(node, instance, reboot_type, extra_args):
-  """Reboots an instance.
+  def call_storage_list(self, node_list, su_name, su_args, name, fields):
+    """Get list of storage units.
 
 
-  This is a single-node call.
+    This is a multi-node call.
 
 
-  """
-  c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+    """
+    return self._MultiNodeCall(node_list, "storage_list",
+                               [su_name, su_args, name, fields])
 
 
+  def call_storage_modify(self, node, su_name, su_args, name, changes):
+    """Modify a storage unit.
 
 
-def call_instance_os_add(node, inst, osdev, swapdev):
-  """Installs an OS on the given instance.
+    This is a single-node call.
 
 
-  This is a single-node call.
+    """
+    return self._SingleNodeCall(node, "storage_modify",
+                                [su_name, su_args, name, changes])
 
 
-  """
-  params = [inst.ToDict(), osdev, swapdev]
-  c = Client("instance_os_add", params)
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+  def call_storage_execute(self, node, su_name, su_args, name, op):
+    """Executes an operation on a storage unit.
 
 
+    This is a single-node call.
 
 
-def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
-  """Run the OS rename script for an instance.
+    """
+    return self._SingleNodeCall(node, "storage_execute",
+                                [su_name, su_args, name, op])
 
 
-  This is a single-node call.
+  def call_bridges_exist(self, node, bridges_list):
+    """Checks if a node has all the bridges given.
 
 
-  """
-  params = [inst.ToDict(), old_name, osdev, swapdev]
-  c = Client("instance_run_rename", params)
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+    This method checks if all bridges given in the bridges_list are
+    present on the remote node, so that an instance that uses interfaces
+    on those bridges can be started.
 
 
+    This is a single-node call.
 
 
-def call_instance_info(node, instance):
-  """Returns information about a single instance.
+    """
+    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
 
 
-  This is a single-node call.
+  def call_instance_start(self, node, instance, hvp, bep):
+    """Starts an instance.
 
 
-  """
-  c = Client("instance_info", [instance])
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+    This is a single-node call.
 
 
+    """
+    idict = self._InstDict(instance, hvp=hvp, bep=bep)
+    return self._SingleNodeCall(node, "instance_start", [idict])
 
 
-def call_all_instances_info(node_list):
-  """Returns information about all instances on a given node.
+  def call_instance_shutdown(self, node, instance):
+    """Stops an instance.
 
 
-  This is a single-node call.
+    This is a single-node call.
 
 
-  """
-  c = Client("all_instances_info", [])
-  c.connect_list(node_list)
-  c.run()
-  return c.getresult()
+    """
+    return self._SingleNodeCall(node, "instance_shutdown",
+                                [self._InstDict(instance)])
 
 
+  def call_migration_info(self, node, instance):
+    """Gather the information necessary to prepare an instance migration.
 
 
-def call_instance_list(node_list):
-  """Returns the list of running instances on a given node.
+    This is a single-node call.
 
 
-  This is a single-node call.
+    @type node: string
+    @param node: the node on which the instance is currently running
+    @type instance: C{objects.Instance}
+    @param instance: the instance definition
 
 
-  """
-  c = Client("instance_list", [])
-  c.connect_list(node_list)
-  c.run()
-  return c.getresult()
+    """
+    return self._SingleNodeCall(node, "migration_info",
+                                [self._InstDict(instance)])
 
 
+  def call_accept_instance(self, node, instance, info, target):
+    """Prepare a node to accept an instance.
 
 
-def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
-  """Do a TcpPing on the remote node
+    This is a single-node call.
 
 
-  This is a single-node call.
-  """
-  c = Client("node_tcp_ping", [source, target, port, timeout,
-                               live_port_needed])
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+    @type node: string
+    @param node: the target node for the migration
+    @type instance: C{objects.Instance}
+    @param instance: the instance definition
+    @type info: opaque/hypervisor specific (string/data)
+    @param info: result for the call_migration_info call
+    @type target: string
+    @param target: target hostname (usually ip address) (on the node itself)
 
 
+    """
+    return self._SingleNodeCall(node, "accept_instance",
+                                [self._InstDict(instance), info, target])
 
 
-def call_node_info(node_list, vg_name):
-  """Return node information.
+  def call_finalize_migration(self, node, instance, info, success):
+    """Finalize any target-node migration specific operation.
 
 
-  This will return memory information and volume group size and free
-  space.
+    This is called both in case of a successful migration and in case of error
+    (in which case it should abort the migration).
 
 
-  This is a multi-node call.
+    This is a single-node call.
 
 
-  """
-  c = Client("node_info", [vg_name])
-  c.connect_list(node_list)
-  c.run()
-  retux = c.getresult()
+    @type node: string
+    @param node: the target node for the migration
+    @type instance: C{objects.Instance}
+    @param instance: the instance definition
+    @type info: opaque/hypervisor specific (string/data)
+    @param info: result for the call_migration_info call
+    @type success: boolean
+    @param success: whether the migration was a success or a failure
 
 
-  for node_name in retux:
-    ret = retux.get(node_name, False)
-    if type(ret) != dict:
-      logger.Error("could not connect to node %s" % (node_name))
-      ret = {}
+    """
+    return self._SingleNodeCall(node, "finalize_migration",
+                                [self._InstDict(instance), info, success])
 
 
-    utils.CheckDict(ret,
-                    { 'memory_total' : '-',
-                      'memory_dom0' : '-',
-                      'memory_free' : '-',
-                      'vg_size' : 'node_unreachable',
-                      'vg_free' : '-' },
-                    "call_node_info",
-                    )
-  return retux
+  def call_instance_migrate(self, node, instance, target, live):
+    """Migrate an instance.
 
 
+    This is a single-node call.
 
 
-def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
-  """Add a node to the cluster.
+    @type node: string
+    @param node: the node on which the instance is currently running
+    @type instance: C{objects.Instance}
+    @param instance: the instance definition
+    @type target: string
+    @param target: the target node name
+    @type live: boolean
+    @param live: whether the migration should be done live or not (the
+        interpretation of this parameter is left to the hypervisor)
 
 
-  This is a single-node call.
+    """
+    return self._SingleNodeCall(node, "instance_migrate",
+                                [self._InstDict(instance), target, live])
 
 
-  """
-  params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
-  c = Client("node_add", params)
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+  def call_instance_reboot(self, node, instance, reboot_type):
+    """Reboots an instance.
 
 
+    This is a single-node call.
 
 
-def call_node_verify(node_list, checkdict):
-  """Request verification of given parameters.
+    """
+    return self._SingleNodeCall(node, "instance_reboot",
+                                [self._InstDict(instance), reboot_type])
 
 
-  This is a multi-node call.
+  def call_instance_os_add(self, node, inst, reinstall):
+    """Installs an OS on the given instance.
 
 
-  """
-  c = Client("node_verify", [checkdict])
-  c.connect_list(node_list)
-  c.run()
-  return c.getresult()
+    This is a single-node call.
 
 
+    """
+    return self._SingleNodeCall(node, "instance_os_add",
+                                [self._InstDict(inst), reinstall])
 
 
-def call_node_start_master(node, start_daemons):
-  """Tells a node to activate itself as a master.
+  def call_instance_run_rename(self, node, inst, old_name):
+    """Run the OS rename script for an instance.
 
 
-  This is a single-node call.
+    This is a single-node call.
 
 
-  """
-  c = Client("node_start_master", [start_daemons])
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+    """
+    return self._SingleNodeCall(node, "instance_run_rename",
+                                [self._InstDict(inst), old_name])
 
 
+  def call_instance_info(self, node, instance, hname):
+    """Returns information about a single instance.
 
 
-def call_node_stop_master(node, stop_daemons):
-  """Tells a node to demote itself from master status.
+    This is a single-node call.
 
 
-  This is a single-node call.
+    @type node: list
+    @param node: the list of nodes to query
+    @type instance: string
+    @param instance: the instance name
+    @type hname: string
+    @param hname: the hypervisor type of the instance
 
 
-  """
-  c = Client("node_stop_master", [stop_daemons])
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+    """
+    return self._SingleNodeCall(node, "instance_info", [instance, hname])
 
 
+  def call_instance_migratable(self, node, instance):
+    """Checks whether the given instance can be migrated.
 
 
-def call_version(node_list):
-  """Query node version.
+    This is a single-node call.
 
 
-  This is a multi-node call.
+    @param node: the node to query
+    @type instance: L{objects.Instance}
+    @param instance: the instance to check
 
 
-  """
-  c = Client("version", [])
-  c.connect_list(node_list)
-  c.run()
-  return c.getresult()
 
 
+    """
+    return self._SingleNodeCall(node, "instance_migratable",
+                                [self._InstDict(instance)])
 
 
-def call_blockdev_create(node, bdev, size, owner, on_primary, info):
-  """Request creation of a given block device.
+  def call_all_instances_info(self, node_list, hypervisor_list):
+    """Returns information about all instances on the given nodes.
 
 
-  This is a single-node call.
+    This is a multi-node call.
 
 
-  """
-  params = [bdev.ToDict(), size, owner, on_primary, info]
-  c = Client("blockdev_create", params)
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+    @type node_list: list
+    @param node_list: the list of nodes to query
+    @type hypervisor_list: list
+    @param hypervisor_list: the hypervisors to query for instances
 
 
+    """
+    return self._MultiNodeCall(node_list, "all_instances_info",
+                               [hypervisor_list])
 
 
-def call_blockdev_remove(node, bdev):
-  """Request removal of a given block device.
+  def call_instance_list(self, node_list, hypervisor_list):
+    """Returns the list of running instances on a given node.
 
 
-  This is a single-node call.
+    This is a multi-node call.
 
 
-  """
-  c = Client("blockdev_remove", [bdev.ToDict()])
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+    @type node_list: list
+    @param node_list: the list of nodes to query
+    @type hypervisor_list: list
+    @param hypervisor_list: the hypervisors to query for instances
 
 
+    """
+    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
 
 
-def call_blockdev_rename(node, devlist):
-  """Request rename of the given block devices.
+  def call_node_tcp_ping(self, node, source, target, port, timeout,
+                         live_port_needed):
+    """Do a TcpPing on the remote node
 
 
-  This is a single-node call.
+    This is a single-node call.
 
 
-  """
-  params = [(d.ToDict(), uid) for d, uid in devlist]
-  c = Client("blockdev_rename", params)
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+    """
+    return self._SingleNodeCall(node, "node_tcp_ping",
+                                [source, target, port, timeout,
+                                 live_port_needed])
 
 
+  def call_node_has_ip_address(self, node, address):
+    """Checks if a node has the given IP address.
 
 
-def call_blockdev_assemble(node, disk, owner, on_primary):
-  """Request assembling of a given block device.
+    This is a single-node call.
 
 
-  This is a single-node call.
+    """
+    return self._SingleNodeCall(node, "node_has_ip_address", [address])
 
 
-  """
-  params = [disk.ToDict(), owner, on_primary]
-  c = Client("blockdev_assemble", params)
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+  def call_node_info(self, node_list, vg_name, hypervisor_type):
+    """Return node information.
 
 
+    This will return memory information and volume group size and free
+    space.
 
 
-def call_blockdev_shutdown(node, disk):
-  """Request shutdown of a given block device.
+    This is a multi-node call.
 
 
-  This is a single-node call.
+    @type node_list: list
+    @param node_list: the list of nodes to query
+    @type vg_name: C{string}
+    @param vg_name: the name of the volume group to ask for disk space
+        information
+    @type hypervisor_type: C{str}
+    @param hypervisor_type: the name of the hypervisor to ask for
+        memory information
 
 
-  """
-  c = Client("blockdev_shutdown", [disk.ToDict()])
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+    """
+    return self._MultiNodeCall(node_list, "node_info",
+                               [vg_name, hypervisor_type])
 
 
+  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
+    """Add a node to the cluster.
 
 
-def call_blockdev_addchildren(node, bdev, ndevs):
-  """Request adding a list of children to a (mirroring) device.
+    This is a single-node call.
 
 
-  This is a single-node call.
+    """
+    return self._SingleNodeCall(node, "node_add",
+                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
 
 
-  """
-  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
-  c = Client("blockdev_addchildren", params)
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+  def call_node_verify(self, node_list, checkdict, cluster_name):
+    """Request verification of given parameters.
 
 
+    This is a multi-node call.
 
 
-def call_blockdev_removechildren(node, bdev, ndevs):
-  """Request removing a list of children from a (mirroring) device.
+    """
+    return self._MultiNodeCall(node_list, "node_verify",
+                               [checkdict, cluster_name])
 
 
-  This is a single-node call.
+  @classmethod
+  def call_node_start_master(cls, node, start_daemons, no_voting):
+    """Tells a node to activate itself as a master.
 
 
-  """
-  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
-  c = Client("blockdev_removechildren", params)
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+    This is a single-node call.
 
 
+    """
+    return cls._StaticSingleNodeCall(node, "node_start_master",
+                                     [start_daemons, no_voting])
 
 
-def call_blockdev_getmirrorstatus(node, disks):
-  """Request status of a (mirroring) device.
+  @classmethod
+  def call_node_stop_master(cls, node, stop_daemons):
+    """Tells a node to demote itself from master status.
 
 
-  This is a single-node call.
+    This is a single-node call.
 
 
-  """
-  params = [dsk.ToDict() for dsk in disks]
-  c = Client("blockdev_getmirrorstatus", params)
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+    """
+    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
 
 
+  @classmethod
+  def call_master_info(cls, node_list):
+    """Query master info.
 
 
-def call_blockdev_find(node, disk):
-  """Request identification of a given block device.
+    This is a multi-node call.
 
 
-  This is a single-node call.
+    """
+    # TODO: should this method query down nodes?
+    return cls._StaticMultiNodeCall(node_list, "master_info", [])
 
 
-  """
-  c = Client("blockdev_find", [disk.ToDict()])
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+  def call_version(self, node_list):
+    """Query node version.
 
 
+    This is a multi-node call.
 
 
-def call_blockdev_close(node, disks):
-  """Closes the given block devices.
+    """
+    return self._MultiNodeCall(node_list, "version", [])
 
 
-  This is a single-node call.
+  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
+    """Request creation of a given block device.
 
 
-  """
-  params = [cf.ToDict() for cf in disks]
-  c = Client("blockdev_close", params)
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+    This is a single-node call.
 
 
+    """
+    return self._SingleNodeCall(node, "blockdev_create",
+                                [bdev.ToDict(), size, owner, on_primary, info])
 
 
-def call_upload_file(node_list, file_name):
-  """Upload a file.
+  def call_blockdev_remove(self, node, bdev):
+    """Request removal of a given block device.
 
 
-  The node will refuse the operation in case the file is not on the
-  approved file list.
+    This is a single-node call.
 
 
-  This is a multi-node call.
+    """
+    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
 
 
-  """
-  fh = file(file_name)
-  try:
-    data = fh.read()
-  finally:
-    fh.close()
-  st = os.stat(file_name)
-  params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
-            st.st_atime, st.st_mtime]
-  c = Client("upload_file", params)
-  c.connect_list(node_list)
-  c.run()
-  return c.getresult()
+  def call_blockdev_rename(self, node, devlist):
+    """Request rename of the given block devices.
 
 
+    This is a single-node call.
 
 
-def call_os_diagnose(node_list):
-  """Request a diagnose of OS definitions.
+    """
+    return self._SingleNodeCall(node, "blockdev_rename",
+                                [(d.ToDict(), uid) for d, uid in devlist])
 
 
-  This is a multi-node call.
+  def call_blockdev_assemble(self, node, disk, owner, on_primary):
+    """Request assembling of a given block device.
 
 
-  """
-  c = Client("os_diagnose", [])
-  c.connect_list(node_list)
-  c.run()
-  result = c.getresult()
-  new_result = {}
-  for node_name in result:
-    if result[node_name]:
-      nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
-    else:
-      nr = []
-    new_result[node_name] = nr
-  return new_result
+    This is a single-node call.
+
+    """
+    return self._SingleNodeCall(node, "blockdev_assemble",
+                                [disk.ToDict(), owner, on_primary])
+
+  def call_blockdev_shutdown(self, node, disk):
+    """Request shutdown of a given block device.
 
 
+    This is a single-node call.
 
 
-def call_os_get(node, name):
-  """Returns an OS definition.
+    """
+    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
 
 
-  This is a single-node call.
+  def call_blockdev_addchildren(self, node, bdev, ndevs):
+    """Request adding a list of children to a (mirroring) device.
 
 
-  """
-  c = Client("os_get", [name])
-  c.connect(node)
-  c.run()
-  result = c.getresult().get(node, False)
-  if isinstance(result, dict):
-    return objects.OS.FromDict(result)
-  else:
+    This is a single-node call.
+
+    """
+    return self._SingleNodeCall(node, "blockdev_addchildren",
+                                [bdev.ToDict(),
+                                 [disk.ToDict() for disk in ndevs]])
+
+  def call_blockdev_removechildren(self, node, bdev, ndevs):
+    """Request removing a list of children from a (mirroring) device.
+
+    This is a single-node call.
+
+    """
+    return self._SingleNodeCall(node, "blockdev_removechildren",
+                                [bdev.ToDict(),
+                                 [disk.ToDict() for disk in ndevs]])
+
+  def call_blockdev_getmirrorstatus(self, node, disks):
+    """Request status of a (mirroring) device.
+
+    This is a single-node call.
+
+    """
+    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
+                                  [dsk.ToDict() for dsk in disks])
+    if not result.fail_msg:
+      result.payload = [objects.BlockDevStatus.FromDict(i)
+                        for i in result.payload]
     return result
 
     return result
 
+  def call_blockdev_find(self, node, disk):
+    """Request identification of a given block device.
 
 
-def call_hooks_runner(node_list, hpath, phase, env):
-  """Call the hooks runner.
+    This is a single-node call.
 
 
-  Args:
-    - op: the OpCode instance
-    - env: a dictionary with the environment
+    """
+    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
+    if not result.fail_msg and result.payload is not None:
+      result.payload = objects.BlockDevStatus.FromDict(result.payload)
+    return result
 
 
-  This is a multi-node call.
+  def call_blockdev_close(self, node, instance_name, disks):
+    """Closes the given block devices.
 
 
-  """
-  params = [hpath, phase, env]
-  c = Client("hooks_runner", params)
-  c.connect_list(node_list)
-  c.run()
-  result = c.getresult()
-  return result
+    This is a single-node call.
 
 
+    """
+    params = [instance_name, [cf.ToDict() for cf in disks]]
+    return self._SingleNodeCall(node, "blockdev_close", params)
 
 
-def call_iallocator_runner(node, name, idata):
-  """Call an iallocator on a remote node
+  def call_blockdev_getsizes(self, node, disks):
+    """Returns the size of the given disks.
 
 
-  Args:
-    - name: the iallocator name
-    - input: the json-encoded input string
+    This is a single-node call.
 
 
-  This is a single-node call.
+    """
+    params = [[cf.ToDict() for cf in disks]]
+    return self._SingleNodeCall(node, "blockdev_getsize", params)
 
 
-  """
-  params = [name, idata]
-  c = Client("iallocator_runner", params)
-  c.connect(node)
-  c.run()
-  result = c.getresult().get(node, False)
-  return result
+  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
+    """Disconnects the network of the given drbd devices.
+
+    This is a multi-node call.
 
 
+    """
+    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
+                               [nodes_ip, [cf.ToDict() for cf in disks]])
 
 
-def call_blockdev_grow(node, cf_bdev, amount):
-  """Request a snapshot of the given block device.
+  def call_drbd_attach_net(self, node_list, nodes_ip,
+                           disks, instance_name, multimaster):
+    """Disconnects the given drbd devices.
 
 
-  This is a single-node call.
+    This is a multi-node call.
 
 
-  """
-  c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+    """
+    return self._MultiNodeCall(node_list, "drbd_attach_net",
+                               [nodes_ip, [cf.ToDict() for cf in disks],
+                                instance_name, multimaster])
 
 
+  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
+    """Waits for the synchronization of drbd devices is complete.
 
 
-def call_blockdev_snapshot(node, cf_bdev):
-  """Request a snapshot of the given block device.
+    This is a multi-node call.
 
 
-  This is a single-node call.
+    """
+    return self._MultiNodeCall(node_list, "drbd_wait_sync",
+                               [nodes_ip, [cf.ToDict() for cf in disks]])
 
 
-  """
-  c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+  @classmethod
+  def call_upload_file(cls, node_list, file_name, address_list=None):
+    """Upload a file.
 
 
+    The node will refuse the operation in case the file is not on the
+    approved file list.
 
 
-def call_snapshot_export(node, snap_bdev, dest_node, instance):
-  """Request the export of a given snapshot.
+    This is a multi-node call.
 
 
-  This is a single-node call.
+    @type node_list: list
+    @param node_list: the list of node names to upload to
+    @type file_name: str
+    @param file_name: the filename to upload
+    @type address_list: list or None
+    @keyword address_list: an optional list of node addresses, in order
+        to optimize the RPC speed
 
 
-  """
-  params = [snap_bdev.ToDict(), dest_node, instance.ToDict()]
-  c = Client("snapshot_export", params)
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+    """
+    file_contents = utils.ReadFile(file_name)
+    data = cls._Compress(file_contents)
+    st = os.stat(file_name)
+    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
+              st.st_atime, st.st_mtime]
+    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
+                                    address_list=address_list)
 
 
+  @classmethod
+  def call_write_ssconf_files(cls, node_list, values):
+    """Write ssconf files.
 
 
-def call_finalize_export(node, instance, snap_disks):
-  """Request the completion of an export operation.
+    This is a multi-node call.
 
 
-  This writes the export config file, etc.
+    """
+    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
 
 
-  This is a single-node call.
+  def call_os_diagnose(self, node_list):
+    """Request a diagnose of OS definitions.
 
 
-  """
-  flat_disks = []
-  for disk in snap_disks:
-    flat_disks.append(disk.ToDict())
-  params = [instance.ToDict(), flat_disks]
-  c = Client("finalize_export", params)
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+    This is a multi-node call.
 
 
+    """
+    return self._MultiNodeCall(node_list, "os_diagnose", [])
 
 
-def call_export_info(node, path):
-  """Queries the export information in a given path.
+  def call_os_get(self, node, name):
+    """Returns an OS definition.
 
 
-  This is a single-node call.
+    This is a single-node call.
 
 
-  """
-  c = Client("export_info", [path])
-  c.connect(node)
-  c.run()
-  result = c.getresult().get(node, False)
-  if not result:
+    """
+    result = self._SingleNodeCall(node, "os_get", [name])
+    if not result.fail_msg and isinstance(result.data, dict):
+      result.data = objects.OS.FromDict(result.data)
     return result
     return result
-  return objects.SerializableConfigParser.Loads(str(result))
 
 
+  def call_hooks_runner(self, node_list, hpath, phase, env):
+    """Call the hooks runner.
 
 
-def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
-  """Request the import of a backup into an instance.
+    Args:
+      - op: the OpCode instance
+      - env: a dictionary with the environment
 
 
-  This is a single-node call.
+    This is a multi-node call.
 
 
-  """
-  params = [inst.ToDict(), osdev, swapdev, src_node, src_image]
-  c = Client("instance_os_import", params)
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+    """
+    params = [hpath, phase, env]
+    return self._MultiNodeCall(node_list, "hooks_runner", params)
 
 
+  def call_iallocator_runner(self, node, name, idata):
+    """Call an iallocator on a remote node
 
 
-def call_export_list(node_list):
-  """Gets the stored exports list.
+    Args:
+      - name: the iallocator name
+      - input: the json-encoded input string
 
 
-  This is a multi-node call.
+    This is a single-node call.
 
 
-  """
-  c = Client("export_list", [])
-  c.connect_list(node_list)
-  c.run()
-  result = c.getresult()
-  return result
+    """
+    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
 
 
+  def call_blockdev_grow(self, node, cf_bdev, amount):
+    """Request a snapshot of the given block device.
 
 
-def call_export_remove(node, export):
-  """Requests removal of a given export.
+    This is a single-node call.
 
 
-  This is a single-node call.
+    """
+    return self._SingleNodeCall(node, "blockdev_grow",
+                                [cf_bdev.ToDict(), amount])
 
 
-  """
-  c = Client("export_remove", [export])
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+  def call_blockdev_export(self, node, cf_bdev,
+                           dest_node, dest_path, cluster_name):
+    """Export a given disk to another node.
 
 
+    This is a single-node call.
 
 
-def call_node_leave_cluster(node):
-  """Requests a node to clean the cluster information it has.
+    """
+    return self._SingleNodeCall(node, "blockdev_export",
+                                [cf_bdev.ToDict(), dest_node, dest_path,
+                                 cluster_name])
 
 
-  This will remove the configuration information from the ganeti data
-  dir.
+  def call_blockdev_snapshot(self, node, cf_bdev):
+    """Request a snapshot of the given block device.
 
 
-  This is a single-node call.
+    This is a single-node call.
 
 
-  """
-  c = Client("node_leave_cluster", [])
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+    """
+    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
 
 
+  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
+                           cluster_name, idx):
+    """Request the export of a given snapshot.
 
 
-def call_node_volumes(node_list):
-  """Gets all volumes on node(s).
+    This is a single-node call.
 
 
-  This is a multi-node call.
+    """
+    return self._SingleNodeCall(node, "snapshot_export",
+                                [snap_bdev.ToDict(), dest_node,
+                                 self._InstDict(instance), cluster_name, idx])
 
 
-  """
-  c = Client("node_volumes", [])
-  c.connect_list(node_list)
-  c.run()
-  return c.getresult()
+  def call_finalize_export(self, node, instance, snap_disks):
+    """Request the completion of an export operation.
 
 
+    This writes the export config file, etc.
 
 
-def call_test_delay(node_list, duration):
-  """Sleep for a fixed time on given node(s).
+    This is a single-node call.
 
 
-  This is a multi-node call.
+    """
+    flat_disks = []
+    for disk in snap_disks:
+      if isinstance(disk, bool):
+        flat_disks.append(disk)
+      else:
+        flat_disks.append(disk.ToDict())
 
 
-  """
-  c = Client("test_delay", [duration])
-  c.connect_list(node_list)
-  c.run()
-  return c.getresult()
+    return self._SingleNodeCall(node, "finalize_export",
+                                [self._InstDict(instance), flat_disks])
 
 
+  def call_export_info(self, node, path):
+    """Queries the export information in a given path.
 
 
-def call_file_storage_dir_create(node, file_storage_dir):
-  """Create the given file storage directory.
+    This is a single-node call.
 
 
-  This is a single-node call.
+    """
+    return self._SingleNodeCall(node, "export_info", [path])
 
 
-  """
-  c = Client("file_storage_dir_create", [file_storage_dir])
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+  def call_instance_os_import(self, node, inst, src_node, src_images,
+                              cluster_name):
+    """Request the import of a backup into an instance.
 
 
+    This is a single-node call.
 
 
-def call_file_storage_dir_remove(node, file_storage_dir):
-  """Remove the given file storage directory.
+    """
+    return self._SingleNodeCall(node, "instance_os_import",
+                                [self._InstDict(inst), src_node, src_images,
+                                 cluster_name])
 
 
-  This is a single-node call.
+  def call_export_list(self, node_list):
+    """Gets the stored exports list.
 
 
-  """
-  c = Client("file_storage_dir_remove", [file_storage_dir])
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+    This is a multi-node call.
 
 
+    """
+    return self._MultiNodeCall(node_list, "export_list", [])
 
 
-def call_file_storage_dir_rename(node, old_file_storage_dir,
-                                 new_file_storage_dir):
-  """Rename file storage directory.
+  def call_export_remove(self, node, export):
+    """Requests removal of a given export.
 
 
-  This is a single-node call.
+    This is a single-node call.
 
 
-  """
-  c = Client("file_storage_dir_rename",
-             [old_file_storage_dir, new_file_storage_dir])
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+    """
+    return self._SingleNodeCall(node, "export_remove", [export])
 
 
+  @classmethod
+  def call_node_leave_cluster(cls, node):
+    """Requests a node to clean the cluster information it has.
 
 
-def call_jobqueue_update(node_list, file_name, content):
-  """Update job queue.
+    This will remove the configuration information from the ganeti data
+    dir.
 
 
-  This is a multi-node call.
+    This is a single-node call.
 
 
-  """
-  c = Client("jobqueue_update", [file_name, content])
-  c.connect_list(node_list)
-  c.run()
-  result = c.getresult()
-  return result
+    """
+    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
+
+  def call_node_volumes(self, node_list):
+    """Gets all volumes on node(s).
 
 
+    This is a multi-node call.
 
 
-def call_jobqueue_purge(node):
-  """Purge job queue.
+    """
+    return self._MultiNodeCall(node_list, "node_volumes", [])
 
 
-  This is a single-node call.
+  def call_node_demote_from_mc(self, node):
+    """Demote a node from the master candidate role.
 
 
-  """
-  c = Client("jobqueue_purge", [])
-  c.connect(node)
-  c.run()
-  return c.getresult().get(node, False)
+    This is a single-node call.
+
+    """
+    return self._SingleNodeCall(node, "node_demote_from_mc", [])
+
+
+  def call_node_powercycle(self, node, hypervisor):
+    """Tries to powercycle a node.
+
+    This is a single-node call.
+
+    """
+    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
+
+
+  def call_test_delay(self, node_list, duration):
+    """Sleep for a fixed time on given node(s).
+
+    This is a multi-node call.
+
+    """
+    return self._MultiNodeCall(node_list, "test_delay", [duration])
+
+  def call_file_storage_dir_create(self, node, file_storage_dir):
+    """Create the given file storage directory.
+
+    This is a single-node call.
+
+    """
+    return self._SingleNodeCall(node, "file_storage_dir_create",
+                                [file_storage_dir])
+
+  def call_file_storage_dir_remove(self, node, file_storage_dir):
+    """Remove the given file storage directory.
+
+    This is a single-node call.
+
+    """
+    return self._SingleNodeCall(node, "file_storage_dir_remove",
+                                [file_storage_dir])
+
+  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
+                                   new_file_storage_dir):
+    """Rename file storage directory.
+
+    This is a single-node call.
+
+    """
+    return self._SingleNodeCall(node, "file_storage_dir_rename",
+                                [old_file_storage_dir, new_file_storage_dir])
+
+  @classmethod
+  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
+    """Update job queue.
+
+    This is a multi-node call.
+
+    """
+    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
+                                    [file_name, cls._Compress(content)],
+                                    address_list=address_list)
+
+  @classmethod
+  def call_jobqueue_purge(cls, node):
+    """Purge job queue.
+
+    This is a single-node call.
+
+    """
+    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
+
+  @classmethod
+  def call_jobqueue_rename(cls, node_list, address_list, rename):
+    """Rename a job queue file.
+
+    This is a multi-node call.
+
+    """
+    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
+                                    address_list=address_list)
+
+  @classmethod
+  def call_jobqueue_set_drain(cls, node_list, drain_flag):
+    """Set the drain flag on the queue.
+
+    This is a multi-node call.
+
+    @type node_list: list
+    @param node_list: the list of nodes to query
+    @type drain_flag: bool
+    @param drain_flag: if True, will set the drain flag, otherwise reset it.
+
+    """
+    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
+                                    [drain_flag])
+
+  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
+    """Validate the hypervisor params.
+
+    This is a multi-node call.
+
+    @type node_list: list
+    @param node_list: the list of nodes to query
+    @type hvname: string
+    @param hvname: the hypervisor name
+    @type hvparams: dict
+    @param hvparams: the hypervisor parameters to be validated
+
+    """
+    cluster = self._cfg.GetClusterInfo()
+    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
+    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
+                               [hvname, hv_full])