Use hvparams in GetInstanceInfo
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import logging
34 import zlib
35 import base64
36 import pycurl
37 import threading
38 import copy
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49 from ganeti import compat
50 from ganeti import rpc_defs
51 from ganeti import pathutils
52 from ganeti import vcluster
53
54 # Special module generated at build time
55 from ganeti import _generated_rpc
56
57 # pylint has a bug here, doesn't see this import
58 import ganeti.http.client  # pylint: disable=W0611
59
60
61 _RPC_CLIENT_HEADERS = [
62   "Content-type: %s" % http.HTTP_APP_JSON,
63   "Expect:",
64   ]
65
66 #: Special value to describe an offline host
67 _OFFLINE = object()
68
69
70 def Init():
71   """Initializes the module-global HTTP client manager.
72
73   Must be called before using any RPC function and while exactly one thread is
74   running.
75
76   """
77   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
78   # one thread running. This check is just a safety measure -- it doesn't
79   # cover all cases.
80   assert threading.activeCount() == 1, \
81          "Found more than one active thread when initializing pycURL"
82
83   logging.info("Using PycURL %s", pycurl.version)
84
85   pycurl.global_init(pycurl.GLOBAL_ALL)
86
87
88 def Shutdown():
89   """Stops the module-global HTTP client manager.
90
91   Must be called before quitting the program and while exactly one thread is
92   running.
93
94   """
95   pycurl.global_cleanup()
96
97
98 def _ConfigRpcCurl(curl):
99   noded_cert = str(pathutils.NODED_CERT_FILE)
100
101   curl.setopt(pycurl.FOLLOWLOCATION, False)
102   curl.setopt(pycurl.CAINFO, noded_cert)
103   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
104   curl.setopt(pycurl.SSL_VERIFYPEER, True)
105   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
106   curl.setopt(pycurl.SSLCERT, noded_cert)
107   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
108   curl.setopt(pycurl.SSLKEY, noded_cert)
109   curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
110
111
112 def RunWithRPC(fn):
113   """RPC-wrapper decorator.
114
115   When applied to a function, it runs it with the RPC system
116   initialized, and it shutsdown the system afterwards. This means the
117   function must be called without RPC being initialized.
118
119   """
120   def wrapper(*args, **kwargs):
121     Init()
122     try:
123       return fn(*args, **kwargs)
124     finally:
125       Shutdown()
126   return wrapper
127
128
129 def _Compress(data):
130   """Compresses a string for transport over RPC.
131
132   Small amounts of data are not compressed.
133
134   @type data: str
135   @param data: Data
136   @rtype: tuple
137   @return: Encoded data to send
138
139   """
140   # Small amounts of data are not compressed
141   if len(data) < 512:
142     return (constants.RPC_ENCODING_NONE, data)
143
144   # Compress with zlib and encode in base64
145   return (constants.RPC_ENCODING_ZLIB_BASE64,
146           base64.b64encode(zlib.compress(data, 3)))
147
148
149 class RpcResult(object):
150   """RPC Result class.
151
152   This class holds an RPC result. It is needed since in multi-node
153   calls we can't raise an exception just because one out of many
154   failed, and therefore we use this class to encapsulate the result.
155
156   @ivar data: the data payload, for successful results, or None
157   @ivar call: the name of the RPC call
158   @ivar node: the name of the node to which we made the call
159   @ivar offline: whether the operation failed because the node was
160       offline, as opposed to actual failure; offline=True will always
161       imply failed=True, in order to allow simpler checking if
162       the user doesn't care about the exact failure mode
163   @ivar fail_msg: the error message if the call failed
164
165   """
166   def __init__(self, data=None, failed=False, offline=False,
167                call=None, node=None):
168     self.offline = offline
169     self.call = call
170     self.node = node
171
172     if offline:
173       self.fail_msg = "Node is marked offline"
174       self.data = self.payload = None
175     elif failed:
176       self.fail_msg = self._EnsureErr(data)
177       self.data = self.payload = None
178     else:
179       self.data = data
180       if not isinstance(self.data, (tuple, list)):
181         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
182                          type(self.data))
183         self.payload = None
184       elif len(data) != 2:
185         self.fail_msg = ("RPC layer error: invalid result length (%d), "
186                          "expected 2" % len(self.data))
187         self.payload = None
188       elif not self.data[0]:
189         self.fail_msg = self._EnsureErr(self.data[1])
190         self.payload = None
191       else:
192         # finally success
193         self.fail_msg = None
194         self.payload = data[1]
195
196     for attr_name in ["call", "data", "fail_msg",
197                       "node", "offline", "payload"]:
198       assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
199
200   @staticmethod
201   def _EnsureErr(val):
202     """Helper to ensure we return a 'True' value for error."""
203     if val:
204       return val
205     else:
206       return "No error information"
207
208   def Raise(self, msg, prereq=False, ecode=None):
209     """If the result has failed, raise an OpExecError.
210
211     This is used so that LU code doesn't have to check for each
212     result, but instead can call this function.
213
214     """
215     if not self.fail_msg:
216       return
217
218     if not msg: # one could pass None for default message
219       msg = ("Call '%s' to node '%s' has failed: %s" %
220              (self.call, self.node, self.fail_msg))
221     else:
222       msg = "%s: %s" % (msg, self.fail_msg)
223     if prereq:
224       ec = errors.OpPrereqError
225     else:
226       ec = errors.OpExecError
227     if ecode is not None:
228       args = (msg, ecode)
229     else:
230       args = (msg, )
231     raise ec(*args) # pylint: disable=W0142
232
233   def Warn(self, msg, feedback_fn):
234     """If the result has failed, call the feedback_fn.
235
236     This is used to in cases were LU wants to warn the
237     user about a failure, but continue anyway.
238
239     """
240     if not self.fail_msg:
241       return
242
243     msg = "%s: %s" % (msg, self.fail_msg)
244     feedback_fn(msg)
245
246
247 def _SsconfResolver(ssconf_ips, node_list, _,
248                     ssc=ssconf.SimpleStore,
249                     nslookup_fn=netutils.Hostname.GetIP):
250   """Return addresses for given node names.
251
252   @type ssconf_ips: bool
253   @param ssconf_ips: Use the ssconf IPs
254   @type node_list: list
255   @param node_list: List of node names
256   @type ssc: class
257   @param ssc: SimpleStore class that is used to obtain node->ip mappings
258   @type nslookup_fn: callable
259   @param nslookup_fn: function use to do NS lookup
260   @rtype: list of tuple; (string, string)
261   @return: List of tuples containing node name and IP address
262
263   """
264   ss = ssc()
265   family = ss.GetPrimaryIPFamily()
266
267   if ssconf_ips:
268     iplist = ss.GetNodePrimaryIPList()
269     ipmap = dict(entry.split() for entry in iplist)
270   else:
271     ipmap = {}
272
273   result = []
274   for node in node_list:
275     ip = ipmap.get(node)
276     if ip is None:
277       ip = nslookup_fn(node, family=family)
278     result.append((node, ip))
279
280   return result
281
282
283 class _StaticResolver:
284   def __init__(self, addresses):
285     """Initializes this class.
286
287     """
288     self._addresses = addresses
289
290   def __call__(self, hosts, _):
291     """Returns static addresses for hosts.
292
293     """
294     assert len(hosts) == len(self._addresses)
295     return zip(hosts, self._addresses)
296
297
298 def _CheckConfigNode(name, node, accept_offline_node):
299   """Checks if a node is online.
300
301   @type name: string
302   @param name: Node name
303   @type node: L{objects.Node} or None
304   @param node: Node object
305
306   """
307   if node is None:
308     # Depend on DNS for name resolution
309     ip = name
310   elif node.offline and not accept_offline_node:
311     ip = _OFFLINE
312   else:
313     ip = node.primary_ip
314   return (name, ip)
315
316
317 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
318   """Calculate node addresses using configuration.
319
320   """
321   accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
322
323   assert accept_offline_node or opts is None, "Unknown option"
324
325   # Special case for single-host lookups
326   if len(hosts) == 1:
327     (name, ) = hosts
328     return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
329   else:
330     all_nodes = all_nodes_fn()
331     return [_CheckConfigNode(name, all_nodes.get(name, None),
332                              accept_offline_node)
333             for name in hosts]
334
335
336 class _RpcProcessor:
337   def __init__(self, resolver, port, lock_monitor_cb=None):
338     """Initializes this class.
339
340     @param resolver: callable accepting a list of hostnames, returning a list
341       of tuples containing name and IP address (IP address can be the name or
342       the special value L{_OFFLINE} to mark offline machines)
343     @type port: int
344     @param port: TCP port
345     @param lock_monitor_cb: Callable for registering with lock monitor
346
347     """
348     self._resolver = resolver
349     self._port = port
350     self._lock_monitor_cb = lock_monitor_cb
351
352   @staticmethod
353   def _PrepareRequests(hosts, port, procedure, body, read_timeout):
354     """Prepares requests by sorting offline hosts into separate list.
355
356     @type body: dict
357     @param body: a dictionary with per-host body data
358
359     """
360     results = {}
361     requests = {}
362
363     assert isinstance(body, dict)
364     assert len(body) == len(hosts)
365     assert compat.all(isinstance(v, str) for v in body.values())
366     assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
367         "%s != %s" % (hosts, body.keys())
368
369     for (name, ip) in hosts:
370       if ip is _OFFLINE:
371         # Node is marked as offline
372         results[name] = RpcResult(node=name, offline=True, call=procedure)
373       else:
374         requests[name] = \
375           http.client.HttpClientRequest(str(ip), port,
376                                         http.HTTP_POST, str("/%s" % procedure),
377                                         headers=_RPC_CLIENT_HEADERS,
378                                         post_data=body[name],
379                                         read_timeout=read_timeout,
380                                         nicename="%s/%s" % (name, procedure),
381                                         curl_config_fn=_ConfigRpcCurl)
382
383     return (results, requests)
384
385   @staticmethod
386   def _CombineResults(results, requests, procedure):
387     """Combines pre-computed results for offline hosts with actual call results.
388
389     """
390     for name, req in requests.items():
391       if req.success and req.resp_status_code == http.HTTP_OK:
392         host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
393                                 node=name, call=procedure)
394       else:
395         # TODO: Better error reporting
396         if req.error:
397           msg = req.error
398         else:
399           msg = req.resp_body
400
401         logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
402         host_result = RpcResult(data=msg, failed=True, node=name,
403                                 call=procedure)
404
405       results[name] = host_result
406
407     return results
408
409   def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
410                _req_process_fn=None):
411     """Makes an RPC request to a number of nodes.
412
413     @type hosts: sequence
414     @param hosts: Hostnames
415     @type procedure: string
416     @param procedure: Request path
417     @type body: dictionary
418     @param body: dictionary with request bodies per host
419     @type read_timeout: int or None
420     @param read_timeout: Read timeout for request
421     @rtype: dictionary
422     @return: a dictionary mapping host names to rpc.RpcResult objects
423
424     """
425     assert read_timeout is not None, \
426       "Missing RPC read timeout for procedure '%s'" % procedure
427
428     if _req_process_fn is None:
429       _req_process_fn = http.client.ProcessRequests
430
431     (results, requests) = \
432       self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
433                             procedure, body, read_timeout)
434
435     _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
436
437     assert not frozenset(results).intersection(requests)
438
439     return self._CombineResults(results, requests, procedure)
440
441
442 class _RpcClientBase:
443   def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
444                _req_process_fn=None):
445     """Initializes this class.
446
447     """
448     proc = _RpcProcessor(resolver,
449                          netutils.GetDaemonPort(constants.NODED),
450                          lock_monitor_cb=lock_monitor_cb)
451     self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
452     self._encoder = compat.partial(self._EncodeArg, encoder_fn)
453
454   @staticmethod
455   def _EncodeArg(encoder_fn, (argkind, value)):
456     """Encode argument.
457
458     """
459     if argkind is None:
460       return value
461     else:
462       return encoder_fn(argkind)(value)
463
464   def _Call(self, cdef, node_list, args):
465     """Entry point for automatically generated RPC wrappers.
466
467     """
468     (procedure, _, resolver_opts, timeout, argdefs,
469      prep_fn, postproc_fn, _) = cdef
470
471     if callable(timeout):
472       read_timeout = timeout(args)
473     else:
474       read_timeout = timeout
475
476     if callable(resolver_opts):
477       req_resolver_opts = resolver_opts(args)
478     else:
479       req_resolver_opts = resolver_opts
480
481     if len(args) != len(argdefs):
482       raise errors.ProgrammerError("Number of passed arguments doesn't match")
483
484     enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
485     if prep_fn is None:
486       # for a no-op prep_fn, we serialise the body once, and then we
487       # reuse it in the dictionary values
488       body = serializer.DumpJson(enc_args)
489       pnbody = dict((n, body) for n in node_list)
490     else:
491       # for a custom prep_fn, we pass the encoded arguments and the
492       # node name to the prep_fn, and we serialise its return value
493       assert callable(prep_fn)
494       pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
495                     for n in node_list)
496
497     result = self._proc(node_list, procedure, pnbody, read_timeout,
498                         req_resolver_opts)
499
500     if postproc_fn:
501       return dict(map(lambda (key, value): (key, postproc_fn(value)),
502                       result.items()))
503     else:
504       return result
505
506
507 def _ObjectToDict(value):
508   """Converts an object to a dictionary.
509
510   @note: See L{objects}.
511
512   """
513   return value.ToDict()
514
515
516 def _ObjectListToDict(value):
517   """Converts a list of L{objects} to dictionaries.
518
519   """
520   return map(_ObjectToDict, value)
521
522
523 def _EncodeNodeToDiskDict(value):
524   """Encodes a dictionary with node name as key and disk objects as values.
525
526   """
527   return dict((name, _ObjectListToDict(disks))
528               for name, disks in value.items())
529
530
531 def _PrepareFileUpload(getents_fn, filename):
532   """Loads a file and prepares it for an upload to nodes.
533
534   """
535   statcb = utils.FileStatHelper()
536   data = _Compress(utils.ReadFile(filename, preread=statcb))
537   st = statcb.st
538
539   if getents_fn is None:
540     getents_fn = runtime.GetEnts
541
542   getents = getents_fn()
543
544   virt_filename = vcluster.MakeVirtualPath(filename)
545
546   return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid),
547           getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
548
549
550 def _PrepareFinalizeExportDisks(snap_disks):
551   """Encodes disks for finalizing export.
552
553   """
554   flat_disks = []
555
556   for disk in snap_disks:
557     if isinstance(disk, bool):
558       flat_disks.append(disk)
559     else:
560       flat_disks.append(disk.ToDict())
561
562   return flat_disks
563
564
565 def _EncodeImportExportIO((ieio, ieioargs)):
566   """Encodes import/export I/O information.
567
568   """
569   if ieio == constants.IEIO_RAW_DISK:
570     assert len(ieioargs) == 1
571     return (ieio, (ieioargs[0].ToDict(), ))
572
573   if ieio == constants.IEIO_SCRIPT:
574     assert len(ieioargs) == 2
575     return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
576
577   return (ieio, ieioargs)
578
579
580 def _EncodeBlockdevRename(value):
581   """Encodes information for renaming block devices.
582
583   """
584   return [(d.ToDict(), uid) for d, uid in value]
585
586
587 def BuildVgInfoQuery(cfg):
588   """Build a query about the default VG for C{node_info}.
589
590   The result of the RPC can be parsed with L{MakeLegacyNodeInfo}.
591
592   @type cfg: L{config.ConfigWriter}
593   @param cfg: Cluster configuration
594   @rtype: list
595   @return: argument suitable for L{rpc.RpcRunner.call_node_info}
596
597   """
598   vg_name = cfg.GetVGName()
599   if vg_name:
600     ret = [
601       (constants.ST_LVM_VG, vg_name),
602       (constants.ST_LVM_PV, vg_name),
603       ]
604   else:
605     ret = []
606   return ret
607
608
609 def MakeLegacyNodeInfo(data, require_vg_info=True):
610   """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
611
612   Converts the data into a single dictionary. This is fine for most use cases,
613   but some require information from more than one volume group or hypervisor.
614
615   @param require_vg_info: raise an error if the returnd vg_info
616       doesn't have any values
617
618   """
619   (bootid, vgs_info, (hv_info, )) = data
620
621   ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid})
622
623   if require_vg_info or vgs_info:
624     (vg0_info, vg0_spindles) = vgs_info
625     ret = utils.JoinDisjointDicts(vg0_info, ret)
626     ret["spindles_free"] = vg0_spindles["vg_free"]
627     ret["spindles_total"] = vg0_spindles["vg_size"]
628
629   return ret
630
631
632 def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
633   """Annotates just DRBD disks layouts.
634
635   """
636   assert disk.dev_type == constants.LD_DRBD8
637
638   disk.params = objects.FillDict(drbd_params, disk.params)
639   (dev_data, dev_meta) = disk.children
640   dev_data.params = objects.FillDict(data_params, dev_data.params)
641   dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
642
643   return disk
644
645
646 def _AnnotateDParamsGeneric(disk, (params, )):
647   """Generic disk parameter annotation routine.
648
649   """
650   assert disk.dev_type != constants.LD_DRBD8
651
652   disk.params = objects.FillDict(params, disk.params)
653
654   return disk
655
656
657 def AnnotateDiskParams(template, disks, disk_params):
658   """Annotates the disk objects with the disk parameters.
659
660   @param template: The disk template used
661   @param disks: The list of disks objects to annotate
662   @param disk_params: The disk paramaters for annotation
663   @returns: A list of disk objects annotated
664
665   """
666   ld_params = objects.Disk.ComputeLDParams(template, disk_params)
667
668   if template == constants.DT_DRBD8:
669     annotation_fn = _AnnotateDParamsDRBD
670   elif template == constants.DT_DISKLESS:
671     annotation_fn = lambda disk, _: disk
672   else:
673     annotation_fn = _AnnotateDParamsGeneric
674
675   return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
676
677
678 def _GetESFlag(cfg, nodename):
679   ni = cfg.GetNodeInfo(nodename)
680   if ni is None:
681     raise errors.OpPrereqError("Invalid node name %s" % nodename,
682                                errors.ECODE_NOENT)
683   return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]
684
685
686 def GetExclusiveStorageForNodeNames(cfg, nodelist):
687   """Return the exclusive storage flag for all the given nodes.
688
689   @type cfg: L{config.ConfigWriter}
690   @param cfg: cluster configuration
691   @type nodelist: list or tuple
692   @param nodelist: node names for which to read the flag
693   @rtype: dict
694   @return: mapping from node names to exclusive storage flags
695   @raise errors.OpPrereqError: if any given node name has no corresponding node
696
697   """
698   getflag = lambda n: _GetESFlag(cfg, n)
699   flags = map(getflag, nodelist)
700   return dict(zip(nodelist, flags))
701
702
703 #: Generic encoders
704 _ENCODERS = {
705   rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
706   rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
707   rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
708   rpc_defs.ED_COMPRESS: _Compress,
709   rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
710   rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
711   rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
712   }
713
714
715 class RpcRunner(_RpcClientBase,
716                 _generated_rpc.RpcClientDefault,
717                 _generated_rpc.RpcClientBootstrap,
718                 _generated_rpc.RpcClientDnsOnly,
719                 _generated_rpc.RpcClientConfig):
720   """RPC runner class.
721
722   """
723   def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
724     """Initialized the RPC runner.
725
726     @type cfg: L{config.ConfigWriter}
727     @param cfg: Configuration
728     @type lock_monitor_cb: callable
729     @param lock_monitor_cb: Lock monitor callback
730
731     """
732     self._cfg = cfg
733
734     encoders = _ENCODERS.copy()
735
736     encoders.update({
737       # Encoders requiring configuration object
738       rpc_defs.ED_INST_DICT: self._InstDict,
739       rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
740       rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
741       rpc_defs.ED_NIC_DICT: self._NicDict,
742
743       # Encoders annotating disk parameters
744       rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
745       rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
746
747       # Encoders with special requirements
748       rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
749       })
750
751     # Resolver using configuration
752     resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
753                               cfg.GetAllNodesInfo)
754
755     # Pylint doesn't recognize multiple inheritance properly, see
756     # <http://www.logilab.org/ticket/36586> and
757     # <http://www.logilab.org/ticket/35642>
758     # pylint: disable=W0233
759     _RpcClientBase.__init__(self, resolver, encoders.get,
760                             lock_monitor_cb=lock_monitor_cb,
761                             _req_process_fn=_req_process_fn)
762     _generated_rpc.RpcClientConfig.__init__(self)
763     _generated_rpc.RpcClientBootstrap.__init__(self)
764     _generated_rpc.RpcClientDnsOnly.__init__(self)
765     _generated_rpc.RpcClientDefault.__init__(self)
766
767   def _NicDict(self, nic):
768     """Convert the given nic to a dict and encapsulate netinfo
769
770     """
771     n = copy.deepcopy(nic)
772     if n.network:
773       net_uuid = self._cfg.LookupNetwork(n.network)
774       if net_uuid:
775         nobj = self._cfg.GetNetwork(net_uuid)
776         n.netinfo = objects.Network.ToDict(nobj)
777     return n.ToDict()
778
779   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
780     """Convert the given instance to a dict.
781
782     This is done via the instance's ToDict() method and additionally
783     we fill the hvparams with the cluster defaults.
784
785     @type instance: L{objects.Instance}
786     @param instance: an Instance object
787     @type hvp: dict or None
788     @param hvp: a dictionary with overridden hypervisor parameters
789     @type bep: dict or None
790     @param bep: a dictionary with overridden backend parameters
791     @type osp: dict or None
792     @param osp: a dictionary with overridden os parameters
793     @rtype: dict
794     @return: the instance dict, with the hvparams filled with the
795         cluster defaults
796
797     """
798     idict = instance.ToDict()
799     cluster = self._cfg.GetClusterInfo()
800     idict["hvparams"] = cluster.FillHV(instance)
801     if hvp is not None:
802       idict["hvparams"].update(hvp)
803     idict["beparams"] = cluster.FillBE(instance)
804     if bep is not None:
805       idict["beparams"].update(bep)
806     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
807     if osp is not None:
808       idict["osparams"].update(osp)
809     idict["disks"] = self._DisksDictDP((instance.disks, instance))
810     for nic in idict["nics"]:
811       nic["nicparams"] = objects.FillDict(
812         cluster.nicparams[constants.PP_DEFAULT],
813         nic["nicparams"])
814       network = nic.get("network", None)
815       if network:
816         net_uuid = self._cfg.LookupNetwork(network)
817         if net_uuid:
818           nobj = self._cfg.GetNetwork(net_uuid)
819           nic["netinfo"] = objects.Network.ToDict(nobj)
820     return idict
821
822   def _InstDictHvpBepDp(self, (instance, hvp, bep)):
823     """Wrapper for L{_InstDict}.
824
825     """
826     return self._InstDict(instance, hvp=hvp, bep=bep)
827
828   def _InstDictOspDp(self, (instance, osparams)):
829     """Wrapper for L{_InstDict}.
830
831     """
832     return self._InstDict(instance, osp=osparams)
833
834   def _DisksDictDP(self, (disks, instance)):
835     """Wrapper for L{AnnotateDiskParams}.
836
837     """
838     diskparams = self._cfg.GetInstanceDiskParams(instance)
839     return [disk.ToDict()
840             for disk in AnnotateDiskParams(instance.disk_template,
841                                            disks, diskparams)]
842
843   def _SingleDiskDictDP(self, (disk, instance)):
844     """Wrapper for L{AnnotateDiskParams}.
845
846     """
847     (anno_disk,) = self._DisksDictDP(([disk], instance))
848     return anno_disk
849
850
851 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
852   """RPC wrappers for job queue.
853
854   """
855   def __init__(self, context, address_list):
856     """Initializes this class.
857
858     """
859     if address_list is None:
860       resolver = compat.partial(_SsconfResolver, True)
861     else:
862       # Caller provided an address list
863       resolver = _StaticResolver(address_list)
864
865     _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
866                             lock_monitor_cb=context.glm.AddToLockMonitor)
867     _generated_rpc.RpcClientJobQueue.__init__(self)
868
869
870 class BootstrapRunner(_RpcClientBase,
871                       _generated_rpc.RpcClientBootstrap,
872                       _generated_rpc.RpcClientDnsOnly):
873   """RPC wrappers for bootstrapping.
874
875   """
876   def __init__(self):
877     """Initializes this class.
878
879     """
880     # Pylint doesn't recognize multiple inheritance properly, see
881     # <http://www.logilab.org/ticket/36586> and
882     # <http://www.logilab.org/ticket/35642>
883     # pylint: disable=W0233
884     _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
885                             _ENCODERS.get)
886     _generated_rpc.RpcClientBootstrap.__init__(self)
887     _generated_rpc.RpcClientDnsOnly.__init__(self)
888
889
890 class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
891   """RPC wrappers for calls using only DNS.
892
893   """
894   def __init__(self):
895     """Initialize this class.
896
897     """
898     _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
899                             _ENCODERS.get)
900     _generated_rpc.RpcClientDnsOnly.__init__(self)
901
902
903 class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
904   """RPC wrappers for L{config}.
905
906   """
907   def __init__(self, context, address_list, _req_process_fn=None,
908                _getents=None):
909     """Initializes this class.
910
911     """
912     if context:
913       lock_monitor_cb = context.glm.AddToLockMonitor
914     else:
915       lock_monitor_cb = None
916
917     if address_list is None:
918       resolver = compat.partial(_SsconfResolver, True)
919     else:
920       # Caller provided an address list
921       resolver = _StaticResolver(address_list)
922
923     encoders = _ENCODERS.copy()
924
925     encoders.update({
926       rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
927       })
928
929     _RpcClientBase.__init__(self, resolver, encoders.get,
930                             lock_monitor_cb=lock_monitor_cb,
931                             _req_process_fn=_req_process_fn)
932     _generated_rpc.RpcClientConfig.__init__(self)