4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49 from ganeti import compat
50 from ganeti import rpc_defs
51 from ganeti import pathutils
52 from ganeti import vcluster
54 # Special module generated at build time
55 from ganeti import _generated_rpc
57 # pylint has a bug here, doesn't see this import
58 import ganeti.http.client # pylint: disable=W0611
61 _RPC_CLIENT_HEADERS = [
62 "Content-type: %s" % http.HTTP_APP_JSON,
66 #: Special value to describe an offline host
71 """Initializes the module-global HTTP client manager.
73 Must be called before using any RPC function and while exactly one thread is
77 # curl_global_init(3) and curl_global_cleanup(3) must be called with only
78 # one thread running. This check is just a safety measure -- it doesn't
80 assert threading.activeCount() == 1, \
81 "Found more than one active thread when initializing pycURL"
83 logging.info("Using PycURL %s", pycurl.version)
85 pycurl.global_init(pycurl.GLOBAL_ALL)
89 """Stops the module-global HTTP client manager.
91 Must be called before quitting the program and while exactly one thread is
95 pycurl.global_cleanup()
98 def _ConfigRpcCurl(curl):
99 noded_cert = str(pathutils.NODED_CERT_FILE)
101 curl.setopt(pycurl.FOLLOWLOCATION, False)
102 curl.setopt(pycurl.CAINFO, noded_cert)
103 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
104 curl.setopt(pycurl.SSL_VERIFYPEER, True)
105 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
106 curl.setopt(pycurl.SSLCERT, noded_cert)
107 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
108 curl.setopt(pycurl.SSLKEY, noded_cert)
109 curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
113 """RPC-wrapper decorator.
115 When applied to a function, it runs it with the RPC system
116 initialized, and it shutsdown the system afterwards. This means the
117 function must be called without RPC being initialized.
120 def wrapper(*args, **kwargs):
123 return fn(*args, **kwargs)
130 """Compresses a string for transport over RPC.
132 Small amounts of data are not compressed.
137 @return: Encoded data to send
140 # Small amounts of data are not compressed
142 return (constants.RPC_ENCODING_NONE, data)
144 # Compress with zlib and encode in base64
145 return (constants.RPC_ENCODING_ZLIB_BASE64,
146 base64.b64encode(zlib.compress(data, 3)))
149 class RpcResult(object):
152 This class holds an RPC result. It is needed since in multi-node
153 calls we can't raise an exception just because one out of many
154 failed, and therefore we use this class to encapsulate the result.
156 @ivar data: the data payload, for successful results, or None
157 @ivar call: the name of the RPC call
158 @ivar node: the name of the node to which we made the call
159 @ivar offline: whether the operation failed because the node was
160 offline, as opposed to actual failure; offline=True will always
161 imply failed=True, in order to allow simpler checking if
162 the user doesn't care about the exact failure mode
163 @ivar fail_msg: the error message if the call failed
166 def __init__(self, data=None, failed=False, offline=False,
167 call=None, node=None):
168 self.offline = offline
173 self.fail_msg = "Node is marked offline"
174 self.data = self.payload = None
176 self.fail_msg = self._EnsureErr(data)
177 self.data = self.payload = None
180 if not isinstance(self.data, (tuple, list)):
181 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
185 self.fail_msg = ("RPC layer error: invalid result length (%d), "
186 "expected 2" % len(self.data))
188 elif not self.data[0]:
189 self.fail_msg = self._EnsureErr(self.data[1])
194 self.payload = data[1]
196 for attr_name in ["call", "data", "fail_msg",
197 "node", "offline", "payload"]:
198 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
202 """Helper to ensure we return a 'True' value for error."""
206 return "No error information"
208 def Raise(self, msg, prereq=False, ecode=None):
209 """If the result has failed, raise an OpExecError.
211 This is used so that LU code doesn't have to check for each
212 result, but instead can call this function.
215 if not self.fail_msg:
218 if not msg: # one could pass None for default message
219 msg = ("Call '%s' to node '%s' has failed: %s" %
220 (self.call, self.node, self.fail_msg))
222 msg = "%s: %s" % (msg, self.fail_msg)
224 ec = errors.OpPrereqError
226 ec = errors.OpExecError
227 if ecode is not None:
231 raise ec(*args) # pylint: disable=W0142
234 def _SsconfResolver(ssconf_ips, node_list, _,
235 ssc=ssconf.SimpleStore,
236 nslookup_fn=netutils.Hostname.GetIP):
237 """Return addresses for given node names.
239 @type ssconf_ips: bool
240 @param ssconf_ips: Use the ssconf IPs
241 @type node_list: list
242 @param node_list: List of node names
244 @param ssc: SimpleStore class that is used to obtain node->ip mappings
245 @type nslookup_fn: callable
246 @param nslookup_fn: function use to do NS lookup
247 @rtype: list of tuple; (string, string)
248 @return: List of tuples containing node name and IP address
252 family = ss.GetPrimaryIPFamily()
255 iplist = ss.GetNodePrimaryIPList()
256 ipmap = dict(entry.split() for entry in iplist)
261 for node in node_list:
264 ip = nslookup_fn(node, family=family)
265 result.append((node, ip))
270 class _StaticResolver:
271 def __init__(self, addresses):
272 """Initializes this class.
275 self._addresses = addresses
277 def __call__(self, hosts, _):
278 """Returns static addresses for hosts.
281 assert len(hosts) == len(self._addresses)
282 return zip(hosts, self._addresses)
285 def _CheckConfigNode(name, node, accept_offline_node):
286 """Checks if a node is online.
289 @param name: Node name
290 @type node: L{objects.Node} or None
291 @param node: Node object
295 # Depend on DNS for name resolution
297 elif node.offline and not accept_offline_node:
304 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
305 """Calculate node addresses using configuration.
308 accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
310 assert accept_offline_node or opts is None, "Unknown option"
312 # Special case for single-host lookups
315 return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
317 all_nodes = all_nodes_fn()
318 return [_CheckConfigNode(name, all_nodes.get(name, None),
324 def __init__(self, resolver, port, lock_monitor_cb=None):
325 """Initializes this class.
327 @param resolver: callable accepting a list of hostnames, returning a list
328 of tuples containing name and IP address (IP address can be the name or
329 the special value L{_OFFLINE} to mark offline machines)
331 @param port: TCP port
332 @param lock_monitor_cb: Callable for registering with lock monitor
335 self._resolver = resolver
337 self._lock_monitor_cb = lock_monitor_cb
340 def _PrepareRequests(hosts, port, procedure, body, read_timeout):
341 """Prepares requests by sorting offline hosts into separate list.
344 @param body: a dictionary with per-host body data
350 assert isinstance(body, dict)
351 assert len(body) == len(hosts)
352 assert compat.all(isinstance(v, str) for v in body.values())
353 assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
354 "%s != %s" % (hosts, body.keys())
356 for (name, ip) in hosts:
358 # Node is marked as offline
359 results[name] = RpcResult(node=name, offline=True, call=procedure)
362 http.client.HttpClientRequest(str(ip), port,
363 http.HTTP_POST, str("/%s" % procedure),
364 headers=_RPC_CLIENT_HEADERS,
365 post_data=body[name],
366 read_timeout=read_timeout,
367 nicename="%s/%s" % (name, procedure),
368 curl_config_fn=_ConfigRpcCurl)
370 return (results, requests)
373 def _CombineResults(results, requests, procedure):
374 """Combines pre-computed results for offline hosts with actual call results.
377 for name, req in requests.items():
378 if req.success and req.resp_status_code == http.HTTP_OK:
379 host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
380 node=name, call=procedure)
382 # TODO: Better error reporting
388 logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
389 host_result = RpcResult(data=msg, failed=True, node=name,
392 results[name] = host_result
396 def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
397 _req_process_fn=None):
398 """Makes an RPC request to a number of nodes.
400 @type hosts: sequence
401 @param hosts: Hostnames
402 @type procedure: string
403 @param procedure: Request path
404 @type body: dictionary
405 @param body: dictionary with request bodies per host
406 @type read_timeout: int or None
407 @param read_timeout: Read timeout for request
409 @return: a dictionary mapping host names to rpc.RpcResult objects
412 assert read_timeout is not None, \
413 "Missing RPC read timeout for procedure '%s'" % procedure
415 if _req_process_fn is None:
416 _req_process_fn = http.client.ProcessRequests
418 (results, requests) = \
419 self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
420 procedure, body, read_timeout)
422 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
424 assert not frozenset(results).intersection(requests)
426 return self._CombineResults(results, requests, procedure)
429 class _RpcClientBase:
430 def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
431 _req_process_fn=None):
432 """Initializes this class.
435 proc = _RpcProcessor(resolver,
436 netutils.GetDaemonPort(constants.NODED),
437 lock_monitor_cb=lock_monitor_cb)
438 self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
439 self._encoder = compat.partial(self._EncodeArg, encoder_fn)
442 def _EncodeArg(encoder_fn, (argkind, value)):
449 return encoder_fn(argkind)(value)
451 def _Call(self, cdef, node_list, args):
452 """Entry point for automatically generated RPC wrappers.
455 (procedure, _, resolver_opts, timeout, argdefs,
456 prep_fn, postproc_fn, _) = cdef
458 if callable(timeout):
459 read_timeout = timeout(args)
461 read_timeout = timeout
463 if callable(resolver_opts):
464 req_resolver_opts = resolver_opts(args)
466 req_resolver_opts = resolver_opts
468 if len(args) != len(argdefs):
469 raise errors.ProgrammerError("Number of passed arguments doesn't match")
471 enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
473 # for a no-op prep_fn, we serialise the body once, and then we
474 # reuse it in the dictionary values
475 body = serializer.DumpJson(enc_args)
476 pnbody = dict((n, body) for n in node_list)
478 # for a custom prep_fn, we pass the encoded arguments and the
479 # node name to the prep_fn, and we serialise its return value
480 assert callable(prep_fn)
481 pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
484 result = self._proc(node_list, procedure, pnbody, read_timeout,
488 return dict(map(lambda (key, value): (key, postproc_fn(value)),
494 def _ObjectToDict(value):
495 """Converts an object to a dictionary.
497 @note: See L{objects}.
500 return value.ToDict()
503 def _ObjectListToDict(value):
504 """Converts a list of L{objects} to dictionaries.
507 return map(_ObjectToDict, value)
510 def _EncodeNodeToDiskDict(value):
511 """Encodes a dictionary with node name as key and disk objects as values.
514 return dict((name, _ObjectListToDict(disks))
515 for name, disks in value.items())
518 def _PrepareFileUpload(getents_fn, filename):
519 """Loads a file and prepares it for an upload to nodes.
522 statcb = utils.FileStatHelper()
523 data = _Compress(utils.ReadFile(filename, preread=statcb))
526 if getents_fn is None:
527 getents_fn = runtime.GetEnts
529 getents = getents_fn()
531 virt_filename = vcluster.MakeVirtualPath(filename)
533 return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid),
534 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
537 def _PrepareFinalizeExportDisks(snap_disks):
538 """Encodes disks for finalizing export.
543 for disk in snap_disks:
544 if isinstance(disk, bool):
545 flat_disks.append(disk)
547 flat_disks.append(disk.ToDict())
552 def _EncodeImportExportIO((ieio, ieioargs)):
553 """Encodes import/export I/O information.
556 if ieio == constants.IEIO_RAW_DISK:
557 assert len(ieioargs) == 1
558 return (ieio, (ieioargs[0].ToDict(), ))
560 if ieio == constants.IEIO_SCRIPT:
561 assert len(ieioargs) == 2
562 return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
564 return (ieio, ieioargs)
567 def _EncodeBlockdevRename(value):
568 """Encodes information for renaming block devices.
571 return [(d.ToDict(), uid) for d, uid in value]
574 def MakeLegacyNodeInfo(data, require_vg_info=True):
575 """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
577 Converts the data into a single dictionary. This is fine for most use cases,
578 but some require information from more than one volume group or hypervisor.
580 @param require_vg_info: raise an error if the returnd vg_info
581 doesn't have any values
584 (bootid, vgs_info, (hv_info, )) = data
586 ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid})
588 if require_vg_info or vgs_info:
589 (vg0_info, ) = vgs_info
590 ret = utils.JoinDisjointDicts(vg0_info, ret)
595 def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
596 """Annotates just DRBD disks layouts.
599 assert disk.dev_type == constants.LD_DRBD8
601 disk.params = objects.FillDict(drbd_params, disk.params)
602 (dev_data, dev_meta) = disk.children
603 dev_data.params = objects.FillDict(data_params, dev_data.params)
604 dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
609 def _AnnotateDParamsGeneric(disk, (params, )):
610 """Generic disk parameter annotation routine.
613 assert disk.dev_type != constants.LD_DRBD8
615 disk.params = objects.FillDict(params, disk.params)
620 def AnnotateDiskParams(template, disks, disk_params):
621 """Annotates the disk objects with the disk parameters.
623 @param template: The disk template used
624 @param disks: The list of disks objects to annotate
625 @param disk_params: The disk paramaters for annotation
626 @returns: A list of disk objects annotated
629 ld_params = objects.Disk.ComputeLDParams(template, disk_params)
631 if template == constants.DT_DRBD8:
632 annotation_fn = _AnnotateDParamsDRBD
633 elif template == constants.DT_DISKLESS:
634 annotation_fn = lambda disk, _: disk
636 annotation_fn = _AnnotateDParamsGeneric
638 return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
641 def _GetESFlag(cfg, nodename):
642 ni = cfg.GetNodeInfo(nodename)
644 raise errors.OpPrereqError("Invalid node name %s" % nodename,
646 return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]
649 def GetExclusiveStorageForNodeNames(cfg, nodelist):
650 """Return the exclusive storage flag for all the given nodes.
652 @type cfg: L{config.ConfigWriter}
653 @param cfg: cluster configuration
654 @type nodelist: list or tuple
655 @param nodelist: node names for which to read the flag
657 @return: mapping from node names to exclusive storage flags
658 @raise errors.OpPrereqError: if any given node name has no corresponding node
661 getflag = lambda n: _GetESFlag(cfg, n)
662 flags = map(getflag, nodelist)
663 return dict(zip(nodelist, flags))
668 rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
669 rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
670 rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
671 rpc_defs.ED_COMPRESS: _Compress,
672 rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
673 rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
674 rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
678 class RpcRunner(_RpcClientBase,
679 _generated_rpc.RpcClientDefault,
680 _generated_rpc.RpcClientBootstrap,
681 _generated_rpc.RpcClientDnsOnly,
682 _generated_rpc.RpcClientConfig):
686 def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
687 """Initialized the RPC runner.
689 @type cfg: L{config.ConfigWriter}
690 @param cfg: Configuration
691 @type lock_monitor_cb: callable
692 @param lock_monitor_cb: Lock monitor callback
697 encoders = _ENCODERS.copy()
700 # Encoders requiring configuration object
701 rpc_defs.ED_INST_DICT: self._InstDict,
702 rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
703 rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
704 rpc_defs.ED_NIC_DICT: self._NicDict,
705 rpc_defs.ED_DEVICE_DICT: self._DeviceDict,
707 # Encoders annotating disk parameters
708 rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
709 rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
711 # Encoders with special requirements
712 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
715 # Resolver using configuration
716 resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
719 # Pylint doesn't recognize multiple inheritance properly, see
720 # <http://www.logilab.org/ticket/36586> and
721 # <http://www.logilab.org/ticket/35642>
722 # pylint: disable=W0233
723 _RpcClientBase.__init__(self, resolver, encoders.get,
724 lock_monitor_cb=lock_monitor_cb,
725 _req_process_fn=_req_process_fn)
726 _generated_rpc.RpcClientConfig.__init__(self)
727 _generated_rpc.RpcClientBootstrap.__init__(self)
728 _generated_rpc.RpcClientDnsOnly.__init__(self)
729 _generated_rpc.RpcClientDefault.__init__(self)
731 def _NicDict(self, nic):
732 """Convert the given nic to a dict and encapsulate netinfo
735 n = copy.deepcopy(nic)
737 net_uuid = self._cfg.LookupNetwork(n.network)
739 nobj = self._cfg.GetNetwork(net_uuid)
740 n.netinfo = objects.Network.ToDict(nobj)
743 def _DeviceDict(self, (device, instance)):
744 if isinstance(device, objects.NIC):
745 return self._NicDict(device)
746 elif isinstance(device, objects.Disk):
747 return self._SingleDiskDictDP((device, instance))
749 def _InstDict(self, instance, hvp=None, bep=None, osp=None):
750 """Convert the given instance to a dict.
752 This is done via the instance's ToDict() method and additionally
753 we fill the hvparams with the cluster defaults.
755 @type instance: L{objects.Instance}
756 @param instance: an Instance object
757 @type hvp: dict or None
758 @param hvp: a dictionary with overridden hypervisor parameters
759 @type bep: dict or None
760 @param bep: a dictionary with overridden backend parameters
761 @type osp: dict or None
762 @param osp: a dictionary with overridden os parameters
764 @return: the instance dict, with the hvparams filled with the
768 idict = instance.ToDict()
769 cluster = self._cfg.GetClusterInfo()
770 idict["hvparams"] = cluster.FillHV(instance)
772 idict["hvparams"].update(hvp)
773 idict["beparams"] = cluster.FillBE(instance)
775 idict["beparams"].update(bep)
776 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
778 idict["osparams"].update(osp)
779 idict["disks"] = self._DisksDictDP((instance.disks, instance))
780 for nic in idict["nics"]:
781 nic["nicparams"] = objects.FillDict(
782 cluster.nicparams[constants.PP_DEFAULT],
784 network = nic.get("network", None)
786 net_uuid = self._cfg.LookupNetwork(network)
788 nobj = self._cfg.GetNetwork(net_uuid)
789 nic["netinfo"] = objects.Network.ToDict(nobj)
792 def _InstDictHvpBepDp(self, (instance, hvp, bep)):
793 """Wrapper for L{_InstDict}.
796 return self._InstDict(instance, hvp=hvp, bep=bep)
798 def _InstDictOspDp(self, (instance, osparams)):
799 """Wrapper for L{_InstDict}.
802 return self._InstDict(instance, osp=osparams)
804 def _DisksDictDP(self, (disks, instance)):
805 """Wrapper for L{AnnotateDiskParams}.
808 diskparams = self._cfg.GetInstanceDiskParams(instance)
809 return [disk.ToDict()
810 for disk in AnnotateDiskParams(instance.disk_template,
813 def _SingleDiskDictDP(self, (disk, instance)):
814 """Wrapper for L{AnnotateDiskParams}.
817 (anno_disk,) = self._DisksDictDP(([disk], instance))
821 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
822 """RPC wrappers for job queue.
825 def __init__(self, context, address_list):
826 """Initializes this class.
829 if address_list is None:
830 resolver = compat.partial(_SsconfResolver, True)
832 # Caller provided an address list
833 resolver = _StaticResolver(address_list)
835 _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
836 lock_monitor_cb=context.glm.AddToLockMonitor)
837 _generated_rpc.RpcClientJobQueue.__init__(self)
840 class BootstrapRunner(_RpcClientBase,
841 _generated_rpc.RpcClientBootstrap,
842 _generated_rpc.RpcClientDnsOnly):
843 """RPC wrappers for bootstrapping.
847 """Initializes this class.
850 # Pylint doesn't recognize multiple inheritance properly, see
851 # <http://www.logilab.org/ticket/36586> and
852 # <http://www.logilab.org/ticket/35642>
853 # pylint: disable=W0233
854 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
856 _generated_rpc.RpcClientBootstrap.__init__(self)
857 _generated_rpc.RpcClientDnsOnly.__init__(self)
860 class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
861 """RPC wrappers for calls using only DNS.
865 """Initialize this class.
868 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
870 _generated_rpc.RpcClientDnsOnly.__init__(self)
873 class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
874 """RPC wrappers for L{config}.
877 def __init__(self, context, address_list, _req_process_fn=None,
879 """Initializes this class.
883 lock_monitor_cb = context.glm.AddToLockMonitor
885 lock_monitor_cb = None
887 if address_list is None:
888 resolver = compat.partial(_SsconfResolver, True)
890 # Caller provided an address list
891 resolver = _StaticResolver(address_list)
893 encoders = _ENCODERS.copy()
896 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
899 _RpcClientBase.__init__(self, resolver, encoders.get,
900 lock_monitor_cb=lock_monitor_cb,
901 _req_process_fn=_req_process_fn)
902 _generated_rpc.RpcClientConfig.__init__(self)