4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49 from ganeti import compat
50 from ganeti import rpc_defs
51 from ganeti import pathutils
52 from ganeti import vcluster
54 # Special module generated at build time
55 from ganeti import _generated_rpc
57 # pylint has a bug here, doesn't see this import
58 import ganeti.http.client # pylint: disable=W0611
61 _RPC_CLIENT_HEADERS = [
62 "Content-type: %s" % http.HTTP_APP_JSON,
66 #: Special value to describe an offline host
71 """Initializes the module-global HTTP client manager.
73 Must be called before using any RPC function and while exactly one thread is
77 # curl_global_init(3) and curl_global_cleanup(3) must be called with only
78 # one thread running. This check is just a safety measure -- it doesn't
80 assert threading.activeCount() == 1, \
81 "Found more than one active thread when initializing pycURL"
83 logging.info("Using PycURL %s", pycurl.version)
85 pycurl.global_init(pycurl.GLOBAL_ALL)
89 """Stops the module-global HTTP client manager.
91 Must be called before quitting the program and while exactly one thread is
95 pycurl.global_cleanup()
98 def _ConfigRpcCurl(curl):
99 noded_cert = str(pathutils.NODED_CERT_FILE)
101 curl.setopt(pycurl.FOLLOWLOCATION, False)
102 curl.setopt(pycurl.CAINFO, noded_cert)
103 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
104 curl.setopt(pycurl.SSL_VERIFYPEER, True)
105 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
106 curl.setopt(pycurl.SSLCERT, noded_cert)
107 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
108 curl.setopt(pycurl.SSLKEY, noded_cert)
109 curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
113 """RPC-wrapper decorator.
115 When applied to a function, it runs it with the RPC system
116 initialized, and it shutsdown the system afterwards. This means the
117 function must be called without RPC being initialized.
120 def wrapper(*args, **kwargs):
123 return fn(*args, **kwargs)
130 """Compresses a string for transport over RPC.
132 Small amounts of data are not compressed.
137 @return: Encoded data to send
140 # Small amounts of data are not compressed
142 return (constants.RPC_ENCODING_NONE, data)
144 # Compress with zlib and encode in base64
145 return (constants.RPC_ENCODING_ZLIB_BASE64,
146 base64.b64encode(zlib.compress(data, 3)))
149 class RpcResult(object):
152 This class holds an RPC result. It is needed since in multi-node
153 calls we can't raise an exception just because one out of many
154 failed, and therefore we use this class to encapsulate the result.
156 @ivar data: the data payload, for successful results, or None
157 @ivar call: the name of the RPC call
158 @ivar node: the name of the node to which we made the call
159 @ivar offline: whether the operation failed because the node was
160 offline, as opposed to actual failure; offline=True will always
161 imply failed=True, in order to allow simpler checking if
162 the user doesn't care about the exact failure mode
163 @ivar fail_msg: the error message if the call failed
166 def __init__(self, data=None, failed=False, offline=False,
167 call=None, node=None):
168 self.offline = offline
173 self.fail_msg = "Node is marked offline"
174 self.data = self.payload = None
176 self.fail_msg = self._EnsureErr(data)
177 self.data = self.payload = None
180 if not isinstance(self.data, (tuple, list)):
181 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
185 self.fail_msg = ("RPC layer error: invalid result length (%d), "
186 "expected 2" % len(self.data))
188 elif not self.data[0]:
189 self.fail_msg = self._EnsureErr(self.data[1])
194 self.payload = data[1]
196 for attr_name in ["call", "data", "fail_msg",
197 "node", "offline", "payload"]:
198 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
202 """Helper to ensure we return a 'True' value for error."""
206 return "No error information"
208 def Raise(self, msg, prereq=False, ecode=None):
209 """If the result has failed, raise an OpExecError.
211 This is used so that LU code doesn't have to check for each
212 result, but instead can call this function.
215 if not self.fail_msg:
218 if not msg: # one could pass None for default message
219 msg = ("Call '%s' to node '%s' has failed: %s" %
220 (self.call, self.node, self.fail_msg))
222 msg = "%s: %s" % (msg, self.fail_msg)
224 ec = errors.OpPrereqError
226 ec = errors.OpExecError
227 if ecode is not None:
231 raise ec(*args) # pylint: disable=W0142
233 def Warn(self, msg, feedback_fn):
234 """If the result has failed, call the feedback_fn.
236 This is used to in cases were LU wants to warn the
237 user about a failure, but continue anyway.
240 if not self.fail_msg:
243 msg = "%s: %s" % (msg, self.fail_msg)
247 def _SsconfResolver(ssconf_ips, node_list, _,
248 ssc=ssconf.SimpleStore,
249 nslookup_fn=netutils.Hostname.GetIP):
250 """Return addresses for given node names.
252 @type ssconf_ips: bool
253 @param ssconf_ips: Use the ssconf IPs
254 @type node_list: list
255 @param node_list: List of node names
257 @param ssc: SimpleStore class that is used to obtain node->ip mappings
258 @type nslookup_fn: callable
259 @param nslookup_fn: function use to do NS lookup
260 @rtype: list of tuple; (string, string)
261 @return: List of tuples containing node name and IP address
265 family = ss.GetPrimaryIPFamily()
268 iplist = ss.GetNodePrimaryIPList()
269 ipmap = dict(entry.split() for entry in iplist)
274 for node in node_list:
277 ip = nslookup_fn(node, family=family)
278 result.append((node, ip))
283 class _StaticResolver:
284 def __init__(self, addresses):
285 """Initializes this class.
288 self._addresses = addresses
290 def __call__(self, hosts, _):
291 """Returns static addresses for hosts.
294 assert len(hosts) == len(self._addresses)
295 return zip(hosts, self._addresses)
298 def _CheckConfigNode(name, node, accept_offline_node):
299 """Checks if a node is online.
302 @param name: Node name
303 @type node: L{objects.Node} or None
304 @param node: Node object
308 # Depend on DNS for name resolution
310 elif node.offline and not accept_offline_node:
317 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
318 """Calculate node addresses using configuration.
321 accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
323 assert accept_offline_node or opts is None, "Unknown option"
325 # Special case for single-host lookups
328 return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
330 all_nodes = all_nodes_fn()
331 return [_CheckConfigNode(name, all_nodes.get(name, None),
337 def __init__(self, resolver, port, lock_monitor_cb=None):
338 """Initializes this class.
340 @param resolver: callable accepting a list of hostnames, returning a list
341 of tuples containing name and IP address (IP address can be the name or
342 the special value L{_OFFLINE} to mark offline machines)
344 @param port: TCP port
345 @param lock_monitor_cb: Callable for registering with lock monitor
348 self._resolver = resolver
350 self._lock_monitor_cb = lock_monitor_cb
353 def _PrepareRequests(hosts, port, procedure, body, read_timeout):
354 """Prepares requests by sorting offline hosts into separate list.
357 @param body: a dictionary with per-host body data
363 assert isinstance(body, dict)
364 assert len(body) == len(hosts)
365 assert compat.all(isinstance(v, str) for v in body.values())
366 assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
367 "%s != %s" % (hosts, body.keys())
369 for (name, ip) in hosts:
371 # Node is marked as offline
372 results[name] = RpcResult(node=name, offline=True, call=procedure)
375 http.client.HttpClientRequest(str(ip), port,
376 http.HTTP_POST, str("/%s" % procedure),
377 headers=_RPC_CLIENT_HEADERS,
378 post_data=body[name],
379 read_timeout=read_timeout,
380 nicename="%s/%s" % (name, procedure),
381 curl_config_fn=_ConfigRpcCurl)
383 return (results, requests)
386 def _CombineResults(results, requests, procedure):
387 """Combines pre-computed results for offline hosts with actual call results.
390 for name, req in requests.items():
391 if req.success and req.resp_status_code == http.HTTP_OK:
392 host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
393 node=name, call=procedure)
395 # TODO: Better error reporting
401 logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
402 host_result = RpcResult(data=msg, failed=True, node=name,
405 results[name] = host_result
409 def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
410 _req_process_fn=None):
411 """Makes an RPC request to a number of nodes.
413 @type hosts: sequence
414 @param hosts: Hostnames
415 @type procedure: string
416 @param procedure: Request path
417 @type body: dictionary
418 @param body: dictionary with request bodies per host
419 @type read_timeout: int or None
420 @param read_timeout: Read timeout for request
422 @return: a dictionary mapping host names to rpc.RpcResult objects
425 assert read_timeout is not None, \
426 "Missing RPC read timeout for procedure '%s'" % procedure
428 if _req_process_fn is None:
429 _req_process_fn = http.client.ProcessRequests
431 (results, requests) = \
432 self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
433 procedure, body, read_timeout)
435 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
437 assert not frozenset(results).intersection(requests)
439 return self._CombineResults(results, requests, procedure)
442 class _RpcClientBase:
443 def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
444 _req_process_fn=None):
445 """Initializes this class.
448 proc = _RpcProcessor(resolver,
449 netutils.GetDaemonPort(constants.NODED),
450 lock_monitor_cb=lock_monitor_cb)
451 self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
452 self._encoder = compat.partial(self._EncodeArg, encoder_fn)
455 def _EncodeArg(encoder_fn, (argkind, value)):
462 return encoder_fn(argkind)(value)
464 def _Call(self, cdef, node_list, args):
465 """Entry point for automatically generated RPC wrappers.
468 (procedure, _, resolver_opts, timeout, argdefs,
469 prep_fn, postproc_fn, _) = cdef
471 if callable(timeout):
472 read_timeout = timeout(args)
474 read_timeout = timeout
476 if callable(resolver_opts):
477 req_resolver_opts = resolver_opts(args)
479 req_resolver_opts = resolver_opts
481 if len(args) != len(argdefs):
482 raise errors.ProgrammerError("Number of passed arguments doesn't match")
484 enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
486 # for a no-op prep_fn, we serialise the body once, and then we
487 # reuse it in the dictionary values
488 body = serializer.DumpJson(enc_args)
489 pnbody = dict((n, body) for n in node_list)
491 # for a custom prep_fn, we pass the encoded arguments and the
492 # node name to the prep_fn, and we serialise its return value
493 assert callable(prep_fn)
494 pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
497 result = self._proc(node_list, procedure, pnbody, read_timeout,
501 return dict(map(lambda (key, value): (key, postproc_fn(value)),
507 def _ObjectToDict(value):
508 """Converts an object to a dictionary.
510 @note: See L{objects}.
513 return value.ToDict()
516 def _ObjectListToDict(value):
517 """Converts a list of L{objects} to dictionaries.
520 return map(_ObjectToDict, value)
523 def _EncodeNodeToDiskDict(value):
524 """Encodes a dictionary with node name as key and disk objects as values.
527 return dict((name, _ObjectListToDict(disks))
528 for name, disks in value.items())
531 def _PrepareFileUpload(getents_fn, filename):
532 """Loads a file and prepares it for an upload to nodes.
535 statcb = utils.FileStatHelper()
536 data = _Compress(utils.ReadFile(filename, preread=statcb))
539 if getents_fn is None:
540 getents_fn = runtime.GetEnts
542 getents = getents_fn()
544 virt_filename = vcluster.MakeVirtualPath(filename)
546 return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid),
547 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
550 def _PrepareFinalizeExportDisks(snap_disks):
551 """Encodes disks for finalizing export.
556 for disk in snap_disks:
557 if isinstance(disk, bool):
558 flat_disks.append(disk)
560 flat_disks.append(disk.ToDict())
565 def _EncodeImportExportIO((ieio, ieioargs)):
566 """Encodes import/export I/O information.
569 if ieio == constants.IEIO_RAW_DISK:
570 assert len(ieioargs) == 1
571 return (ieio, (ieioargs[0].ToDict(), ))
573 if ieio == constants.IEIO_SCRIPT:
574 assert len(ieioargs) == 2
575 return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
577 return (ieio, ieioargs)
580 def _EncodeBlockdevRename(value):
581 """Encodes information for renaming block devices.
584 return [(d.ToDict(), uid) for d, uid in value]
587 def BuildVgInfoQuery(cfg):
588 """Build a query about the default VG for C{node_info}.
590 The result of the RPC can be parsed with L{MakeLegacyNodeInfo}.
592 @type cfg: L{config.ConfigWriter}
593 @param cfg: Cluster configuration
595 @return: argument suitable for L{rpc.RpcRunner.call_node_info}
598 vg_name = cfg.GetVGName()
601 (constants.ST_LVM_VG, vg_name),
602 (constants.ST_LVM_PV, vg_name),
609 def MakeLegacyNodeInfo(data, require_vg_info=True):
610 """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
612 Converts the data into a single dictionary. This is fine for most use cases,
613 but some require information from more than one volume group or hypervisor.
615 @param require_vg_info: raise an error if the returnd vg_info
616 doesn't have any values
619 (bootid, vgs_info, (hv_info, )) = data
621 ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid})
623 if require_vg_info or vgs_info:
624 (vg0_info, vg0_spindles) = vgs_info
625 ret = utils.JoinDisjointDicts(vg0_info, ret)
626 ret["spindles_free"] = vg0_spindles["vg_free"]
627 ret["spindles_total"] = vg0_spindles["vg_size"]
632 def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
633 """Annotates just DRBD disks layouts.
636 assert disk.dev_type == constants.LD_DRBD8
638 disk.params = objects.FillDict(drbd_params, disk.params)
639 (dev_data, dev_meta) = disk.children
640 dev_data.params = objects.FillDict(data_params, dev_data.params)
641 dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
646 def _AnnotateDParamsGeneric(disk, (params, )):
647 """Generic disk parameter annotation routine.
650 assert disk.dev_type != constants.LD_DRBD8
652 disk.params = objects.FillDict(params, disk.params)
657 def AnnotateDiskParams(template, disks, disk_params):
658 """Annotates the disk objects with the disk parameters.
660 @param template: The disk template used
661 @param disks: The list of disks objects to annotate
662 @param disk_params: The disk paramaters for annotation
663 @returns: A list of disk objects annotated
666 ld_params = objects.Disk.ComputeLDParams(template, disk_params)
668 if template == constants.DT_DRBD8:
669 annotation_fn = _AnnotateDParamsDRBD
670 elif template == constants.DT_DISKLESS:
671 annotation_fn = lambda disk, _: disk
673 annotation_fn = _AnnotateDParamsGeneric
675 return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
678 def _GetESFlag(cfg, nodename):
679 ni = cfg.GetNodeInfo(nodename)
681 raise errors.OpPrereqError("Invalid node name %s" % nodename,
683 return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]
686 def GetExclusiveStorageForNodeNames(cfg, nodelist):
687 """Return the exclusive storage flag for all the given nodes.
689 @type cfg: L{config.ConfigWriter}
690 @param cfg: cluster configuration
691 @type nodelist: list or tuple
692 @param nodelist: node names for which to read the flag
694 @return: mapping from node names to exclusive storage flags
695 @raise errors.OpPrereqError: if any given node name has no corresponding node
698 getflag = lambda n: _GetESFlag(cfg, n)
699 flags = map(getflag, nodelist)
700 return dict(zip(nodelist, flags))
705 rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
706 rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
707 rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
708 rpc_defs.ED_COMPRESS: _Compress,
709 rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
710 rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
711 rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
715 class RpcRunner(_RpcClientBase,
716 _generated_rpc.RpcClientDefault,
717 _generated_rpc.RpcClientBootstrap,
718 _generated_rpc.RpcClientDnsOnly,
719 _generated_rpc.RpcClientConfig):
723 def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
724 """Initialized the RPC runner.
726 @type cfg: L{config.ConfigWriter}
727 @param cfg: Configuration
728 @type lock_monitor_cb: callable
729 @param lock_monitor_cb: Lock monitor callback
734 encoders = _ENCODERS.copy()
737 # Encoders requiring configuration object
738 rpc_defs.ED_INST_DICT: self._InstDict,
739 rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
740 rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
741 rpc_defs.ED_NIC_DICT: self._NicDict,
743 # Encoders annotating disk parameters
744 rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
745 rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
747 # Encoders with special requirements
748 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
751 # Resolver using configuration
752 resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
755 # Pylint doesn't recognize multiple inheritance properly, see
756 # <http://www.logilab.org/ticket/36586> and
757 # <http://www.logilab.org/ticket/35642>
758 # pylint: disable=W0233
759 _RpcClientBase.__init__(self, resolver, encoders.get,
760 lock_monitor_cb=lock_monitor_cb,
761 _req_process_fn=_req_process_fn)
762 _generated_rpc.RpcClientConfig.__init__(self)
763 _generated_rpc.RpcClientBootstrap.__init__(self)
764 _generated_rpc.RpcClientDnsOnly.__init__(self)
765 _generated_rpc.RpcClientDefault.__init__(self)
767 def _NicDict(self, nic):
768 """Convert the given nic to a dict and encapsulate netinfo
771 n = copy.deepcopy(nic)
773 net_uuid = self._cfg.LookupNetwork(n.network)
775 nobj = self._cfg.GetNetwork(net_uuid)
776 n.netinfo = objects.Network.ToDict(nobj)
779 def _InstDict(self, instance, hvp=None, bep=None, osp=None):
780 """Convert the given instance to a dict.
782 This is done via the instance's ToDict() method and additionally
783 we fill the hvparams with the cluster defaults.
785 @type instance: L{objects.Instance}
786 @param instance: an Instance object
787 @type hvp: dict or None
788 @param hvp: a dictionary with overridden hypervisor parameters
789 @type bep: dict or None
790 @param bep: a dictionary with overridden backend parameters
791 @type osp: dict or None
792 @param osp: a dictionary with overridden os parameters
794 @return: the instance dict, with the hvparams filled with the
798 idict = instance.ToDict()
799 cluster = self._cfg.GetClusterInfo()
800 idict["hvparams"] = cluster.FillHV(instance)
802 idict["hvparams"].update(hvp)
803 idict["beparams"] = cluster.FillBE(instance)
805 idict["beparams"].update(bep)
806 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
808 idict["osparams"].update(osp)
809 idict["disks"] = self._DisksDictDP((instance.disks, instance))
810 for nic in idict["nics"]:
811 nic["nicparams"] = objects.FillDict(
812 cluster.nicparams[constants.PP_DEFAULT],
814 network = nic.get("network", None)
816 net_uuid = self._cfg.LookupNetwork(network)
818 nobj = self._cfg.GetNetwork(net_uuid)
819 nic["netinfo"] = objects.Network.ToDict(nobj)
822 def _InstDictHvpBepDp(self, (instance, hvp, bep)):
823 """Wrapper for L{_InstDict}.
826 return self._InstDict(instance, hvp=hvp, bep=bep)
828 def _InstDictOspDp(self, (instance, osparams)):
829 """Wrapper for L{_InstDict}.
832 return self._InstDict(instance, osp=osparams)
834 def _DisksDictDP(self, (disks, instance)):
835 """Wrapper for L{AnnotateDiskParams}.
838 diskparams = self._cfg.GetInstanceDiskParams(instance)
839 return [disk.ToDict()
840 for disk in AnnotateDiskParams(instance.disk_template,
843 def _SingleDiskDictDP(self, (disk, instance)):
844 """Wrapper for L{AnnotateDiskParams}.
847 (anno_disk,) = self._DisksDictDP(([disk], instance))
851 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
852 """RPC wrappers for job queue.
855 def __init__(self, context, address_list):
856 """Initializes this class.
859 if address_list is None:
860 resolver = compat.partial(_SsconfResolver, True)
862 # Caller provided an address list
863 resolver = _StaticResolver(address_list)
865 _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
866 lock_monitor_cb=context.glm.AddToLockMonitor)
867 _generated_rpc.RpcClientJobQueue.__init__(self)
870 class BootstrapRunner(_RpcClientBase,
871 _generated_rpc.RpcClientBootstrap,
872 _generated_rpc.RpcClientDnsOnly):
873 """RPC wrappers for bootstrapping.
877 """Initializes this class.
880 # Pylint doesn't recognize multiple inheritance properly, see
881 # <http://www.logilab.org/ticket/36586> and
882 # <http://www.logilab.org/ticket/35642>
883 # pylint: disable=W0233
884 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
886 _generated_rpc.RpcClientBootstrap.__init__(self)
887 _generated_rpc.RpcClientDnsOnly.__init__(self)
890 class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
891 """RPC wrappers for calls using only DNS.
895 """Initialize this class.
898 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
900 _generated_rpc.RpcClientDnsOnly.__init__(self)
903 class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
904 """RPC wrappers for L{config}.
907 def __init__(self, context, address_list, _req_process_fn=None,
909 """Initializes this class.
913 lock_monitor_cb = context.glm.AddToLockMonitor
915 lock_monitor_cb = None
917 if address_list is None:
918 resolver = compat.partial(_SsconfResolver, True)
920 # Caller provided an address list
921 resolver = _StaticResolver(address_list)
923 encoders = _ENCODERS.copy()
926 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
929 _RpcClientBase.__init__(self, resolver, encoders.get,
930 lock_monitor_cb=lock_monitor_cb,
931 _req_process_fn=_req_process_fn)
932 _generated_rpc.RpcClientConfig.__init__(self)