Merge branch 'stable-2.6' into devel-2.6
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import logging
34 import zlib
35 import base64
36 import pycurl
37 import threading
38
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
45 from ganeti import netutils
46 from ganeti import ssconf
47 from ganeti import runtime
48 from ganeti import compat
49 from ganeti import rpc_defs
50
51 # Special module generated at build time
52 from ganeti import _generated_rpc
53
54 # pylint has a bug here, doesn't see this import
55 import ganeti.http.client  # pylint: disable=W0611
56
57
58 # Timeout for connecting to nodes (seconds)
59 _RPC_CONNECT_TIMEOUT = 5
60
61 _RPC_CLIENT_HEADERS = [
62   "Content-type: %s" % http.HTTP_APP_JSON,
63   "Expect:",
64   ]
65
66 # Various time constants for the timeout table
67 _TMO_URGENT = 60 # one minute
68 _TMO_FAST = 5 * 60 # five minutes
69 _TMO_NORMAL = 15 * 60 # 15 minutes
70 _TMO_SLOW = 3600 # one hour
71 _TMO_4HRS = 4 * 3600
72 _TMO_1DAY = 86400
73
74 #: Special value to describe an offline host
75 _OFFLINE = object()
76
77
78 def Init():
79   """Initializes the module-global HTTP client manager.
80
81   Must be called before using any RPC function and while exactly one thread is
82   running.
83
84   """
85   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
86   # one thread running. This check is just a safety measure -- it doesn't
87   # cover all cases.
88   assert threading.activeCount() == 1, \
89          "Found more than one active thread when initializing pycURL"
90
91   logging.info("Using PycURL %s", pycurl.version)
92
93   pycurl.global_init(pycurl.GLOBAL_ALL)
94
95
96 def Shutdown():
97   """Stops the module-global HTTP client manager.
98
99   Must be called before quitting the program and while exactly one thread is
100   running.
101
102   """
103   pycurl.global_cleanup()
104
105
106 def _ConfigRpcCurl(curl):
107   noded_cert = str(constants.NODED_CERT_FILE)
108
109   curl.setopt(pycurl.FOLLOWLOCATION, False)
110   curl.setopt(pycurl.CAINFO, noded_cert)
111   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
112   curl.setopt(pycurl.SSL_VERIFYPEER, True)
113   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
114   curl.setopt(pycurl.SSLCERT, noded_cert)
115   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
116   curl.setopt(pycurl.SSLKEY, noded_cert)
117   curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
118
119
120 def RunWithRPC(fn):
121   """RPC-wrapper decorator.
122
123   When applied to a function, it runs it with the RPC system
124   initialized, and it shutsdown the system afterwards. This means the
125   function must be called without RPC being initialized.
126
127   """
128   def wrapper(*args, **kwargs):
129     Init()
130     try:
131       return fn(*args, **kwargs)
132     finally:
133       Shutdown()
134   return wrapper
135
136
137 def _Compress(data):
138   """Compresses a string for transport over RPC.
139
140   Small amounts of data are not compressed.
141
142   @type data: str
143   @param data: Data
144   @rtype: tuple
145   @return: Encoded data to send
146
147   """
148   # Small amounts of data are not compressed
149   if len(data) < 512:
150     return (constants.RPC_ENCODING_NONE, data)
151
152   # Compress with zlib and encode in base64
153   return (constants.RPC_ENCODING_ZLIB_BASE64,
154           base64.b64encode(zlib.compress(data, 3)))
155
156
157 class RpcResult(object):
158   """RPC Result class.
159
160   This class holds an RPC result. It is needed since in multi-node
161   calls we can't raise an exception just because one one out of many
162   failed, and therefore we use this class to encapsulate the result.
163
164   @ivar data: the data payload, for successful results, or None
165   @ivar call: the name of the RPC call
166   @ivar node: the name of the node to which we made the call
167   @ivar offline: whether the operation failed because the node was
168       offline, as opposed to actual failure; offline=True will always
169       imply failed=True, in order to allow simpler checking if
170       the user doesn't care about the exact failure mode
171   @ivar fail_msg: the error message if the call failed
172
173   """
174   def __init__(self, data=None, failed=False, offline=False,
175                call=None, node=None):
176     self.offline = offline
177     self.call = call
178     self.node = node
179
180     if offline:
181       self.fail_msg = "Node is marked offline"
182       self.data = self.payload = None
183     elif failed:
184       self.fail_msg = self._EnsureErr(data)
185       self.data = self.payload = None
186     else:
187       self.data = data
188       if not isinstance(self.data, (tuple, list)):
189         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
190                          type(self.data))
191         self.payload = None
192       elif len(data) != 2:
193         self.fail_msg = ("RPC layer error: invalid result length (%d), "
194                          "expected 2" % len(self.data))
195         self.payload = None
196       elif not self.data[0]:
197         self.fail_msg = self._EnsureErr(self.data[1])
198         self.payload = None
199       else:
200         # finally success
201         self.fail_msg = None
202         self.payload = data[1]
203
204     for attr_name in ["call", "data", "fail_msg",
205                       "node", "offline", "payload"]:
206       assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
207
208   @staticmethod
209   def _EnsureErr(val):
210     """Helper to ensure we return a 'True' value for error."""
211     if val:
212       return val
213     else:
214       return "No error information"
215
216   def Raise(self, msg, prereq=False, ecode=None):
217     """If the result has failed, raise an OpExecError.
218
219     This is used so that LU code doesn't have to check for each
220     result, but instead can call this function.
221
222     """
223     if not self.fail_msg:
224       return
225
226     if not msg: # one could pass None for default message
227       msg = ("Call '%s' to node '%s' has failed: %s" %
228              (self.call, self.node, self.fail_msg))
229     else:
230       msg = "%s: %s" % (msg, self.fail_msg)
231     if prereq:
232       ec = errors.OpPrereqError
233     else:
234       ec = errors.OpExecError
235     if ecode is not None:
236       args = (msg, ecode)
237     else:
238       args = (msg, )
239     raise ec(*args) # pylint: disable=W0142
240
241
242 def _SsconfResolver(ssconf_ips, node_list, _,
243                     ssc=ssconf.SimpleStore,
244                     nslookup_fn=netutils.Hostname.GetIP):
245   """Return addresses for given node names.
246
247   @type ssconf_ips: bool
248   @param ssconf_ips: Use the ssconf IPs
249   @type node_list: list
250   @param node_list: List of node names
251   @type ssc: class
252   @param ssc: SimpleStore class that is used to obtain node->ip mappings
253   @type nslookup_fn: callable
254   @param nslookup_fn: function use to do NS lookup
255   @rtype: list of tuple; (string, string)
256   @return: List of tuples containing node name and IP address
257
258   """
259   ss = ssc()
260   family = ss.GetPrimaryIPFamily()
261
262   if ssconf_ips:
263     iplist = ss.GetNodePrimaryIPList()
264     ipmap = dict(entry.split() for entry in iplist)
265   else:
266     ipmap = {}
267
268   result = []
269   for node in node_list:
270     ip = ipmap.get(node)
271     if ip is None:
272       ip = nslookup_fn(node, family=family)
273     result.append((node, ip))
274
275   return result
276
277
278 class _StaticResolver:
279   def __init__(self, addresses):
280     """Initializes this class.
281
282     """
283     self._addresses = addresses
284
285   def __call__(self, hosts, _):
286     """Returns static addresses for hosts.
287
288     """
289     assert len(hosts) == len(self._addresses)
290     return zip(hosts, self._addresses)
291
292
293 def _CheckConfigNode(name, node, accept_offline_node):
294   """Checks if a node is online.
295
296   @type name: string
297   @param name: Node name
298   @type node: L{objects.Node} or None
299   @param node: Node object
300
301   """
302   if node is None:
303     # Depend on DNS for name resolution
304     ip = name
305   elif node.offline and not accept_offline_node:
306     ip = _OFFLINE
307   else:
308     ip = node.primary_ip
309   return (name, ip)
310
311
312 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
313   """Calculate node addresses using configuration.
314
315   """
316   accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
317
318   assert accept_offline_node or opts is None, "Unknown option"
319
320   # Special case for single-host lookups
321   if len(hosts) == 1:
322     (name, ) = hosts
323     return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
324   else:
325     all_nodes = all_nodes_fn()
326     return [_CheckConfigNode(name, all_nodes.get(name, None),
327                              accept_offline_node)
328             for name in hosts]
329
330
331 class _RpcProcessor:
332   def __init__(self, resolver, port, lock_monitor_cb=None):
333     """Initializes this class.
334
335     @param resolver: callable accepting a list of hostnames, returning a list
336       of tuples containing name and IP address (IP address can be the name or
337       the special value L{_OFFLINE} to mark offline machines)
338     @type port: int
339     @param port: TCP port
340     @param lock_monitor_cb: Callable for registering with lock monitor
341
342     """
343     self._resolver = resolver
344     self._port = port
345     self._lock_monitor_cb = lock_monitor_cb
346
347   @staticmethod
348   def _PrepareRequests(hosts, port, procedure, body, read_timeout):
349     """Prepares requests by sorting offline hosts into separate list.
350
351     @type body: dict
352     @param body: a dictionary with per-host body data
353
354     """
355     results = {}
356     requests = {}
357
358     assert isinstance(body, dict)
359     assert len(body) == len(hosts)
360     assert compat.all(isinstance(v, str) for v in body.values())
361     assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
362         "%s != %s" % (hosts, body.keys())
363
364     for (name, ip) in hosts:
365       if ip is _OFFLINE:
366         # Node is marked as offline
367         results[name] = RpcResult(node=name, offline=True, call=procedure)
368       else:
369         requests[name] = \
370           http.client.HttpClientRequest(str(ip), port,
371                                         http.HTTP_POST, str("/%s" % procedure),
372                                         headers=_RPC_CLIENT_HEADERS,
373                                         post_data=body[name],
374                                         read_timeout=read_timeout,
375                                         nicename="%s/%s" % (name, procedure),
376                                         curl_config_fn=_ConfigRpcCurl)
377
378     return (results, requests)
379
380   @staticmethod
381   def _CombineResults(results, requests, procedure):
382     """Combines pre-computed results for offline hosts with actual call results.
383
384     """
385     for name, req in requests.items():
386       if req.success and req.resp_status_code == http.HTTP_OK:
387         host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
388                                 node=name, call=procedure)
389       else:
390         # TODO: Better error reporting
391         if req.error:
392           msg = req.error
393         else:
394           msg = req.resp_body
395
396         logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
397         host_result = RpcResult(data=msg, failed=True, node=name,
398                                 call=procedure)
399
400       results[name] = host_result
401
402     return results
403
404   def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
405                _req_process_fn=None):
406     """Makes an RPC request to a number of nodes.
407
408     @type hosts: sequence
409     @param hosts: Hostnames
410     @type procedure: string
411     @param procedure: Request path
412     @type body: dictionary
413     @param body: dictionary with request bodies per host
414     @type read_timeout: int or None
415     @param read_timeout: Read timeout for request
416
417     """
418     assert read_timeout is not None, \
419       "Missing RPC read timeout for procedure '%s'" % procedure
420
421     if _req_process_fn is None:
422       _req_process_fn = http.client.ProcessRequests
423
424     (results, requests) = \
425       self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
426                             procedure, body, read_timeout)
427
428     _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
429
430     assert not frozenset(results).intersection(requests)
431
432     return self._CombineResults(results, requests, procedure)
433
434
435 class _RpcClientBase:
436   def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
437                _req_process_fn=None):
438     """Initializes this class.
439
440     """
441     proc = _RpcProcessor(resolver,
442                          netutils.GetDaemonPort(constants.NODED),
443                          lock_monitor_cb=lock_monitor_cb)
444     self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
445     self._encoder = compat.partial(self._EncodeArg, encoder_fn)
446
447   @staticmethod
448   def _EncodeArg(encoder_fn, (argkind, value)):
449     """Encode argument.
450
451     """
452     if argkind is None:
453       return value
454     else:
455       return encoder_fn(argkind)(value)
456
457   def _Call(self, cdef, node_list, args):
458     """Entry point for automatically generated RPC wrappers.
459
460     """
461     (procedure, _, resolver_opts, timeout, argdefs,
462      prep_fn, postproc_fn, _) = cdef
463
464     if callable(timeout):
465       read_timeout = timeout(args)
466     else:
467       read_timeout = timeout
468
469     if callable(resolver_opts):
470       req_resolver_opts = resolver_opts(args)
471     else:
472       req_resolver_opts = resolver_opts
473
474     if len(args) != len(argdefs):
475       raise errors.ProgrammerError("Number of passed arguments doesn't match")
476
477     enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
478     if prep_fn is None:
479       # for a no-op prep_fn, we serialise the body once, and then we
480       # reuse it in the dictionary values
481       body = serializer.DumpJson(enc_args)
482       pnbody = dict((n, body) for n in node_list)
483     else:
484       # for a custom prep_fn, we pass the encoded arguments and the
485       # node name to the prep_fn, and we serialise its return value
486       assert callable(prep_fn)
487       pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
488                     for n in node_list)
489
490     result = self._proc(node_list, procedure, pnbody, read_timeout,
491                         req_resolver_opts)
492
493     if postproc_fn:
494       return dict(map(lambda (key, value): (key, postproc_fn(value)),
495                       result.items()))
496     else:
497       return result
498
499
500 def _ObjectToDict(value):
501   """Converts an object to a dictionary.
502
503   @note: See L{objects}.
504
505   """
506   return value.ToDict()
507
508
509 def _ObjectListToDict(value):
510   """Converts a list of L{objects} to dictionaries.
511
512   """
513   return map(_ObjectToDict, value)
514
515
516 def _EncodeNodeToDiskDict(value):
517   """Encodes a dictionary with node name as key and disk objects as values.
518
519   """
520   return dict((name, _ObjectListToDict(disks))
521               for name, disks in value.items())
522
523
524 def _PrepareFileUpload(getents_fn, filename):
525   """Loads a file and prepares it for an upload to nodes.
526
527   """
528   statcb = utils.FileStatHelper()
529   data = _Compress(utils.ReadFile(filename, preread=statcb))
530   st = statcb.st
531
532   if getents_fn is None:
533     getents_fn = runtime.GetEnts
534
535   getents = getents_fn()
536
537   return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
538           getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
539
540
541 def _PrepareFinalizeExportDisks(snap_disks):
542   """Encodes disks for finalizing export.
543
544   """
545   flat_disks = []
546
547   for disk in snap_disks:
548     if isinstance(disk, bool):
549       flat_disks.append(disk)
550     else:
551       flat_disks.append(disk.ToDict())
552
553   return flat_disks
554
555
556 def _EncodeImportExportIO((ieio, ieioargs)):
557   """Encodes import/export I/O information.
558
559   """
560   if ieio == constants.IEIO_RAW_DISK:
561     assert len(ieioargs) == 1
562     return (ieio, (ieioargs[0].ToDict(), ))
563
564   if ieio == constants.IEIO_SCRIPT:
565     assert len(ieioargs) == 2
566     return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
567
568   return (ieio, ieioargs)
569
570
571 def _EncodeBlockdevRename(value):
572   """Encodes information for renaming block devices.
573
574   """
575   return [(d.ToDict(), uid) for d, uid in value]
576
577
578 def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
579   """Annotates just DRBD disks layouts.
580
581   """
582   assert disk.dev_type == constants.LD_DRBD8
583
584   disk.params = objects.FillDict(drbd_params, disk.params)
585   (dev_data, dev_meta) = disk.children
586   dev_data.params = objects.FillDict(data_params, dev_data.params)
587   dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
588
589   return disk
590
591
592 def _AnnotateDParamsGeneric(disk, (params, )):
593   """Generic disk parameter annotation routine.
594
595   """
596   assert disk.dev_type != constants.LD_DRBD8
597
598   disk.params = objects.FillDict(params, disk.params)
599
600   return disk
601
602
603 def AnnotateDiskParams(template, disks, disk_params):
604   """Annotates the disk objects with the disk parameters.
605
606   @param template: The disk template used
607   @param disks: The list of disks objects to annotate
608   @param disk_params: The disk paramaters for annotation
609   @returns: A list of disk objects annotated
610
611   """
612   ld_params = objects.Disk.ComputeLDParams(template, disk_params)
613
614   if template == constants.DT_DRBD8:
615     annotation_fn = _AnnotateDParamsDRBD
616   elif template == constants.DT_DISKLESS:
617     annotation_fn = lambda disk, _: disk
618   else:
619     annotation_fn = _AnnotateDParamsGeneric
620
621   new_disks = []
622   for disk in disks:
623     new_disks.append(annotation_fn(disk.Copy(), ld_params))
624
625   return new_disks
626
627
628 #: Generic encoders
629 _ENCODERS = {
630   rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
631   rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
632   rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
633   rpc_defs.ED_COMPRESS: _Compress,
634   rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
635   rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
636   rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
637   }
638
639
640 class RpcRunner(_RpcClientBase,
641                 _generated_rpc.RpcClientDefault,
642                 _generated_rpc.RpcClientBootstrap,
643                 _generated_rpc.RpcClientDnsOnly,
644                 _generated_rpc.RpcClientConfig):
645   """RPC runner class.
646
647   """
648   def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
649     """Initialized the RPC runner.
650
651     @type cfg: L{config.ConfigWriter}
652     @param cfg: Configuration
653     @type lock_monitor_cb: callable
654     @param lock_monitor_cb: Lock monitor callback
655
656     """
657     self._cfg = cfg
658
659     encoders = _ENCODERS.copy()
660
661     encoders.update({
662       # Encoders requiring configuration object
663       rpc_defs.ED_INST_DICT: self._InstDict,
664       rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
665       rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
666
667       # Encoders annotating disk parameters
668       rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
669       rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
670
671       # Encoders with special requirements
672       rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
673       })
674
675     # Resolver using configuration
676     resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
677                               cfg.GetAllNodesInfo)
678
679     # Pylint doesn't recognize multiple inheritance properly, see
680     # <http://www.logilab.org/ticket/36586> and
681     # <http://www.logilab.org/ticket/35642>
682     # pylint: disable=W0233
683     _RpcClientBase.__init__(self, resolver, encoders.get,
684                             lock_monitor_cb=lock_monitor_cb,
685                             _req_process_fn=_req_process_fn)
686     _generated_rpc.RpcClientConfig.__init__(self)
687     _generated_rpc.RpcClientBootstrap.__init__(self)
688     _generated_rpc.RpcClientDnsOnly.__init__(self)
689     _generated_rpc.RpcClientDefault.__init__(self)
690
691   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
692     """Convert the given instance to a dict.
693
694     This is done via the instance's ToDict() method and additionally
695     we fill the hvparams with the cluster defaults.
696
697     @type instance: L{objects.Instance}
698     @param instance: an Instance object
699     @type hvp: dict or None
700     @param hvp: a dictionary with overridden hypervisor parameters
701     @type bep: dict or None
702     @param bep: a dictionary with overridden backend parameters
703     @type osp: dict or None
704     @param osp: a dictionary with overridden os parameters
705     @rtype: dict
706     @return: the instance dict, with the hvparams filled with the
707         cluster defaults
708
709     """
710     idict = instance.ToDict()
711     cluster = self._cfg.GetClusterInfo()
712     idict["hvparams"] = cluster.FillHV(instance)
713     if hvp is not None:
714       idict["hvparams"].update(hvp)
715     idict["beparams"] = cluster.FillBE(instance)
716     if bep is not None:
717       idict["beparams"].update(bep)
718     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
719     if osp is not None:
720       idict["osparams"].update(osp)
721     for nic in idict["nics"]:
722       nic["nicparams"] = objects.FillDict(
723         cluster.nicparams[constants.PP_DEFAULT],
724         nic["nicparams"])
725     idict["disks"] = self._DisksDictDP((instance.disks, instance))
726     return idict
727
728   def _InstDictHvpBepDp(self, (instance, hvp, bep)):
729     """Wrapper for L{_InstDict}.
730
731     """
732     return self._InstDict(instance, hvp=hvp, bep=bep)
733
734   def _InstDictOspDp(self, (instance, osparams)):
735     """Wrapper for L{_InstDict}.
736
737     """
738     return self._InstDict(instance, osp=osparams)
739
740   def _DisksDictDP(self, (disks, instance)):
741     """Wrapper for L{AnnotateDiskParams}.
742
743     """
744     diskparams = self._cfg.GetInstanceDiskParams(instance)
745     return [disk.ToDict()
746             for disk in AnnotateDiskParams(instance.disk_template,
747                                            disks, diskparams)]
748
749   def _SingleDiskDictDP(self, (disk, instance)):
750     """Wrapper for L{AnnotateDiskParams}.
751
752     """
753     (anno_disk,) = self._DisksDictDP(([disk], instance))
754     return anno_disk
755
756
757 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
758   """RPC wrappers for job queue.
759
760   """
761   def __init__(self, context, address_list):
762     """Initializes this class.
763
764     """
765     if address_list is None:
766       resolver = compat.partial(_SsconfResolver, True)
767     else:
768       # Caller provided an address list
769       resolver = _StaticResolver(address_list)
770
771     _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
772                             lock_monitor_cb=context.glm.AddToLockMonitor)
773     _generated_rpc.RpcClientJobQueue.__init__(self)
774
775
776 class BootstrapRunner(_RpcClientBase,
777                       _generated_rpc.RpcClientBootstrap,
778                       _generated_rpc.RpcClientDnsOnly):
779   """RPC wrappers for bootstrapping.
780
781   """
782   def __init__(self):
783     """Initializes this class.
784
785     """
786     # Pylint doesn't recognize multiple inheritance properly, see
787     # <http://www.logilab.org/ticket/36586> and
788     # <http://www.logilab.org/ticket/35642>
789     # pylint: disable=W0233
790     _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
791                             _ENCODERS.get)
792     _generated_rpc.RpcClientBootstrap.__init__(self)
793     _generated_rpc.RpcClientDnsOnly.__init__(self)
794
795
796 class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
797   """RPC wrappers for calls using only DNS.
798
799   """
800   def __init__(self):
801     """Initialize this class.
802
803     """
804     _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
805                             _ENCODERS.get)
806     _generated_rpc.RpcClientDnsOnly.__init__(self)
807
808
809 class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
810   """RPC wrappers for L{config}.
811
812   """
813   def __init__(self, context, address_list, _req_process_fn=None,
814                _getents=None):
815     """Initializes this class.
816
817     """
818     if context:
819       lock_monitor_cb = context.glm.AddToLockMonitor
820     else:
821       lock_monitor_cb = None
822
823     if address_list is None:
824       resolver = compat.partial(_SsconfResolver, True)
825     else:
826       # Caller provided an address list
827       resolver = _StaticResolver(address_list)
828
829     encoders = _ENCODERS.copy()
830
831     encoders.update({
832       rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
833       })
834
835     _RpcClientBase.__init__(self, resolver, encoders.get,
836                             lock_monitor_cb=lock_monitor_cb,
837                             _req_process_fn=_req_process_fn)
838     _generated_rpc.RpcClientConfig.__init__(self)