Locking fixes regarding Issue 324
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import logging
34 import zlib
35 import base64
36 import pycurl
37 import threading
38 import copy
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49 from ganeti import compat
50 from ganeti import rpc_defs
51 from ganeti import pathutils
52 from ganeti import vcluster
53
54 # Special module generated at build time
55 from ganeti import _generated_rpc
56
57 # pylint has a bug here, doesn't see this import
58 import ganeti.http.client  # pylint: disable=W0611
59
60
61 _RPC_CLIENT_HEADERS = [
62   "Content-type: %s" % http.HTTP_APP_JSON,
63   "Expect:",
64   ]
65
66 #: Special value to describe an offline host
67 _OFFLINE = object()
68
69
70 def Init():
71   """Initializes the module-global HTTP client manager.
72
73   Must be called before using any RPC function and while exactly one thread is
74   running.
75
76   """
77   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
78   # one thread running. This check is just a safety measure -- it doesn't
79   # cover all cases.
80   assert threading.activeCount() == 1, \
81          "Found more than one active thread when initializing pycURL"
82
83   logging.info("Using PycURL %s", pycurl.version)
84
85   pycurl.global_init(pycurl.GLOBAL_ALL)
86
87
88 def Shutdown():
89   """Stops the module-global HTTP client manager.
90
91   Must be called before quitting the program and while exactly one thread is
92   running.
93
94   """
95   pycurl.global_cleanup()
96
97
98 def _ConfigRpcCurl(curl):
99   noded_cert = str(pathutils.NODED_CERT_FILE)
100
101   curl.setopt(pycurl.FOLLOWLOCATION, False)
102   curl.setopt(pycurl.CAINFO, noded_cert)
103   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
104   curl.setopt(pycurl.SSL_VERIFYPEER, True)
105   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
106   curl.setopt(pycurl.SSLCERT, noded_cert)
107   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
108   curl.setopt(pycurl.SSLKEY, noded_cert)
109   curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
110
111
112 def RunWithRPC(fn):
113   """RPC-wrapper decorator.
114
115   When applied to a function, it runs it with the RPC system
116   initialized, and it shutsdown the system afterwards. This means the
117   function must be called without RPC being initialized.
118
119   """
120   def wrapper(*args, **kwargs):
121     Init()
122     try:
123       return fn(*args, **kwargs)
124     finally:
125       Shutdown()
126   return wrapper
127
128
129 def _Compress(data):
130   """Compresses a string for transport over RPC.
131
132   Small amounts of data are not compressed.
133
134   @type data: str
135   @param data: Data
136   @rtype: tuple
137   @return: Encoded data to send
138
139   """
140   # Small amounts of data are not compressed
141   if len(data) < 512:
142     return (constants.RPC_ENCODING_NONE, data)
143
144   # Compress with zlib and encode in base64
145   return (constants.RPC_ENCODING_ZLIB_BASE64,
146           base64.b64encode(zlib.compress(data, 3)))
147
148
149 class RpcResult(object):
150   """RPC Result class.
151
152   This class holds an RPC result. It is needed since in multi-node
153   calls we can't raise an exception just because one out of many
154   failed, and therefore we use this class to encapsulate the result.
155
156   @ivar data: the data payload, for successful results, or None
157   @ivar call: the name of the RPC call
158   @ivar node: the name of the node to which we made the call
159   @ivar offline: whether the operation failed because the node was
160       offline, as opposed to actual failure; offline=True will always
161       imply failed=True, in order to allow simpler checking if
162       the user doesn't care about the exact failure mode
163   @ivar fail_msg: the error message if the call failed
164
165   """
166   def __init__(self, data=None, failed=False, offline=False,
167                call=None, node=None):
168     self.offline = offline
169     self.call = call
170     self.node = node
171
172     if offline:
173       self.fail_msg = "Node is marked offline"
174       self.data = self.payload = None
175     elif failed:
176       self.fail_msg = self._EnsureErr(data)
177       self.data = self.payload = None
178     else:
179       self.data = data
180       if not isinstance(self.data, (tuple, list)):
181         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
182                          type(self.data))
183         self.payload = None
184       elif len(data) != 2:
185         self.fail_msg = ("RPC layer error: invalid result length (%d), "
186                          "expected 2" % len(self.data))
187         self.payload = None
188       elif not self.data[0]:
189         self.fail_msg = self._EnsureErr(self.data[1])
190         self.payload = None
191       else:
192         # finally success
193         self.fail_msg = None
194         self.payload = data[1]
195
196     for attr_name in ["call", "data", "fail_msg",
197                       "node", "offline", "payload"]:
198       assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
199
200   @staticmethod
201   def _EnsureErr(val):
202     """Helper to ensure we return a 'True' value for error."""
203     if val:
204       return val
205     else:
206       return "No error information"
207
208   def Raise(self, msg, prereq=False, ecode=None):
209     """If the result has failed, raise an OpExecError.
210
211     This is used so that LU code doesn't have to check for each
212     result, but instead can call this function.
213
214     """
215     if not self.fail_msg:
216       return
217
218     if not msg: # one could pass None for default message
219       msg = ("Call '%s' to node '%s' has failed: %s" %
220              (self.call, self.node, self.fail_msg))
221     else:
222       msg = "%s: %s" % (msg, self.fail_msg)
223     if prereq:
224       ec = errors.OpPrereqError
225     else:
226       ec = errors.OpExecError
227     if ecode is not None:
228       args = (msg, ecode)
229     else:
230       args = (msg, )
231     raise ec(*args) # pylint: disable=W0142
232
233
234 def _SsconfResolver(ssconf_ips, node_list, _,
235                     ssc=ssconf.SimpleStore,
236                     nslookup_fn=netutils.Hostname.GetIP):
237   """Return addresses for given node names.
238
239   @type ssconf_ips: bool
240   @param ssconf_ips: Use the ssconf IPs
241   @type node_list: list
242   @param node_list: List of node names
243   @type ssc: class
244   @param ssc: SimpleStore class that is used to obtain node->ip mappings
245   @type nslookup_fn: callable
246   @param nslookup_fn: function use to do NS lookup
247   @rtype: list of tuple; (string, string)
248   @return: List of tuples containing node name and IP address
249
250   """
251   ss = ssc()
252   family = ss.GetPrimaryIPFamily()
253
254   if ssconf_ips:
255     iplist = ss.GetNodePrimaryIPList()
256     ipmap = dict(entry.split() for entry in iplist)
257   else:
258     ipmap = {}
259
260   result = []
261   for node in node_list:
262     ip = ipmap.get(node)
263     if ip is None:
264       ip = nslookup_fn(node, family=family)
265     result.append((node, ip))
266
267   return result
268
269
270 class _StaticResolver:
271   def __init__(self, addresses):
272     """Initializes this class.
273
274     """
275     self._addresses = addresses
276
277   def __call__(self, hosts, _):
278     """Returns static addresses for hosts.
279
280     """
281     assert len(hosts) == len(self._addresses)
282     return zip(hosts, self._addresses)
283
284
285 def _CheckConfigNode(name, node, accept_offline_node):
286   """Checks if a node is online.
287
288   @type name: string
289   @param name: Node name
290   @type node: L{objects.Node} or None
291   @param node: Node object
292
293   """
294   if node is None:
295     # Depend on DNS for name resolution
296     ip = name
297   elif node.offline and not accept_offline_node:
298     ip = _OFFLINE
299   else:
300     ip = node.primary_ip
301   return (name, ip)
302
303
304 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
305   """Calculate node addresses using configuration.
306
307   """
308   accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
309
310   assert accept_offline_node or opts is None, "Unknown option"
311
312   # Special case for single-host lookups
313   if len(hosts) == 1:
314     (name, ) = hosts
315     return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
316   else:
317     all_nodes = all_nodes_fn()
318     return [_CheckConfigNode(name, all_nodes.get(name, None),
319                              accept_offline_node)
320             for name in hosts]
321
322
323 class _RpcProcessor:
324   def __init__(self, resolver, port, lock_monitor_cb=None):
325     """Initializes this class.
326
327     @param resolver: callable accepting a list of hostnames, returning a list
328       of tuples containing name and IP address (IP address can be the name or
329       the special value L{_OFFLINE} to mark offline machines)
330     @type port: int
331     @param port: TCP port
332     @param lock_monitor_cb: Callable for registering with lock monitor
333
334     """
335     self._resolver = resolver
336     self._port = port
337     self._lock_monitor_cb = lock_monitor_cb
338
339   @staticmethod
340   def _PrepareRequests(hosts, port, procedure, body, read_timeout):
341     """Prepares requests by sorting offline hosts into separate list.
342
343     @type body: dict
344     @param body: a dictionary with per-host body data
345
346     """
347     results = {}
348     requests = {}
349
350     assert isinstance(body, dict)
351     assert len(body) == len(hosts)
352     assert compat.all(isinstance(v, str) for v in body.values())
353     assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
354         "%s != %s" % (hosts, body.keys())
355
356     for (name, ip) in hosts:
357       if ip is _OFFLINE:
358         # Node is marked as offline
359         results[name] = RpcResult(node=name, offline=True, call=procedure)
360       else:
361         requests[name] = \
362           http.client.HttpClientRequest(str(ip), port,
363                                         http.HTTP_POST, str("/%s" % procedure),
364                                         headers=_RPC_CLIENT_HEADERS,
365                                         post_data=body[name],
366                                         read_timeout=read_timeout,
367                                         nicename="%s/%s" % (name, procedure),
368                                         curl_config_fn=_ConfigRpcCurl)
369
370     return (results, requests)
371
372   @staticmethod
373   def _CombineResults(results, requests, procedure):
374     """Combines pre-computed results for offline hosts with actual call results.
375
376     """
377     for name, req in requests.items():
378       if req.success and req.resp_status_code == http.HTTP_OK:
379         host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
380                                 node=name, call=procedure)
381       else:
382         # TODO: Better error reporting
383         if req.error:
384           msg = req.error
385         else:
386           msg = req.resp_body
387
388         logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
389         host_result = RpcResult(data=msg, failed=True, node=name,
390                                 call=procedure)
391
392       results[name] = host_result
393
394     return results
395
396   def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
397                _req_process_fn=None):
398     """Makes an RPC request to a number of nodes.
399
400     @type hosts: sequence
401     @param hosts: Hostnames
402     @type procedure: string
403     @param procedure: Request path
404     @type body: dictionary
405     @param body: dictionary with request bodies per host
406     @type read_timeout: int or None
407     @param read_timeout: Read timeout for request
408     @rtype: dictionary
409     @return: a dictionary mapping host names to rpc.RpcResult objects
410
411     """
412     assert read_timeout is not None, \
413       "Missing RPC read timeout for procedure '%s'" % procedure
414
415     if _req_process_fn is None:
416       _req_process_fn = http.client.ProcessRequests
417
418     (results, requests) = \
419       self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
420                             procedure, body, read_timeout)
421
422     _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
423
424     assert not frozenset(results).intersection(requests)
425
426     return self._CombineResults(results, requests, procedure)
427
428
429 class _RpcClientBase:
430   def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
431                _req_process_fn=None):
432     """Initializes this class.
433
434     """
435     proc = _RpcProcessor(resolver,
436                          netutils.GetDaemonPort(constants.NODED),
437                          lock_monitor_cb=lock_monitor_cb)
438     self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
439     self._encoder = compat.partial(self._EncodeArg, encoder_fn)
440
441   @staticmethod
442   def _EncodeArg(encoder_fn, (argkind, value)):
443     """Encode argument.
444
445     """
446     if argkind is None:
447       return value
448     else:
449       return encoder_fn(argkind)(value)
450
451   def _Call(self, cdef, node_list, args):
452     """Entry point for automatically generated RPC wrappers.
453
454     """
455     (procedure, _, resolver_opts, timeout, argdefs,
456      prep_fn, postproc_fn, _) = cdef
457
458     if callable(timeout):
459       read_timeout = timeout(args)
460     else:
461       read_timeout = timeout
462
463     if callable(resolver_opts):
464       req_resolver_opts = resolver_opts(args)
465     else:
466       req_resolver_opts = resolver_opts
467
468     if len(args) != len(argdefs):
469       raise errors.ProgrammerError("Number of passed arguments doesn't match")
470
471     enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
472     if prep_fn is None:
473       # for a no-op prep_fn, we serialise the body once, and then we
474       # reuse it in the dictionary values
475       body = serializer.DumpJson(enc_args)
476       pnbody = dict((n, body) for n in node_list)
477     else:
478       # for a custom prep_fn, we pass the encoded arguments and the
479       # node name to the prep_fn, and we serialise its return value
480       assert callable(prep_fn)
481       pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
482                     for n in node_list)
483
484     result = self._proc(node_list, procedure, pnbody, read_timeout,
485                         req_resolver_opts)
486
487     if postproc_fn:
488       return dict(map(lambda (key, value): (key, postproc_fn(value)),
489                       result.items()))
490     else:
491       return result
492
493
494 def _ObjectToDict(value):
495   """Converts an object to a dictionary.
496
497   @note: See L{objects}.
498
499   """
500   return value.ToDict()
501
502
503 def _ObjectListToDict(value):
504   """Converts a list of L{objects} to dictionaries.
505
506   """
507   return map(_ObjectToDict, value)
508
509
510 def _EncodeNodeToDiskDict(value):
511   """Encodes a dictionary with node name as key and disk objects as values.
512
513   """
514   return dict((name, _ObjectListToDict(disks))
515               for name, disks in value.items())
516
517
518 def _PrepareFileUpload(getents_fn, filename):
519   """Loads a file and prepares it for an upload to nodes.
520
521   """
522   statcb = utils.FileStatHelper()
523   data = _Compress(utils.ReadFile(filename, preread=statcb))
524   st = statcb.st
525
526   if getents_fn is None:
527     getents_fn = runtime.GetEnts
528
529   getents = getents_fn()
530
531   virt_filename = vcluster.MakeVirtualPath(filename)
532
533   return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid),
534           getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
535
536
537 def _PrepareFinalizeExportDisks(snap_disks):
538   """Encodes disks for finalizing export.
539
540   """
541   flat_disks = []
542
543   for disk in snap_disks:
544     if isinstance(disk, bool):
545       flat_disks.append(disk)
546     else:
547       flat_disks.append(disk.ToDict())
548
549   return flat_disks
550
551
552 def _EncodeImportExportIO((ieio, ieioargs)):
553   """Encodes import/export I/O information.
554
555   """
556   if ieio == constants.IEIO_RAW_DISK:
557     assert len(ieioargs) == 1
558     return (ieio, (ieioargs[0].ToDict(), ))
559
560   if ieio == constants.IEIO_SCRIPT:
561     assert len(ieioargs) == 2
562     return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
563
564   return (ieio, ieioargs)
565
566
567 def _EncodeBlockdevRename(value):
568   """Encodes information for renaming block devices.
569
570   """
571   return [(d.ToDict(), uid) for d, uid in value]
572
573
574 def MakeLegacyNodeInfo(data):
575   """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
576
577   Converts the data into a single dictionary. This is fine for most use cases,
578   but some require information from more than one volume group or hypervisor.
579
580   """
581   (bootid, (vg_info, ), (hv_info, )) = data
582
583   return utils.JoinDisjointDicts(utils.JoinDisjointDicts(vg_info, hv_info), {
584     "bootid": bootid,
585     })
586
587
588 def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
589   """Annotates just DRBD disks layouts.
590
591   """
592   assert disk.dev_type == constants.LD_DRBD8
593
594   disk.params = objects.FillDict(drbd_params, disk.params)
595   (dev_data, dev_meta) = disk.children
596   dev_data.params = objects.FillDict(data_params, dev_data.params)
597   dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
598
599   return disk
600
601
602 def _AnnotateDParamsGeneric(disk, (params, )):
603   """Generic disk parameter annotation routine.
604
605   """
606   assert disk.dev_type != constants.LD_DRBD8
607
608   disk.params = objects.FillDict(params, disk.params)
609
610   return disk
611
612
613 def AnnotateDiskParams(template, disks, disk_params):
614   """Annotates the disk objects with the disk parameters.
615
616   @param template: The disk template used
617   @param disks: The list of disks objects to annotate
618   @param disk_params: The disk paramaters for annotation
619   @returns: A list of disk objects annotated
620
621   """
622   ld_params = objects.Disk.ComputeLDParams(template, disk_params)
623
624   if template == constants.DT_DRBD8:
625     annotation_fn = _AnnotateDParamsDRBD
626   elif template == constants.DT_DISKLESS:
627     annotation_fn = lambda disk, _: disk
628   else:
629     annotation_fn = _AnnotateDParamsGeneric
630
631   return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
632
633
634 def _GetESFlag(cfg, nodename):
635   ni = cfg.GetNodeInfo(nodename)
636   if ni is None:
637     raise errors.OpPrereqError("Invalid node name %s" % nodename,
638                                errors.ECODE_NOENT)
639   return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]
640
641
642 def GetExclusiveStorageForNodeNames(cfg, nodelist):
643   """Return the exclusive storage flag for all the given nodes.
644
645   @type cfg: L{config.ConfigWriter}
646   @param cfg: cluster configuration
647   @type nodelist: list or tuple
648   @param nodelist: node names for which to read the flag
649   @rtype: dict
650   @return: mapping from node names to exclusive storage flags
651   @raise errors.OpPrereqError: if any given node name has no corresponding node
652
653   """
654   getflag = lambda n: _GetESFlag(cfg, n)
655   flags = map(getflag, nodelist)
656   return dict(zip(nodelist, flags))
657
658
659 #: Generic encoders
660 _ENCODERS = {
661   rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
662   rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
663   rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
664   rpc_defs.ED_COMPRESS: _Compress,
665   rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
666   rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
667   rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
668   }
669
670
671 class RpcRunner(_RpcClientBase,
672                 _generated_rpc.RpcClientDefault,
673                 _generated_rpc.RpcClientBootstrap,
674                 _generated_rpc.RpcClientDnsOnly,
675                 _generated_rpc.RpcClientConfig):
676   """RPC runner class.
677
678   """
679   def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
680     """Initialized the RPC runner.
681
682     @type cfg: L{config.ConfigWriter}
683     @param cfg: Configuration
684     @type lock_monitor_cb: callable
685     @param lock_monitor_cb: Lock monitor callback
686
687     """
688     self._cfg = cfg
689
690     encoders = _ENCODERS.copy()
691
692     encoders.update({
693       # Encoders requiring configuration object
694       rpc_defs.ED_INST_DICT: self._InstDict,
695       rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
696       rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
697       rpc_defs.ED_NIC_DICT: self._NicDict,
698
699       # Encoders annotating disk parameters
700       rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
701       rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
702
703       # Encoders with special requirements
704       rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
705       })
706
707     # Resolver using configuration
708     resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
709                               cfg.GetAllNodesInfo)
710
711     # Pylint doesn't recognize multiple inheritance properly, see
712     # <http://www.logilab.org/ticket/36586> and
713     # <http://www.logilab.org/ticket/35642>
714     # pylint: disable=W0233
715     _RpcClientBase.__init__(self, resolver, encoders.get,
716                             lock_monitor_cb=lock_monitor_cb,
717                             _req_process_fn=_req_process_fn)
718     _generated_rpc.RpcClientConfig.__init__(self)
719     _generated_rpc.RpcClientBootstrap.__init__(self)
720     _generated_rpc.RpcClientDnsOnly.__init__(self)
721     _generated_rpc.RpcClientDefault.__init__(self)
722
723   def _NicDict(self, nic):
724     """Convert the given nic to a dict and encapsulate netinfo
725
726     """
727     n = copy.deepcopy(nic)
728     if n.network:
729       net_uuid = self._cfg.LookupNetwork(n.network)
730       if net_uuid:
731         nobj = self._cfg.GetNetwork(net_uuid)
732         n.netinfo = objects.Network.ToDict(nobj)
733     return n.ToDict()
734
735   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
736     """Convert the given instance to a dict.
737
738     This is done via the instance's ToDict() method and additionally
739     we fill the hvparams with the cluster defaults.
740
741     @type instance: L{objects.Instance}
742     @param instance: an Instance object
743     @type hvp: dict or None
744     @param hvp: a dictionary with overridden hypervisor parameters
745     @type bep: dict or None
746     @param bep: a dictionary with overridden backend parameters
747     @type osp: dict or None
748     @param osp: a dictionary with overridden os parameters
749     @rtype: dict
750     @return: the instance dict, with the hvparams filled with the
751         cluster defaults
752
753     """
754     idict = instance.ToDict()
755     cluster = self._cfg.GetClusterInfo()
756     idict["hvparams"] = cluster.FillHV(instance)
757     if hvp is not None:
758       idict["hvparams"].update(hvp)
759     idict["beparams"] = cluster.FillBE(instance)
760     if bep is not None:
761       idict["beparams"].update(bep)
762     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
763     if osp is not None:
764       idict["osparams"].update(osp)
765     idict["disks"] = self._DisksDictDP((instance.disks, instance))
766     for nic in idict["nics"]:
767       nic["nicparams"] = objects.FillDict(
768         cluster.nicparams[constants.PP_DEFAULT],
769         nic["nicparams"])
770       network = nic.get("network", None)
771       if network:
772         net_uuid = self._cfg.LookupNetwork(network)
773         if net_uuid:
774           nobj = self._cfg.GetNetwork(net_uuid)
775           nic["netinfo"] = objects.Network.ToDict(nobj)
776     return idict
777
778   def _InstDictHvpBepDp(self, (instance, hvp, bep)):
779     """Wrapper for L{_InstDict}.
780
781     """
782     return self._InstDict(instance, hvp=hvp, bep=bep)
783
784   def _InstDictOspDp(self, (instance, osparams)):
785     """Wrapper for L{_InstDict}.
786
787     """
788     return self._InstDict(instance, osp=osparams)
789
790   def _DisksDictDP(self, (disks, instance)):
791     """Wrapper for L{AnnotateDiskParams}.
792
793     """
794     diskparams = self._cfg.GetInstanceDiskParams(instance)
795     return [disk.ToDict()
796             for disk in AnnotateDiskParams(instance.disk_template,
797                                            disks, diskparams)]
798
799   def _SingleDiskDictDP(self, (disk, instance)):
800     """Wrapper for L{AnnotateDiskParams}.
801
802     """
803     (anno_disk,) = self._DisksDictDP(([disk], instance))
804     return anno_disk
805
806
807 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
808   """RPC wrappers for job queue.
809
810   """
811   def __init__(self, context, address_list):
812     """Initializes this class.
813
814     """
815     if address_list is None:
816       resolver = compat.partial(_SsconfResolver, True)
817     else:
818       # Caller provided an address list
819       resolver = _StaticResolver(address_list)
820
821     _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
822                             lock_monitor_cb=context.glm.AddToLockMonitor)
823     _generated_rpc.RpcClientJobQueue.__init__(self)
824
825
826 class BootstrapRunner(_RpcClientBase,
827                       _generated_rpc.RpcClientBootstrap,
828                       _generated_rpc.RpcClientDnsOnly):
829   """RPC wrappers for bootstrapping.
830
831   """
832   def __init__(self):
833     """Initializes this class.
834
835     """
836     # Pylint doesn't recognize multiple inheritance properly, see
837     # <http://www.logilab.org/ticket/36586> and
838     # <http://www.logilab.org/ticket/35642>
839     # pylint: disable=W0233
840     _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
841                             _ENCODERS.get)
842     _generated_rpc.RpcClientBootstrap.__init__(self)
843     _generated_rpc.RpcClientDnsOnly.__init__(self)
844
845
846 class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
847   """RPC wrappers for calls using only DNS.
848
849   """
850   def __init__(self):
851     """Initialize this class.
852
853     """
854     _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
855                             _ENCODERS.get)
856     _generated_rpc.RpcClientDnsOnly.__init__(self)
857
858
859 class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
860   """RPC wrappers for L{config}.
861
862   """
863   def __init__(self, context, address_list, _req_process_fn=None,
864                _getents=None):
865     """Initializes this class.
866
867     """
868     if context:
869       lock_monitor_cb = context.glm.AddToLockMonitor
870     else:
871       lock_monitor_cb = None
872
873     if address_list is None:
874       resolver = compat.partial(_SsconfResolver, True)
875     else:
876       # Caller provided an address list
877       resolver = _StaticResolver(address_list)
878
879     encoders = _ENCODERS.copy()
880
881     encoders.update({
882       rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
883       })
884
885     _RpcClientBase.__init__(self, resolver, encoders.get,
886                             lock_monitor_cb=lock_monitor_cb,
887                             _req_process_fn=_req_process_fn)
888     _generated_rpc.RpcClientConfig.__init__(self)