4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49 from ganeti import compat
50 from ganeti import rpc_defs
51 from ganeti import pathutils
52 from ganeti import vcluster
54 # Special module generated at build time
55 from ganeti import _generated_rpc
57 # pylint has a bug here, doesn't see this import
58 import ganeti.http.client # pylint: disable=W0611
61 _RPC_CLIENT_HEADERS = [
62 "Content-type: %s" % http.HTTP_APP_JSON,
66 #: Special value to describe an offline host
71 """Initializes the module-global HTTP client manager.
73 Must be called before using any RPC function and while exactly one thread is
77 # curl_global_init(3) and curl_global_cleanup(3) must be called with only
78 # one thread running. This check is just a safety measure -- it doesn't
80 assert threading.activeCount() == 1, \
81 "Found more than one active thread when initializing pycURL"
83 logging.info("Using PycURL %s", pycurl.version)
85 pycurl.global_init(pycurl.GLOBAL_ALL)
89 """Stops the module-global HTTP client manager.
91 Must be called before quitting the program and while exactly one thread is
95 pycurl.global_cleanup()
98 def _ConfigRpcCurl(curl):
99 noded_cert = str(pathutils.NODED_CERT_FILE)
101 curl.setopt(pycurl.FOLLOWLOCATION, False)
102 curl.setopt(pycurl.CAINFO, noded_cert)
103 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
104 curl.setopt(pycurl.SSL_VERIFYPEER, True)
105 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
106 curl.setopt(pycurl.SSLCERT, noded_cert)
107 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
108 curl.setopt(pycurl.SSLKEY, noded_cert)
109 curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
113 """RPC-wrapper decorator.
115 When applied to a function, it runs it with the RPC system
116 initialized, and it shutsdown the system afterwards. This means the
117 function must be called without RPC being initialized.
120 def wrapper(*args, **kwargs):
123 return fn(*args, **kwargs)
129 def _Compress(_, data):
130 """Compresses a string for transport over RPC.
132 Small amounts of data are not compressed.
137 @return: Encoded data to send
140 # Small amounts of data are not compressed
142 return (constants.RPC_ENCODING_NONE, data)
144 # Compress with zlib and encode in base64
145 return (constants.RPC_ENCODING_ZLIB_BASE64,
146 base64.b64encode(zlib.compress(data, 3)))
149 class RpcResult(object):
152 This class holds an RPC result. It is needed since in multi-node
153 calls we can't raise an exception just because one out of many
154 failed, and therefore we use this class to encapsulate the result.
156 @ivar data: the data payload, for successful results, or None
157 @ivar call: the name of the RPC call
158 @ivar node: the name of the node to which we made the call
159 @ivar offline: whether the operation failed because the node was
160 offline, as opposed to actual failure; offline=True will always
161 imply failed=True, in order to allow simpler checking if
162 the user doesn't care about the exact failure mode
163 @ivar fail_msg: the error message if the call failed
166 def __init__(self, data=None, failed=False, offline=False,
167 call=None, node=None):
168 self.offline = offline
173 self.fail_msg = "Node is marked offline"
174 self.data = self.payload = None
176 self.fail_msg = self._EnsureErr(data)
177 self.data = self.payload = None
180 if not isinstance(self.data, (tuple, list)):
181 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
185 self.fail_msg = ("RPC layer error: invalid result length (%d), "
186 "expected 2" % len(self.data))
188 elif not self.data[0]:
189 self.fail_msg = self._EnsureErr(self.data[1])
194 self.payload = data[1]
196 for attr_name in ["call", "data", "fail_msg",
197 "node", "offline", "payload"]:
198 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
202 """Helper to ensure we return a 'True' value for error."""
206 return "No error information"
208 def Raise(self, msg, prereq=False, ecode=None):
209 """If the result has failed, raise an OpExecError.
211 This is used so that LU code doesn't have to check for each
212 result, but instead can call this function.
215 if not self.fail_msg:
218 if not msg: # one could pass None for default message
219 msg = ("Call '%s' to node '%s' has failed: %s" %
220 (self.call, self.node, self.fail_msg))
222 msg = "%s: %s" % (msg, self.fail_msg)
224 ec = errors.OpPrereqError
226 ec = errors.OpExecError
227 if ecode is not None:
231 raise ec(*args) # pylint: disable=W0142
233 def Warn(self, msg, feedback_fn):
234 """If the result has failed, call the feedback_fn.
236 This is used to in cases were LU wants to warn the
237 user about a failure, but continue anyway.
240 if not self.fail_msg:
243 msg = "%s: %s" % (msg, self.fail_msg)
247 def _SsconfResolver(ssconf_ips, node_list, _,
248 ssc=ssconf.SimpleStore,
249 nslookup_fn=netutils.Hostname.GetIP):
250 """Return addresses for given node names.
252 @type ssconf_ips: bool
253 @param ssconf_ips: Use the ssconf IPs
254 @type node_list: list
255 @param node_list: List of node names
257 @param ssc: SimpleStore class that is used to obtain node->ip mappings
258 @type nslookup_fn: callable
259 @param nslookup_fn: function use to do NS lookup
260 @rtype: list of tuple; (string, string)
261 @return: List of tuples containing node name and IP address
265 family = ss.GetPrimaryIPFamily()
268 iplist = ss.GetNodePrimaryIPList()
269 ipmap = dict(entry.split() for entry in iplist)
274 for node in node_list:
277 ip = nslookup_fn(node, family=family)
278 result.append((node, ip, node))
283 class _StaticResolver:
284 def __init__(self, addresses):
285 """Initializes this class.
288 self._addresses = addresses
290 def __call__(self, hosts, _):
291 """Returns static addresses for hosts.
294 assert len(hosts) == len(self._addresses)
295 return zip(hosts, self._addresses, hosts)
298 def _CheckConfigNode(node_uuid_or_name, node, accept_offline_node):
299 """Checks if a node is online.
301 @type node_uuid_or_name: string
302 @param node_uuid_or_name: Node UUID
303 @type node: L{objects.Node} or None
304 @param node: Node object
308 # Assume that the passed parameter was actually a node name, so depend on
309 # DNS for name resolution
310 return (node_uuid_or_name, node_uuid_or_name, node_uuid_or_name)
312 if node.offline and not accept_offline_node:
316 return (node.name, ip, node_uuid_or_name)
319 def _NodeConfigResolver(single_node_fn, all_nodes_fn, node_uuids, opts):
320 """Calculate node addresses using configuration.
322 Note that strings in node_uuids are treated as node names if the UUID is not
323 found in the configuration.
326 accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
328 assert accept_offline_node or opts is None, "Unknown option"
330 # Special case for single-host lookups
331 if len(node_uuids) == 1:
332 (uuid, ) = node_uuids
333 return [_CheckConfigNode(uuid, single_node_fn(uuid), accept_offline_node)]
335 all_nodes = all_nodes_fn()
336 return [_CheckConfigNode(uuid, all_nodes.get(uuid, None),
338 for uuid in node_uuids]
342 def __init__(self, resolver, port, lock_monitor_cb=None):
343 """Initializes this class.
345 @param resolver: callable accepting a list of node UUIDs or hostnames,
346 returning a list of tuples containing name, IP address and original name
347 of the resolved node. IP address can be the name or the special value
348 L{_OFFLINE} to mark offline machines.
350 @param port: TCP port
351 @param lock_monitor_cb: Callable for registering with lock monitor
354 self._resolver = resolver
356 self._lock_monitor_cb = lock_monitor_cb
359 def _PrepareRequests(hosts, port, procedure, body, read_timeout):
360 """Prepares requests by sorting offline hosts into separate list.
363 @param body: a dictionary with per-host body data
369 assert isinstance(body, dict)
370 assert len(body) == len(hosts)
371 assert compat.all(isinstance(v, str) for v in body.values())
372 assert frozenset(map(lambda x: x[2], hosts)) == frozenset(body.keys()), \
373 "%s != %s" % (hosts, body.keys())
375 for (name, ip, original_name) in hosts:
377 # Node is marked as offline
378 results[original_name] = RpcResult(node=name,
382 requests[original_name] = \
383 http.client.HttpClientRequest(str(ip), port,
384 http.HTTP_POST, str("/%s" % procedure),
385 headers=_RPC_CLIENT_HEADERS,
386 post_data=body[original_name],
387 read_timeout=read_timeout,
388 nicename="%s/%s" % (name, procedure),
389 curl_config_fn=_ConfigRpcCurl)
391 return (results, requests)
394 def _CombineResults(results, requests, procedure):
395 """Combines pre-computed results for offline hosts with actual call results.
398 for name, req in requests.items():
399 if req.success and req.resp_status_code == http.HTTP_OK:
400 host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
401 node=name, call=procedure)
403 # TODO: Better error reporting
409 logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
410 host_result = RpcResult(data=msg, failed=True, node=name,
413 results[name] = host_result
417 def __call__(self, nodes, procedure, body, read_timeout, resolver_opts,
418 _req_process_fn=None):
419 """Makes an RPC request to a number of nodes.
421 @type nodes: sequence
422 @param nodes: node UUIDs or Hostnames
423 @type procedure: string
424 @param procedure: Request path
425 @type body: dictionary
426 @param body: dictionary with request bodies per host
427 @type read_timeout: int or None
428 @param read_timeout: Read timeout for request
430 @return: a dictionary mapping host names to rpc.RpcResult objects
433 assert read_timeout is not None, \
434 "Missing RPC read timeout for procedure '%s'" % procedure
436 if _req_process_fn is None:
437 _req_process_fn = http.client.ProcessRequests
439 (results, requests) = \
440 self._PrepareRequests(self._resolver(nodes, resolver_opts), self._port,
441 procedure, body, read_timeout)
443 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
445 assert not frozenset(results).intersection(requests)
447 return self._CombineResults(results, requests, procedure)
450 class _RpcClientBase:
451 def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
452 _req_process_fn=None):
453 """Initializes this class.
456 proc = _RpcProcessor(resolver,
457 netutils.GetDaemonPort(constants.NODED),
458 lock_monitor_cb=lock_monitor_cb)
459 self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
460 self._encoder = compat.partial(self._EncodeArg, encoder_fn)
463 def _EncodeArg(encoder_fn, node, (argkind, value)):
470 return encoder_fn(argkind)(node, value)
472 def _Call(self, cdef, node_list, args):
473 """Entry point for automatically generated RPC wrappers.
476 (procedure, _, resolver_opts, timeout, argdefs,
477 prep_fn, postproc_fn, _) = cdef
479 if callable(timeout):
480 read_timeout = timeout(args)
482 read_timeout = timeout
484 if callable(resolver_opts):
485 req_resolver_opts = resolver_opts(args)
487 req_resolver_opts = resolver_opts
489 if len(args) != len(argdefs):
490 raise errors.ProgrammerError("Number of passed arguments doesn't match")
493 prep_fn = lambda _, args: args
494 assert callable(prep_fn)
496 # encode the arguments for each node individually, pass them and the node
497 # name to the prep_fn, and serialise its return value
498 encode_args_fn = lambda node: map(compat.partial(self._encoder, node),
499 zip(map(compat.snd, argdefs), args))
500 pnbody = dict((n, serializer.DumpJson(prep_fn(n, encode_args_fn(n))))
503 result = self._proc(node_list, procedure, pnbody, read_timeout,
507 return dict(map(lambda (key, value): (key, postproc_fn(value)),
513 def _ObjectToDict(_, value):
514 """Converts an object to a dictionary.
516 @note: See L{objects}.
519 return value.ToDict()
522 def _ObjectListToDict(node, value):
523 """Converts a list of L{objects} to dictionaries.
526 return map(compat.partial(_ObjectToDict, node), value)
529 def _PrepareFileUpload(getents_fn, node, filename):
530 """Loads a file and prepares it for an upload to nodes.
533 statcb = utils.FileStatHelper()
534 data = _Compress(node, utils.ReadFile(filename, preread=statcb))
537 if getents_fn is None:
538 getents_fn = runtime.GetEnts
540 getents = getents_fn()
542 virt_filename = vcluster.MakeVirtualPath(filename)
544 return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid),
545 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
548 def _PrepareFinalizeExportDisks(_, snap_disks):
549 """Encodes disks for finalizing export.
554 for disk in snap_disks:
555 if isinstance(disk, bool):
556 flat_disks.append(disk)
558 flat_disks.append(disk.ToDict())
563 def _EncodeBlockdevRename(_, value):
564 """Encodes information for renaming block devices.
567 return [(d.ToDict(), uid) for d, uid in value]
570 def _AddSpindlesToLegacyNodeInfo(result, space_info):
571 """Extracts the spindle information from the space info and adds
572 it to the result dictionary.
574 @type result: dict of strings
575 @param result: dictionary holding the result of the legacy node info
576 @type space_info: list of dicts of strings
577 @param space_info: list, each row holding space information of one storage
580 @return: does not return anything, manipulates the C{result} variable
583 lvm_pv_info = utils.storage.LookupSpaceInfoByStorageType(
584 space_info, constants.ST_LVM_PV)
586 result["spindles_free"] = lvm_pv_info["storage_free"]
587 result["spindles_total"] = lvm_pv_info["storage_size"]
589 result["spindles_free"] = 0
590 result["spindles_total"] = 0
593 def _AddStorageInfoToLegacyNodeInfoByTemplate(
594 result, space_info, disk_template):
595 """Extracts the storage space information of the disk template from
596 the space info and adds it to the result dictionary.
598 @see: C{_AddSpindlesToLegacyNodeInfo} for parameter information.
601 if utils.storage.DiskTemplateSupportsSpaceReporting(disk_template):
602 disk_info = utils.storage.LookupSpaceInfoByDiskTemplate(
603 space_info, disk_template)
604 result["name"] = disk_info["name"]
605 result["storage_free"] = disk_info["storage_free"]
606 result["storage_size"] = disk_info["storage_size"]
608 # FIXME: consider displaying '-' in this case
609 result["storage_free"] = 0
610 result["storage_size"] = 0
613 def MakeLegacyNodeInfo(data, disk_template):
614 """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
616 Converts the data into a single dictionary. This is fine for most use cases,
617 but some require information from more than one volume group or hypervisor.
620 (bootid, space_info, (hv_info, )) = data
622 ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid})
624 _AddSpindlesToLegacyNodeInfo(ret, space_info)
625 _AddStorageInfoToLegacyNodeInfoByTemplate(ret, space_info, disk_template)
630 def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
631 """Annotates just DRBD disks layouts.
634 assert disk.dev_type == constants.DT_DRBD8
636 disk.params = objects.FillDict(drbd_params, disk.params)
637 (dev_data, dev_meta) = disk.children
638 dev_data.params = objects.FillDict(data_params, dev_data.params)
639 dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
644 def _AnnotateDParamsGeneric(disk, (params, )):
645 """Generic disk parameter annotation routine.
648 assert disk.dev_type != constants.DT_DRBD8
650 disk.params = objects.FillDict(params, disk.params)
655 def AnnotateDiskParams(disks, disk_params):
656 """Annotates the disk objects with the disk parameters.
658 @param disks: The list of disks objects to annotate
659 @param disk_params: The disk parameters for annotation
660 @returns: A list of disk objects annotated
663 def AnnotateDisk(disk):
664 if disk.dev_type == constants.DT_DISKLESS:
667 ld_params = objects.Disk.ComputeLDParams(disk.dev_type, disk_params)
669 if disk.dev_type == constants.DT_DRBD8:
670 return _AnnotateDParamsDRBD(disk, ld_params)
672 return _AnnotateDParamsGeneric(disk, ld_params)
674 return [AnnotateDisk(disk.Copy()) for disk in disks]
677 def _GetExclusiveStorageFlag(cfg, node_uuid):
678 ni = cfg.GetNodeInfo(node_uuid)
680 raise errors.OpPrereqError("Invalid node name %s" % node_uuid,
682 return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]
685 def _AddExclusiveStorageFlagToLvmStorageUnits(storage_units, es_flag):
686 """Adds the exclusive storage flag to lvm units.
688 This function creates a copy of the storage_units lists, with the
689 es_flag being added to all lvm storage units.
691 @type storage_units: list of pairs (string, string)
692 @param storage_units: list of 'raw' storage units, consisting only of
693 (storage_type, storage_key)
694 @type es_flag: boolean
695 @param es_flag: exclusive storage flag
696 @rtype: list of tuples (string, string, list)
697 @return: list of storage units (storage_type, storage_key, params) with
698 the params containing the es_flag for lvm-vg storage units
702 for (storage_type, storage_key) in storage_units:
703 if storage_type in [constants.ST_LVM_VG]:
704 result.append((storage_type, storage_key, [es_flag]))
706 result.append((constants.ST_LVM_PV, storage_key, [es_flag]))
708 result.append((storage_type, storage_key, []))
712 def GetExclusiveStorageForNodes(cfg, node_uuids):
713 """Return the exclusive storage flag for all the given nodes.
715 @type cfg: L{config.ConfigWriter}
716 @param cfg: cluster configuration
717 @type node_uuids: list or tuple
718 @param node_uuids: node UUIDs for which to read the flag
720 @return: mapping from node uuids to exclusive storage flags
721 @raise errors.OpPrereqError: if any given node name has no corresponding
725 getflag = lambda n: _GetExclusiveStorageFlag(cfg, n)
726 flags = map(getflag, node_uuids)
727 return dict(zip(node_uuids, flags))
730 def PrepareStorageUnitsForNodes(cfg, storage_units, node_uuids):
731 """Return the lvm storage unit for all the given nodes.
733 Main purpose of this function is to map the exclusive storage flag, which
734 can be different for each node, to the default LVM storage unit.
736 @type cfg: L{config.ConfigWriter}
737 @param cfg: cluster configuration
738 @type storage_units: list of pairs (string, string)
739 @param storage_units: list of 'raw' storage units, e.g. pairs of
740 (storage_type, storage_key)
741 @type node_uuids: list or tuple
742 @param node_uuids: node UUIDs for which to read the flag
744 @return: mapping from node uuids to a list of storage units which include
745 the exclusive storage flag for lvm storage
746 @raise errors.OpPrereqError: if any given node name has no corresponding
750 getunit = lambda n: _AddExclusiveStorageFlagToLvmStorageUnits(
751 storage_units, _GetExclusiveStorageFlag(cfg, n))
752 flags = map(getunit, node_uuids)
753 return dict(zip(node_uuids, flags))
758 rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
759 rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
760 rpc_defs.ED_COMPRESS: _Compress,
761 rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
762 rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
766 class RpcRunner(_RpcClientBase,
767 _generated_rpc.RpcClientDefault,
768 _generated_rpc.RpcClientBootstrap,
769 _generated_rpc.RpcClientDnsOnly,
770 _generated_rpc.RpcClientConfig):
774 def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
775 """Initialized the RPC runner.
777 @type cfg: L{config.ConfigWriter}
778 @param cfg: Configuration
779 @type lock_monitor_cb: callable
780 @param lock_monitor_cb: Lock monitor callback
785 encoders = _ENCODERS.copy()
788 # Encoders requiring configuration object
789 rpc_defs.ED_INST_DICT: self._InstDict,
790 rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
791 rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
792 rpc_defs.ED_NIC_DICT: self._NicDict,
793 rpc_defs.ED_DEVICE_DICT: self._DeviceDict,
795 # Encoders annotating disk parameters
796 rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
797 rpc_defs.ED_MULTI_DISKS_DICT_DP: self._MultiDiskDictDP,
798 rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
799 rpc_defs.ED_NODE_TO_DISK_DICT_DP: self._EncodeNodeToDiskDictDP,
801 # Encoders with special requirements
802 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
804 rpc_defs.ED_IMPEXP_IO: self._EncodeImportExportIO,
807 # Resolver using configuration
808 resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
811 # Pylint doesn't recognize multiple inheritance properly, see
812 # <http://www.logilab.org/ticket/36586> and
813 # <http://www.logilab.org/ticket/35642>
814 # pylint: disable=W0233
815 _RpcClientBase.__init__(self, resolver, encoders.get,
816 lock_monitor_cb=lock_monitor_cb,
817 _req_process_fn=_req_process_fn)
818 _generated_rpc.RpcClientConfig.__init__(self)
819 _generated_rpc.RpcClientBootstrap.__init__(self)
820 _generated_rpc.RpcClientDnsOnly.__init__(self)
821 _generated_rpc.RpcClientDefault.__init__(self)
823 def _NicDict(self, _, nic):
824 """Convert the given nic to a dict and encapsulate netinfo
827 n = copy.deepcopy(nic)
829 net_uuid = self._cfg.LookupNetwork(n.network)
831 nobj = self._cfg.GetNetwork(net_uuid)
832 n.netinfo = objects.Network.ToDict(nobj)
835 def _DeviceDict(self, _, (device, instance)):
836 if isinstance(device, objects.NIC):
837 return self._NicDict(None, device)
838 elif isinstance(device, objects.Disk):
839 return self._SingleDiskDictDP(None, (device, instance))
841 def _InstDict(self, node, instance, hvp=None, bep=None, osp=None):
842 """Convert the given instance to a dict.
844 This is done via the instance's ToDict() method and additionally
845 we fill the hvparams with the cluster defaults.
847 @type instance: L{objects.Instance}
848 @param instance: an Instance object
849 @type hvp: dict or None
850 @param hvp: a dictionary with overridden hypervisor parameters
851 @type bep: dict or None
852 @param bep: a dictionary with overridden backend parameters
853 @type osp: dict or None
854 @param osp: a dictionary with overridden os parameters
856 @return: the instance dict, with the hvparams filled with the
860 idict = instance.ToDict()
861 cluster = self._cfg.GetClusterInfo()
862 idict["hvparams"] = cluster.FillHV(instance)
864 idict["hvparams"].update(hvp)
865 idict["beparams"] = cluster.FillBE(instance)
867 idict["beparams"].update(bep)
868 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
870 idict["osparams"].update(osp)
871 idict["disks"] = self._DisksDictDP(node, (instance.disks, instance))
872 for nic in idict["nics"]:
873 nic["nicparams"] = objects.FillDict(
874 cluster.nicparams[constants.PP_DEFAULT],
876 network = nic.get("network", None)
878 net_uuid = self._cfg.LookupNetwork(network)
880 nobj = self._cfg.GetNetwork(net_uuid)
881 nic["netinfo"] = objects.Network.ToDict(nobj)
884 def _InstDictHvpBepDp(self, node, (instance, hvp, bep)):
885 """Wrapper for L{_InstDict}.
888 return self._InstDict(node, instance, hvp=hvp, bep=bep)
890 def _InstDictOspDp(self, node, (instance, osparams)):
891 """Wrapper for L{_InstDict}.
894 return self._InstDict(node, instance, osp=osparams)
896 def _DisksDictDP(self, node, (disks, instance)):
897 """Wrapper for L{AnnotateDiskParams}.
900 diskparams = self._cfg.GetInstanceDiskParams(instance)
902 for disk in AnnotateDiskParams(disks, diskparams):
903 disk_node_uuids = disk.GetNodes(instance.primary_node)
904 node_ips = dict((uuid, node.secondary_ip) for (uuid, node)
905 in self._cfg.GetMultiNodeInfo(disk_node_uuids))
907 disk.UpdateDynamicDiskParams(node, node_ips)
909 ret.append(disk.ToDict(include_dynamic_params=True))
913 def _MultiDiskDictDP(self, node, disks_insts):
914 """Wrapper for L{AnnotateDiskParams}.
916 Supports a list of (disk, instance) tuples.
918 return [disk for disk_inst in disks_insts
919 for disk in self._DisksDictDP(node, disk_inst)]
921 def _SingleDiskDictDP(self, node, (disk, instance)):
922 """Wrapper for L{AnnotateDiskParams}.
925 (anno_disk,) = self._DisksDictDP(node, ([disk], instance))
928 def _EncodeNodeToDiskDictDP(self, node, value):
929 """Encode dict of node name -> list of (disk, instance) tuples as values.
932 return dict((name, [self._SingleDiskDictDP(node, disk) for disk in disks])
933 for name, disks in value.items())
935 def _EncodeImportExportIO(self, node, (ieio, ieioargs)):
936 """Encodes import/export I/O information.
939 if ieio == constants.IEIO_RAW_DISK:
940 assert len(ieioargs) == 1
941 return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), ))
943 if ieio == constants.IEIO_SCRIPT:
944 assert len(ieioargs) == 2
945 return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), ieioargs[1]))
947 return (ieio, ieioargs)
950 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
951 """RPC wrappers for job queue.
954 def __init__(self, context, address_list):
955 """Initializes this class.
958 if address_list is None:
959 resolver = compat.partial(_SsconfResolver, True)
961 # Caller provided an address list
962 resolver = _StaticResolver(address_list)
964 _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
965 lock_monitor_cb=context.glm.AddToLockMonitor)
966 _generated_rpc.RpcClientJobQueue.__init__(self)
969 class BootstrapRunner(_RpcClientBase,
970 _generated_rpc.RpcClientBootstrap,
971 _generated_rpc.RpcClientDnsOnly):
972 """RPC wrappers for bootstrapping.
976 """Initializes this class.
979 # Pylint doesn't recognize multiple inheritance properly, see
980 # <http://www.logilab.org/ticket/36586> and
981 # <http://www.logilab.org/ticket/35642>
982 # pylint: disable=W0233
983 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
985 _generated_rpc.RpcClientBootstrap.__init__(self)
986 _generated_rpc.RpcClientDnsOnly.__init__(self)
989 class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
990 """RPC wrappers for calls using only DNS.
994 """Initialize this class.
997 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
999 _generated_rpc.RpcClientDnsOnly.__init__(self)
1002 class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
1003 """RPC wrappers for L{config}.
1006 def __init__(self, context, address_list, _req_process_fn=None,
1008 """Initializes this class.
1012 lock_monitor_cb = context.glm.AddToLockMonitor
1014 lock_monitor_cb = None
1016 if address_list is None:
1017 resolver = compat.partial(_SsconfResolver, True)
1019 # Caller provided an address list
1020 resolver = _StaticResolver(address_list)
1022 encoders = _ENCODERS.copy()
1025 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
1028 _RpcClientBase.__init__(self, resolver, encoders.get,
1029 lock_monitor_cb=lock_monitor_cb,
1030 _req_process_fn=_req_process_fn)
1031 _generated_rpc.RpcClientConfig.__init__(self)