if offline:
self.failed = True
self.error = "Node is marked offline"
- self.data = None
+ self.data = self.payload = None
elif failed:
self.error = data
- self.data = None
+ self.data = self.payload = None
else:
self.data = data
self.error = None
+ if isinstance(data, (tuple, list)) and len(data) == 2:
+ self.payload = data[1]
+ else:
+ self.payload = None
def Raise(self):
"""If the result has failed, raise an OpExecError.
raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
(self.call, self.node, self.error))
+ def RemoteFailMsg(self):
+ """Check if the remote procedure failed.
+
+ This is valid only for RPC calls which return result of the form
+ (status, data | error_msg).
+
+ @return: empty string for succcess, otherwise an error message
+
+ """
+ def _EnsureErr(val):
+ """Helper to ensure we return a 'True' value for error."""
+ if val:
+ return val
+ else:
+ return "No error information"
+
+ if self.failed:
+ return _EnsureErr(self.error)
+ if not isinstance(self.data, (tuple, list)):
+ return "Invalid result type (%s)" % type(self.data)
+ if len(self.data) != 2:
+ return "Invalid result length (%d), expected 2" % len(self.data)
+ if not self.data[0]:
+ return _EnsureErr(self.data[1])
+ return ""
+
class Client:
"""RPC Client class.
"""Call nodes and return results.
@rtype: list
- @returns: List of RPC results
+ @return: List of RPC results
"""
assert _http_manager, "RPC module not intialized"
else:
msg = req.resp_body
- logging.error("RPC error from node %s: %s", name, msg)
+ logging.error("RPC error in %s from node %s: %s",
+ self.procedure, name, msg)
results[name] = RpcResult(data=msg, failed=True, node=name,
call=self.procedure)
self._cfg = cfg
self.port = utils.GetNodeDaemonPort()
- def _InstDict(self, instance):
+ def _InstDict(self, instance, hvp=None, bep=None):
"""Convert the given instance to a dict.
This is done via the instance's ToDict() method and additionally
@type instance: L{objects.Instance}
@param instance: an Instance object
+ @type hvp: dict or None
+ @param hvp: a dictionary with overriden hypervisor parameters
+ @type bep: dict or None
+ @param bep: a dictionary with overriden backend parameters
@rtype: dict
@return: the instance dict, with the hvparams filled with the
cluster defaults
idict = instance.ToDict()
cluster = self._cfg.GetClusterInfo()
idict["hvparams"] = cluster.FillHV(instance)
+ if hvp is not None:
+ idict["hvparams"].update(hvp)
idict["beparams"] = cluster.FillBE(instance)
+ if bep is not None:
+ idict["beparams"].update(bep)
return idict
- def _ConnectList(self, client, node_list):
+ def _ConnectList(self, client, node_list, call):
"""Helper for computing node addresses.
@type client: L{Client}
@param client: a C{Client} instance
@type node_list: list
@param node_list: the node list we should connect
+ @type call: string
+ @param call: the name of the remote procedure call, for filling in
+ correctly any eventual offline nodes' results
"""
all_nodes = self._cfg.GetAllNodesInfo()
for node in node_list:
if node in all_nodes:
if all_nodes[node].offline:
- skip_dict[node] = RpcResult(node=node, offline=True)
+ skip_dict[node] = RpcResult(node=node, offline=True, call=call)
continue
val = all_nodes[node].primary_ip
else:
client.ConnectList(name_list, address_list=addr_list)
return skip_dict
- def _ConnectNode(self, client, node):
+ def _ConnectNode(self, client, node, call):
"""Helper for computing one node's address.
@type client: L{Client}
@param client: a C{Client} instance
@type node: str
@param node: the node we should connect
+ @type call: string
+ @param call: the name of the remote procedure call, for filling in
+ correctly any eventual offline nodes' results
"""
node_info = self._cfg.GetNodeInfo(node)
if node_info is not None:
if node_info.offline:
- return RpcResult(node=node, offline=True)
+ return RpcResult(node=node, offline=True, call=call)
addr = node_info.primary_ip
else:
addr = None
"""
body = serializer.DumpJson(args, indent=False)
c = Client(procedure, body, self.port)
- skip_dict = self._ConnectList(c, node_list)
+ skip_dict = self._ConnectList(c, node_list, procedure)
skip_dict.update(c.GetResults())
return skip_dict
"""
body = serializer.DumpJson(args, indent=False)
c = Client(procedure, body, self.port)
- result = self._ConnectNode(c, node)
+ result = self._ConnectNode(c, node, procedure)
if result is None:
# we did connect, node is not offline
result = c.GetResults()[node]
"""
return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
- def call_instance_start(self, node, instance, extra_args):
+ def call_instance_start(self, node, instance, hvp, bep):
"""Starts an instance.
This is a single-node call.
"""
- return self._SingleNodeCall(node, "instance_start",
- [self._InstDict(instance), extra_args])
+ idict = self._InstDict(instance, hvp=hvp, bep=bep)
+ return self._SingleNodeCall(node, "instance_start", [idict])
def call_instance_shutdown(self, node, instance):
"""Stops an instance.
return self._SingleNodeCall(node, "instance_shutdown",
[self._InstDict(instance)])
+ def call_migration_info(self, node, instance):
+ """Gather the information necessary to prepare an instance migration.
+
+ This is a single-node call.
+
+ @type node: string
+ @param node: the node on which the instance is currently running
+ @type instance: C{objects.Instance}
+ @param instance: the instance definition
+
+ """
+ return self._SingleNodeCall(node, "migration_info",
+ [self._InstDict(instance)])
+
+ def call_accept_instance(self, node, instance, info, target):
+ """Prepare a node to accept an instance.
+
+ This is a single-node call.
+
+ @type node: string
+ @param node: the target node for the migration
+ @type instance: C{objects.Instance}
+ @param instance: the instance definition
+ @type info: opaque/hypervisor specific (string/data)
+ @param info: result for the call_migration_info call
+ @type target: string
+ @param target: target hostname (usually ip address) (on the node itself)
+
+ """
+ return self._SingleNodeCall(node, "accept_instance",
+ [self._InstDict(instance), info, target])
+
+ def call_finalize_migration(self, node, instance, info, success):
+ """Finalize any target-node migration specific operation.
+
+ This is called both in case of a successful migration and in case of error
+ (in which case it should abort the migration).
+
+ This is a single-node call.
+
+ @type node: string
+ @param node: the target node for the migration
+ @type instance: C{objects.Instance}
+ @param instance: the instance definition
+ @type info: opaque/hypervisor specific (string/data)
+ @param info: result for the call_migration_info call
+ @type success: boolean
+ @param success: whether the migration was a success or a failure
+
+ """
+ return self._SingleNodeCall(node, "finalize_migration",
+ [self._InstDict(instance), info, success])
+
def call_instance_migrate(self, node, instance, target, live):
"""Migrate an instance.
return self._SingleNodeCall(node, "instance_migrate",
[self._InstDict(instance), target, live])
- def call_instance_reboot(self, node, instance, reboot_type, extra_args):
+ def call_instance_reboot(self, node, instance, reboot_type):
"""Reboots an instance.
This is a single-node call.
"""
return self._SingleNodeCall(node, "instance_reboot",
- [self._InstDict(instance), reboot_type,
- extra_args])
+ [self._InstDict(instance), reboot_type])
- def call_instance_os_add(self, node, inst):
+ def call_instance_os_add(self, node, inst, reinstall):
"""Installs an OS on the given instance.
This is a single-node call.
"""
return self._SingleNodeCall(node, "instance_os_add",
- [self._InstDict(inst)])
+ [self._InstDict(inst), reinstall])
def call_instance_run_rename(self, node, inst, old_name):
"""Run the OS rename script for an instance.
"""
return self._SingleNodeCall(node, "instance_info", [instance, hname])
+ def call_instance_migratable(self, node, instance):
+ """Checks whether the given instance can be migrated.
+
+ This is a single-node call.
+
+ @param node: the node to query
+ @type instance: L{objects.Instance}
+ @param instance: the instance to check
+
+
+ """
+ return self._SingleNodeCall(node, "instance_migratable",
+ [self._InstDict(instance)])
+
def call_all_instances_info(self, node_list, hypervisor_list):
"""Returns information about all instances on the given nodes.
@type node_list: list
@param node_list: the list of nodes to query
- @type vgname: C{string}
- @param vgname: the name of the volume group to ask for disk space
+ @type vg_name: C{string}
+ @param vg_name: the name of the volume group to ask for disk space
information
@type hypervisor_type: C{str}
@param hypervisor_type: the name of the hypervisor to ask for
for result in retux.itervalues():
if result.failed or not isinstance(result.data, dict):
result.data = {}
+ if result.offline:
+ log_name = None
+ else:
+ log_name = "call_node_info"
utils.CheckDict(result.data, {
'memory_total' : '-',
'memory_free' : '-',
'vg_size' : 'node_unreachable',
'vg_free' : '-',
- }, "call_node_info")
+ }, log_name)
return retux
def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
"""
return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
- def call_blockdev_close(self, node, disks):
+ def call_blockdev_close(self, node, instance_name, disks):
"""Closes the given block devices.
This is a single-node call.
"""
- return self._SingleNodeCall(node, "blockdev_close",
- [cf.ToDict() for cf in disks])
+ params = [instance_name, [cf.ToDict() for cf in disks]]
+ return self._SingleNodeCall(node, "blockdev_close", params)
+
+ def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
+ """Disconnects the network of the given drbd devices.
+
+ This is a multi-node call.
+
+ """
+ return self._MultiNodeCall(node_list, "drbd_disconnect_net",
+ [nodes_ip, [cf.ToDict() for cf in disks]])
+
+ def call_drbd_attach_net(self, node_list, nodes_ip,
+ disks, instance_name, multimaster):
+ """Disconnects the given drbd devices.
+
+ This is a multi-node call.
+
+ """
+ return self._MultiNodeCall(node_list, "drbd_attach_net",
+ [nodes_ip, [cf.ToDict() for cf in disks],
+ instance_name, multimaster])
+
+ def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
+ """Waits for the synchronization of drbd devices is complete.
+
+ This is a multi-node call.
+
+ """
+ return self._MultiNodeCall(node_list, "drbd_wait_sync",
+ [nodes_ip, [cf.ToDict() for cf in disks]])
@classmethod
def call_upload_file(cls, node_list, file_name, address_list=None):
"""
result = self._MultiNodeCall(node_list, "os_diagnose", [])
- for node_name, node_result in result.iteritems():
+ for node_result in result.values():
if not node_result.failed and node_result.data:
node_result.data = [objects.OS.FromDict(oss)
for oss in node_result.data]
"""
return self._SingleNodeCall(node, "node_demote_from_mc", [])
+
+ def call_node_powercycle(self, node, hypervisor):
+ """Tries to powercycle a node.
+
+ This is a single-node call.
+
+ """
+ return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
+
+
def call_test_delay(self, node_list, duration):
"""Sleep for a fixed time on given node(s).
return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
@classmethod
- def call_jobqueue_rename(cls, node_list, address_list, old, new):
+ def call_jobqueue_rename(cls, node_list, address_list, rename):
"""Rename a job queue file.
This is a multi-node call.
"""
- return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", [old, new],
+ return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
address_list=address_list)
@classmethod
"""
cluster = self._cfg.GetClusterInfo()
- hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
+ hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
return self._MultiNodeCall(node_list, "hypervisor_validate_params",
[hvname, hv_full])