objects, rpc: Code cleanup
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import logging
34 import zlib
35 import base64
36 import pycurl
37 import threading
38
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
45 from ganeti import netutils
46 from ganeti import ssconf
47 from ganeti import runtime
48 from ganeti import compat
49 from ganeti import rpc_defs
50
51 # Special module generated at build time
52 from ganeti import _generated_rpc
53
54 # pylint has a bug here, doesn't see this import
55 import ganeti.http.client  # pylint: disable=W0611
56
57
58 _RPC_CLIENT_HEADERS = [
59   "Content-type: %s" % http.HTTP_APP_JSON,
60   "Expect:",
61   ]
62
63 #: Special value to describe an offline host
64 _OFFLINE = object()
65
66
67 def Init():
68   """Initializes the module-global HTTP client manager.
69
70   Must be called before using any RPC function and while exactly one thread is
71   running.
72
73   """
74   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
75   # one thread running. This check is just a safety measure -- it doesn't
76   # cover all cases.
77   assert threading.activeCount() == 1, \
78          "Found more than one active thread when initializing pycURL"
79
80   logging.info("Using PycURL %s", pycurl.version)
81
82   pycurl.global_init(pycurl.GLOBAL_ALL)
83
84
85 def Shutdown():
86   """Stops the module-global HTTP client manager.
87
88   Must be called before quitting the program and while exactly one thread is
89   running.
90
91   """
92   pycurl.global_cleanup()
93
94
95 def _ConfigRpcCurl(curl):
96   noded_cert = str(constants.NODED_CERT_FILE)
97
98   curl.setopt(pycurl.FOLLOWLOCATION, False)
99   curl.setopt(pycurl.CAINFO, noded_cert)
100   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
101   curl.setopt(pycurl.SSL_VERIFYPEER, True)
102   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
103   curl.setopt(pycurl.SSLCERT, noded_cert)
104   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
105   curl.setopt(pycurl.SSLKEY, noded_cert)
106   curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
107
108
109 def RunWithRPC(fn):
110   """RPC-wrapper decorator.
111
112   When applied to a function, it runs it with the RPC system
113   initialized, and it shutsdown the system afterwards. This means the
114   function must be called without RPC being initialized.
115
116   """
117   def wrapper(*args, **kwargs):
118     Init()
119     try:
120       return fn(*args, **kwargs)
121     finally:
122       Shutdown()
123   return wrapper
124
125
126 def _Compress(data):
127   """Compresses a string for transport over RPC.
128
129   Small amounts of data are not compressed.
130
131   @type data: str
132   @param data: Data
133   @rtype: tuple
134   @return: Encoded data to send
135
136   """
137   # Small amounts of data are not compressed
138   if len(data) < 512:
139     return (constants.RPC_ENCODING_NONE, data)
140
141   # Compress with zlib and encode in base64
142   return (constants.RPC_ENCODING_ZLIB_BASE64,
143           base64.b64encode(zlib.compress(data, 3)))
144
145
146 class RpcResult(object):
147   """RPC Result class.
148
149   This class holds an RPC result. It is needed since in multi-node
150   calls we can't raise an exception just because one one out of many
151   failed, and therefore we use this class to encapsulate the result.
152
153   @ivar data: the data payload, for successful results, or None
154   @ivar call: the name of the RPC call
155   @ivar node: the name of the node to which we made the call
156   @ivar offline: whether the operation failed because the node was
157       offline, as opposed to actual failure; offline=True will always
158       imply failed=True, in order to allow simpler checking if
159       the user doesn't care about the exact failure mode
160   @ivar fail_msg: the error message if the call failed
161
162   """
163   def __init__(self, data=None, failed=False, offline=False,
164                call=None, node=None):
165     self.offline = offline
166     self.call = call
167     self.node = node
168
169     if offline:
170       self.fail_msg = "Node is marked offline"
171       self.data = self.payload = None
172     elif failed:
173       self.fail_msg = self._EnsureErr(data)
174       self.data = self.payload = None
175     else:
176       self.data = data
177       if not isinstance(self.data, (tuple, list)):
178         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
179                          type(self.data))
180         self.payload = None
181       elif len(data) != 2:
182         self.fail_msg = ("RPC layer error: invalid result length (%d), "
183                          "expected 2" % len(self.data))
184         self.payload = None
185       elif not self.data[0]:
186         self.fail_msg = self._EnsureErr(self.data[1])
187         self.payload = None
188       else:
189         # finally success
190         self.fail_msg = None
191         self.payload = data[1]
192
193     for attr_name in ["call", "data", "fail_msg",
194                       "node", "offline", "payload"]:
195       assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
196
197   @staticmethod
198   def _EnsureErr(val):
199     """Helper to ensure we return a 'True' value for error."""
200     if val:
201       return val
202     else:
203       return "No error information"
204
205   def Raise(self, msg, prereq=False, ecode=None):
206     """If the result has failed, raise an OpExecError.
207
208     This is used so that LU code doesn't have to check for each
209     result, but instead can call this function.
210
211     """
212     if not self.fail_msg:
213       return
214
215     if not msg: # one could pass None for default message
216       msg = ("Call '%s' to node '%s' has failed: %s" %
217              (self.call, self.node, self.fail_msg))
218     else:
219       msg = "%s: %s" % (msg, self.fail_msg)
220     if prereq:
221       ec = errors.OpPrereqError
222     else:
223       ec = errors.OpExecError
224     if ecode is not None:
225       args = (msg, ecode)
226     else:
227       args = (msg, )
228     raise ec(*args) # pylint: disable=W0142
229
230
231 def _SsconfResolver(ssconf_ips, node_list, _,
232                     ssc=ssconf.SimpleStore,
233                     nslookup_fn=netutils.Hostname.GetIP):
234   """Return addresses for given node names.
235
236   @type ssconf_ips: bool
237   @param ssconf_ips: Use the ssconf IPs
238   @type node_list: list
239   @param node_list: List of node names
240   @type ssc: class
241   @param ssc: SimpleStore class that is used to obtain node->ip mappings
242   @type nslookup_fn: callable
243   @param nslookup_fn: function use to do NS lookup
244   @rtype: list of tuple; (string, string)
245   @return: List of tuples containing node name and IP address
246
247   """
248   ss = ssc()
249   family = ss.GetPrimaryIPFamily()
250
251   if ssconf_ips:
252     iplist = ss.GetNodePrimaryIPList()
253     ipmap = dict(entry.split() for entry in iplist)
254   else:
255     ipmap = {}
256
257   result = []
258   for node in node_list:
259     ip = ipmap.get(node)
260     if ip is None:
261       ip = nslookup_fn(node, family=family)
262     result.append((node, ip))
263
264   return result
265
266
267 class _StaticResolver:
268   def __init__(self, addresses):
269     """Initializes this class.
270
271     """
272     self._addresses = addresses
273
274   def __call__(self, hosts, _):
275     """Returns static addresses for hosts.
276
277     """
278     assert len(hosts) == len(self._addresses)
279     return zip(hosts, self._addresses)
280
281
282 def _CheckConfigNode(name, node, accept_offline_node):
283   """Checks if a node is online.
284
285   @type name: string
286   @param name: Node name
287   @type node: L{objects.Node} or None
288   @param node: Node object
289
290   """
291   if node is None:
292     # Depend on DNS for name resolution
293     ip = name
294   elif node.offline and not accept_offline_node:
295     ip = _OFFLINE
296   else:
297     ip = node.primary_ip
298   return (name, ip)
299
300
301 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
302   """Calculate node addresses using configuration.
303
304   """
305   accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
306
307   assert accept_offline_node or opts is None, "Unknown option"
308
309   # Special case for single-host lookups
310   if len(hosts) == 1:
311     (name, ) = hosts
312     return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
313   else:
314     all_nodes = all_nodes_fn()
315     return [_CheckConfigNode(name, all_nodes.get(name, None),
316                              accept_offline_node)
317             for name in hosts]
318
319
320 class _RpcProcessor:
321   def __init__(self, resolver, port, lock_monitor_cb=None):
322     """Initializes this class.
323
324     @param resolver: callable accepting a list of hostnames, returning a list
325       of tuples containing name and IP address (IP address can be the name or
326       the special value L{_OFFLINE} to mark offline machines)
327     @type port: int
328     @param port: TCP port
329     @param lock_monitor_cb: Callable for registering with lock monitor
330
331     """
332     self._resolver = resolver
333     self._port = port
334     self._lock_monitor_cb = lock_monitor_cb
335
336   @staticmethod
337   def _PrepareRequests(hosts, port, procedure, body, read_timeout):
338     """Prepares requests by sorting offline hosts into separate list.
339
340     @type body: dict
341     @param body: a dictionary with per-host body data
342
343     """
344     results = {}
345     requests = {}
346
347     assert isinstance(body, dict)
348     assert len(body) == len(hosts)
349     assert compat.all(isinstance(v, str) for v in body.values())
350     assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
351         "%s != %s" % (hosts, body.keys())
352
353     for (name, ip) in hosts:
354       if ip is _OFFLINE:
355         # Node is marked as offline
356         results[name] = RpcResult(node=name, offline=True, call=procedure)
357       else:
358         requests[name] = \
359           http.client.HttpClientRequest(str(ip), port,
360                                         http.HTTP_POST, str("/%s" % procedure),
361                                         headers=_RPC_CLIENT_HEADERS,
362                                         post_data=body[name],
363                                         read_timeout=read_timeout,
364                                         nicename="%s/%s" % (name, procedure),
365                                         curl_config_fn=_ConfigRpcCurl)
366
367     return (results, requests)
368
369   @staticmethod
370   def _CombineResults(results, requests, procedure):
371     """Combines pre-computed results for offline hosts with actual call results.
372
373     """
374     for name, req in requests.items():
375       if req.success and req.resp_status_code == http.HTTP_OK:
376         host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
377                                 node=name, call=procedure)
378       else:
379         # TODO: Better error reporting
380         if req.error:
381           msg = req.error
382         else:
383           msg = req.resp_body
384
385         logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
386         host_result = RpcResult(data=msg, failed=True, node=name,
387                                 call=procedure)
388
389       results[name] = host_result
390
391     return results
392
393   def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
394                _req_process_fn=None):
395     """Makes an RPC request to a number of nodes.
396
397     @type hosts: sequence
398     @param hosts: Hostnames
399     @type procedure: string
400     @param procedure: Request path
401     @type body: dictionary
402     @param body: dictionary with request bodies per host
403     @type read_timeout: int or None
404     @param read_timeout: Read timeout for request
405
406     """
407     assert read_timeout is not None, \
408       "Missing RPC read timeout for procedure '%s'" % procedure
409
410     if _req_process_fn is None:
411       _req_process_fn = http.client.ProcessRequests
412
413     (results, requests) = \
414       self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
415                             procedure, body, read_timeout)
416
417     _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
418
419     assert not frozenset(results).intersection(requests)
420
421     return self._CombineResults(results, requests, procedure)
422
423
424 class _RpcClientBase:
425   def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
426                _req_process_fn=None):
427     """Initializes this class.
428
429     """
430     proc = _RpcProcessor(resolver,
431                          netutils.GetDaemonPort(constants.NODED),
432                          lock_monitor_cb=lock_monitor_cb)
433     self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
434     self._encoder = compat.partial(self._EncodeArg, encoder_fn)
435
436   @staticmethod
437   def _EncodeArg(encoder_fn, (argkind, value)):
438     """Encode argument.
439
440     """
441     if argkind is None:
442       return value
443     else:
444       return encoder_fn(argkind)(value)
445
446   def _Call(self, cdef, node_list, args):
447     """Entry point for automatically generated RPC wrappers.
448
449     """
450     (procedure, _, resolver_opts, timeout, argdefs,
451      prep_fn, postproc_fn, _) = cdef
452
453     if callable(timeout):
454       read_timeout = timeout(args)
455     else:
456       read_timeout = timeout
457
458     if callable(resolver_opts):
459       req_resolver_opts = resolver_opts(args)
460     else:
461       req_resolver_opts = resolver_opts
462
463     if len(args) != len(argdefs):
464       raise errors.ProgrammerError("Number of passed arguments doesn't match")
465
466     enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
467     if prep_fn is None:
468       # for a no-op prep_fn, we serialise the body once, and then we
469       # reuse it in the dictionary values
470       body = serializer.DumpJson(enc_args)
471       pnbody = dict((n, body) for n in node_list)
472     else:
473       # for a custom prep_fn, we pass the encoded arguments and the
474       # node name to the prep_fn, and we serialise its return value
475       assert callable(prep_fn)
476       pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
477                     for n in node_list)
478
479     result = self._proc(node_list, procedure, pnbody, read_timeout,
480                         req_resolver_opts)
481
482     if postproc_fn:
483       return dict(map(lambda (key, value): (key, postproc_fn(value)),
484                       result.items()))
485     else:
486       return result
487
488
489 def _ObjectToDict(value):
490   """Converts an object to a dictionary.
491
492   @note: See L{objects}.
493
494   """
495   return value.ToDict()
496
497
498 def _ObjectListToDict(value):
499   """Converts a list of L{objects} to dictionaries.
500
501   """
502   return map(_ObjectToDict, value)
503
504
505 def _EncodeNodeToDiskDict(value):
506   """Encodes a dictionary with node name as key and disk objects as values.
507
508   """
509   return dict((name, _ObjectListToDict(disks))
510               for name, disks in value.items())
511
512
513 def _PrepareFileUpload(getents_fn, filename):
514   """Loads a file and prepares it for an upload to nodes.
515
516   """
517   statcb = utils.FileStatHelper()
518   data = _Compress(utils.ReadFile(filename, preread=statcb))
519   st = statcb.st
520
521   if getents_fn is None:
522     getents_fn = runtime.GetEnts
523
524   getents = getents_fn()
525
526   return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
527           getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
528
529
530 def _PrepareFinalizeExportDisks(snap_disks):
531   """Encodes disks for finalizing export.
532
533   """
534   flat_disks = []
535
536   for disk in snap_disks:
537     if isinstance(disk, bool):
538       flat_disks.append(disk)
539     else:
540       flat_disks.append(disk.ToDict())
541
542   return flat_disks
543
544
545 def _EncodeImportExportIO((ieio, ieioargs)):
546   """Encodes import/export I/O information.
547
548   """
549   if ieio == constants.IEIO_RAW_DISK:
550     assert len(ieioargs) == 1
551     return (ieio, (ieioargs[0].ToDict(), ))
552
553   if ieio == constants.IEIO_SCRIPT:
554     assert len(ieioargs) == 2
555     return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
556
557   return (ieio, ieioargs)
558
559
560 def _EncodeBlockdevRename(value):
561   """Encodes information for renaming block devices.
562
563   """
564   return [(d.ToDict(), uid) for d, uid in value]
565
566
567 def MakeLegacyNodeInfo(data):
568   """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
569
570   Converts the data into a single dictionary. This is fine for most use cases,
571   but some require information from more than one volume group or hypervisor.
572
573   """
574   (bootid, (vg_info, ), (hv_info, )) = data
575
576   return utils.JoinDisjointDicts(utils.JoinDisjointDicts(vg_info, hv_info), {
577     "bootid": bootid,
578     })
579
580
581 def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
582   """Annotates just DRBD disks layouts.
583
584   """
585   assert disk.dev_type == constants.LD_DRBD8
586
587   disk.params = objects.FillDict(drbd_params, disk.params)
588   (dev_data, dev_meta) = disk.children
589   dev_data.params = objects.FillDict(data_params, dev_data.params)
590   dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
591
592   return disk
593
594
595 def _AnnotateDParamsGeneric(disk, (params, )):
596   """Generic disk parameter annotation routine.
597
598   """
599   assert disk.dev_type != constants.LD_DRBD8
600
601   disk.params = objects.FillDict(params, disk.params)
602
603   return disk
604
605
606 def AnnotateDiskParams(template, disks, disk_params):
607   """Annotates the disk objects with the disk parameters.
608
609   @param template: The disk template used
610   @param disks: The list of disks objects to annotate
611   @param disk_params: The disk paramaters for annotation
612   @returns: A list of disk objects annotated
613
614   """
615   ld_params = objects.Disk.ComputeLDParams(template, disk_params)
616
617   if template == constants.DT_DRBD8:
618     annotation_fn = _AnnotateDParamsDRBD
619   elif template == constants.DT_DISKLESS:
620     annotation_fn = lambda disk, _: disk
621   else:
622     annotation_fn = _AnnotateDParamsGeneric
623
624   return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
625
626
627 #: Generic encoders
628 _ENCODERS = {
629   rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
630   rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
631   rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
632   rpc_defs.ED_COMPRESS: _Compress,
633   rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
634   rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
635   rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
636   }
637
638
639 class RpcRunner(_RpcClientBase,
640                 _generated_rpc.RpcClientDefault,
641                 _generated_rpc.RpcClientBootstrap,
642                 _generated_rpc.RpcClientDnsOnly,
643                 _generated_rpc.RpcClientConfig):
644   """RPC runner class.
645
646   """
647   def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
648     """Initialized the RPC runner.
649
650     @type cfg: L{config.ConfigWriter}
651     @param cfg: Configuration
652     @type lock_monitor_cb: callable
653     @param lock_monitor_cb: Lock monitor callback
654
655     """
656     self._cfg = cfg
657
658     encoders = _ENCODERS.copy()
659
660     encoders.update({
661       # Encoders requiring configuration object
662       rpc_defs.ED_INST_DICT: self._InstDict,
663       rpc_defs.ED_INST_DICT_HVP_BEP: self._InstDictHvpBep,
664       rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
665
666       # Encoders annotating disk parameters
667       rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
668       rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
669
670       # Encoders with special requirements
671       rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
672       })
673
674     # Resolver using configuration
675     resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
676                               cfg.GetAllNodesInfo)
677
678     # Pylint doesn't recognize multiple inheritance properly, see
679     # <http://www.logilab.org/ticket/36586> and
680     # <http://www.logilab.org/ticket/35642>
681     # pylint: disable=W0233
682     _RpcClientBase.__init__(self, resolver, encoders.get,
683                             lock_monitor_cb=lock_monitor_cb,
684                             _req_process_fn=_req_process_fn)
685     _generated_rpc.RpcClientConfig.__init__(self)
686     _generated_rpc.RpcClientBootstrap.__init__(self)
687     _generated_rpc.RpcClientDnsOnly.__init__(self)
688     _generated_rpc.RpcClientDefault.__init__(self)
689
690   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
691     """Convert the given instance to a dict.
692
693     This is done via the instance's ToDict() method and additionally
694     we fill the hvparams with the cluster defaults.
695
696     @type instance: L{objects.Instance}
697     @param instance: an Instance object
698     @type hvp: dict or None
699     @param hvp: a dictionary with overridden hypervisor parameters
700     @type bep: dict or None
701     @param bep: a dictionary with overridden backend parameters
702     @type osp: dict or None
703     @param osp: a dictionary with overridden os parameters
704     @rtype: dict
705     @return: the instance dict, with the hvparams filled with the
706         cluster defaults
707
708     """
709     idict = instance.ToDict()
710     cluster = self._cfg.GetClusterInfo()
711     idict["hvparams"] = cluster.FillHV(instance)
712     if hvp is not None:
713       idict["hvparams"].update(hvp)
714     idict["beparams"] = cluster.FillBE(instance)
715     if bep is not None:
716       idict["beparams"].update(bep)
717     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
718     if osp is not None:
719       idict["osparams"].update(osp)
720     for nic in idict["nics"]:
721       nic["nicparams"] = objects.FillDict(
722         cluster.nicparams[constants.PP_DEFAULT],
723         nic["nicparams"])
724     return idict
725
726   def _InstDictHvpBep(self, (instance, hvp, bep)):
727     """Wrapper for L{_InstDict}.
728
729     """
730     return self._InstDict(instance, hvp=hvp, bep=bep)
731
732   def _InstDictOspDp(self, (instance, osparams)):
733     """Wrapper for L{_InstDict}.
734
735     """
736     updated_inst = self._InstDict(instance, osp=osparams)
737     updated_inst["disks"] = self._DisksDictDP((instance.disks, instance))
738     return updated_inst
739
740   def _DisksDictDP(self, (disks, instance)):
741     """Wrapper for L{AnnotateDiskParams}.
742
743     """
744     diskparams = self._cfg.GetInstanceDiskParams(instance)
745     return [disk.ToDict()
746             for disk in AnnotateDiskParams(instance.disk_template,
747                                            disks, diskparams)]
748
749   def _SingleDiskDictDP(self, (disk, instance)):
750     """Wrapper for L{AnnotateDiskParams}.
751
752     """
753     (anno_disk,) = self._DisksDictDP(([disk], instance))
754     return anno_disk
755
756
757 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
758   """RPC wrappers for job queue.
759
760   """
761   def __init__(self, context, address_list):
762     """Initializes this class.
763
764     """
765     if address_list is None:
766       resolver = compat.partial(_SsconfResolver, True)
767     else:
768       # Caller provided an address list
769       resolver = _StaticResolver(address_list)
770
771     _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
772                             lock_monitor_cb=context.glm.AddToLockMonitor)
773     _generated_rpc.RpcClientJobQueue.__init__(self)
774
775
776 class BootstrapRunner(_RpcClientBase,
777                       _generated_rpc.RpcClientBootstrap,
778                       _generated_rpc.RpcClientDnsOnly):
779   """RPC wrappers for bootstrapping.
780
781   """
782   def __init__(self):
783     """Initializes this class.
784
785     """
786     # Pylint doesn't recognize multiple inheritance properly, see
787     # <http://www.logilab.org/ticket/36586> and
788     # <http://www.logilab.org/ticket/35642>
789     # pylint: disable=W0233
790     _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
791                             _ENCODERS.get)
792     _generated_rpc.RpcClientBootstrap.__init__(self)
793     _generated_rpc.RpcClientDnsOnly.__init__(self)
794
795
796 class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
797   """RPC wrappers for calls using only DNS.
798
799   """
800   def __init__(self):
801     """Initialize this class.
802
803     """
804     _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
805                             _ENCODERS.get)
806     _generated_rpc.RpcClientDnsOnly.__init__(self)
807
808
809 class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
810   """RPC wrappers for L{config}.
811
812   """
813   def __init__(self, context, address_list, _req_process_fn=None,
814                _getents=None):
815     """Initializes this class.
816
817     """
818     if context:
819       lock_monitor_cb = context.glm.AddToLockMonitor
820     else:
821       lock_monitor_cb = None
822
823     if address_list is None:
824       resolver = compat.partial(_SsconfResolver, True)
825     else:
826       # Caller provided an address list
827       resolver = _StaticResolver(address_list)
828
829     encoders = _ENCODERS.copy()
830
831     encoders.update({
832       rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
833       })
834
835     _RpcClientBase.__init__(self, resolver, encoders.get,
836                             lock_monitor_cb=lock_monitor_cb,
837                             _req_process_fn=_req_process_fn)
838     _generated_rpc.RpcClientConfig.__init__(self)