Fix the node powered field
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import logging
34 import zlib
35 import base64
36 import pycurl
37 import threading
38
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
45 from ganeti import netutils
46 from ganeti import ssconf
47 from ganeti import runtime
48 from ganeti import compat
49 from ganeti import rpc_defs
50 from ganeti import pathutils
51 from ganeti import vcluster
52
53 # Special module generated at build time
54 from ganeti import _generated_rpc
55
56 # pylint has a bug here, doesn't see this import
57 import ganeti.http.client  # pylint: disable=W0611
58
59
60 _RPC_CLIENT_HEADERS = [
61   "Content-type: %s" % http.HTTP_APP_JSON,
62   "Expect:",
63   ]
64
65 #: Special value to describe an offline host
66 _OFFLINE = object()
67
68
69 def Init():
70   """Initializes the module-global HTTP client manager.
71
72   Must be called before using any RPC function and while exactly one thread is
73   running.
74
75   """
76   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
77   # one thread running. This check is just a safety measure -- it doesn't
78   # cover all cases.
79   assert threading.activeCount() == 1, \
80          "Found more than one active thread when initializing pycURL"
81
82   logging.info("Using PycURL %s", pycurl.version)
83
84   pycurl.global_init(pycurl.GLOBAL_ALL)
85
86
87 def Shutdown():
88   """Stops the module-global HTTP client manager.
89
90   Must be called before quitting the program and while exactly one thread is
91   running.
92
93   """
94   pycurl.global_cleanup()
95
96
97 def _ConfigRpcCurl(curl):
98   noded_cert = str(pathutils.NODED_CERT_FILE)
99
100   curl.setopt(pycurl.FOLLOWLOCATION, False)
101   curl.setopt(pycurl.CAINFO, noded_cert)
102   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
103   curl.setopt(pycurl.SSL_VERIFYPEER, True)
104   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
105   curl.setopt(pycurl.SSLCERT, noded_cert)
106   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
107   curl.setopt(pycurl.SSLKEY, noded_cert)
108   curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
109
110
111 def RunWithRPC(fn):
112   """RPC-wrapper decorator.
113
114   When applied to a function, it runs it with the RPC system
115   initialized, and it shutsdown the system afterwards. This means the
116   function must be called without RPC being initialized.
117
118   """
119   def wrapper(*args, **kwargs):
120     Init()
121     try:
122       return fn(*args, **kwargs)
123     finally:
124       Shutdown()
125   return wrapper
126
127
128 def _Compress(data):
129   """Compresses a string for transport over RPC.
130
131   Small amounts of data are not compressed.
132
133   @type data: str
134   @param data: Data
135   @rtype: tuple
136   @return: Encoded data to send
137
138   """
139   # Small amounts of data are not compressed
140   if len(data) < 512:
141     return (constants.RPC_ENCODING_NONE, data)
142
143   # Compress with zlib and encode in base64
144   return (constants.RPC_ENCODING_ZLIB_BASE64,
145           base64.b64encode(zlib.compress(data, 3)))
146
147
148 class RpcResult(object):
149   """RPC Result class.
150
151   This class holds an RPC result. It is needed since in multi-node
152   calls we can't raise an exception just because one one out of many
153   failed, and therefore we use this class to encapsulate the result.
154
155   @ivar data: the data payload, for successful results, or None
156   @ivar call: the name of the RPC call
157   @ivar node: the name of the node to which we made the call
158   @ivar offline: whether the operation failed because the node was
159       offline, as opposed to actual failure; offline=True will always
160       imply failed=True, in order to allow simpler checking if
161       the user doesn't care about the exact failure mode
162   @ivar fail_msg: the error message if the call failed
163
164   """
165   def __init__(self, data=None, failed=False, offline=False,
166                call=None, node=None):
167     self.offline = offline
168     self.call = call
169     self.node = node
170
171     if offline:
172       self.fail_msg = "Node is marked offline"
173       self.data = self.payload = None
174     elif failed:
175       self.fail_msg = self._EnsureErr(data)
176       self.data = self.payload = None
177     else:
178       self.data = data
179       if not isinstance(self.data, (tuple, list)):
180         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
181                          type(self.data))
182         self.payload = None
183       elif len(data) != 2:
184         self.fail_msg = ("RPC layer error: invalid result length (%d), "
185                          "expected 2" % len(self.data))
186         self.payload = None
187       elif not self.data[0]:
188         self.fail_msg = self._EnsureErr(self.data[1])
189         self.payload = None
190       else:
191         # finally success
192         self.fail_msg = None
193         self.payload = data[1]
194
195     for attr_name in ["call", "data", "fail_msg",
196                       "node", "offline", "payload"]:
197       assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
198
199   @staticmethod
200   def _EnsureErr(val):
201     """Helper to ensure we return a 'True' value for error."""
202     if val:
203       return val
204     else:
205       return "No error information"
206
207   def Raise(self, msg, prereq=False, ecode=None):
208     """If the result has failed, raise an OpExecError.
209
210     This is used so that LU code doesn't have to check for each
211     result, but instead can call this function.
212
213     """
214     if not self.fail_msg:
215       return
216
217     if not msg: # one could pass None for default message
218       msg = ("Call '%s' to node '%s' has failed: %s" %
219              (self.call, self.node, self.fail_msg))
220     else:
221       msg = "%s: %s" % (msg, self.fail_msg)
222     if prereq:
223       ec = errors.OpPrereqError
224     else:
225       ec = errors.OpExecError
226     if ecode is not None:
227       args = (msg, ecode)
228     else:
229       args = (msg, )
230     raise ec(*args) # pylint: disable=W0142
231
232
233 def _SsconfResolver(ssconf_ips, node_list, _,
234                     ssc=ssconf.SimpleStore,
235                     nslookup_fn=netutils.Hostname.GetIP):
236   """Return addresses for given node names.
237
238   @type ssconf_ips: bool
239   @param ssconf_ips: Use the ssconf IPs
240   @type node_list: list
241   @param node_list: List of node names
242   @type ssc: class
243   @param ssc: SimpleStore class that is used to obtain node->ip mappings
244   @type nslookup_fn: callable
245   @param nslookup_fn: function use to do NS lookup
246   @rtype: list of tuple; (string, string)
247   @return: List of tuples containing node name and IP address
248
249   """
250   ss = ssc()
251   family = ss.GetPrimaryIPFamily()
252
253   if ssconf_ips:
254     iplist = ss.GetNodePrimaryIPList()
255     ipmap = dict(entry.split() for entry in iplist)
256   else:
257     ipmap = {}
258
259   result = []
260   for node in node_list:
261     ip = ipmap.get(node)
262     if ip is None:
263       ip = nslookup_fn(node, family=family)
264     result.append((node, ip))
265
266   return result
267
268
269 class _StaticResolver:
270   def __init__(self, addresses):
271     """Initializes this class.
272
273     """
274     self._addresses = addresses
275
276   def __call__(self, hosts, _):
277     """Returns static addresses for hosts.
278
279     """
280     assert len(hosts) == len(self._addresses)
281     return zip(hosts, self._addresses)
282
283
284 def _CheckConfigNode(name, node, accept_offline_node):
285   """Checks if a node is online.
286
287   @type name: string
288   @param name: Node name
289   @type node: L{objects.Node} or None
290   @param node: Node object
291
292   """
293   if node is None:
294     # Depend on DNS for name resolution
295     ip = name
296   elif node.offline and not accept_offline_node:
297     ip = _OFFLINE
298   else:
299     ip = node.primary_ip
300   return (name, ip)
301
302
303 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
304   """Calculate node addresses using configuration.
305
306   """
307   accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
308
309   assert accept_offline_node or opts is None, "Unknown option"
310
311   # Special case for single-host lookups
312   if len(hosts) == 1:
313     (name, ) = hosts
314     return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
315   else:
316     all_nodes = all_nodes_fn()
317     return [_CheckConfigNode(name, all_nodes.get(name, None),
318                              accept_offline_node)
319             for name in hosts]
320
321
322 class _RpcProcessor:
323   def __init__(self, resolver, port, lock_monitor_cb=None):
324     """Initializes this class.
325
326     @param resolver: callable accepting a list of hostnames, returning a list
327       of tuples containing name and IP address (IP address can be the name or
328       the special value L{_OFFLINE} to mark offline machines)
329     @type port: int
330     @param port: TCP port
331     @param lock_monitor_cb: Callable for registering with lock monitor
332
333     """
334     self._resolver = resolver
335     self._port = port
336     self._lock_monitor_cb = lock_monitor_cb
337
338   @staticmethod
339   def _PrepareRequests(hosts, port, procedure, body, read_timeout):
340     """Prepares requests by sorting offline hosts into separate list.
341
342     @type body: dict
343     @param body: a dictionary with per-host body data
344
345     """
346     results = {}
347     requests = {}
348
349     assert isinstance(body, dict)
350     assert len(body) == len(hosts)
351     assert compat.all(isinstance(v, str) for v in body.values())
352     assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
353         "%s != %s" % (hosts, body.keys())
354
355     for (name, ip) in hosts:
356       if ip is _OFFLINE:
357         # Node is marked as offline
358         results[name] = RpcResult(node=name, offline=True, call=procedure)
359       else:
360         requests[name] = \
361           http.client.HttpClientRequest(str(ip), port,
362                                         http.HTTP_POST, str("/%s" % procedure),
363                                         headers=_RPC_CLIENT_HEADERS,
364                                         post_data=body[name],
365                                         read_timeout=read_timeout,
366                                         nicename="%s/%s" % (name, procedure),
367                                         curl_config_fn=_ConfigRpcCurl)
368
369     return (results, requests)
370
371   @staticmethod
372   def _CombineResults(results, requests, procedure):
373     """Combines pre-computed results for offline hosts with actual call results.
374
375     """
376     for name, req in requests.items():
377       if req.success and req.resp_status_code == http.HTTP_OK:
378         host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
379                                 node=name, call=procedure)
380       else:
381         # TODO: Better error reporting
382         if req.error:
383           msg = req.error
384         else:
385           msg = req.resp_body
386
387         logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
388         host_result = RpcResult(data=msg, failed=True, node=name,
389                                 call=procedure)
390
391       results[name] = host_result
392
393     return results
394
395   def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
396                _req_process_fn=None):
397     """Makes an RPC request to a number of nodes.
398
399     @type hosts: sequence
400     @param hosts: Hostnames
401     @type procedure: string
402     @param procedure: Request path
403     @type body: dictionary
404     @param body: dictionary with request bodies per host
405     @type read_timeout: int or None
406     @param read_timeout: Read timeout for request
407
408     """
409     assert read_timeout is not None, \
410       "Missing RPC read timeout for procedure '%s'" % procedure
411
412     if _req_process_fn is None:
413       _req_process_fn = http.client.ProcessRequests
414
415     (results, requests) = \
416       self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
417                             procedure, body, read_timeout)
418
419     _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
420
421     assert not frozenset(results).intersection(requests)
422
423     return self._CombineResults(results, requests, procedure)
424
425
426 class _RpcClientBase:
427   def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
428                _req_process_fn=None):
429     """Initializes this class.
430
431     """
432     proc = _RpcProcessor(resolver,
433                          netutils.GetDaemonPort(constants.NODED),
434                          lock_monitor_cb=lock_monitor_cb)
435     self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
436     self._encoder = compat.partial(self._EncodeArg, encoder_fn)
437
438   @staticmethod
439   def _EncodeArg(encoder_fn, (argkind, value)):
440     """Encode argument.
441
442     """
443     if argkind is None:
444       return value
445     else:
446       return encoder_fn(argkind)(value)
447
448   def _Call(self, cdef, node_list, args):
449     """Entry point for automatically generated RPC wrappers.
450
451     """
452     (procedure, _, resolver_opts, timeout, argdefs,
453      prep_fn, postproc_fn, _) = cdef
454
455     if callable(timeout):
456       read_timeout = timeout(args)
457     else:
458       read_timeout = timeout
459
460     if callable(resolver_opts):
461       req_resolver_opts = resolver_opts(args)
462     else:
463       req_resolver_opts = resolver_opts
464
465     if len(args) != len(argdefs):
466       raise errors.ProgrammerError("Number of passed arguments doesn't match")
467
468     enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
469     if prep_fn is None:
470       # for a no-op prep_fn, we serialise the body once, and then we
471       # reuse it in the dictionary values
472       body = serializer.DumpJson(enc_args)
473       pnbody = dict((n, body) for n in node_list)
474     else:
475       # for a custom prep_fn, we pass the encoded arguments and the
476       # node name to the prep_fn, and we serialise its return value
477       assert callable(prep_fn)
478       pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
479                     for n in node_list)
480
481     result = self._proc(node_list, procedure, pnbody, read_timeout,
482                         req_resolver_opts)
483
484     if postproc_fn:
485       return dict(map(lambda (key, value): (key, postproc_fn(value)),
486                       result.items()))
487     else:
488       return result
489
490
491 def _ObjectToDict(value):
492   """Converts an object to a dictionary.
493
494   @note: See L{objects}.
495
496   """
497   return value.ToDict()
498
499
500 def _ObjectListToDict(value):
501   """Converts a list of L{objects} to dictionaries.
502
503   """
504   return map(_ObjectToDict, value)
505
506
507 def _EncodeNodeToDiskDict(value):
508   """Encodes a dictionary with node name as key and disk objects as values.
509
510   """
511   return dict((name, _ObjectListToDict(disks))
512               for name, disks in value.items())
513
514
515 def _PrepareFileUpload(getents_fn, filename):
516   """Loads a file and prepares it for an upload to nodes.
517
518   """
519   statcb = utils.FileStatHelper()
520   data = _Compress(utils.ReadFile(filename, preread=statcb))
521   st = statcb.st
522
523   if getents_fn is None:
524     getents_fn = runtime.GetEnts
525
526   getents = getents_fn()
527
528   virt_filename = vcluster.MakeVirtualPath(filename)
529
530   return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid),
531           getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
532
533
534 def _PrepareFinalizeExportDisks(snap_disks):
535   """Encodes disks for finalizing export.
536
537   """
538   flat_disks = []
539
540   for disk in snap_disks:
541     if isinstance(disk, bool):
542       flat_disks.append(disk)
543     else:
544       flat_disks.append(disk.ToDict())
545
546   return flat_disks
547
548
549 def _EncodeImportExportIO((ieio, ieioargs)):
550   """Encodes import/export I/O information.
551
552   """
553   if ieio == constants.IEIO_RAW_DISK:
554     assert len(ieioargs) == 1
555     return (ieio, (ieioargs[0].ToDict(), ))
556
557   if ieio == constants.IEIO_SCRIPT:
558     assert len(ieioargs) == 2
559     return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
560
561   return (ieio, ieioargs)
562
563
564 def _EncodeBlockdevRename(value):
565   """Encodes information for renaming block devices.
566
567   """
568   return [(d.ToDict(), uid) for d, uid in value]
569
570
571 def MakeLegacyNodeInfo(data):
572   """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
573
574   Converts the data into a single dictionary. This is fine for most use cases,
575   but some require information from more than one volume group or hypervisor.
576
577   """
578   (bootid, (vg_info, ), (hv_info, )) = data
579
580   return utils.JoinDisjointDicts(utils.JoinDisjointDicts(vg_info, hv_info), {
581     "bootid": bootid,
582     })
583
584
585 def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
586   """Annotates just DRBD disks layouts.
587
588   """
589   assert disk.dev_type == constants.LD_DRBD8
590
591   disk.params = objects.FillDict(drbd_params, disk.params)
592   (dev_data, dev_meta) = disk.children
593   dev_data.params = objects.FillDict(data_params, dev_data.params)
594   dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
595
596   return disk
597
598
599 def _AnnotateDParamsGeneric(disk, (params, )):
600   """Generic disk parameter annotation routine.
601
602   """
603   assert disk.dev_type != constants.LD_DRBD8
604
605   disk.params = objects.FillDict(params, disk.params)
606
607   return disk
608
609
610 def AnnotateDiskParams(template, disks, disk_params):
611   """Annotates the disk objects with the disk parameters.
612
613   @param template: The disk template used
614   @param disks: The list of disks objects to annotate
615   @param disk_params: The disk paramaters for annotation
616   @returns: A list of disk objects annotated
617
618   """
619   ld_params = objects.Disk.ComputeLDParams(template, disk_params)
620
621   if template == constants.DT_DRBD8:
622     annotation_fn = _AnnotateDParamsDRBD
623   elif template == constants.DT_DISKLESS:
624     annotation_fn = lambda disk, _: disk
625   else:
626     annotation_fn = _AnnotateDParamsGeneric
627
628   return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
629
630
631 #: Generic encoders
632 _ENCODERS = {
633   rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
634   rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
635   rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
636   rpc_defs.ED_COMPRESS: _Compress,
637   rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
638   rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
639   rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
640   }
641
642
643 class RpcRunner(_RpcClientBase,
644                 _generated_rpc.RpcClientDefault,
645                 _generated_rpc.RpcClientBootstrap,
646                 _generated_rpc.RpcClientDnsOnly,
647                 _generated_rpc.RpcClientConfig):
648   """RPC runner class.
649
650   """
651   def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
652     """Initialized the RPC runner.
653
654     @type cfg: L{config.ConfigWriter}
655     @param cfg: Configuration
656     @type lock_monitor_cb: callable
657     @param lock_monitor_cb: Lock monitor callback
658
659     """
660     self._cfg = cfg
661
662     encoders = _ENCODERS.copy()
663
664     encoders.update({
665       # Encoders requiring configuration object
666       rpc_defs.ED_INST_DICT: self._InstDict,
667       rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
668       rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
669
670       # Encoders annotating disk parameters
671       rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
672       rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
673
674       # Encoders with special requirements
675       rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
676       })
677
678     # Resolver using configuration
679     resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
680                               cfg.GetAllNodesInfo)
681
682     # Pylint doesn't recognize multiple inheritance properly, see
683     # <http://www.logilab.org/ticket/36586> and
684     # <http://www.logilab.org/ticket/35642>
685     # pylint: disable=W0233
686     _RpcClientBase.__init__(self, resolver, encoders.get,
687                             lock_monitor_cb=lock_monitor_cb,
688                             _req_process_fn=_req_process_fn)
689     _generated_rpc.RpcClientConfig.__init__(self)
690     _generated_rpc.RpcClientBootstrap.__init__(self)
691     _generated_rpc.RpcClientDnsOnly.__init__(self)
692     _generated_rpc.RpcClientDefault.__init__(self)
693
694   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
695     """Convert the given instance to a dict.
696
697     This is done via the instance's ToDict() method and additionally
698     we fill the hvparams with the cluster defaults.
699
700     @type instance: L{objects.Instance}
701     @param instance: an Instance object
702     @type hvp: dict or None
703     @param hvp: a dictionary with overridden hypervisor parameters
704     @type bep: dict or None
705     @param bep: a dictionary with overridden backend parameters
706     @type osp: dict or None
707     @param osp: a dictionary with overridden os parameters
708     @rtype: dict
709     @return: the instance dict, with the hvparams filled with the
710         cluster defaults
711
712     """
713     idict = instance.ToDict()
714     cluster = self._cfg.GetClusterInfo()
715     idict["hvparams"] = cluster.FillHV(instance)
716     if hvp is not None:
717       idict["hvparams"].update(hvp)
718     idict["beparams"] = cluster.FillBE(instance)
719     if bep is not None:
720       idict["beparams"].update(bep)
721     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
722     if osp is not None:
723       idict["osparams"].update(osp)
724     for nic in idict["nics"]:
725       nic["nicparams"] = objects.FillDict(
726         cluster.nicparams[constants.PP_DEFAULT],
727         nic["nicparams"])
728     idict["disks"] = self._DisksDictDP((instance.disks, instance))
729     return idict
730
731   def _InstDictHvpBepDp(self, (instance, hvp, bep)):
732     """Wrapper for L{_InstDict}.
733
734     """
735     return self._InstDict(instance, hvp=hvp, bep=bep)
736
737   def _InstDictOspDp(self, (instance, osparams)):
738     """Wrapper for L{_InstDict}.
739
740     """
741     return self._InstDict(instance, osp=osparams)
742
743   def _DisksDictDP(self, (disks, instance)):
744     """Wrapper for L{AnnotateDiskParams}.
745
746     """
747     diskparams = self._cfg.GetInstanceDiskParams(instance)
748     return [disk.ToDict()
749             for disk in AnnotateDiskParams(instance.disk_template,
750                                            disks, diskparams)]
751
752   def _SingleDiskDictDP(self, (disk, instance)):
753     """Wrapper for L{AnnotateDiskParams}.
754
755     """
756     (anno_disk,) = self._DisksDictDP(([disk], instance))
757     return anno_disk
758
759
760 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
761   """RPC wrappers for job queue.
762
763   """
764   def __init__(self, context, address_list):
765     """Initializes this class.
766
767     """
768     if address_list is None:
769       resolver = compat.partial(_SsconfResolver, True)
770     else:
771       # Caller provided an address list
772       resolver = _StaticResolver(address_list)
773
774     _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
775                             lock_monitor_cb=context.glm.AddToLockMonitor)
776     _generated_rpc.RpcClientJobQueue.__init__(self)
777
778
779 class BootstrapRunner(_RpcClientBase,
780                       _generated_rpc.RpcClientBootstrap,
781                       _generated_rpc.RpcClientDnsOnly):
782   """RPC wrappers for bootstrapping.
783
784   """
785   def __init__(self):
786     """Initializes this class.
787
788     """
789     # Pylint doesn't recognize multiple inheritance properly, see
790     # <http://www.logilab.org/ticket/36586> and
791     # <http://www.logilab.org/ticket/35642>
792     # pylint: disable=W0233
793     _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
794                             _ENCODERS.get)
795     _generated_rpc.RpcClientBootstrap.__init__(self)
796     _generated_rpc.RpcClientDnsOnly.__init__(self)
797
798
799 class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
800   """RPC wrappers for calls using only DNS.
801
802   """
803   def __init__(self):
804     """Initialize this class.
805
806     """
807     _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
808                             _ENCODERS.get)
809     _generated_rpc.RpcClientDnsOnly.__init__(self)
810
811
812 class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
813   """RPC wrappers for L{config}.
814
815   """
816   def __init__(self, context, address_list, _req_process_fn=None,
817                _getents=None):
818     """Initializes this class.
819
820     """
821     if context:
822       lock_monitor_cb = context.glm.AddToLockMonitor
823     else:
824       lock_monitor_cb = None
825
826     if address_list is None:
827       resolver = compat.partial(_SsconfResolver, True)
828     else:
829       # Caller provided an address list
830       resolver = _StaticResolver(address_list)
831
832     encoders = _ENCODERS.copy()
833
834     encoders.update({
835       rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
836       })
837
838     _RpcClientBase.__init__(self, resolver, encoders.get,
839                             lock_monitor_cb=lock_monitor_cb,
840                             _req_process_fn=_req_process_fn)
841     _generated_rpc.RpcClientConfig.__init__(self)