X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/56aa9fd555ab30d0a002a239c6a5d7a88692c58e..7a8994d471db3244c67647186c5fca06569de924:/lib/rpc.py diff --git a/lib/rpc.py b/lib/rpc.py index 86a6a3f..70dd312 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -33,6 +33,8 @@ import os import socket import logging +import zlib +import base64 from ganeti import utils from ganeti import objects @@ -81,17 +83,38 @@ class RpcResult(object): calls we can't raise an exception just because one one out of many failed, and therefore we use this class to encapsulate the result. + @ivar data: the data payload, for successfull results, or None + @type failed: boolean + @ivar failed: whether the operation failed at RPC level (not + application level on the remote node) + @ivar call: the name of the RPC call + @ivar node: the name of the node to which we made the call + @ivar offline: whether the operation failed because the node was + offline, as opposed to actual failure; offline=True will always + imply failed=True, in order to allow simpler checking if + the user doesn't care about the exact failure mode + """ - def __init__(self, data, failed=False, call=None, node=None): + def __init__(self, data=None, failed=False, offline=False, + call=None, node=None): self.failed = failed - self.call = None - self.node = None - if failed: + self.offline = offline + self.call = call + self.node = node + if offline: + self.failed = True + self.error = "Node is marked offline" + self.data = self.payload = None + elif failed: self.error = data - self.data = None + self.data = self.payload = None else: self.data = data self.error = None + if isinstance(data, (tuple, list)) and len(data) == 2: + self.payload = data[1] + else: + self.payload = None def Raise(self): """If the result has failed, raise an OpExecError. @@ -104,6 +127,32 @@ class RpcResult(object): raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" % (self.call, self.node, self.error)) + def RemoteFailMsg(self): + """Check if the remote procedure failed. + + This is valid only for RPC calls which return result of the form + (status, data | error_msg). + + @return: empty string for succcess, otherwise an error message + + """ + def _EnsureErr(val): + """Helper to ensure we return a 'True' value for error.""" + if val: + return val + else: + return "No error information" + + if self.failed: + return _EnsureErr(self.error) + if not isinstance(self.data, (tuple, list)): + return "Invalid result type (%s)" % type(self.data) + if len(self.data) != 2: + return "Invalid result length (%d), expected 2" % len(self.data) + if not self.data[0]: + return _EnsureErr(self.data[1]) + return "" + class Client: """RPC Client class. @@ -168,7 +217,7 @@ class Client: """Call nodes and return results. @rtype: list - @returns: List of RPC results + @return: List of RPC results """ assert _http_manager, "RPC module not intialized" @@ -189,7 +238,8 @@ class Client: else: msg = req.resp_body - logging.error("RPC error from node %s: %s", name, msg) + logging.error("RPC error in %s from node %s: %s", + self.procedure, name, msg) results[name] = RpcResult(data=msg, failed=True, node=name, call=self.procedure) @@ -229,53 +279,66 @@ class RpcRunner(object): idict["beparams"] = cluster.FillBE(instance) return idict - def _ConnectList(self, client, node_list): + def _ConnectList(self, client, node_list, call): """Helper for computing node addresses. @type client: L{Client} @param client: a C{Client} instance @type node_list: list @param node_list: the node list we should connect + @type call: string + @param call: the name of the remote procedure call, for filling in + correctly any eventual offline nodes' results """ all_nodes = self._cfg.GetAllNodesInfo() + name_list = [] addr_list = [] + skip_dict = {} for node in node_list: if node in all_nodes: + if all_nodes[node].offline: + skip_dict[node] = RpcResult(node=node, offline=True, call=call) + continue val = all_nodes[node].primary_ip else: val = None addr_list.append(val) - client.ConnectList(node_list, address_list=addr_list) + name_list.append(node) + if name_list: + client.ConnectList(name_list, address_list=addr_list) + return skip_dict - def _ConnectNode(self, client, node): + def _ConnectNode(self, client, node, call): """Helper for computing one node's address. @type client: L{Client} @param client: a C{Client} instance @type node: str @param node: the node we should connect + @type call: string + @param call: the name of the remote procedure call, for filling in + correctly any eventual offline nodes' results """ node_info = self._cfg.GetNodeInfo(node) if node_info is not None: + if node_info.offline: + return RpcResult(node=node, offline=True, call=call) addr = node_info.primary_ip else: addr = None client.ConnectNode(node, address=addr) - def _MultiNodeCall(self, node_list, procedure, args, - address_list=None): + def _MultiNodeCall(self, node_list, procedure, args): """Helper for making a multi-node call """ body = serializer.DumpJson(args, indent=False) c = Client(procedure, body, self.port) - if address_list is None: - self._ConnectList(c, node_list) - else: - c.ConnectList(node_list, address_list=address_list) - return c.GetResults() + skip_dict = self._ConnectList(c, node_list, procedure) + skip_dict.update(c.GetResults()) + return skip_dict @classmethod def _StaticMultiNodeCall(cls, node_list, procedure, args, @@ -294,8 +357,11 @@ class RpcRunner(object): """ body = serializer.DumpJson(args, indent=False) c = Client(procedure, body, self.port) - self._ConnectNode(c, node) - return c.GetResults().get(node, False) + result = self._ConnectNode(c, node, procedure) + if result is None: + # we did connect, node is not offline + result = c.GetResults()[node] + return result @classmethod def _StaticSingleNodeCall(cls, node, procedure, args): @@ -305,7 +371,27 @@ class RpcRunner(object): body = serializer.DumpJson(args, indent=False) c = Client(procedure, body, utils.GetNodeDaemonPort()) c.ConnectNode(node) - return c.GetResults().get(node, False) + return c.GetResults()[node] + + @staticmethod + def _Compress(data): + """Compresses a string for transport over RPC. + + Small amounts of data are not compressed. + + @type data: str + @param data: Data + @rtype: tuple + @return: Encoded data to send + + """ + # Small amounts of data are not compressed + if len(data) < 512: + return (constants.RPC_ENCODING_NONE, data) + + # Compress with zlib and encode in base64 + return (constants.RPC_ENCODING_ZLIB_BASE64, + base64.b64encode(zlib.compress(data, 3))) # # Begin RPC calls @@ -339,14 +425,14 @@ class RpcRunner(object): """ return self._SingleNodeCall(node, "bridges_exist", [bridges_list]) - def call_instance_start(self, node, instance, extra_args): + def call_instance_start(self, node, instance): """Starts an instance. This is a single-node call. """ return self._SingleNodeCall(node, "instance_start", - [self._InstDict(instance), extra_args]) + [self._InstDict(instance)]) def call_instance_shutdown(self, node, instance): """Stops an instance. @@ -357,6 +443,59 @@ class RpcRunner(object): return self._SingleNodeCall(node, "instance_shutdown", [self._InstDict(instance)]) + def call_migration_info(self, node, instance): + """Gather the information necessary to prepare an instance migration. + + This is a single-node call. + + @type node: string + @param node: the node on which the instance is currently running + @type instance: C{objects.Instance} + @param instance: the instance definition + + """ + return self._SingleNodeCall(node, "migration_info", + [self._InstDict(instance)]) + + def call_accept_instance(self, node, instance, info, target): + """Prepare a node to accept an instance. + + This is a single-node call. + + @type node: string + @param node: the target node for the migration + @type instance: C{objects.Instance} + @param instance: the instance definition + @type info: opaque/hypervisor specific (string/data) + @param info: result for the call_migration_info call + @type target: string + @param target: target hostname (usually ip address) (on the node itself) + + """ + return self._SingleNodeCall(node, "accept_instance", + [self._InstDict(instance), info, target]) + + def call_finalize_migration(self, node, instance, info, success): + """Finalize any target-node migration specific operation. + + This is called both in case of a successful migration and in case of error + (in which case it should abort the migration). + + This is a single-node call. + + @type node: string + @param node: the target node for the migration + @type instance: C{objects.Instance} + @param instance: the instance definition + @type info: opaque/hypervisor specific (string/data) + @param info: result for the call_migration_info call + @type success: boolean + @param success: whether the migration was a success or a failure + + """ + return self._SingleNodeCall(node, "finalize_migration", + [self._InstDict(instance), info, success]) + def call_instance_migrate(self, node, instance, target, live): """Migrate an instance. @@ -376,15 +515,14 @@ class RpcRunner(object): return self._SingleNodeCall(node, "instance_migrate", [self._InstDict(instance), target, live]) - def call_instance_reboot(self, node, instance, reboot_type, extra_args): + def call_instance_reboot(self, node, instance, reboot_type): """Reboots an instance. This is a single-node call. """ return self._SingleNodeCall(node, "instance_reboot", - [self._InstDict(instance), reboot_type, - extra_args]) + [self._InstDict(instance), reboot_type]) def call_instance_os_add(self, node, inst): """Installs an OS on the given instance. @@ -419,6 +557,20 @@ class RpcRunner(object): """ return self._SingleNodeCall(node, "instance_info", [instance, hname]) + def call_instance_migratable(self, node, instance): + """Checks whether the given instance can be migrated. + + This is a single-node call. + + @param node: the node to query + @type instance: L{objects.Instance} + @param instance: the instance to check + + + """ + return self._SingleNodeCall(node, "instance_migratable", + [self._InstDict(instance)]) + def call_all_instances_info(self, node_list, hypervisor_list): """Returns information about all instances on the given nodes. @@ -475,8 +627,8 @@ class RpcRunner(object): @type node_list: list @param node_list: the list of nodes to query - @type vgname: C{string} - @param vgname: the name of the volume group to ask for disk space + @type vg_name: C{string} + @param vg_name: the name of the volume group to ask for disk space information @type hypervisor_type: C{str} @param hypervisor_type: the name of the hypervisor to ask for @@ -489,6 +641,10 @@ class RpcRunner(object): for result in retux.itervalues(): if result.failed or not isinstance(result.data, dict): result.data = {} + if result.offline: + log_name = None + else: + log_name = "call_node_info" utils.CheckDict(result.data, { 'memory_total' : '-', @@ -496,7 +652,7 @@ class RpcRunner(object): 'memory_free' : '-', 'vg_size' : 'node_unreachable', 'vg_free' : '-', - }, "call_node_info") + }, log_name) return retux def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub): @@ -634,14 +790,43 @@ class RpcRunner(object): """ return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()]) - def call_blockdev_close(self, node, disks): + def call_blockdev_close(self, node, instance_name, disks): """Closes the given block devices. This is a single-node call. """ - return self._SingleNodeCall(node, "blockdev_close", - [cf.ToDict() for cf in disks]) + params = [instance_name, [cf.ToDict() for cf in disks]] + return self._SingleNodeCall(node, "blockdev_close", params) + + def call_drbd_disconnect_net(self, node_list, nodes_ip, disks): + """Disconnects the network of the given drbd devices. + + This is a multi-node call. + + """ + return self._MultiNodeCall(node_list, "drbd_disconnect_net", + [nodes_ip, [cf.ToDict() for cf in disks]]) + + def call_drbd_attach_net(self, node_list, nodes_ip, + disks, instance_name, multimaster): + """Disconnects the given drbd devices. + + This is a multi-node call. + + """ + return self._MultiNodeCall(node_list, "drbd_attach_net", + [nodes_ip, [cf.ToDict() for cf in disks], + instance_name, multimaster]) + + def call_drbd_wait_sync(self, node_list, nodes_ip, disks): + """Waits for the synchronization of drbd devices is complete. + + This is a multi-node call. + + """ + return self._MultiNodeCall(node_list, "drbd_wait_sync", + [nodes_ip, [cf.ToDict() for cf in disks]]) @classmethod def call_upload_file(cls, node_list, file_name, address_list=None): @@ -661,7 +846,8 @@ class RpcRunner(object): to optimize the RPC speed """ - data = utils.ReadFile(file_name) + file_contents = utils.ReadFile(file_name) + data = cls._Compress(file_contents) st = os.stat(file_name) params = [file_name, data, st.st_mode, st.st_uid, st.st_gid, st.st_atime, st.st_mtime] @@ -685,7 +871,7 @@ class RpcRunner(object): """ result = self._MultiNodeCall(node_list, "os_diagnose", []) - for node_name, node_result in result.iteritems(): + for node_result in result.values(): if not node_result.failed and node_result.data: node_result.data = [objects.OS.FromDict(oss) for oss in node_result.data] @@ -880,7 +1066,7 @@ class RpcRunner(object): """ return cls._StaticMultiNodeCall(node_list, "jobqueue_update", - [file_name, content], + [file_name, cls._Compress(content)], address_list=address_list) @classmethod @@ -893,13 +1079,13 @@ class RpcRunner(object): return cls._StaticSingleNodeCall(node, "jobqueue_purge", []) @classmethod - def call_jobqueue_rename(cls, node_list, address_list, old, new): + def call_jobqueue_rename(cls, node_list, address_list, rename): """Rename a job queue file. This is a multi-node call. """ - return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", [old, new], + return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename, address_list=address_list) @classmethod