import os
import socket
import logging
+import zlib
+import base64
from ganeti import utils
from ganeti import objects
calls we can't raise an exception just because one one out of many
failed, and therefore we use this class to encapsulate the result.
+ @ivar data: the data payload, for successfull results, or None
+ @type failed: boolean
+ @ivar failed: whether the operation failed at RPC level (not
+ application level on the remote node)
+ @ivar call: the name of the RPC call
+ @ivar node: the name of the node to which we made the call
+ @ivar offline: whether the operation failed because the node was
+ offline, as opposed to actual failure; offline=True will always
+ imply failed=True, in order to allow simpler checking if
+ the user doesn't care about the exact failure mode
+
"""
- def __init__(self, data, failed=False, call=None, node=None):
+ def __init__(self, data=None, failed=False, offline=False,
+ call=None, node=None):
self.failed = failed
- self.call = None
- self.node = None
- if failed:
+ self.offline = offline
+ self.call = call
+ self.node = node
+ if offline:
+ self.failed = True
+ self.error = "Node is marked offline"
+ self.data = self.payload = None
+ elif failed:
self.error = data
- self.data = None
+ self.data = self.payload = None
else:
self.data = data
self.error = None
+ if isinstance(data, (tuple, list)) and len(data) == 2:
+ self.payload = data[1]
+ else:
+ self.payload = None
def Raise(self):
"""If the result has failed, raise an OpExecError.
raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
(self.call, self.node, self.error))
+ def RemoteFailMsg(self):
+ """Check if the remote procedure failed.
+
+ This is valid only for RPC calls which return result of the form
+ (status, data | error_msg).
+
+ @return: empty string for succcess, otherwise an error message
+
+ """
+ def _EnsureErr(val):
+ """Helper to ensure we return a 'True' value for error."""
+ if val:
+ return val
+ else:
+ return "No error information"
+
+ if self.failed:
+ return _EnsureErr(self.error)
+ if not isinstance(self.data, (tuple, list)):
+ return "Invalid result type (%s)" % type(self.data)
+ if len(self.data) != 2:
+ return "Invalid result length (%d), expected 2" % len(self.data)
+ if not self.data[0]:
+ return _EnsureErr(self.data[1])
+ return ""
+
class Client:
"""RPC Client class.
"""Call nodes and return results.
@rtype: list
- @returns: List of RPC results
+ @return: List of RPC results
"""
assert _http_manager, "RPC module not intialized"
else:
msg = req.resp_body
- logging.error("RPC error from node %s: %s", name, msg)
+ logging.error("RPC error in %s from node %s: %s",
+ self.procedure, name, msg)
results[name] = RpcResult(data=msg, failed=True, node=name,
call=self.procedure)
self._cfg = cfg
self.port = utils.GetNodeDaemonPort()
- def _InstDict(self, instance):
+ def _InstDict(self, instance, hvp=None, bep=None):
"""Convert the given instance to a dict.
This is done via the instance's ToDict() method and additionally
@type instance: L{objects.Instance}
@param instance: an Instance object
+ @type hvp: dict or None
+ @param hvp: a dictionary with overriden hypervisor parameters
+ @type bep: dict or None
+ @param bep: a dictionary with overriden backend parameters
@rtype: dict
@return: the instance dict, with the hvparams filled with the
cluster defaults
idict = instance.ToDict()
cluster = self._cfg.GetClusterInfo()
idict["hvparams"] = cluster.FillHV(instance)
+ if hvp is not None:
+ idict["hvparams"].update(hvp)
idict["beparams"] = cluster.FillBE(instance)
+ if bep is not None:
+ idict["beparams"].update(bep)
return idict
- def _ConnectList(self, client, node_list):
+ def _ConnectList(self, client, node_list, call):
"""Helper for computing node addresses.
@type client: L{Client}
@param client: a C{Client} instance
@type node_list: list
@param node_list: the node list we should connect
+ @type call: string
+ @param call: the name of the remote procedure call, for filling in
+ correctly any eventual offline nodes' results
"""
all_nodes = self._cfg.GetAllNodesInfo()
+ name_list = []
addr_list = []
+ skip_dict = {}
for node in node_list:
if node in all_nodes:
+ if all_nodes[node].offline:
+ skip_dict[node] = RpcResult(node=node, offline=True, call=call)
+ continue
val = all_nodes[node].primary_ip
else:
val = None
addr_list.append(val)
- client.ConnectList(node_list, address_list=addr_list)
+ name_list.append(node)
+ if name_list:
+ client.ConnectList(name_list, address_list=addr_list)
+ return skip_dict
- def _ConnectNode(self, client, node):
+ def _ConnectNode(self, client, node, call):
"""Helper for computing one node's address.
@type client: L{Client}
@param client: a C{Client} instance
@type node: str
@param node: the node we should connect
+ @type call: string
+ @param call: the name of the remote procedure call, for filling in
+ correctly any eventual offline nodes' results
"""
node_info = self._cfg.GetNodeInfo(node)
if node_info is not None:
+ if node_info.offline:
+ return RpcResult(node=node, offline=True, call=call)
addr = node_info.primary_ip
else:
addr = None
client.ConnectNode(node, address=addr)
- def _MultiNodeCall(self, node_list, procedure, args,
- address_list=None):
+ def _MultiNodeCall(self, node_list, procedure, args):
"""Helper for making a multi-node call
"""
body = serializer.DumpJson(args, indent=False)
c = Client(procedure, body, self.port)
- if address_list is None:
- self._ConnectList(c, node_list)
- else:
- c.ConnectList(node_list, address_list=address_list)
- return c.GetResults()
+ skip_dict = self._ConnectList(c, node_list, procedure)
+ skip_dict.update(c.GetResults())
+ return skip_dict
@classmethod
def _StaticMultiNodeCall(cls, node_list, procedure, args,
"""
body = serializer.DumpJson(args, indent=False)
c = Client(procedure, body, self.port)
- self._ConnectNode(c, node)
- return c.GetResults().get(node, False)
+ result = self._ConnectNode(c, node, procedure)
+ if result is None:
+ # we did connect, node is not offline
+ result = c.GetResults()[node]
+ return result
@classmethod
def _StaticSingleNodeCall(cls, node, procedure, args):
body = serializer.DumpJson(args, indent=False)
c = Client(procedure, body, utils.GetNodeDaemonPort())
c.ConnectNode(node)
- return c.GetResults().get(node, False)
+ return c.GetResults()[node]
+
+ @staticmethod
+ def _Compress(data):
+ """Compresses a string for transport over RPC.
+
+ Small amounts of data are not compressed.
+
+ @type data: str
+ @param data: Data
+ @rtype: tuple
+ @return: Encoded data to send
+
+ """
+ # Small amounts of data are not compressed
+ if len(data) < 512:
+ return (constants.RPC_ENCODING_NONE, data)
+
+ # Compress with zlib and encode in base64
+ return (constants.RPC_ENCODING_ZLIB_BASE64,
+ base64.b64encode(zlib.compress(data, 3)))
#
# Begin RPC calls
"""
return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
- def call_instance_start(self, node, instance, extra_args):
+ def call_instance_start(self, node, instance, hvp, bep):
"""Starts an instance.
This is a single-node call.
"""
- return self._SingleNodeCall(node, "instance_start",
- [self._InstDict(instance), extra_args])
+ idict = self._InstDict(instance, hvp=hvp, bep=bep)
+ return self._SingleNodeCall(node, "instance_start", [idict])
def call_instance_shutdown(self, node, instance):
"""Stops an instance.
return self._SingleNodeCall(node, "instance_shutdown",
[self._InstDict(instance)])
+ def call_migration_info(self, node, instance):
+ """Gather the information necessary to prepare an instance migration.
+
+ This is a single-node call.
+
+ @type node: string
+ @param node: the node on which the instance is currently running
+ @type instance: C{objects.Instance}
+ @param instance: the instance definition
+
+ """
+ return self._SingleNodeCall(node, "migration_info",
+ [self._InstDict(instance)])
+
+ def call_accept_instance(self, node, instance, info, target):
+ """Prepare a node to accept an instance.
+
+ This is a single-node call.
+
+ @type node: string
+ @param node: the target node for the migration
+ @type instance: C{objects.Instance}
+ @param instance: the instance definition
+ @type info: opaque/hypervisor specific (string/data)
+ @param info: result for the call_migration_info call
+ @type target: string
+ @param target: target hostname (usually ip address) (on the node itself)
+
+ """
+ return self._SingleNodeCall(node, "accept_instance",
+ [self._InstDict(instance), info, target])
+
+ def call_finalize_migration(self, node, instance, info, success):
+ """Finalize any target-node migration specific operation.
+
+ This is called both in case of a successful migration and in case of error
+ (in which case it should abort the migration).
+
+ This is a single-node call.
+
+ @type node: string
+ @param node: the target node for the migration
+ @type instance: C{objects.Instance}
+ @param instance: the instance definition
+ @type info: opaque/hypervisor specific (string/data)
+ @param info: result for the call_migration_info call
+ @type success: boolean
+ @param success: whether the migration was a success or a failure
+
+ """
+ return self._SingleNodeCall(node, "finalize_migration",
+ [self._InstDict(instance), info, success])
+
def call_instance_migrate(self, node, instance, target, live):
"""Migrate an instance.
return self._SingleNodeCall(node, "instance_migrate",
[self._InstDict(instance), target, live])
- def call_instance_reboot(self, node, instance, reboot_type, extra_args):
+ def call_instance_reboot(self, node, instance, reboot_type):
"""Reboots an instance.
This is a single-node call.
"""
return self._SingleNodeCall(node, "instance_reboot",
- [self._InstDict(instance), reboot_type,
- extra_args])
+ [self._InstDict(instance), reboot_type])
- def call_instance_os_add(self, node, inst):
+ def call_instance_os_add(self, node, inst, reinstall):
"""Installs an OS on the given instance.
This is a single-node call.
"""
return self._SingleNodeCall(node, "instance_os_add",
- [self._InstDict(inst)])
+ [self._InstDict(inst), reinstall])
def call_instance_run_rename(self, node, inst, old_name):
"""Run the OS rename script for an instance.
"""
return self._SingleNodeCall(node, "instance_info", [instance, hname])
+ def call_instance_migratable(self, node, instance):
+ """Checks whether the given instance can be migrated.
+
+ This is a single-node call.
+
+ @param node: the node to query
+ @type instance: L{objects.Instance}
+ @param instance: the instance to check
+
+
+ """
+ return self._SingleNodeCall(node, "instance_migratable",
+ [self._InstDict(instance)])
+
def call_all_instances_info(self, node_list, hypervisor_list):
"""Returns information about all instances on the given nodes.
@type node_list: list
@param node_list: the list of nodes to query
- @type vgname: C{string}
- @param vgname: the name of the volume group to ask for disk space
+ @type vg_name: C{string}
+ @param vg_name: the name of the volume group to ask for disk space
information
@type hypervisor_type: C{str}
@param hypervisor_type: the name of the hypervisor to ask for
for result in retux.itervalues():
if result.failed or not isinstance(result.data, dict):
result.data = {}
+ if result.offline:
+ log_name = None
+ else:
+ log_name = "call_node_info"
utils.CheckDict(result.data, {
'memory_total' : '-',
'memory_free' : '-',
'vg_size' : 'node_unreachable',
'vg_free' : '-',
- }, "call_node_info")
+ }, log_name)
return retux
def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
"""
return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
- def call_blockdev_close(self, node, disks):
+ def call_blockdev_close(self, node, instance_name, disks):
"""Closes the given block devices.
This is a single-node call.
"""
- return self._SingleNodeCall(node, "blockdev_close",
- [cf.ToDict() for cf in disks])
+ params = [instance_name, [cf.ToDict() for cf in disks]]
+ return self._SingleNodeCall(node, "blockdev_close", params)
+
+ def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
+ """Disconnects the network of the given drbd devices.
+
+ This is a multi-node call.
+
+ """
+ return self._MultiNodeCall(node_list, "drbd_disconnect_net",
+ [nodes_ip, [cf.ToDict() for cf in disks]])
+
+ def call_drbd_attach_net(self, node_list, nodes_ip,
+ disks, instance_name, multimaster):
+ """Disconnects the given drbd devices.
+
+ This is a multi-node call.
+
+ """
+ return self._MultiNodeCall(node_list, "drbd_attach_net",
+ [nodes_ip, [cf.ToDict() for cf in disks],
+ instance_name, multimaster])
+
+ def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
+ """Waits for the synchronization of drbd devices is complete.
+
+ This is a multi-node call.
+
+ """
+ return self._MultiNodeCall(node_list, "drbd_wait_sync",
+ [nodes_ip, [cf.ToDict() for cf in disks]])
@classmethod
def call_upload_file(cls, node_list, file_name, address_list=None):
to optimize the RPC speed
"""
- data = utils.ReadFile(file_name)
+ file_contents = utils.ReadFile(file_name)
+ data = cls._Compress(file_contents)
st = os.stat(file_name)
params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
st.st_atime, st.st_mtime]
"""
result = self._MultiNodeCall(node_list, "os_diagnose", [])
- for node_name, node_result in result.iteritems():
+ for node_result in result.values():
if not node_result.failed and node_result.data:
node_result.data = [objects.OS.FromDict(oss)
for oss in node_result.data]
"""
return self._SingleNodeCall(node, "node_demote_from_mc", [])
+
+ def call_node_powercycle(self, node, hypervisor):
+ """Tries to powercycle a node.
+
+ This is a single-node call.
+
+ """
+ return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
+
+
def call_test_delay(self, node_list, duration):
"""Sleep for a fixed time on given node(s).
"""
return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
- [file_name, content],
+ [file_name, cls._Compress(content)],
address_list=address_list)
@classmethod
return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
@classmethod
- def call_jobqueue_rename(cls, node_list, address_list, old, new):
+ def call_jobqueue_rename(cls, node_list, address_list, rename):
"""Rename a job queue file.
This is a multi-node call.
"""
- return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", [old, new],
+ return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
address_list=address_list)
@classmethod