# R0904: Too many public methods
import os
-import socket
import logging
import zlib
import base64
calls we can't raise an exception just because one one out of many
failed, and therefore we use this class to encapsulate the result.
- @ivar data: the data payload, for successfull results, or None
- @type failed: boolean
- @ivar failed: whether the operation failed at RPC level (not
- application level on the remote node)
+ @ivar data: the data payload, for successful results, or None
@ivar call: the name of the RPC call
@ivar node: the name of the node to which we made the call
@ivar offline: whether the operation failed because the node was
offline, as opposed to actual failure; offline=True will always
imply failed=True, in order to allow simpler checking if
the user doesn't care about the exact failure mode
+ @ivar fail_msg: the error message if the call failed
"""
def __init__(self, data=None, failed=False, offline=False,
call=None, node=None):
- self.failed = failed
self.offline = offline
self.call = call
self.node = node
if offline:
- self.failed = True
- self.error = "Node is marked offline"
- self.data = None
+ self.fail_msg = "Node is marked offline"
+ self.data = self.payload = None
elif failed:
- self.error = data
- self.data = None
+ self.fail_msg = self._EnsureErr(data)
+ self.data = self.payload = None
else:
self.data = data
- self.error = None
+ if not isinstance(self.data, (tuple, list)):
+ self.fail_msg = ("RPC layer error: invalid result type (%s)" %
+ type(self.data))
+ elif len(data) != 2:
+ self.fail_msg = ("RPC layer error: invalid result length (%d), "
+ "expected 2" % len(self.data))
+ elif not self.data[0]:
+ self.fail_msg = self._EnsureErr(self.data[1])
+ else:
+ # finally success
+ self.fail_msg = None
+ self.payload = data[1]
+
+ @staticmethod
+ def _EnsureErr(val):
+ """Helper to ensure we return a 'True' value for error."""
+ if val:
+ return val
+ else:
+ return "No error information"
- def Raise(self):
+ def Raise(self, msg, prereq=False):
"""If the result has failed, raise an OpExecError.
This is used so that LU code doesn't have to check for each
result, but instead can call this function.
"""
- if self.failed:
- raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
- (self.call, self.node, self.error))
-
- def RemoteFailMsg(self):
- """Check if the remote procedure failed.
-
- This is valid only for RPC calls which return result of the form
- (status, data | error_msg).
+ if not self.fail_msg:
+ return
- @return: empty string for succcess, otherwise an error message
-
- """
- def _EnsureErr(val):
- """Helper to ensure we return a 'True' value for error."""
- if val:
- return val
- else:
- return "No error information"
-
- if self.failed:
- return _EnsureErr(self.error)
- if not isinstance(self.data, (tuple, list)):
- return "Invalid result type (%s)" % type(self.data)
- if len(self.data) != 2:
- return "Invalid result length (%d), expected 2" % len(self.data)
- if not self.data[0]:
- return _EnsureErr(self.data[1])
- return ""
+ if not msg: # one could pass None for default message
+ msg = ("Call '%s' to node '%s' has failed: %s" %
+ (self.call, self.node, self.fail_msg))
+ else:
+ msg = "%s: %s" % (msg, self.fail_msg)
+ if prereq:
+ ec = errors.OpPrereqError
+ else:
+ ec = errors.OpExecError
+ raise ec(msg)
class Client:
list of nodes, will contact (in parallel) all nodes, and return a
dict of results (key: node name, value: result).
- One current bug is that generic failure is still signalled by
+ One current bug is that generic failure is still signaled by
'False' result, which is not good. This overloading of values can
cause bugs.
"""Call nodes and return results.
@rtype: list
- @returns: List of RPC results
+ @return: List of RPC results
"""
- assert _http_manager, "RPC module not intialized"
+ assert _http_manager, "RPC module not initialized"
_http_manager.ExecRequests(self.nc.values())
else:
msg = req.resp_body
- logging.error("RPC error from node %s: %s", name, msg)
+ logging.error("RPC error in %s from node %s: %s",
+ self.procedure, name, msg)
results[name] = RpcResult(data=msg, failed=True, node=name,
call=self.procedure)
"""
self._cfg = cfg
- self.port = utils.GetNodeDaemonPort()
+ self.port = utils.GetDaemonPort(constants.NODED)
- def _InstDict(self, instance):
+ def _InstDict(self, instance, hvp=None, bep=None):
"""Convert the given instance to a dict.
This is done via the instance's ToDict() method and additionally
@type instance: L{objects.Instance}
@param instance: an Instance object
+ @type hvp: dict or None
+ @param hvp: a dictionary with overridden hypervisor parameters
+ @type bep: dict or None
+ @param bep: a dictionary with overridden backend parameters
@rtype: dict
@return: the instance dict, with the hvparams filled with the
cluster defaults
idict = instance.ToDict()
cluster = self._cfg.GetClusterInfo()
idict["hvparams"] = cluster.FillHV(instance)
+ if hvp is not None:
+ idict["hvparams"].update(hvp)
idict["beparams"] = cluster.FillBE(instance)
+ if bep is not None:
+ idict["beparams"].update(bep)
+ for nic in idict["nics"]:
+ nic['nicparams'] = objects.FillDict(
+ cluster.nicparams[constants.PP_DEFAULT],
+ nic['nicparams'])
return idict
- def _ConnectList(self, client, node_list):
+ def _ConnectList(self, client, node_list, call):
"""Helper for computing node addresses.
- @type client: L{Client}
+ @type client: L{ganeti.rpc.Client}
@param client: a C{Client} instance
@type node_list: list
@param node_list: the node list we should connect
+ @type call: string
+ @param call: the name of the remote procedure call, for filling in
+ correctly any eventual offline nodes' results
"""
all_nodes = self._cfg.GetAllNodesInfo()
for node in node_list:
if node in all_nodes:
if all_nodes[node].offline:
- skip_dict[node] = RpcResult(node=node, offline=True)
+ skip_dict[node] = RpcResult(node=node, offline=True, call=call)
continue
val = all_nodes[node].primary_ip
else:
client.ConnectList(name_list, address_list=addr_list)
return skip_dict
- def _ConnectNode(self, client, node):
+ def _ConnectNode(self, client, node, call):
"""Helper for computing one node's address.
- @type client: L{Client}
+ @type client: L{ganeti.rpc.Client}
@param client: a C{Client} instance
@type node: str
@param node: the node we should connect
+ @type call: string
+ @param call: the name of the remote procedure call, for filling in
+ correctly any eventual offline nodes' results
"""
node_info = self._cfg.GetNodeInfo(node)
if node_info is not None:
if node_info.offline:
- return RpcResult(node=node, offline=True)
+ return RpcResult(node=node, offline=True, call=call)
addr = node_info.primary_ip
else:
addr = None
"""
body = serializer.DumpJson(args, indent=False)
c = Client(procedure, body, self.port)
- skip_dict = self._ConnectList(c, node_list)
+ skip_dict = self._ConnectList(c, node_list, procedure)
skip_dict.update(c.GetResults())
return skip_dict
"""
body = serializer.DumpJson(args, indent=False)
- c = Client(procedure, body, utils.GetNodeDaemonPort())
+ c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
c.ConnectList(node_list, address_list=address_list)
return c.GetResults()
"""
body = serializer.DumpJson(args, indent=False)
c = Client(procedure, body, self.port)
- result = self._ConnectNode(c, node)
+ result = self._ConnectNode(c, node, procedure)
if result is None:
# we did connect, node is not offline
result = c.GetResults()[node]
"""
body = serializer.DumpJson(args, indent=False)
- c = Client(procedure, body, utils.GetNodeDaemonPort())
+ c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
c.ConnectNode(node)
return c.GetResults()[node]
# Begin RPC calls
#
- def call_volume_list(self, node_list, vg_name):
+ def call_lv_list(self, node_list, vg_name):
"""Gets the logical volumes present in a given volume group.
This is a multi-node call.
"""
- return self._MultiNodeCall(node_list, "volume_list", [vg_name])
+ return self._MultiNodeCall(node_list, "lv_list", [vg_name])
def call_vg_list(self, node_list):
"""Gets the volume group list.
"""
return self._MultiNodeCall(node_list, "vg_list", [])
+ def call_storage_list(self, node_list, su_name, su_args, name, fields):
+ """Get list of storage units.
+
+ This is a multi-node call.
+
+ """
+ return self._MultiNodeCall(node_list, "storage_list",
+ [su_name, su_args, name, fields])
+
+ def call_storage_modify(self, node, su_name, su_args, name, changes):
+ """Modify a storage unit.
+
+ This is a single-node call.
+
+ """
+ return self._SingleNodeCall(node, "storage_modify",
+ [su_name, su_args, name, changes])
+
+ def call_storage_execute(self, node, su_name, su_args, name, op):
+ """Executes an operation on a storage unit.
+
+ This is a single-node call.
+
+ """
+ return self._SingleNodeCall(node, "storage_execute",
+ [su_name, su_args, name, op])
+
def call_bridges_exist(self, node, bridges_list):
"""Checks if a node has all the bridges given.
"""
return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
- def call_instance_start(self, node, instance, extra_args):
+ def call_instance_start(self, node, instance, hvp, bep):
"""Starts an instance.
This is a single-node call.
"""
- return self._SingleNodeCall(node, "instance_start",
- [self._InstDict(instance), extra_args])
+ idict = self._InstDict(instance, hvp=hvp, bep=bep)
+ return self._SingleNodeCall(node, "instance_start", [idict])
def call_instance_shutdown(self, node, instance):
"""Stops an instance.
return self._SingleNodeCall(node, "instance_shutdown",
[self._InstDict(instance)])
+ def call_migration_info(self, node, instance):
+ """Gather the information necessary to prepare an instance migration.
+
+ This is a single-node call.
+
+ @type node: string
+ @param node: the node on which the instance is currently running
+ @type instance: C{objects.Instance}
+ @param instance: the instance definition
+
+ """
+ return self._SingleNodeCall(node, "migration_info",
+ [self._InstDict(instance)])
+
+ def call_accept_instance(self, node, instance, info, target):
+ """Prepare a node to accept an instance.
+
+ This is a single-node call.
+
+ @type node: string
+ @param node: the target node for the migration
+ @type instance: C{objects.Instance}
+ @param instance: the instance definition
+ @type info: opaque/hypervisor specific (string/data)
+ @param info: result for the call_migration_info call
+ @type target: string
+ @param target: target hostname (usually ip address) (on the node itself)
+
+ """
+ return self._SingleNodeCall(node, "accept_instance",
+ [self._InstDict(instance), info, target])
+
+ def call_finalize_migration(self, node, instance, info, success):
+ """Finalize any target-node migration specific operation.
+
+ This is called both in case of a successful migration and in case of error
+ (in which case it should abort the migration).
+
+ This is a single-node call.
+
+ @type node: string
+ @param node: the target node for the migration
+ @type instance: C{objects.Instance}
+ @param instance: the instance definition
+ @type info: opaque/hypervisor specific (string/data)
+ @param info: result for the call_migration_info call
+ @type success: boolean
+ @param success: whether the migration was a success or a failure
+
+ """
+ return self._SingleNodeCall(node, "finalize_migration",
+ [self._InstDict(instance), info, success])
+
def call_instance_migrate(self, node, instance, target, live):
"""Migrate an instance.
return self._SingleNodeCall(node, "instance_migrate",
[self._InstDict(instance), target, live])
- def call_instance_reboot(self, node, instance, reboot_type, extra_args):
+ def call_instance_reboot(self, node, instance, reboot_type):
"""Reboots an instance.
This is a single-node call.
"""
return self._SingleNodeCall(node, "instance_reboot",
- [self._InstDict(instance), reboot_type,
- extra_args])
+ [self._InstDict(instance), reboot_type])
- def call_instance_os_add(self, node, inst):
+ def call_instance_os_add(self, node, inst, reinstall):
"""Installs an OS on the given instance.
This is a single-node call.
"""
return self._SingleNodeCall(node, "instance_os_add",
- [self._InstDict(inst)])
+ [self._InstDict(inst), reinstall])
def call_instance_run_rename(self, node, inst, old_name):
"""Run the OS rename script for an instance.
memory information
"""
- retux = self._MultiNodeCall(node_list, "node_info",
- [vg_name, hypervisor_type])
-
- for result in retux.itervalues():
- if result.failed or not isinstance(result.data, dict):
- result.data = {}
- if result.offline:
- log_name = None
- else:
- log_name = "call_node_info"
-
- utils.CheckDict(result.data, {
- 'memory_total' : '-',
- 'memory_dom0' : '-',
- 'memory_free' : '-',
- 'vg_size' : 'node_unreachable',
- 'vg_free' : '-',
- }, log_name)
- return retux
+ return self._MultiNodeCall(node_list, "node_info",
+ [vg_name, hypervisor_type])
def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
"""Add a node to the cluster.
[checkdict, cluster_name])
@classmethod
- def call_node_start_master(cls, node, start_daemons):
+ def call_node_start_master(cls, node, start_daemons, no_voting):
"""Tells a node to activate itself as a master.
This is a single-node call.
"""
return cls._StaticSingleNodeCall(node, "node_start_master",
- [start_daemons])
+ [start_daemons, no_voting])
@classmethod
def call_node_stop_master(cls, node, stop_daemons):
This is a single-node call.
"""
- return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
- [dsk.ToDict() for dsk in disks])
+ result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
+ [dsk.ToDict() for dsk in disks])
+ if not result.fail_msg:
+ result.payload = [objects.BlockDevStatus.FromDict(i)
+ for i in result.payload]
+ return result
def call_blockdev_find(self, node, disk):
"""Request identification of a given block device.
This is a single-node call.
"""
- return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
+ result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
+ if not result.fail_msg and result.payload is not None:
+ result.payload = objects.BlockDevStatus.FromDict(result.payload)
+ return result
def call_blockdev_close(self, node, instance_name, disks):
"""Closes the given block devices.
params = [instance_name, [cf.ToDict() for cf in disks]]
return self._SingleNodeCall(node, "blockdev_close", params)
+ def call_blockdev_getsizes(self, node, disks):
+ """Returns the size of the given disks.
+
+ This is a single-node call.
+
+ """
+ params = [[cf.ToDict() for cf in disks]]
+ return self._SingleNodeCall(node, "blockdev_getsize", params)
+
def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
"""Disconnects the network of the given drbd devices.
This is a multi-node call.
"""
- result = self._MultiNodeCall(node_list, "os_diagnose", [])
-
- for node_result in result.values():
- if not node_result.failed and node_result.data:
- node_result.data = [objects.OS.FromDict(oss)
- for oss in node_result.data]
- return result
+ return self._MultiNodeCall(node_list, "os_diagnose", [])
def call_os_get(self, node, name):
"""Returns an OS definition.
"""
result = self._SingleNodeCall(node, "os_get", [name])
- if not result.failed and isinstance(result.data, dict):
+ if not result.fail_msg and isinstance(result.data, dict):
result.data = objects.OS.FromDict(result.data)
return result
return self._SingleNodeCall(node, "blockdev_grow",
[cf_bdev.ToDict(), amount])
+ def call_blockdev_export(self, node, cf_bdev,
+ dest_node, dest_path, cluster_name):
+ """Export a given disk to another node.
+
+ This is a single-node call.
+
+ """
+ return self._SingleNodeCall(node, "blockdev_export",
+ [cf_bdev.ToDict(), dest_node, dest_path,
+ cluster_name])
+
def call_blockdev_snapshot(self, node, cf_bdev):
"""Request a snapshot of the given block device.
"""
flat_disks = []
for disk in snap_disks:
- flat_disks.append(disk.ToDict())
+ if isinstance(disk, bool):
+ flat_disks.append(disk)
+ else:
+ flat_disks.append(disk.ToDict())
return self._SingleNodeCall(node, "finalize_export",
[self._InstDict(instance), flat_disks])
This is a single-node call.
"""
- result = self._SingleNodeCall(node, "export_info", [path])
- if not result.failed and result.data:
- result.data = objects.SerializableConfigParser.Loads(str(result.data))
- return result
+ return self._SingleNodeCall(node, "export_info", [path])
def call_instance_os_import(self, node, inst, src_node, src_images,
cluster_name):
"""
return self._SingleNodeCall(node, "node_demote_from_mc", [])
+
+ def call_node_powercycle(self, node, hypervisor):
+ """Tries to powercycle a node.
+
+ This is a single-node call.
+
+ """
+ return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
+
+
def call_test_delay(self, node_list, duration):
"""Sleep for a fixed time on given node(s).
"""
cluster = self._cfg.GetClusterInfo()
- hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
+ hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
return self._MultiNodeCall(node_list, "hypervisor_validate_params",
[hvname, hv_full])