X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/fb1ffbca53e1fa9f7167ab04ec9c8299b64c43d6..3201717461bc6e55f2deab7ad426ebfbb7b1e943:/lib/rpc.py diff --git a/lib/rpc.py b/lib/rpc.py index bffd0d9..0bedb3c 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -47,6 +47,7 @@ from ganeti import netutils from ganeti import ssconf from ganeti import runtime from ganeti import compat +from ganeti import rpc_defs # Special module generated at build time from ganeti import _generated_rpc @@ -71,16 +72,6 @@ _TMO_SLOW = 3600 # one hour _TMO_4HRS = 4 * 3600 _TMO_1DAY = 86400 -# Timeout table that will be built later by decorators -# Guidelines for choosing timeouts: -# - call used during watcher: timeout -> 1min, _TMO_URGENT -# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST -# - other calls: 15 min, _TMO_NORMAL -# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts - -_TIMEOUTS = { -} - #: Special value to describe an offline host _OFFLINE = object() @@ -127,21 +118,6 @@ def _ConfigRpcCurl(curl): curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT) -def _RpcTimeout(secs): - """Timeout decorator. - - When applied to a rpc call_* function, it updates the global timeout - table with the given function/timeout. - - """ - def decorator(f): - name = f.__name__ - assert name.startswith("call_") - _TIMEOUTS[name[len("call_"):]] = secs - return f - return decorator - - def RunWithRPC(fn): """RPC-wrapper decorator. @@ -420,9 +396,6 @@ class _RpcProcessor: @param read_timeout: Read timeout for request """ - if read_timeout is None: - read_timeout = _TIMEOUTS.get(procedure, None) - assert read_timeout is not None, \ "Missing RPC read timeout for procedure '%s'" % procedure @@ -437,381 +410,266 @@ class _RpcProcessor: return self._CombineResults(results, requests, procedure) -class RpcRunner(_generated_rpc.RpcClientDefault): - """RPC runner class. - - """ - def __init__(self, context): - """Initialized the RPC runner. - - @type context: C{masterd.GanetiContext} - @param context: Ganeti context +class _RpcClientBase: + def __init__(self, resolver, encoder_fn, lock_monitor_cb=None): + """Initializes this class. """ - _generated_rpc.RpcClientDefault.__init__(self) - - self._cfg = context.cfg - self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver, - self._cfg.GetNodeInfo, - self._cfg.GetAllNodesInfo), + self._proc = _RpcProcessor(resolver, netutils.GetDaemonPort(constants.NODED), - lock_monitor_cb=context.glm.AddToLockMonitor) - - def _InstDict(self, instance, hvp=None, bep=None, osp=None): - """Convert the given instance to a dict. - - This is done via the instance's ToDict() method and additionally - we fill the hvparams with the cluster defaults. - - @type instance: L{objects.Instance} - @param instance: an Instance object - @type hvp: dict or None - @param hvp: a dictionary with overridden hypervisor parameters - @type bep: dict or None - @param bep: a dictionary with overridden backend parameters - @type osp: dict or None - @param osp: a dictionary with overridden os parameters - @rtype: dict - @return: the instance dict, with the hvparams filled with the - cluster defaults - - """ - idict = instance.ToDict() - cluster = self._cfg.GetClusterInfo() - idict["hvparams"] = cluster.FillHV(instance) - if hvp is not None: - idict["hvparams"].update(hvp) - idict["beparams"] = cluster.FillBE(instance) - if bep is not None: - idict["beparams"].update(bep) - idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams) - if osp is not None: - idict["osparams"].update(osp) - for nic in idict["nics"]: - nic['nicparams'] = objects.FillDict( - cluster.nicparams[constants.PP_DEFAULT], - nic['nicparams']) - return idict - - def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None): - """Helper for making a multi-node call - - """ - body = serializer.DumpJson(args, indent=False) - return self._proc(node_list, procedure, body, read_timeout=read_timeout) - - def _Call(self, node_list, procedure, timeout, args): - """Entry point for automatically generated RPC wrappers. - - """ - return self._MultiNodeCall(node_list, procedure, args, read_timeout=timeout) + lock_monitor_cb=lock_monitor_cb) + self._encoder = compat.partial(self._EncodeArg, encoder_fn) @staticmethod - def _StaticMultiNodeCall(node_list, procedure, args, - address_list=None, read_timeout=None): - """Helper for making a multi-node static call + def _EncodeArg(encoder_fn, (argkind, value)): + """Encode argument. """ - body = serializer.DumpJson(args, indent=False) - - if address_list is None: - resolver = _SsconfResolver + if argkind is None: + return value else: - # Caller provided an address list - resolver = _StaticResolver(address_list) - - proc = _RpcProcessor(resolver, - netutils.GetDaemonPort(constants.NODED)) - return proc(node_list, procedure, body, read_timeout=read_timeout) - - def _SingleNodeCall(self, node, procedure, args, read_timeout=None): - """Helper for making a single-node call - - """ - body = serializer.DumpJson(args, indent=False) - return self._proc([node], procedure, body, read_timeout=read_timeout)[node] + return encoder_fn(argkind)(value) - @classmethod - def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None): - """Helper for making a single-node static call + def _Call(self, cdef, node_list, args): + """Entry point for automatically generated RPC wrappers. """ - body = serializer.DumpJson(args, indent=False) - proc = _RpcProcessor(_SsconfResolver, - netutils.GetDaemonPort(constants.NODED)) - return proc([node], procedure, body, read_timeout=read_timeout)[node] - - @staticmethod - def _BlockdevFindPostProc(result): - if not result.fail_msg and result.payload is not None: - result.payload = objects.BlockDevStatus.FromDict(result.payload) - return result - - @staticmethod - def _BlockdevGetMirrorStatusPostProc(result): - if not result.fail_msg: - result.payload = [objects.BlockDevStatus.FromDict(i) - for i in result.payload] - return result + (procedure, _, timeout, argdefs, postproc_fn, _) = cdef - @staticmethod - def _BlockdevGetMirrorStatusMultiPostProc(result): - for nres in result.values(): - if nres.fail_msg: - continue - - for idx, (success, status) in enumerate(nres.payload): - if success: - nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status)) - - return result + if callable(timeout): + read_timeout = timeout(args) + else: + read_timeout = timeout - @staticmethod - def _OsGetPostProc(result): - if not result.fail_msg and isinstance(result.payload, dict): - result.payload = objects.OS.FromDict(result.payload) - return result + body = serializer.DumpJson(map(self._encoder, + zip(map(compat.snd, argdefs), args)), + indent=False) - @staticmethod - def _PrepareFinalizeExportDisks(snap_disks): - flat_disks = [] + result = self._proc(node_list, procedure, body, read_timeout=read_timeout) - for disk in snap_disks: - if isinstance(disk, bool): - flat_disks.append(disk) - else: - flat_disks.append(disk.ToDict()) + if postproc_fn: + return dict(map(lambda (key, value): (key, postproc_fn(value)), + result.items())) + else: + return result - return flat_disks - @staticmethod - def _ImpExpStatusPostProc(result): - """Post-processor for import/export status. +def _ObjectToDict(value): + """Converts an object to a dictionary. - @rtype: Payload containing list of L{objects.ImportExportStatus} instances - @return: Returns a list of the state of each named import/export or None if - a status couldn't be retrieved - - """ - if not result.fail_msg: - decoded = [] + @note: See L{objects}. - for i in result.payload: - if i is None: - decoded.append(None) - continue - decoded.append(objects.ImportExportStatus.FromDict(i)) + """ + return value.ToDict() - result.payload = decoded - return result +def _ObjectListToDict(value): + """Converts a list of L{objects} to dictionaries. - @staticmethod - def _EncodeImportExportIO(ieio, ieioargs): - """Encodes import/export I/O information. + """ + return map(_ObjectToDict, value) - """ - if ieio == constants.IEIO_RAW_DISK: - assert len(ieioargs) == 1 - return (ieioargs[0].ToDict(), ) - if ieio == constants.IEIO_SCRIPT: - assert len(ieioargs) == 2 - return (ieioargs[0].ToDict(), ieioargs[1]) +def _EncodeNodeToDiskDict(value): + """Encodes a dictionary with node name as key and disk objects as values. - return ieioargs + """ + return dict((name, _ObjectListToDict(disks)) + for name, disks in value.items()) - # - # Begin RPC calls - # - @_RpcTimeout(_TMO_NORMAL) - def call_instance_start(self, node, instance, hvp, bep, startup_paused): - """Starts an instance. +def _PrepareFileUpload(filename): + """Loads a file and prepares it for an upload to nodes. - This is a single-node call. + """ + data = _Compress(utils.ReadFile(filename)) + st = os.stat(filename) + getents = runtime.GetEnts() + return [filename, data, st.st_mode, getents.LookupUid(st.st_uid), + getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime] - """ - idict = self._InstDict(instance, hvp=hvp, bep=bep) - return self._SingleNodeCall(node, "instance_start", [idict, startup_paused]) - @_RpcTimeout(_TMO_1DAY) - def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None): - """Installs an OS on the given instance. +def _PrepareFinalizeExportDisks(snap_disks): + """Encodes disks for finalizing export. - This is a single-node call. + """ + flat_disks = [] - """ - return self._SingleNodeCall(node, "instance_os_add", - [self._InstDict(inst, osp=osparams), - reinstall, debug]) + for disk in snap_disks: + if isinstance(disk, bool): + flat_disks.append(disk) + else: + flat_disks.append(disk.ToDict()) - @classmethod - @_RpcTimeout(_TMO_FAST) - def call_node_start_master_daemons(cls, node, no_voting): - """Starts master daemons on a node. + return flat_disks - This is a single-node call. - """ - return cls._StaticSingleNodeCall(node, "node_start_master_daemons", - [no_voting]) +def _EncodeImportExportIO((ieio, ieioargs)): + """Encodes import/export I/O information. - @classmethod - @_RpcTimeout(_TMO_FAST) - def call_node_activate_master_ip(cls, node): - """Activates master IP on a node. + """ + if ieio == constants.IEIO_RAW_DISK: + assert len(ieioargs) == 1 + return (ieio, (ieioargs[0].ToDict(), )) - This is a single-node call. + if ieio == constants.IEIO_SCRIPT: + assert len(ieioargs) == 2 + return (ieio, (ieioargs[0].ToDict(), ieioargs[1])) - """ - return cls._StaticSingleNodeCall(node, "node_activate_master_ip", []) + return (ieio, ieioargs) - @classmethod - @_RpcTimeout(_TMO_FAST) - def call_node_stop_master(cls, node): - """Deactivates master IP and stops master daemons on a node. - This is a single-node call. +def _EncodeBlockdevRename(value): + """Encodes information for renaming block devices. - """ - return cls._StaticSingleNodeCall(node, "node_stop_master", []) + """ + return [(d.ToDict(), uid) for d, uid in value] + + +#: Generic encoders +_ENCODERS = { + rpc_defs.ED_OBJECT_DICT: _ObjectToDict, + rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict, + rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict, + rpc_defs.ED_FILE_DETAILS: _PrepareFileUpload, + rpc_defs.ED_COMPRESS: _Compress, + rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks, + rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO, + rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename, + } + + +class RpcRunner(_RpcClientBase, + _generated_rpc.RpcClientDefault, + _generated_rpc.RpcClientBootstrap, + _generated_rpc.RpcClientConfig): + """RPC runner class. - @classmethod - @_RpcTimeout(_TMO_FAST) - def call_node_deactivate_master_ip(cls, node): - """Deactivates master IP on a node. + """ + def __init__(self, context): + """Initialized the RPC runner. - This is a single-node call. + @type context: C{masterd.GanetiContext} + @param context: Ganeti context """ - return cls._StaticSingleNodeCall(node, "node_deactivate_master_ip", []) - - @classmethod - @_RpcTimeout(_TMO_FAST) - def call_node_change_master_netmask(cls, node, netmask): - """Change master IP netmask. + self._cfg = context.cfg - This is a single-node call. + encoders = _ENCODERS.copy() + + # Add encoders requiring configuration object + encoders.update({ + rpc_defs.ED_INST_DICT: self._InstDict, + rpc_defs.ED_INST_DICT_HVP_BEP: self._InstDictHvpBep, + rpc_defs.ED_INST_DICT_OSP: self._InstDictOsp, + }) + + # Resolver using configuration + resolver = compat.partial(_NodeConfigResolver, self._cfg.GetNodeInfo, + self._cfg.GetAllNodesInfo) + + # Pylint doesn't recognize multiple inheritance properly, see + # and + # + # pylint: disable=W0233 + _RpcClientBase.__init__(self, resolver, encoders.get, + lock_monitor_cb=context.glm.AddToLockMonitor) + _generated_rpc.RpcClientConfig.__init__(self) + _generated_rpc.RpcClientBootstrap.__init__(self) + _generated_rpc.RpcClientDefault.__init__(self) - """ - return cls._StaticSingleNodeCall(node, "node_change_master_netmask", - [netmask]) + def _InstDict(self, instance, hvp=None, bep=None, osp=None): + """Convert the given instance to a dict. - @classmethod - @_RpcTimeout(_TMO_URGENT) - def call_master_info(cls, node_list): - """Query master info. + This is done via the instance's ToDict() method and additionally + we fill the hvparams with the cluster defaults. - This is a multi-node call. + @type instance: L{objects.Instance} + @param instance: an Instance object + @type hvp: dict or None + @param hvp: a dictionary with overridden hypervisor parameters + @type bep: dict or None + @param bep: a dictionary with overridden backend parameters + @type osp: dict or None + @param osp: a dictionary with overridden os parameters + @rtype: dict + @return: the instance dict, with the hvparams filled with the + cluster defaults """ - # TODO: should this method query down nodes? - return cls._StaticMultiNodeCall(node_list, "master_info", []) - - @classmethod - @_RpcTimeout(_TMO_URGENT) - def call_version(cls, node_list): - """Query node version. + idict = instance.ToDict() + cluster = self._cfg.GetClusterInfo() + idict["hvparams"] = cluster.FillHV(instance) + if hvp is not None: + idict["hvparams"].update(hvp) + idict["beparams"] = cluster.FillBE(instance) + if bep is not None: + idict["beparams"].update(bep) + idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams) + if osp is not None: + idict["osparams"].update(osp) + for nic in idict["nics"]: + nic['nicparams'] = objects.FillDict( + cluster.nicparams[constants.PP_DEFAULT], + nic['nicparams']) + return idict - This is a multi-node call. + def _InstDictHvpBep(self, (instance, hvp, bep)): + """Wrapper for L{_InstDict}. """ - return cls._StaticMultiNodeCall(node_list, "version", []) - - @classmethod - @_RpcTimeout(_TMO_NORMAL) - def call_upload_file(cls, node_list, file_name, address_list=None): - """Upload a file. - - The node will refuse the operation in case the file is not on the - approved file list. + return self._InstDict(instance, hvp=hvp, bep=bep) - This is a multi-node call. - - @type node_list: list - @param node_list: the list of node names to upload to - @type file_name: str - @param file_name: the filename to upload - @type address_list: list or None - @keyword address_list: an optional list of node addresses, in order - to optimize the RPC speed - - """ - file_contents = utils.ReadFile(file_name) - data = _Compress(file_contents) - st = os.stat(file_name) - getents = runtime.GetEnts() - params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid), - getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime] - return cls._StaticMultiNodeCall(node_list, "upload_file", params, - address_list=address_list) - - @classmethod - @_RpcTimeout(_TMO_NORMAL) - def call_write_ssconf_files(cls, node_list, values): - """Write ssconf files. - - This is a multi-node call. + def _InstDictOsp(self, (instance, osparams)): + """Wrapper for L{_InstDict}. """ - return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values]) + return self._InstDict(instance, osp=osparams) - @classmethod - @_RpcTimeout(_TMO_NORMAL) - def call_node_leave_cluster(cls, node, modify_ssh_setup): - """Requests a node to clean the cluster information it has. - This will remove the configuration information from the ganeti data - dir. +class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue): + """RPC wrappers for job queue. - This is a single-node call. + """ + def __init__(self, context, address_list): + """Initializes this class. """ - return cls._StaticSingleNodeCall(node, "node_leave_cluster", - [modify_ssh_setup]) - - def call_test_delay(self, node_list, duration, read_timeout=None): - """Sleep for a fixed time on given node(s). - - This is a multi-node call. + if address_list is None: + resolver = _SsconfResolver + else: + # Caller provided an address list + resolver = _StaticResolver(address_list) - """ - assert read_timeout is None - return self.call_test_delay(node_list, duration, - read_timeout=int(duration + 5)) + _RpcClientBase.__init__(self, resolver, _ENCODERS.get, + lock_monitor_cb=context.glm.AddToLockMonitor) + _generated_rpc.RpcClientJobQueue.__init__(self) - @_RpcTimeout(_TMO_NORMAL) - def call_hypervisor_validate_params(self, node_list, hvname, hvparams): - """Validate the hypervisor params. - This is a multi-node call. +class BootstrapRunner(_RpcClientBase, _generated_rpc.RpcClientBootstrap): + """RPC wrappers for bootstrapping. - @type node_list: list - @param node_list: the list of nodes to query - @type hvname: string - @param hvname: the hypervisor name - @type hvparams: dict - @param hvparams: the hypervisor parameters to be validated + """ + def __init__(self): + """Initializes this class. """ - cluster = self._cfg.GetClusterInfo() - hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams) - return self._MultiNodeCall(node_list, "hypervisor_validate_params", - [hvname, hv_full]) + _RpcClientBase.__init__(self, _SsconfResolver, _ENCODERS.get) + _generated_rpc.RpcClientBootstrap.__init__(self) -class JobQueueRunner(_generated_rpc.RpcClientJobQueue): - """RPC wrappers for job queue. +class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig): + """RPC wrappers for L{config}. """ - _Compress = staticmethod(_Compress) - def __init__(self, context, address_list): """Initializes this class. """ - _generated_rpc.RpcClientJobQueue.__init__(self) + if context: + lock_monitor_cb = context.glm.AddToLockMonitor + else: + lock_monitor_cb = None if address_list is None: resolver = _SsconfResolver @@ -819,14 +677,6 @@ class JobQueueRunner(_generated_rpc.RpcClientJobQueue): # Caller provided an address list resolver = _StaticResolver(address_list) - self._proc = _RpcProcessor(resolver, - netutils.GetDaemonPort(constants.NODED), - lock_monitor_cb=context.glm.AddToLockMonitor) - - def _Call(self, node_list, procedure, timeout, args): - """Entry point for automatically generated RPC wrappers. - - """ - body = serializer.DumpJson(args, indent=False) - - return self._proc(node_list, procedure, body, read_timeout=timeout) + _RpcClientBase.__init__(self, resolver, _ENCODERS.get, + lock_monitor_cb=lock_monitor_cb) + _generated_rpc.RpcClientConfig.__init__(self)