X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/0d1e78dda214045d1d752bc1f98289bdd2f37f0c..3201717461bc6e55f2deab7ad426ebfbb7b1e943:/lib/rpc.py diff --git a/lib/rpc.py b/lib/rpc.py index c4efad8..0bedb3c 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -47,6 +47,7 @@ from ganeti import netutils from ganeti import ssconf from ganeti import runtime from ganeti import compat +from ganeti import rpc_defs # Special module generated at build time from ganeti import _generated_rpc @@ -409,7 +410,137 @@ class _RpcProcessor: return self._CombineResults(results, requests, procedure) -class RpcRunner(_generated_rpc.RpcClientDefault, +class _RpcClientBase: + def __init__(self, resolver, encoder_fn, lock_monitor_cb=None): + """Initializes this class. + + """ + self._proc = _RpcProcessor(resolver, + netutils.GetDaemonPort(constants.NODED), + lock_monitor_cb=lock_monitor_cb) + self._encoder = compat.partial(self._EncodeArg, encoder_fn) + + @staticmethod + def _EncodeArg(encoder_fn, (argkind, value)): + """Encode argument. + + """ + if argkind is None: + return value + else: + return encoder_fn(argkind)(value) + + def _Call(self, cdef, node_list, args): + """Entry point for automatically generated RPC wrappers. + + """ + (procedure, _, timeout, argdefs, postproc_fn, _) = cdef + + if callable(timeout): + read_timeout = timeout(args) + else: + read_timeout = timeout + + body = serializer.DumpJson(map(self._encoder, + zip(map(compat.snd, argdefs), args)), + indent=False) + + result = self._proc(node_list, procedure, body, read_timeout=read_timeout) + + if postproc_fn: + return dict(map(lambda (key, value): (key, postproc_fn(value)), + result.items())) + else: + return result + + +def _ObjectToDict(value): + """Converts an object to a dictionary. + + @note: See L{objects}. + + """ + return value.ToDict() + + +def _ObjectListToDict(value): + """Converts a list of L{objects} to dictionaries. + + """ + return map(_ObjectToDict, value) + + +def _EncodeNodeToDiskDict(value): + """Encodes a dictionary with node name as key and disk objects as values. + + """ + return dict((name, _ObjectListToDict(disks)) + for name, disks in value.items()) + + +def _PrepareFileUpload(filename): + """Loads a file and prepares it for an upload to nodes. + + """ + data = _Compress(utils.ReadFile(filename)) + st = os.stat(filename) + getents = runtime.GetEnts() + return [filename, data, st.st_mode, getents.LookupUid(st.st_uid), + getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime] + + +def _PrepareFinalizeExportDisks(snap_disks): + """Encodes disks for finalizing export. + + """ + flat_disks = [] + + for disk in snap_disks: + if isinstance(disk, bool): + flat_disks.append(disk) + else: + flat_disks.append(disk.ToDict()) + + return flat_disks + + +def _EncodeImportExportIO((ieio, ieioargs)): + """Encodes import/export I/O information. + + """ + if ieio == constants.IEIO_RAW_DISK: + assert len(ieioargs) == 1 + return (ieio, (ieioargs[0].ToDict(), )) + + if ieio == constants.IEIO_SCRIPT: + assert len(ieioargs) == 2 + return (ieio, (ieioargs[0].ToDict(), ieioargs[1])) + + return (ieio, ieioargs) + + +def _EncodeBlockdevRename(value): + """Encodes information for renaming block devices. + + """ + return [(d.ToDict(), uid) for d, uid in value] + + +#: Generic encoders +_ENCODERS = { + rpc_defs.ED_OBJECT_DICT: _ObjectToDict, + rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict, + rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict, + rpc_defs.ED_FILE_DETAILS: _PrepareFileUpload, + rpc_defs.ED_COMPRESS: _Compress, + rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks, + rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO, + rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename, + } + + +class RpcRunner(_RpcClientBase, + _generated_rpc.RpcClientDefault, _generated_rpc.RpcClientBootstrap, _generated_rpc.RpcClientConfig): """RPC runner class. @@ -422,21 +553,31 @@ class RpcRunner(_generated_rpc.RpcClientDefault, @param context: Ganeti context """ + self._cfg = context.cfg + + encoders = _ENCODERS.copy() + + # Add encoders requiring configuration object + encoders.update({ + rpc_defs.ED_INST_DICT: self._InstDict, + rpc_defs.ED_INST_DICT_HVP_BEP: self._InstDictHvpBep, + rpc_defs.ED_INST_DICT_OSP: self._InstDictOsp, + }) + + # Resolver using configuration + resolver = compat.partial(_NodeConfigResolver, self._cfg.GetNodeInfo, + self._cfg.GetAllNodesInfo) + # Pylint doesn't recognize multiple inheritance properly, see # and # # pylint: disable=W0233 + _RpcClientBase.__init__(self, resolver, encoders.get, + lock_monitor_cb=context.glm.AddToLockMonitor) _generated_rpc.RpcClientConfig.__init__(self) _generated_rpc.RpcClientBootstrap.__init__(self) _generated_rpc.RpcClientDefault.__init__(self) - self._cfg = context.cfg - self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver, - self._cfg.GetNodeInfo, - self._cfg.GetAllNodesInfo), - netutils.GetDaemonPort(constants.NODED), - lock_monitor_cb=context.glm.AddToLockMonitor) - def _InstDict(self, instance, hvp=None, bep=None, osp=None): """Convert the given instance to a dict. @@ -485,152 +626,27 @@ class RpcRunner(_generated_rpc.RpcClientDefault, """ return self._InstDict(instance, osp=osparams) - def _Call(self, node_list, procedure, timeout, args): - """Entry point for automatically generated RPC wrappers. - - """ - body = serializer.DumpJson(args, indent=False) - return self._proc(node_list, procedure, body, read_timeout=timeout) - - @staticmethod - def _BlockdevFindPostProc(result): - if not result.fail_msg and result.payload is not None: - result.payload = objects.BlockDevStatus.FromDict(result.payload) - return result - - @staticmethod - def _BlockdevGetMirrorStatusPostProc(result): - if not result.fail_msg: - result.payload = [objects.BlockDevStatus.FromDict(i) - for i in result.payload] - return result - - @staticmethod - def _BlockdevGetMirrorStatusMultiPostProc(result): - for nres in result.values(): - if nres.fail_msg: - continue - - for idx, (success, status) in enumerate(nres.payload): - if success: - nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status)) - - return result - - @staticmethod - def _OsGetPostProc(result): - if not result.fail_msg and isinstance(result.payload, dict): - result.payload = objects.OS.FromDict(result.payload) - return result - - @staticmethod - def _PrepareFinalizeExportDisks(snap_disks): - flat_disks = [] - - for disk in snap_disks: - if isinstance(disk, bool): - flat_disks.append(disk) - else: - flat_disks.append(disk.ToDict()) - - return flat_disks - - @staticmethod - def _ImpExpStatusPostProc(result): - """Post-processor for import/export status. - - @rtype: Payload containing list of L{objects.ImportExportStatus} instances - @return: Returns a list of the state of each named import/export or None if - a status couldn't be retrieved - - """ - if not result.fail_msg: - decoded = [] - - for i in result.payload: - if i is None: - decoded.append(None) - continue - decoded.append(objects.ImportExportStatus.FromDict(i)) - - result.payload = decoded - - return result - - @staticmethod - def _EncodeImportExportIO(ieio, ieioargs): - """Encodes import/export I/O information. - - """ - if ieio == constants.IEIO_RAW_DISK: - assert len(ieioargs) == 1 - return (ieioargs[0].ToDict(), ) - - if ieio == constants.IEIO_SCRIPT: - assert len(ieioargs) == 2 - return (ieioargs[0].ToDict(), ieioargs[1]) - - return ieioargs - - @staticmethod - def _PrepareFileUpload(filename): - """Loads a file and prepares it for an upload to nodes. - - """ - data = _Compress(utils.ReadFile(filename)) - st = os.stat(filename) - getents = runtime.GetEnts() - return [filename, data, st.st_mode, getents.LookupUid(st.st_uid), - getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime] - - # - # Begin RPC calls - # - - def call_test_delay(self, node_list, duration, read_timeout=None): - """Sleep for a fixed time on given node(s). - - This is a multi-node call. - - """ - assert read_timeout is None - return self.call_test_delay(node_list, duration, - read_timeout=int(duration + 5)) - - -class JobQueueRunner(_generated_rpc.RpcClientJobQueue): +class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue): """RPC wrappers for job queue. """ - _Compress = staticmethod(_Compress) - def __init__(self, context, address_list): """Initializes this class. """ - _generated_rpc.RpcClientJobQueue.__init__(self) - if address_list is None: resolver = _SsconfResolver else: # Caller provided an address list resolver = _StaticResolver(address_list) - self._proc = _RpcProcessor(resolver, - netutils.GetDaemonPort(constants.NODED), - lock_monitor_cb=context.glm.AddToLockMonitor) - - def _Call(self, node_list, procedure, timeout, args): - """Entry point for automatically generated RPC wrappers. - - """ - body = serializer.DumpJson(args, indent=False) - - return self._proc(node_list, procedure, body, read_timeout=timeout) + _RpcClientBase.__init__(self, resolver, _ENCODERS.get, + lock_monitor_cb=context.glm.AddToLockMonitor) + _generated_rpc.RpcClientJobQueue.__init__(self) -class BootstrapRunner(_generated_rpc.RpcClientBootstrap): +class BootstrapRunner(_RpcClientBase, _generated_rpc.RpcClientBootstrap): """RPC wrappers for bootstrapping. """ @@ -638,32 +654,22 @@ class BootstrapRunner(_generated_rpc.RpcClientBootstrap): """Initializes this class. """ + _RpcClientBase.__init__(self, _SsconfResolver, _ENCODERS.get) _generated_rpc.RpcClientBootstrap.__init__(self) - self._proc = _RpcProcessor(_SsconfResolver, - netutils.GetDaemonPort(constants.NODED)) - def _Call(self, node_list, procedure, timeout, args): - """Entry point for automatically generated RPC wrappers. - - """ - body = serializer.DumpJson(args, indent=False) - - return self._proc(node_list, procedure, body, read_timeout=timeout) - - -class ConfigRunner(_generated_rpc.RpcClientConfig): +class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig): """RPC wrappers for L{config}. """ - _PrepareFileUpload = \ - staticmethod(RpcRunner._PrepareFileUpload) # pylint: disable=W0212 - - def __init__(self, address_list): + def __init__(self, context, address_list): """Initializes this class. """ - _generated_rpc.RpcClientConfig.__init__(self) + if context: + lock_monitor_cb = context.glm.AddToLockMonitor + else: + lock_monitor_cb = None if address_list is None: resolver = _SsconfResolver @@ -671,13 +677,6 @@ class ConfigRunner(_generated_rpc.RpcClientConfig): # Caller provided an address list resolver = _StaticResolver(address_list) - self._proc = _RpcProcessor(resolver, - netutils.GetDaemonPort(constants.NODED)) - - def _Call(self, node_list, procedure, timeout, args): - """Entry point for automatically generated RPC wrappers. - - """ - body = serializer.DumpJson(args, indent=False) - - return self._proc(node_list, procedure, body, read_timeout=timeout) + _RpcClientBase.__init__(self, resolver, _ENCODERS.get, + lock_monitor_cb=lock_monitor_cb) + _generated_rpc.RpcClientConfig.__init__(self)