+class _RpcClientBase:
+ def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
+ _req_process_fn=None):
+ """Initializes this class.
+
+ """
+ proc = _RpcProcessor(resolver,
+ netutils.GetDaemonPort(constants.NODED),
+ lock_monitor_cb=lock_monitor_cb)
+ self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
+ self._encoder = compat.partial(self._EncodeArg, encoder_fn)
+
+ @staticmethod
+ def _EncodeArg(encoder_fn, (argkind, value)):
+ """Encode argument.
+
+ """
+ if argkind is None:
+ return value
+ else:
+ return encoder_fn(argkind)(value)
+
+ def _Call(self, cdef, node_list, args):
+ """Entry point for automatically generated RPC wrappers.
+
+ """
+ (procedure, _, resolver_opts, timeout, argdefs,
+ prep_fn, postproc_fn, _) = cdef
+
+ if callable(timeout):
+ read_timeout = timeout(args)
+ else:
+ read_timeout = timeout
+
+ if callable(resolver_opts):
+ req_resolver_opts = resolver_opts(args)
+ else:
+ req_resolver_opts = resolver_opts
+
+ if len(args) != len(argdefs):
+ raise errors.ProgrammerError("Number of passed arguments doesn't match")
+
+ enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
+ if prep_fn is None:
+ # for a no-op prep_fn, we serialise the body once, and then we
+ # reuse it in the dictionary values
+ body = serializer.DumpJson(enc_args)
+ pnbody = dict((n, body) for n in node_list)
+ else:
+ # for a custom prep_fn, we pass the encoded arguments and the
+ # node name to the prep_fn, and we serialise its return value
+ assert callable(prep_fn)
+ pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
+ for n in node_list)
+
+ result = self._proc(node_list, procedure, pnbody, read_timeout,
+ req_resolver_opts)
+
+ if postproc_fn:
+ return dict(map(lambda (key, value): (key, postproc_fn(value)),
+ result.items()))
+ else:
+ return result
+
+
+def _ObjectToDict(value):
+ """Converts an object to a dictionary.
+
+ @note: See L{objects}.
+
+ """
+ return value.ToDict()
+
+
+def _ObjectListToDict(value):
+ """Converts a list of L{objects} to dictionaries.
+
+ """
+ return map(_ObjectToDict, value)
+
+
+def _EncodeNodeToDiskDict(value):
+ """Encodes a dictionary with node name as key and disk objects as values.
+
+ """
+ return dict((name, _ObjectListToDict(disks))
+ for name, disks in value.items())
+
+
+def _PrepareFileUpload(getents_fn, filename):
+ """Loads a file and prepares it for an upload to nodes.
+
+ """
+ statcb = utils.FileStatHelper()
+ data = _Compress(utils.ReadFile(filename, preread=statcb))
+ st = statcb.st
+
+ if getents_fn is None:
+ getents_fn = runtime.GetEnts
+
+ getents = getents_fn()
+
+ virt_filename = vcluster.MakeVirtualPath(filename)
+
+ return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid),
+ getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
+
+
+def _PrepareFinalizeExportDisks(snap_disks):
+ """Encodes disks for finalizing export.
+
+ """
+ flat_disks = []
+
+ for disk in snap_disks:
+ if isinstance(disk, bool):
+ flat_disks.append(disk)
+ else:
+ flat_disks.append(disk.ToDict())
+
+ return flat_disks
+
+
+def _EncodeImportExportIO((ieio, ieioargs)):
+ """Encodes import/export I/O information.
+
+ """
+ if ieio == constants.IEIO_RAW_DISK:
+ assert len(ieioargs) == 1
+ return (ieio, (ieioargs[0].ToDict(), ))
+
+ if ieio == constants.IEIO_SCRIPT:
+ assert len(ieioargs) == 2
+ return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
+
+ return (ieio, ieioargs)
+
+
+def _EncodeBlockdevRename(value):
+ """Encodes information for renaming block devices.
+
+ """
+ return [(d.ToDict(), uid) for d, uid in value]
+
+
+def MakeLegacyNodeInfo(data, require_vg_info=True):
+ """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
+
+ Converts the data into a single dictionary. This is fine for most use cases,
+ but some require information from more than one volume group or hypervisor.
+
+ @param require_vg_info: raise an error if the returnd vg_info
+ doesn't have any values
+
+ """
+ (bootid, vgs_info, (hv_info, )) = data
+
+ ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid})
+
+ if require_vg_info or vgs_info:
+ (vg0_info, ) = vgs_info
+ ret = utils.JoinDisjointDicts(vg0_info, ret)
+
+ return ret
+
+
+def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
+ """Annotates just DRBD disks layouts.
+
+ """
+ assert disk.dev_type == constants.LD_DRBD8
+
+ disk.params = objects.FillDict(drbd_params, disk.params)
+ (dev_data, dev_meta) = disk.children
+ dev_data.params = objects.FillDict(data_params, dev_data.params)
+ dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
+
+ return disk
+
+
+def _AnnotateDParamsGeneric(disk, (params, )):
+ """Generic disk parameter annotation routine.
+
+ """
+ assert disk.dev_type != constants.LD_DRBD8
+
+ disk.params = objects.FillDict(params, disk.params)
+
+ return disk
+
+
+def AnnotateDiskParams(template, disks, disk_params):
+ """Annotates the disk objects with the disk parameters.
+
+ @param template: The disk template used
+ @param disks: The list of disks objects to annotate
+ @param disk_params: The disk paramaters for annotation
+ @returns: A list of disk objects annotated
+
+ """
+ ld_params = objects.Disk.ComputeLDParams(template, disk_params)
+
+ if template == constants.DT_DRBD8:
+ annotation_fn = _AnnotateDParamsDRBD
+ elif template == constants.DT_DISKLESS:
+ annotation_fn = lambda disk, _: disk
+ else:
+ annotation_fn = _AnnotateDParamsGeneric
+
+ return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
+
+
+def _GetESFlag(cfg, nodename):
+ ni = cfg.GetNodeInfo(nodename)
+ if ni is None:
+ raise errors.OpPrereqError("Invalid node name %s" % nodename,
+ errors.ECODE_NOENT)
+ return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]
+
+
+def GetExclusiveStorageForNodeNames(cfg, nodelist):
+ """Return the exclusive storage flag for all the given nodes.
+
+ @type cfg: L{config.ConfigWriter}
+ @param cfg: cluster configuration
+ @type nodelist: list or tuple
+ @param nodelist: node names for which to read the flag
+ @rtype: dict
+ @return: mapping from node names to exclusive storage flags
+ @raise errors.OpPrereqError: if any given node name has no corresponding node
+
+ """
+ getflag = lambda n: _GetESFlag(cfg, n)
+ flags = map(getflag, nodelist)
+ return dict(zip(nodelist, flags))
+
+
+#: Generic encoders
+_ENCODERS = {
+ rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
+ rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
+ rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
+ rpc_defs.ED_COMPRESS: _Compress,
+ rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
+ rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
+ rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
+ }
+
+
+class RpcRunner(_RpcClientBase,
+ _generated_rpc.RpcClientDefault,