Merge branch 'stable-2.6-hotplug' into stable-2.6-ippool-hotplug-esi
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import logging
34 import zlib
35 import base64
36 import pycurl
37 import threading
38 import copy
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49 from ganeti import compat
50 from ganeti import rpc_defs
51
52 # Special module generated at build time
53 from ganeti import _generated_rpc
54
55 # pylint has a bug here, doesn't see this import
56 import ganeti.http.client  # pylint: disable=W0611
57
58
59 # Timeout for connecting to nodes (seconds)
60 _RPC_CONNECT_TIMEOUT = 5
61
62 _RPC_CLIENT_HEADERS = [
63   "Content-type: %s" % http.HTTP_APP_JSON,
64   "Expect:",
65   ]
66
67 # Various time constants for the timeout table
68 _TMO_URGENT = 60 # one minute
69 _TMO_FAST = 5 * 60 # five minutes
70 _TMO_NORMAL = 15 * 60 # 15 minutes
71 _TMO_SLOW = 3600 # one hour
72 _TMO_4HRS = 4 * 3600
73 _TMO_1DAY = 86400
74
75 #: Special value to describe an offline host
76 _OFFLINE = object()
77
78
79 def Init():
80   """Initializes the module-global HTTP client manager.
81
82   Must be called before using any RPC function and while exactly one thread is
83   running.
84
85   """
86   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
87   # one thread running. This check is just a safety measure -- it doesn't
88   # cover all cases.
89   assert threading.activeCount() == 1, \
90          "Found more than one active thread when initializing pycURL"
91
92   logging.info("Using PycURL %s", pycurl.version)
93
94   pycurl.global_init(pycurl.GLOBAL_ALL)
95
96
97 def Shutdown():
98   """Stops the module-global HTTP client manager.
99
100   Must be called before quitting the program and while exactly one thread is
101   running.
102
103   """
104   pycurl.global_cleanup()
105
106
107 def _ConfigRpcCurl(curl):
108   noded_cert = str(constants.NODED_CERT_FILE)
109
110   curl.setopt(pycurl.FOLLOWLOCATION, False)
111   curl.setopt(pycurl.CAINFO, noded_cert)
112   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
113   curl.setopt(pycurl.SSL_VERIFYPEER, True)
114   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
115   curl.setopt(pycurl.SSLCERT, noded_cert)
116   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
117   curl.setopt(pycurl.SSLKEY, noded_cert)
118   curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
119
120
121 def RunWithRPC(fn):
122   """RPC-wrapper decorator.
123
124   When applied to a function, it runs it with the RPC system
125   initialized, and it shutsdown the system afterwards. This means the
126   function must be called without RPC being initialized.
127
128   """
129   def wrapper(*args, **kwargs):
130     Init()
131     try:
132       return fn(*args, **kwargs)
133     finally:
134       Shutdown()
135   return wrapper
136
137
138 def _Compress(data):
139   """Compresses a string for transport over RPC.
140
141   Small amounts of data are not compressed.
142
143   @type data: str
144   @param data: Data
145   @rtype: tuple
146   @return: Encoded data to send
147
148   """
149   # Small amounts of data are not compressed
150   if len(data) < 512:
151     return (constants.RPC_ENCODING_NONE, data)
152
153   # Compress with zlib and encode in base64
154   return (constants.RPC_ENCODING_ZLIB_BASE64,
155           base64.b64encode(zlib.compress(data, 3)))
156
157
158 class RpcResult(object):
159   """RPC Result class.
160
161   This class holds an RPC result. It is needed since in multi-node
162   calls we can't raise an exception just because one one out of many
163   failed, and therefore we use this class to encapsulate the result.
164
165   @ivar data: the data payload, for successful results, or None
166   @ivar call: the name of the RPC call
167   @ivar node: the name of the node to which we made the call
168   @ivar offline: whether the operation failed because the node was
169       offline, as opposed to actual failure; offline=True will always
170       imply failed=True, in order to allow simpler checking if
171       the user doesn't care about the exact failure mode
172   @ivar fail_msg: the error message if the call failed
173
174   """
175   def __init__(self, data=None, failed=False, offline=False,
176                call=None, node=None):
177     self.offline = offline
178     self.call = call
179     self.node = node
180
181     if offline:
182       self.fail_msg = "Node is marked offline"
183       self.data = self.payload = None
184     elif failed:
185       self.fail_msg = self._EnsureErr(data)
186       self.data = self.payload = None
187     else:
188       self.data = data
189       if not isinstance(self.data, (tuple, list)):
190         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
191                          type(self.data))
192         self.payload = None
193       elif len(data) != 2:
194         self.fail_msg = ("RPC layer error: invalid result length (%d), "
195                          "expected 2" % len(self.data))
196         self.payload = None
197       elif not self.data[0]:
198         self.fail_msg = self._EnsureErr(self.data[1])
199         self.payload = None
200       else:
201         # finally success
202         self.fail_msg = None
203         self.payload = data[1]
204
205     for attr_name in ["call", "data", "fail_msg",
206                       "node", "offline", "payload"]:
207       assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
208
209   @staticmethod
210   def _EnsureErr(val):
211     """Helper to ensure we return a 'True' value for error."""
212     if val:
213       return val
214     else:
215       return "No error information"
216
217   def Raise(self, msg, prereq=False, ecode=None):
218     """If the result has failed, raise an OpExecError.
219
220     This is used so that LU code doesn't have to check for each
221     result, but instead can call this function.
222
223     """
224     if not self.fail_msg:
225       return
226
227     if not msg: # one could pass None for default message
228       msg = ("Call '%s' to node '%s' has failed: %s" %
229              (self.call, self.node, self.fail_msg))
230     else:
231       msg = "%s: %s" % (msg, self.fail_msg)
232     if prereq:
233       ec = errors.OpPrereqError
234     else:
235       ec = errors.OpExecError
236     if ecode is not None:
237       args = (msg, ecode)
238     else:
239       args = (msg, )
240     raise ec(*args) # pylint: disable=W0142
241
242
243 def _SsconfResolver(ssconf_ips, node_list, _,
244                     ssc=ssconf.SimpleStore,
245                     nslookup_fn=netutils.Hostname.GetIP):
246   """Return addresses for given node names.
247
248   @type ssconf_ips: bool
249   @param ssconf_ips: Use the ssconf IPs
250   @type node_list: list
251   @param node_list: List of node names
252   @type ssc: class
253   @param ssc: SimpleStore class that is used to obtain node->ip mappings
254   @type nslookup_fn: callable
255   @param nslookup_fn: function use to do NS lookup
256   @rtype: list of tuple; (string, string)
257   @return: List of tuples containing node name and IP address
258
259   """
260   ss = ssc()
261   family = ss.GetPrimaryIPFamily()
262
263   if ssconf_ips:
264     iplist = ss.GetNodePrimaryIPList()
265     ipmap = dict(entry.split() for entry in iplist)
266   else:
267     ipmap = {}
268
269   result = []
270   for node in node_list:
271     ip = ipmap.get(node)
272     if ip is None:
273       ip = nslookup_fn(node, family=family)
274     result.append((node, ip))
275
276   return result
277
278
279 class _StaticResolver:
280   def __init__(self, addresses):
281     """Initializes this class.
282
283     """
284     self._addresses = addresses
285
286   def __call__(self, hosts, _):
287     """Returns static addresses for hosts.
288
289     """
290     assert len(hosts) == len(self._addresses)
291     return zip(hosts, self._addresses)
292
293
294 def _CheckConfigNode(name, node, accept_offline_node):
295   """Checks if a node is online.
296
297   @type name: string
298   @param name: Node name
299   @type node: L{objects.Node} or None
300   @param node: Node object
301
302   """
303   if node is None:
304     # Depend on DNS for name resolution
305     ip = name
306   elif node.offline and not accept_offline_node:
307     ip = _OFFLINE
308   else:
309     ip = node.primary_ip
310   return (name, ip)
311
312
313 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
314   """Calculate node addresses using configuration.
315
316   """
317   accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
318
319   assert accept_offline_node or opts is None, "Unknown option"
320
321   # Special case for single-host lookups
322   if len(hosts) == 1:
323     (name, ) = hosts
324     return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
325   else:
326     all_nodes = all_nodes_fn()
327     return [_CheckConfigNode(name, all_nodes.get(name, None),
328                              accept_offline_node)
329             for name in hosts]
330
331
332 class _RpcProcessor:
333   def __init__(self, resolver, port, lock_monitor_cb=None):
334     """Initializes this class.
335
336     @param resolver: callable accepting a list of hostnames, returning a list
337       of tuples containing name and IP address (IP address can be the name or
338       the special value L{_OFFLINE} to mark offline machines)
339     @type port: int
340     @param port: TCP port
341     @param lock_monitor_cb: Callable for registering with lock monitor
342
343     """
344     self._resolver = resolver
345     self._port = port
346     self._lock_monitor_cb = lock_monitor_cb
347
348   @staticmethod
349   def _PrepareRequests(hosts, port, procedure, body, read_timeout):
350     """Prepares requests by sorting offline hosts into separate list.
351
352     @type body: dict
353     @param body: a dictionary with per-host body data
354
355     """
356     results = {}
357     requests = {}
358
359     assert isinstance(body, dict)
360     assert len(body) == len(hosts)
361     assert compat.all(isinstance(v, str) for v in body.values())
362     assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
363         "%s != %s" % (hosts, body.keys())
364
365     for (name, ip) in hosts:
366       if ip is _OFFLINE:
367         # Node is marked as offline
368         results[name] = RpcResult(node=name, offline=True, call=procedure)
369       else:
370         requests[name] = \
371           http.client.HttpClientRequest(str(ip), port,
372                                         http.HTTP_POST, str("/%s" % procedure),
373                                         headers=_RPC_CLIENT_HEADERS,
374                                         post_data=body[name],
375                                         read_timeout=read_timeout,
376                                         nicename="%s/%s" % (name, procedure),
377                                         curl_config_fn=_ConfigRpcCurl)
378
379     return (results, requests)
380
381   @staticmethod
382   def _CombineResults(results, requests, procedure):
383     """Combines pre-computed results for offline hosts with actual call results.
384
385     """
386     for name, req in requests.items():
387       if req.success and req.resp_status_code == http.HTTP_OK:
388         host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
389                                 node=name, call=procedure)
390       else:
391         # TODO: Better error reporting
392         if req.error:
393           msg = req.error
394         else:
395           msg = req.resp_body
396
397         logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
398         host_result = RpcResult(data=msg, failed=True, node=name,
399                                 call=procedure)
400
401       results[name] = host_result
402
403     return results
404
405   def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
406                _req_process_fn=None):
407     """Makes an RPC request to a number of nodes.
408
409     @type hosts: sequence
410     @param hosts: Hostnames
411     @type procedure: string
412     @param procedure: Request path
413     @type body: dictionary
414     @param body: dictionary with request bodies per host
415     @type read_timeout: int or None
416     @param read_timeout: Read timeout for request
417
418     """
419     assert read_timeout is not None, \
420       "Missing RPC read timeout for procedure '%s'" % procedure
421
422     if _req_process_fn is None:
423       _req_process_fn = http.client.ProcessRequests
424
425     (results, requests) = \
426       self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
427                             procedure, body, read_timeout)
428
429     _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
430
431     assert not frozenset(results).intersection(requests)
432
433     return self._CombineResults(results, requests, procedure)
434
435
436 class _RpcClientBase:
437   def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
438                _req_process_fn=None):
439     """Initializes this class.
440
441     """
442     proc = _RpcProcessor(resolver,
443                          netutils.GetDaemonPort(constants.NODED),
444                          lock_monitor_cb=lock_monitor_cb)
445     self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
446     self._encoder = compat.partial(self._EncodeArg, encoder_fn)
447
448   @staticmethod
449   def _EncodeArg(encoder_fn, (argkind, value)):
450     """Encode argument.
451
452     """
453     if argkind is None:
454       return value
455     else:
456       return encoder_fn(argkind)(value)
457
458   def _Call(self, cdef, node_list, args):
459     """Entry point for automatically generated RPC wrappers.
460
461     """
462     (procedure, _, resolver_opts, timeout, argdefs,
463      prep_fn, postproc_fn, _) = cdef
464
465     if callable(timeout):
466       read_timeout = timeout(args)
467     else:
468       read_timeout = timeout
469
470     if callable(resolver_opts):
471       req_resolver_opts = resolver_opts(args)
472     else:
473       req_resolver_opts = resolver_opts
474
475     if len(args) != len(argdefs):
476       raise errors.ProgrammerError("Number of passed arguments doesn't match")
477
478     enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
479     if prep_fn is None:
480       # for a no-op prep_fn, we serialise the body once, and then we
481       # reuse it in the dictionary values
482       body = serializer.DumpJson(enc_args)
483       pnbody = dict((n, body) for n in node_list)
484     else:
485       # for a custom prep_fn, we pass the encoded arguments and the
486       # node name to the prep_fn, and we serialise its return value
487       assert callable(prep_fn)
488       pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
489                     for n in node_list)
490
491     result = self._proc(node_list, procedure, pnbody, read_timeout,
492                         req_resolver_opts)
493
494     if postproc_fn:
495       return dict(map(lambda (key, value): (key, postproc_fn(value)),
496                       result.items()))
497     else:
498       return result
499
500
501 def _ObjectToDict(value):
502   """Converts an object to a dictionary.
503
504   @note: See L{objects}.
505
506   """
507   return value.ToDict()
508
509
510 def _ObjectListToDict(value):
511   """Converts a list of L{objects} to dictionaries.
512
513   """
514   return map(_ObjectToDict, value)
515
516
517 def _EncodeNodeToDiskDict(value):
518   """Encodes a dictionary with node name as key and disk objects as values.
519
520   """
521   return dict((name, _ObjectListToDict(disks))
522               for name, disks in value.items())
523
524
525 def _PrepareFileUpload(getents_fn, filename):
526   """Loads a file and prepares it for an upload to nodes.
527
528   """
529   statcb = utils.FileStatHelper()
530   data = _Compress(utils.ReadFile(filename, preread=statcb))
531   st = statcb.st
532
533   if getents_fn is None:
534     getents_fn = runtime.GetEnts
535
536   getents = getents_fn()
537
538   return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
539           getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
540
541
542 def _PrepareFinalizeExportDisks(snap_disks):
543   """Encodes disks for finalizing export.
544
545   """
546   flat_disks = []
547
548   for disk in snap_disks:
549     if isinstance(disk, bool):
550       flat_disks.append(disk)
551     else:
552       flat_disks.append(disk.ToDict())
553
554   return flat_disks
555
556
557 def _EncodeImportExportIO((ieio, ieioargs)):
558   """Encodes import/export I/O information.
559
560   """
561   if ieio == constants.IEIO_RAW_DISK:
562     assert len(ieioargs) == 1
563     return (ieio, (ieioargs[0].ToDict(), ))
564
565   if ieio == constants.IEIO_SCRIPT:
566     assert len(ieioargs) == 2
567     return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
568
569   return (ieio, ieioargs)
570
571
572 def _EncodeBlockdevRename(value):
573   """Encodes information for renaming block devices.
574
575   """
576   return [(d.ToDict(), uid) for d, uid in value]
577
578
579 def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
580   """Annotates just DRBD disks layouts.
581
582   """
583   assert disk.dev_type == constants.LD_DRBD8
584
585   disk.params = objects.FillDict(drbd_params, disk.params)
586   (dev_data, dev_meta) = disk.children
587   dev_data.params = objects.FillDict(data_params, dev_data.params)
588   dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
589
590   return disk
591
592
593 def _AnnotateDParamsGeneric(disk, (params, )):
594   """Generic disk parameter annotation routine.
595
596   """
597   assert disk.dev_type != constants.LD_DRBD8
598
599   disk.params = objects.FillDict(params, disk.params)
600
601   return disk
602
603
604 def AnnotateDiskParams(template, disks, disk_params):
605   """Annotates the disk objects with the disk parameters.
606
607   @param template: The disk template used
608   @param disks: The list of disks objects to annotate
609   @param disk_params: The disk paramaters for annotation
610   @returns: A list of disk objects annotated
611
612   """
613   ld_params = objects.Disk.ComputeLDParams(template, disk_params)
614
615   if template == constants.DT_DRBD8:
616     annotation_fn = _AnnotateDParamsDRBD
617   elif template == constants.DT_DISKLESS:
618     annotation_fn = lambda disk, _: disk
619   else:
620     annotation_fn = _AnnotateDParamsGeneric
621
622   new_disks = []
623   for disk in disks:
624     new_disks.append(annotation_fn(disk.Copy(), ld_params))
625
626   return new_disks
627
628
629 #: Generic encoders
630 _ENCODERS = {
631   rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
632   rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
633   rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
634   rpc_defs.ED_COMPRESS: _Compress,
635   rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
636   rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
637   rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
638   }
639
640
641 class RpcRunner(_RpcClientBase,
642                 _generated_rpc.RpcClientDefault,
643                 _generated_rpc.RpcClientBootstrap,
644                 _generated_rpc.RpcClientDnsOnly,
645                 _generated_rpc.RpcClientConfig):
646   """RPC runner class.
647
648   """
649   def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
650     """Initialized the RPC runner.
651
652     @type cfg: L{config.ConfigWriter}
653     @param cfg: Configuration
654     @type lock_monitor_cb: callable
655     @param lock_monitor_cb: Lock monitor callback
656
657     """
658     self._cfg = cfg
659
660     encoders = _ENCODERS.copy()
661
662     encoders.update({
663       # Encoders requiring configuration object
664       rpc_defs.ED_INST_DICT: self._InstDict,
665       rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
666       rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
667       rpc_defs.ED_NIC_DICT: self._NicDict,
668
669       # Encoders annotating disk parameters
670       rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
671       rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
672
673       # Encoders with special requirements
674       rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
675       })
676
677     # Resolver using configuration
678     resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
679                               cfg.GetAllNodesInfo)
680
681     # Pylint doesn't recognize multiple inheritance properly, see
682     # <http://www.logilab.org/ticket/36586> and
683     # <http://www.logilab.org/ticket/35642>
684     # pylint: disable=W0233
685     _RpcClientBase.__init__(self, resolver, encoders.get,
686                             lock_monitor_cb=lock_monitor_cb,
687                             _req_process_fn=_req_process_fn)
688     _generated_rpc.RpcClientConfig.__init__(self)
689     _generated_rpc.RpcClientBootstrap.__init__(self)
690     _generated_rpc.RpcClientDnsOnly.__init__(self)
691     _generated_rpc.RpcClientDefault.__init__(self)
692
693   def _NicDict(self, nic):
694     """Convert the given nic to a dict and encapsulate netinfo
695
696     """
697     n = copy.deepcopy(nic)
698     if n.network:
699       net_uuid = self._cfg.LookupNetwork(n.network)
700       if net_uuid:
701         nobj = self._cfg.GetNetwork(net_uuid)
702         n.netinfo = objects.Network.ToDict(nobj)
703     return n.ToDict()
704
705   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
706     """Convert the given instance to a dict.
707
708     This is done via the instance's ToDict() method and additionally
709     we fill the hvparams with the cluster defaults.
710
711     @type instance: L{objects.Instance}
712     @param instance: an Instance object
713     @type hvp: dict or None
714     @param hvp: a dictionary with overridden hypervisor parameters
715     @type bep: dict or None
716     @param bep: a dictionary with overridden backend parameters
717     @type osp: dict or None
718     @param osp: a dictionary with overridden os parameters
719     @rtype: dict
720     @return: the instance dict, with the hvparams filled with the
721         cluster defaults
722
723     """
724     idict = instance.ToDict()
725     cluster = self._cfg.GetClusterInfo()
726     idict["hvparams"] = cluster.FillHV(instance)
727     if hvp is not None:
728       idict["hvparams"].update(hvp)
729     idict["beparams"] = cluster.FillBE(instance)
730     if bep is not None:
731       idict["beparams"].update(bep)
732     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
733     if osp is not None:
734       idict["osparams"].update(osp)
735     for nic in idict["nics"]:
736       nic["nicparams"] = objects.FillDict(
737         cluster.nicparams[constants.PP_DEFAULT],
738         nic["nicparams"])
739       network = nic.get("network", None)
740       if network:
741         net_uuid = self._cfg.LookupNetwork(network)
742         if net_uuid:
743           nobj = self._cfg.GetNetwork(net_uuid)
744           nic["netinfo"] = objects.Network.ToDict(nobj)
745     idict["disks"] = self._DisksDictDP((instance.disks, instance))
746     return idict
747
748   def _InstDictHvpBepDp(self, (instance, hvp, bep)):
749     """Wrapper for L{_InstDict}.
750
751     """
752     return self._InstDict(instance, hvp=hvp, bep=bep)
753
754   def _InstDictOspDp(self, (instance, osparams)):
755     """Wrapper for L{_InstDict}.
756
757     """
758     return self._InstDict(instance, osp=osparams)
759
760   def _DisksDictDP(self, (disks, instance)):
761     """Wrapper for L{AnnotateDiskParams}.
762
763     """
764     diskparams = self._cfg.GetInstanceDiskParams(instance)
765     return [disk.ToDict()
766             for disk in AnnotateDiskParams(instance.disk_template,
767                                            disks, diskparams)]
768
769   def _SingleDiskDictDP(self, (disk, instance)):
770     """Wrapper for L{AnnotateDiskParams}.
771
772     """
773     (anno_disk,) = self._DisksDictDP(([disk], instance))
774     return anno_disk
775
776
777 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
778   """RPC wrappers for job queue.
779
780   """
781   def __init__(self, context, address_list):
782     """Initializes this class.
783
784     """
785     if address_list is None:
786       resolver = compat.partial(_SsconfResolver, True)
787     else:
788       # Caller provided an address list
789       resolver = _StaticResolver(address_list)
790
791     _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
792                             lock_monitor_cb=context.glm.AddToLockMonitor)
793     _generated_rpc.RpcClientJobQueue.__init__(self)
794
795
796 class BootstrapRunner(_RpcClientBase,
797                       _generated_rpc.RpcClientBootstrap,
798                       _generated_rpc.RpcClientDnsOnly):
799   """RPC wrappers for bootstrapping.
800
801   """
802   def __init__(self):
803     """Initializes this class.
804
805     """
806     # Pylint doesn't recognize multiple inheritance properly, see
807     # <http://www.logilab.org/ticket/36586> and
808     # <http://www.logilab.org/ticket/35642>
809     # pylint: disable=W0233
810     _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
811                             _ENCODERS.get)
812     _generated_rpc.RpcClientBootstrap.__init__(self)
813     _generated_rpc.RpcClientDnsOnly.__init__(self)
814
815
816 class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
817   """RPC wrappers for calls using only DNS.
818
819   """
820   def __init__(self):
821     """Initialize this class.
822
823     """
824     _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
825                             _ENCODERS.get)
826     _generated_rpc.RpcClientDnsOnly.__init__(self)
827
828
829 class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
830   """RPC wrappers for L{config}.
831
832   """
833   def __init__(self, context, address_list, _req_process_fn=None,
834                _getents=None):
835     """Initializes this class.
836
837     """
838     if context:
839       lock_monitor_cb = context.glm.AddToLockMonitor
840     else:
841       lock_monitor_cb = None
842
843     if address_list is None:
844       resolver = compat.partial(_SsconfResolver, True)
845     else:
846       # Caller provided an address list
847       resolver = _StaticResolver(address_list)
848
849     encoders = _ENCODERS.copy()
850
851     encoders.update({
852       rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
853       })
854
855     _RpcClientBase.__init__(self, resolver, encoders.get,
856                             lock_monitor_cb=lock_monitor_cb,
857                             _req_process_fn=_req_process_fn)
858     _generated_rpc.RpcClientConfig.__init__(self)