Expand Objects.hs definitions
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import logging
34 import zlib
35 import base64
36 import pycurl
37 import threading
38
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
45 from ganeti import netutils
46 from ganeti import ssconf
47 from ganeti import runtime
48 from ganeti import compat
49 from ganeti import rpc_defs
50
51 # Special module generated at build time
52 from ganeti import _generated_rpc
53
54 # pylint has a bug here, doesn't see this import
55 import ganeti.http.client  # pylint: disable=W0611
56
57
58 _RPC_CLIENT_HEADERS = [
59   "Content-type: %s" % http.HTTP_APP_JSON,
60   "Expect:",
61   ]
62
63 #: Special value to describe an offline host
64 _OFFLINE = object()
65
66
67 def Init():
68   """Initializes the module-global HTTP client manager.
69
70   Must be called before using any RPC function and while exactly one thread is
71   running.
72
73   """
74   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
75   # one thread running. This check is just a safety measure -- it doesn't
76   # cover all cases.
77   assert threading.activeCount() == 1, \
78          "Found more than one active thread when initializing pycURL"
79
80   logging.info("Using PycURL %s", pycurl.version)
81
82   pycurl.global_init(pycurl.GLOBAL_ALL)
83
84
85 def Shutdown():
86   """Stops the module-global HTTP client manager.
87
88   Must be called before quitting the program and while exactly one thread is
89   running.
90
91   """
92   pycurl.global_cleanup()
93
94
95 def _ConfigRpcCurl(curl):
96   noded_cert = str(constants.NODED_CERT_FILE)
97
98   curl.setopt(pycurl.FOLLOWLOCATION, False)
99   curl.setopt(pycurl.CAINFO, noded_cert)
100   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
101   curl.setopt(pycurl.SSL_VERIFYPEER, True)
102   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
103   curl.setopt(pycurl.SSLCERT, noded_cert)
104   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
105   curl.setopt(pycurl.SSLKEY, noded_cert)
106   curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
107
108
109 def RunWithRPC(fn):
110   """RPC-wrapper decorator.
111
112   When applied to a function, it runs it with the RPC system
113   initialized, and it shutsdown the system afterwards. This means the
114   function must be called without RPC being initialized.
115
116   """
117   def wrapper(*args, **kwargs):
118     Init()
119     try:
120       return fn(*args, **kwargs)
121     finally:
122       Shutdown()
123   return wrapper
124
125
126 def _Compress(data):
127   """Compresses a string for transport over RPC.
128
129   Small amounts of data are not compressed.
130
131   @type data: str
132   @param data: Data
133   @rtype: tuple
134   @return: Encoded data to send
135
136   """
137   # Small amounts of data are not compressed
138   if len(data) < 512:
139     return (constants.RPC_ENCODING_NONE, data)
140
141   # Compress with zlib and encode in base64
142   return (constants.RPC_ENCODING_ZLIB_BASE64,
143           base64.b64encode(zlib.compress(data, 3)))
144
145
146 class RpcResult(object):
147   """RPC Result class.
148
149   This class holds an RPC result. It is needed since in multi-node
150   calls we can't raise an exception just because one one out of many
151   failed, and therefore we use this class to encapsulate the result.
152
153   @ivar data: the data payload, for successful results, or None
154   @ivar call: the name of the RPC call
155   @ivar node: the name of the node to which we made the call
156   @ivar offline: whether the operation failed because the node was
157       offline, as opposed to actual failure; offline=True will always
158       imply failed=True, in order to allow simpler checking if
159       the user doesn't care about the exact failure mode
160   @ivar fail_msg: the error message if the call failed
161
162   """
163   def __init__(self, data=None, failed=False, offline=False,
164                call=None, node=None):
165     self.offline = offline
166     self.call = call
167     self.node = node
168
169     if offline:
170       self.fail_msg = "Node is marked offline"
171       self.data = self.payload = None
172     elif failed:
173       self.fail_msg = self._EnsureErr(data)
174       self.data = self.payload = None
175     else:
176       self.data = data
177       if not isinstance(self.data, (tuple, list)):
178         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
179                          type(self.data))
180         self.payload = None
181       elif len(data) != 2:
182         self.fail_msg = ("RPC layer error: invalid result length (%d), "
183                          "expected 2" % len(self.data))
184         self.payload = None
185       elif not self.data[0]:
186         self.fail_msg = self._EnsureErr(self.data[1])
187         self.payload = None
188       else:
189         # finally success
190         self.fail_msg = None
191         self.payload = data[1]
192
193     for attr_name in ["call", "data", "fail_msg",
194                       "node", "offline", "payload"]:
195       assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
196
197   @staticmethod
198   def _EnsureErr(val):
199     """Helper to ensure we return a 'True' value for error."""
200     if val:
201       return val
202     else:
203       return "No error information"
204
205   def Raise(self, msg, prereq=False, ecode=None):
206     """If the result has failed, raise an OpExecError.
207
208     This is used so that LU code doesn't have to check for each
209     result, but instead can call this function.
210
211     """
212     if not self.fail_msg:
213       return
214
215     if not msg: # one could pass None for default message
216       msg = ("Call '%s' to node '%s' has failed: %s" %
217              (self.call, self.node, self.fail_msg))
218     else:
219       msg = "%s: %s" % (msg, self.fail_msg)
220     if prereq:
221       ec = errors.OpPrereqError
222     else:
223       ec = errors.OpExecError
224     if ecode is not None:
225       args = (msg, ecode)
226     else:
227       args = (msg, )
228     raise ec(*args) # pylint: disable=W0142
229
230
231 def _SsconfResolver(ssconf_ips, node_list, _,
232                     ssc=ssconf.SimpleStore,
233                     nslookup_fn=netutils.Hostname.GetIP):
234   """Return addresses for given node names.
235
236   @type ssconf_ips: bool
237   @param ssconf_ips: Use the ssconf IPs
238   @type node_list: list
239   @param node_list: List of node names
240   @type ssc: class
241   @param ssc: SimpleStore class that is used to obtain node->ip mappings
242   @type nslookup_fn: callable
243   @param nslookup_fn: function use to do NS lookup
244   @rtype: list of tuple; (string, string)
245   @return: List of tuples containing node name and IP address
246
247   """
248   ss = ssc()
249   family = ss.GetPrimaryIPFamily()
250
251   if ssconf_ips:
252     iplist = ss.GetNodePrimaryIPList()
253     ipmap = dict(entry.split() for entry in iplist)
254   else:
255     ipmap = {}
256
257   result = []
258   for node in node_list:
259     ip = ipmap.get(node)
260     if ip is None:
261       ip = nslookup_fn(node, family=family)
262     result.append((node, ip))
263
264   return result
265
266
267 class _StaticResolver:
268   def __init__(self, addresses):
269     """Initializes this class.
270
271     """
272     self._addresses = addresses
273
274   def __call__(self, hosts, _):
275     """Returns static addresses for hosts.
276
277     """
278     assert len(hosts) == len(self._addresses)
279     return zip(hosts, self._addresses)
280
281
282 def _CheckConfigNode(name, node, accept_offline_node):
283   """Checks if a node is online.
284
285   @type name: string
286   @param name: Node name
287   @type node: L{objects.Node} or None
288   @param node: Node object
289
290   """
291   if node is None:
292     # Depend on DNS for name resolution
293     ip = name
294   elif node.offline and not accept_offline_node:
295     ip = _OFFLINE
296   else:
297     ip = node.primary_ip
298   return (name, ip)
299
300
301 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
302   """Calculate node addresses using configuration.
303
304   """
305   accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
306
307   assert accept_offline_node or opts is None, "Unknown option"
308
309   # Special case for single-host lookups
310   if len(hosts) == 1:
311     (name, ) = hosts
312     return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
313   else:
314     all_nodes = all_nodes_fn()
315     return [_CheckConfigNode(name, all_nodes.get(name, None),
316                              accept_offline_node)
317             for name in hosts]
318
319
320 class _RpcProcessor:
321   def __init__(self, resolver, port, lock_monitor_cb=None):
322     """Initializes this class.
323
324     @param resolver: callable accepting a list of hostnames, returning a list
325       of tuples containing name and IP address (IP address can be the name or
326       the special value L{_OFFLINE} to mark offline machines)
327     @type port: int
328     @param port: TCP port
329     @param lock_monitor_cb: Callable for registering with lock monitor
330
331     """
332     self._resolver = resolver
333     self._port = port
334     self._lock_monitor_cb = lock_monitor_cb
335
336   @staticmethod
337   def _PrepareRequests(hosts, port, procedure, body, read_timeout):
338     """Prepares requests by sorting offline hosts into separate list.
339
340     @type body: dict
341     @param body: a dictionary with per-host body data
342
343     """
344     results = {}
345     requests = {}
346
347     assert isinstance(body, dict)
348     assert len(body) == len(hosts)
349     assert compat.all(isinstance(v, str) for v in body.values())
350     assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
351         "%s != %s" % (hosts, body.keys())
352
353     for (name, ip) in hosts:
354       if ip is _OFFLINE:
355         # Node is marked as offline
356         results[name] = RpcResult(node=name, offline=True, call=procedure)
357       else:
358         requests[name] = \
359           http.client.HttpClientRequest(str(ip), port,
360                                         http.HTTP_POST, str("/%s" % procedure),
361                                         headers=_RPC_CLIENT_HEADERS,
362                                         post_data=body[name],
363                                         read_timeout=read_timeout,
364                                         nicename="%s/%s" % (name, procedure),
365                                         curl_config_fn=_ConfigRpcCurl)
366
367     return (results, requests)
368
369   @staticmethod
370   def _CombineResults(results, requests, procedure):
371     """Combines pre-computed results for offline hosts with actual call results.
372
373     """
374     for name, req in requests.items():
375       if req.success and req.resp_status_code == http.HTTP_OK:
376         host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
377                                 node=name, call=procedure)
378       else:
379         # TODO: Better error reporting
380         if req.error:
381           msg = req.error
382         else:
383           msg = req.resp_body
384
385         logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
386         host_result = RpcResult(data=msg, failed=True, node=name,
387                                 call=procedure)
388
389       results[name] = host_result
390
391     return results
392
393   def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
394                _req_process_fn=None):
395     """Makes an RPC request to a number of nodes.
396
397     @type hosts: sequence
398     @param hosts: Hostnames
399     @type procedure: string
400     @param procedure: Request path
401     @type body: dictionary
402     @param body: dictionary with request bodies per host
403     @type read_timeout: int or None
404     @param read_timeout: Read timeout for request
405
406     """
407     assert read_timeout is not None, \
408       "Missing RPC read timeout for procedure '%s'" % procedure
409
410     if _req_process_fn is None:
411       _req_process_fn = http.client.ProcessRequests
412
413     (results, requests) = \
414       self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
415                             procedure, body, read_timeout)
416
417     _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
418
419     assert not frozenset(results).intersection(requests)
420
421     return self._CombineResults(results, requests, procedure)
422
423
424 class _RpcClientBase:
425   def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
426                _req_process_fn=None):
427     """Initializes this class.
428
429     """
430     proc = _RpcProcessor(resolver,
431                          netutils.GetDaemonPort(constants.NODED),
432                          lock_monitor_cb=lock_monitor_cb)
433     self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
434     self._encoder = compat.partial(self._EncodeArg, encoder_fn)
435
436   @staticmethod
437   def _EncodeArg(encoder_fn, (argkind, value)):
438     """Encode argument.
439
440     """
441     if argkind is None:
442       return value
443     else:
444       return encoder_fn(argkind)(value)
445
446   def _Call(self, cdef, node_list, args):
447     """Entry point for automatically generated RPC wrappers.
448
449     """
450     (procedure, _, resolver_opts, timeout, argdefs,
451      prep_fn, postproc_fn, _) = cdef
452
453     if callable(timeout):
454       read_timeout = timeout(args)
455     else:
456       read_timeout = timeout
457
458     if callable(resolver_opts):
459       req_resolver_opts = resolver_opts(args)
460     else:
461       req_resolver_opts = resolver_opts
462
463     if len(args) != len(argdefs):
464       raise errors.ProgrammerError("Number of passed arguments doesn't match")
465
466     enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
467     if prep_fn is None:
468       # for a no-op prep_fn, we serialise the body once, and then we
469       # reuse it in the dictionary values
470       body = serializer.DumpJson(enc_args)
471       pnbody = dict((n, body) for n in node_list)
472     else:
473       # for a custom prep_fn, we pass the encoded arguments and the
474       # node name to the prep_fn, and we serialise its return value
475       assert callable(prep_fn)
476       pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
477                     for n in node_list)
478
479     result = self._proc(node_list, procedure, pnbody, read_timeout,
480                         req_resolver_opts)
481
482     if postproc_fn:
483       return dict(map(lambda (key, value): (key, postproc_fn(value)),
484                       result.items()))
485     else:
486       return result
487
488
489 def _ObjectToDict(value):
490   """Converts an object to a dictionary.
491
492   @note: See L{objects}.
493
494   """
495   return value.ToDict()
496
497
498 def _ObjectListToDict(value):
499   """Converts a list of L{objects} to dictionaries.
500
501   """
502   return map(_ObjectToDict, value)
503
504
505 def _EncodeNodeToDiskDict(value):
506   """Encodes a dictionary with node name as key and disk objects as values.
507
508   """
509   return dict((name, _ObjectListToDict(disks))
510               for name, disks in value.items())
511
512
513 def _PrepareFileUpload(getents_fn, filename):
514   """Loads a file and prepares it for an upload to nodes.
515
516   """
517   statcb = utils.FileStatHelper()
518   data = _Compress(utils.ReadFile(filename, preread=statcb))
519   st = statcb.st
520
521   if getents_fn is None:
522     getents_fn = runtime.GetEnts
523
524   getents = getents_fn()
525
526   return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
527           getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
528
529
530 def _PrepareFinalizeExportDisks(snap_disks):
531   """Encodes disks for finalizing export.
532
533   """
534   flat_disks = []
535
536   for disk in snap_disks:
537     if isinstance(disk, bool):
538       flat_disks.append(disk)
539     else:
540       flat_disks.append(disk.ToDict())
541
542   return flat_disks
543
544
545 def _EncodeImportExportIO((ieio, ieioargs)):
546   """Encodes import/export I/O information.
547
548   """
549   if ieio == constants.IEIO_RAW_DISK:
550     assert len(ieioargs) == 1
551     return (ieio, (ieioargs[0].ToDict(), ))
552
553   if ieio == constants.IEIO_SCRIPT:
554     assert len(ieioargs) == 2
555     return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
556
557   return (ieio, ieioargs)
558
559
560 def _EncodeBlockdevRename(value):
561   """Encodes information for renaming block devices.
562
563   """
564   return [(d.ToDict(), uid) for d, uid in value]
565
566
567 def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
568   """Annotates just DRBD disks layouts.
569
570   """
571   assert disk.dev_type == constants.LD_DRBD8
572
573   disk.params = objects.FillDict(drbd_params, disk.params)
574   (dev_data, dev_meta) = disk.children
575   dev_data.params = objects.FillDict(data_params, dev_data.params)
576   dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
577
578   return disk
579
580
581 def _AnnotateDParamsGeneric(disk, (params, )):
582   """Generic disk parameter annotation routine.
583
584   """
585   assert disk.dev_type != constants.LD_DRBD8
586
587   disk.params = objects.FillDict(params, disk.params)
588
589   return disk
590
591
592 def AnnotateDiskParams(template, disks, disk_params):
593   """Annotates the disk objects with the disk parameters.
594
595   @param template: The disk template used
596   @param disks: The list of disks objects to annotate
597   @param disk_params: The disk paramaters for annotation
598   @returns: A list of disk objects annotated
599
600   """
601   ld_params = objects.Disk.ComputeLDParams(template, disk_params)
602
603   if template == constants.DT_DRBD8:
604     annotation_fn = _AnnotateDParamsDRBD
605   elif template == constants.DT_DISKLESS:
606     annotation_fn = lambda disk, _: disk
607   else:
608     annotation_fn = _AnnotateDParamsGeneric
609
610   new_disks = []
611   for disk in disks:
612     new_disks.append(annotation_fn(disk.Copy(), ld_params))
613
614   return new_disks
615
616
617 #: Generic encoders
618 _ENCODERS = {
619   rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
620   rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
621   rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
622   rpc_defs.ED_COMPRESS: _Compress,
623   rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
624   rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
625   rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
626   }
627
628
629 class RpcRunner(_RpcClientBase,
630                 _generated_rpc.RpcClientDefault,
631                 _generated_rpc.RpcClientBootstrap,
632                 _generated_rpc.RpcClientDnsOnly,
633                 _generated_rpc.RpcClientConfig):
634   """RPC runner class.
635
636   """
637   def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
638     """Initialized the RPC runner.
639
640     @type cfg: L{config.ConfigWriter}
641     @param cfg: Configuration
642     @type lock_monitor_cb: callable
643     @param lock_monitor_cb: Lock monitor callback
644
645     """
646     self._cfg = cfg
647
648     encoders = _ENCODERS.copy()
649
650     encoders.update({
651       # Encoders requiring configuration object
652       rpc_defs.ED_INST_DICT: self._InstDict,
653       rpc_defs.ED_INST_DICT_HVP_BEP: self._InstDictHvpBep,
654       rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
655
656       # Encoders annotating disk parameters
657       rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
658       rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
659
660       # Encoders with special requirements
661       rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
662       })
663
664     # Resolver using configuration
665     resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
666                               cfg.GetAllNodesInfo)
667
668     # Pylint doesn't recognize multiple inheritance properly, see
669     # <http://www.logilab.org/ticket/36586> and
670     # <http://www.logilab.org/ticket/35642>
671     # pylint: disable=W0233
672     _RpcClientBase.__init__(self, resolver, encoders.get,
673                             lock_monitor_cb=lock_monitor_cb,
674                             _req_process_fn=_req_process_fn)
675     _generated_rpc.RpcClientConfig.__init__(self)
676     _generated_rpc.RpcClientBootstrap.__init__(self)
677     _generated_rpc.RpcClientDnsOnly.__init__(self)
678     _generated_rpc.RpcClientDefault.__init__(self)
679
680   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
681     """Convert the given instance to a dict.
682
683     This is done via the instance's ToDict() method and additionally
684     we fill the hvparams with the cluster defaults.
685
686     @type instance: L{objects.Instance}
687     @param instance: an Instance object
688     @type hvp: dict or None
689     @param hvp: a dictionary with overridden hypervisor parameters
690     @type bep: dict or None
691     @param bep: a dictionary with overridden backend parameters
692     @type osp: dict or None
693     @param osp: a dictionary with overridden os parameters
694     @rtype: dict
695     @return: the instance dict, with the hvparams filled with the
696         cluster defaults
697
698     """
699     idict = instance.ToDict()
700     cluster = self._cfg.GetClusterInfo()
701     idict["hvparams"] = cluster.FillHV(instance)
702     if hvp is not None:
703       idict["hvparams"].update(hvp)
704     idict["beparams"] = cluster.FillBE(instance)
705     if bep is not None:
706       idict["beparams"].update(bep)
707     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
708     if osp is not None:
709       idict["osparams"].update(osp)
710     for nic in idict["nics"]:
711       nic["nicparams"] = objects.FillDict(
712         cluster.nicparams[constants.PP_DEFAULT],
713         nic["nicparams"])
714     return idict
715
716   def _InstDictHvpBep(self, (instance, hvp, bep)):
717     """Wrapper for L{_InstDict}.
718
719     """
720     return self._InstDict(instance, hvp=hvp, bep=bep)
721
722   def _InstDictOspDp(self, (instance, osparams)):
723     """Wrapper for L{_InstDict}.
724
725     """
726     updated_inst = self._InstDict(instance, osp=osparams)
727     updated_inst["disks"] = self._DisksDictDP((instance.disks, instance))
728     return updated_inst
729
730   def _DisksDictDP(self, (disks, instance)):
731     """Wrapper for L{AnnotateDiskParams}.
732
733     """
734     diskparams = self._cfg.GetInstanceDiskParams(instance)
735     return [disk.ToDict()
736             for disk in AnnotateDiskParams(instance.disk_template,
737                                            disks, diskparams)]
738
739   def _SingleDiskDictDP(self, (disk, instance)):
740     """Wrapper for L{AnnotateDiskParams}.
741
742     """
743     (anno_disk,) = self._DisksDictDP(([disk], instance))
744     return anno_disk
745
746
747 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
748   """RPC wrappers for job queue.
749
750   """
751   def __init__(self, context, address_list):
752     """Initializes this class.
753
754     """
755     if address_list is None:
756       resolver = compat.partial(_SsconfResolver, True)
757     else:
758       # Caller provided an address list
759       resolver = _StaticResolver(address_list)
760
761     _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
762                             lock_monitor_cb=context.glm.AddToLockMonitor)
763     _generated_rpc.RpcClientJobQueue.__init__(self)
764
765
766 class BootstrapRunner(_RpcClientBase,
767                       _generated_rpc.RpcClientBootstrap,
768                       _generated_rpc.RpcClientDnsOnly):
769   """RPC wrappers for bootstrapping.
770
771   """
772   def __init__(self):
773     """Initializes this class.
774
775     """
776     # Pylint doesn't recognize multiple inheritance properly, see
777     # <http://www.logilab.org/ticket/36586> and
778     # <http://www.logilab.org/ticket/35642>
779     # pylint: disable=W0233
780     _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
781                             _ENCODERS.get)
782     _generated_rpc.RpcClientBootstrap.__init__(self)
783     _generated_rpc.RpcClientDnsOnly.__init__(self)
784
785
786 class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
787   """RPC wrappers for calls using only DNS.
788
789   """
790   def __init__(self):
791     """Initialize this class.
792
793     """
794     _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
795                             _ENCODERS.get)
796     _generated_rpc.RpcClientDnsOnly.__init__(self)
797
798
799 class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
800   """RPC wrappers for L{config}.
801
802   """
803   def __init__(self, context, address_list, _req_process_fn=None,
804                _getents=None):
805     """Initializes this class.
806
807     """
808     if context:
809       lock_monitor_cb = context.glm.AddToLockMonitor
810     else:
811       lock_monitor_cb = None
812
813     if address_list is None:
814       resolver = compat.partial(_SsconfResolver, True)
815     else:
816       # Caller provided an address list
817       resolver = _StaticResolver(address_list)
818
819     encoders = _ENCODERS.copy()
820
821     encoders.update({
822       rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
823       })
824
825     _RpcClientBase.__init__(self, resolver, encoders.get,
826                             lock_monitor_cb=lock_monitor_cb,
827                             _req_process_fn=_req_process_fn)
828     _generated_rpc.RpcClientConfig.__init__(self)