Add cluster netmask parameter
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37 import pycurl
38 import threading
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49 from ganeti import compat
50
51 # pylint has a bug here, doesn't see this import
52 import ganeti.http.client  # pylint: disable=W0611
53
54
55 # Timeout for connecting to nodes (seconds)
56 _RPC_CONNECT_TIMEOUT = 5
57
58 _RPC_CLIENT_HEADERS = [
59   "Content-type: %s" % http.HTTP_APP_JSON,
60   "Expect:",
61   ]
62
63 # Various time constants for the timeout table
64 _TMO_URGENT = 60 # one minute
65 _TMO_FAST = 5 * 60 # five minutes
66 _TMO_NORMAL = 15 * 60 # 15 minutes
67 _TMO_SLOW = 3600 # one hour
68 _TMO_4HRS = 4 * 3600
69 _TMO_1DAY = 86400
70
71 # Timeout table that will be built later by decorators
72 # Guidelines for choosing timeouts:
73 # - call used during watcher: timeout -> 1min, _TMO_URGENT
74 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
75 # - other calls: 15 min, _TMO_NORMAL
76 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
77
78 _TIMEOUTS = {
79 }
80
81 #: Special value to describe an offline host
82 _OFFLINE = object()
83
84
85 def Init():
86   """Initializes the module-global HTTP client manager.
87
88   Must be called before using any RPC function and while exactly one thread is
89   running.
90
91   """
92   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
93   # one thread running. This check is just a safety measure -- it doesn't
94   # cover all cases.
95   assert threading.activeCount() == 1, \
96          "Found more than one active thread when initializing pycURL"
97
98   logging.info("Using PycURL %s", pycurl.version)
99
100   pycurl.global_init(pycurl.GLOBAL_ALL)
101
102
103 def Shutdown():
104   """Stops the module-global HTTP client manager.
105
106   Must be called before quitting the program and while exactly one thread is
107   running.
108
109   """
110   pycurl.global_cleanup()
111
112
113 def _ConfigRpcCurl(curl):
114   noded_cert = str(constants.NODED_CERT_FILE)
115
116   curl.setopt(pycurl.FOLLOWLOCATION, False)
117   curl.setopt(pycurl.CAINFO, noded_cert)
118   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
119   curl.setopt(pycurl.SSL_VERIFYPEER, True)
120   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
121   curl.setopt(pycurl.SSLCERT, noded_cert)
122   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
123   curl.setopt(pycurl.SSLKEY, noded_cert)
124   curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
125
126
127 # Aliasing this module avoids the following warning by epydoc: "Warning: No
128 # information available for ganeti.rpc._RpcThreadLocal's base threading.local"
129 _threading = threading
130
131
132 class _RpcThreadLocal(_threading.local):
133   def GetHttpClientPool(self):
134     """Returns a per-thread HTTP client pool.
135
136     @rtype: L{http.client.HttpClientPool}
137
138     """
139     try:
140       pool = self.hcp
141     except AttributeError:
142       pool = http.client.HttpClientPool(_ConfigRpcCurl)
143       self.hcp = pool
144
145     return pool
146
147
148 # Remove module alias (see above)
149 del _threading
150
151
152 _thread_local = _RpcThreadLocal()
153
154
155 def _RpcTimeout(secs):
156   """Timeout decorator.
157
158   When applied to a rpc call_* function, it updates the global timeout
159   table with the given function/timeout.
160
161   """
162   def decorator(f):
163     name = f.__name__
164     assert name.startswith("call_")
165     _TIMEOUTS[name[len("call_"):]] = secs
166     return f
167   return decorator
168
169
170 def RunWithRPC(fn):
171   """RPC-wrapper decorator.
172
173   When applied to a function, it runs it with the RPC system
174   initialized, and it shutsdown the system afterwards. This means the
175   function must be called without RPC being initialized.
176
177   """
178   def wrapper(*args, **kwargs):
179     Init()
180     try:
181       return fn(*args, **kwargs)
182     finally:
183       Shutdown()
184   return wrapper
185
186
187 def _Compress(data):
188   """Compresses a string for transport over RPC.
189
190   Small amounts of data are not compressed.
191
192   @type data: str
193   @param data: Data
194   @rtype: tuple
195   @return: Encoded data to send
196
197   """
198   # Small amounts of data are not compressed
199   if len(data) < 512:
200     return (constants.RPC_ENCODING_NONE, data)
201
202   # Compress with zlib and encode in base64
203   return (constants.RPC_ENCODING_ZLIB_BASE64,
204           base64.b64encode(zlib.compress(data, 3)))
205
206
207 class RpcResult(object):
208   """RPC Result class.
209
210   This class holds an RPC result. It is needed since in multi-node
211   calls we can't raise an exception just because one one out of many
212   failed, and therefore we use this class to encapsulate the result.
213
214   @ivar data: the data payload, for successful results, or None
215   @ivar call: the name of the RPC call
216   @ivar node: the name of the node to which we made the call
217   @ivar offline: whether the operation failed because the node was
218       offline, as opposed to actual failure; offline=True will always
219       imply failed=True, in order to allow simpler checking if
220       the user doesn't care about the exact failure mode
221   @ivar fail_msg: the error message if the call failed
222
223   """
224   def __init__(self, data=None, failed=False, offline=False,
225                call=None, node=None):
226     self.offline = offline
227     self.call = call
228     self.node = node
229
230     if offline:
231       self.fail_msg = "Node is marked offline"
232       self.data = self.payload = None
233     elif failed:
234       self.fail_msg = self._EnsureErr(data)
235       self.data = self.payload = None
236     else:
237       self.data = data
238       if not isinstance(self.data, (tuple, list)):
239         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
240                          type(self.data))
241         self.payload = None
242       elif len(data) != 2:
243         self.fail_msg = ("RPC layer error: invalid result length (%d), "
244                          "expected 2" % len(self.data))
245         self.payload = None
246       elif not self.data[0]:
247         self.fail_msg = self._EnsureErr(self.data[1])
248         self.payload = None
249       else:
250         # finally success
251         self.fail_msg = None
252         self.payload = data[1]
253
254     for attr_name in ["call", "data", "fail_msg",
255                       "node", "offline", "payload"]:
256       assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
257
258   @staticmethod
259   def _EnsureErr(val):
260     """Helper to ensure we return a 'True' value for error."""
261     if val:
262       return val
263     else:
264       return "No error information"
265
266   def Raise(self, msg, prereq=False, ecode=None):
267     """If the result has failed, raise an OpExecError.
268
269     This is used so that LU code doesn't have to check for each
270     result, but instead can call this function.
271
272     """
273     if not self.fail_msg:
274       return
275
276     if not msg: # one could pass None for default message
277       msg = ("Call '%s' to node '%s' has failed: %s" %
278              (self.call, self.node, self.fail_msg))
279     else:
280       msg = "%s: %s" % (msg, self.fail_msg)
281     if prereq:
282       ec = errors.OpPrereqError
283     else:
284       ec = errors.OpExecError
285     if ecode is not None:
286       args = (msg, ecode)
287     else:
288       args = (msg, )
289     raise ec(*args) # pylint: disable=W0142
290
291
292 def _SsconfResolver(node_list,
293                     ssc=ssconf.SimpleStore,
294                     nslookup_fn=netutils.Hostname.GetIP):
295   """Return addresses for given node names.
296
297   @type node_list: list
298   @param node_list: List of node names
299   @type ssc: class
300   @param ssc: SimpleStore class that is used to obtain node->ip mappings
301   @type nslookup_fn: callable
302   @param nslookup_fn: function use to do NS lookup
303   @rtype: list of tuple; (string, string)
304   @return: List of tuples containing node name and IP address
305
306   """
307   ss = ssc()
308   iplist = ss.GetNodePrimaryIPList()
309   family = ss.GetPrimaryIPFamily()
310   ipmap = dict(entry.split() for entry in iplist)
311
312   result = []
313   for node in node_list:
314     ip = ipmap.get(node)
315     if ip is None:
316       ip = nslookup_fn(node, family=family)
317     result.append((node, ip))
318
319   return result
320
321
322 class _StaticResolver:
323   def __init__(self, addresses):
324     """Initializes this class.
325
326     """
327     self._addresses = addresses
328
329   def __call__(self, hosts):
330     """Returns static addresses for hosts.
331
332     """
333     assert len(hosts) == len(self._addresses)
334     return zip(hosts, self._addresses)
335
336
337 def _CheckConfigNode(name, node):
338   """Checks if a node is online.
339
340   @type name: string
341   @param name: Node name
342   @type node: L{objects.Node} or None
343   @param node: Node object
344
345   """
346   if node is None:
347     # Depend on DNS for name resolution
348     ip = name
349   elif node.offline:
350     ip = _OFFLINE
351   else:
352     ip = node.primary_ip
353   return (name, ip)
354
355
356 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
357   """Calculate node addresses using configuration.
358
359   """
360   # Special case for single-host lookups
361   if len(hosts) == 1:
362     (name, ) = hosts
363     return [_CheckConfigNode(name, single_node_fn(name))]
364   else:
365     all_nodes = all_nodes_fn()
366     return [_CheckConfigNode(name, all_nodes.get(name, None))
367             for name in hosts]
368
369
370 class _RpcProcessor:
371   def __init__(self, resolver, port, lock_monitor_cb=None):
372     """Initializes this class.
373
374     @param resolver: callable accepting a list of hostnames, returning a list
375       of tuples containing name and IP address (IP address can be the name or
376       the special value L{_OFFLINE} to mark offline machines)
377     @type port: int
378     @param port: TCP port
379     @param lock_monitor_cb: Callable for registering with lock monitor
380
381     """
382     self._resolver = resolver
383     self._port = port
384     self._lock_monitor_cb = lock_monitor_cb
385
386   @staticmethod
387   def _PrepareRequests(hosts, port, procedure, body, read_timeout):
388     """Prepares requests by sorting offline hosts into separate list.
389
390     """
391     results = {}
392     requests = {}
393
394     for (name, ip) in hosts:
395       if ip is _OFFLINE:
396         # Node is marked as offline
397         results[name] = RpcResult(node=name, offline=True, call=procedure)
398       else:
399         requests[name] = \
400           http.client.HttpClientRequest(str(ip), port,
401                                         http.HTTP_PUT, str("/%s" % procedure),
402                                         headers=_RPC_CLIENT_HEADERS,
403                                         post_data=body,
404                                         read_timeout=read_timeout,
405                                         nicename="%s/%s" % (name, procedure))
406
407     return (results, requests)
408
409   @staticmethod
410   def _CombineResults(results, requests, procedure):
411     """Combines pre-computed results for offline hosts with actual call results.
412
413     """
414     for name, req in requests.items():
415       if req.success and req.resp_status_code == http.HTTP_OK:
416         host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
417                                 node=name, call=procedure)
418       else:
419         # TODO: Better error reporting
420         if req.error:
421           msg = req.error
422         else:
423           msg = req.resp_body
424
425         logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
426         host_result = RpcResult(data=msg, failed=True, node=name,
427                                 call=procedure)
428
429       results[name] = host_result
430
431     return results
432
433   def __call__(self, hosts, procedure, body, read_timeout=None, http_pool=None):
434     """Makes an RPC request to a number of nodes.
435
436     @type hosts: sequence
437     @param hosts: Hostnames
438     @type procedure: string
439     @param procedure: Request path
440     @type body: string
441     @param body: Request body
442     @type read_timeout: int or None
443     @param read_timeout: Read timeout for request
444
445     """
446     assert procedure in _TIMEOUTS, "RPC call not declared in the timeouts table"
447
448     if not http_pool:
449       http_pool = _thread_local.GetHttpClientPool()
450
451     if read_timeout is None:
452       read_timeout = _TIMEOUTS[procedure]
453
454     (results, requests) = \
455       self._PrepareRequests(self._resolver(hosts), self._port, procedure,
456                             str(body), read_timeout)
457
458     http_pool.ProcessRequests(requests.values(),
459                               lock_monitor_cb=self._lock_monitor_cb)
460
461     assert not frozenset(results).intersection(requests)
462
463     return self._CombineResults(results, requests, procedure)
464
465
466 def _EncodeImportExportIO(ieio, ieioargs):
467   """Encodes import/export I/O information.
468
469   """
470   if ieio == constants.IEIO_RAW_DISK:
471     assert len(ieioargs) == 1
472     return (ieioargs[0].ToDict(), )
473
474   if ieio == constants.IEIO_SCRIPT:
475     assert len(ieioargs) == 2
476     return (ieioargs[0].ToDict(), ieioargs[1])
477
478   return ieioargs
479
480
481 class RpcRunner(object):
482   """RPC runner class.
483
484   """
485   def __init__(self, context):
486     """Initialized the RPC runner.
487
488     @type context: C{masterd.GanetiContext}
489     @param context: Ganeti context
490
491     """
492     self._cfg = context.cfg
493     self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
494                                               self._cfg.GetNodeInfo,
495                                               self._cfg.GetAllNodesInfo),
496                                netutils.GetDaemonPort(constants.NODED),
497                                lock_monitor_cb=context.glm.AddToLockMonitor)
498
499   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
500     """Convert the given instance to a dict.
501
502     This is done via the instance's ToDict() method and additionally
503     we fill the hvparams with the cluster defaults.
504
505     @type instance: L{objects.Instance}
506     @param instance: an Instance object
507     @type hvp: dict or None
508     @param hvp: a dictionary with overridden hypervisor parameters
509     @type bep: dict or None
510     @param bep: a dictionary with overridden backend parameters
511     @type osp: dict or None
512     @param osp: a dictionary with overridden os parameters
513     @rtype: dict
514     @return: the instance dict, with the hvparams filled with the
515         cluster defaults
516
517     """
518     idict = instance.ToDict()
519     cluster = self._cfg.GetClusterInfo()
520     idict["hvparams"] = cluster.FillHV(instance)
521     if hvp is not None:
522       idict["hvparams"].update(hvp)
523     idict["beparams"] = cluster.FillBE(instance)
524     if bep is not None:
525       idict["beparams"].update(bep)
526     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
527     if osp is not None:
528       idict["osparams"].update(osp)
529     for nic in idict["nics"]:
530       nic['nicparams'] = objects.FillDict(
531         cluster.nicparams[constants.PP_DEFAULT],
532         nic['nicparams'])
533     return idict
534
535   def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
536     """Helper for making a multi-node call
537
538     """
539     body = serializer.DumpJson(args, indent=False)
540     return self._proc(node_list, procedure, body, read_timeout=read_timeout)
541
542   @staticmethod
543   def _StaticMultiNodeCall(node_list, procedure, args,
544                            address_list=None, read_timeout=None):
545     """Helper for making a multi-node static call
546
547     """
548     body = serializer.DumpJson(args, indent=False)
549
550     if address_list is None:
551       resolver = _SsconfResolver
552     else:
553       # Caller provided an address list
554       resolver = _StaticResolver(address_list)
555
556     proc = _RpcProcessor(resolver,
557                          netutils.GetDaemonPort(constants.NODED))
558     return proc(node_list, procedure, body, read_timeout=read_timeout)
559
560   def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
561     """Helper for making a single-node call
562
563     """
564     body = serializer.DumpJson(args, indent=False)
565     return self._proc([node], procedure, body, read_timeout=read_timeout)[node]
566
567   @classmethod
568   def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
569     """Helper for making a single-node static call
570
571     """
572     body = serializer.DumpJson(args, indent=False)
573     proc = _RpcProcessor(_SsconfResolver,
574                          netutils.GetDaemonPort(constants.NODED))
575     return proc([node], procedure, body, read_timeout=read_timeout)[node]
576
577   #
578   # Begin RPC calls
579   #
580
581   @_RpcTimeout(_TMO_URGENT)
582   def call_bdev_sizes(self, node_list, devices):
583     """Gets the sizes of requested block devices present on a node
584
585     This is a multi-node call.
586
587     """
588     return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
589
590   @_RpcTimeout(_TMO_URGENT)
591   def call_lv_list(self, node_list, vg_name):
592     """Gets the logical volumes present in a given volume group.
593
594     This is a multi-node call.
595
596     """
597     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
598
599   @_RpcTimeout(_TMO_URGENT)
600   def call_vg_list(self, node_list):
601     """Gets the volume group list.
602
603     This is a multi-node call.
604
605     """
606     return self._MultiNodeCall(node_list, "vg_list", [])
607
608   @_RpcTimeout(_TMO_NORMAL)
609   def call_storage_list(self, node_list, su_name, su_args, name, fields):
610     """Get list of storage units.
611
612     This is a multi-node call.
613
614     """
615     return self._MultiNodeCall(node_list, "storage_list",
616                                [su_name, su_args, name, fields])
617
618   @_RpcTimeout(_TMO_NORMAL)
619   def call_storage_modify(self, node, su_name, su_args, name, changes):
620     """Modify a storage unit.
621
622     This is a single-node call.
623
624     """
625     return self._SingleNodeCall(node, "storage_modify",
626                                 [su_name, su_args, name, changes])
627
628   @_RpcTimeout(_TMO_NORMAL)
629   def call_storage_execute(self, node, su_name, su_args, name, op):
630     """Executes an operation on a storage unit.
631
632     This is a single-node call.
633
634     """
635     return self._SingleNodeCall(node, "storage_execute",
636                                 [su_name, su_args, name, op])
637
638   @_RpcTimeout(_TMO_URGENT)
639   def call_bridges_exist(self, node, bridges_list):
640     """Checks if a node has all the bridges given.
641
642     This method checks if all bridges given in the bridges_list are
643     present on the remote node, so that an instance that uses interfaces
644     on those bridges can be started.
645
646     This is a single-node call.
647
648     """
649     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
650
651   @_RpcTimeout(_TMO_NORMAL)
652   def call_instance_start(self, node, instance, hvp, bep, startup_paused):
653     """Starts an instance.
654
655     This is a single-node call.
656
657     """
658     idict = self._InstDict(instance, hvp=hvp, bep=bep)
659     return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
660
661   @_RpcTimeout(_TMO_NORMAL)
662   def call_instance_shutdown(self, node, instance, timeout):
663     """Stops an instance.
664
665     This is a single-node call.
666
667     """
668     return self._SingleNodeCall(node, "instance_shutdown",
669                                 [self._InstDict(instance), timeout])
670
671   @_RpcTimeout(_TMO_NORMAL)
672   def call_migration_info(self, node, instance):
673     """Gather the information necessary to prepare an instance migration.
674
675     This is a single-node call.
676
677     @type node: string
678     @param node: the node on which the instance is currently running
679     @type instance: C{objects.Instance}
680     @param instance: the instance definition
681
682     """
683     return self._SingleNodeCall(node, "migration_info",
684                                 [self._InstDict(instance)])
685
686   @_RpcTimeout(_TMO_NORMAL)
687   def call_accept_instance(self, node, instance, info, target):
688     """Prepare a node to accept an instance.
689
690     This is a single-node call.
691
692     @type node: string
693     @param node: the target node for the migration
694     @type instance: C{objects.Instance}
695     @param instance: the instance definition
696     @type info: opaque/hypervisor specific (string/data)
697     @param info: result for the call_migration_info call
698     @type target: string
699     @param target: target hostname (usually ip address) (on the node itself)
700
701     """
702     return self._SingleNodeCall(node, "accept_instance",
703                                 [self._InstDict(instance), info, target])
704
705   @_RpcTimeout(_TMO_NORMAL)
706   def call_instance_finalize_migration_dst(self, node, instance, info, success):
707     """Finalize any target-node migration specific operation.
708
709     This is called both in case of a successful migration and in case of error
710     (in which case it should abort the migration).
711
712     This is a single-node call.
713
714     @type node: string
715     @param node: the target node for the migration
716     @type instance: C{objects.Instance}
717     @param instance: the instance definition
718     @type info: opaque/hypervisor specific (string/data)
719     @param info: result for the call_migration_info call
720     @type success: boolean
721     @param success: whether the migration was a success or a failure
722
723     """
724     return self._SingleNodeCall(node, "instance_finalize_migration_dst",
725                                 [self._InstDict(instance), info, success])
726
727   @_RpcTimeout(_TMO_SLOW)
728   def call_instance_migrate(self, node, instance, target, live):
729     """Migrate an instance.
730
731     This is a single-node call.
732
733     @type node: string
734     @param node: the node on which the instance is currently running
735     @type instance: C{objects.Instance}
736     @param instance: the instance definition
737     @type target: string
738     @param target: the target node name
739     @type live: boolean
740     @param live: whether the migration should be done live or not (the
741         interpretation of this parameter is left to the hypervisor)
742
743     """
744     return self._SingleNodeCall(node, "instance_migrate",
745                                 [self._InstDict(instance), target, live])
746
747   @_RpcTimeout(_TMO_SLOW)
748   def call_instance_finalize_migration_src(self, node, instance, success, live):
749     """Finalize the instance migration on the source node.
750
751     This is a single-node call.
752
753     @type instance: L{objects.Instance}
754     @param instance: the instance that was migrated
755     @type success: bool
756     @param success: whether the migration succeeded or not
757     @type live: bool
758     @param live: whether the user requested a live migration or not
759
760     """
761     return self._SingleNodeCall(node, "instance_finalize_migration_src",
762                                 [self._InstDict(instance), success, live])
763
764   @_RpcTimeout(_TMO_SLOW)
765   def call_instance_get_migration_status(self, node, instance):
766     """Report migration status.
767
768     This is a single-node call that must be executed on the source node.
769
770     @type instance: L{objects.Instance}
771     @param instance: the instance that is being migrated
772     @rtype: L{objects.MigrationStatus}
773     @return: the status of the current migration (one of
774              L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
775              progress info that can be retrieved from the hypervisor
776
777     """
778     result = self._SingleNodeCall(node, "instance_get_migration_status",
779                                   [self._InstDict(instance)])
780     if not result.fail_msg and result.payload is not None:
781       result.payload = objects.MigrationStatus.FromDict(result.payload)
782     return result
783
784   @_RpcTimeout(_TMO_NORMAL)
785   def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
786     """Reboots an instance.
787
788     This is a single-node call.
789
790     """
791     return self._SingleNodeCall(node, "instance_reboot",
792                                 [self._InstDict(inst), reboot_type,
793                                  shutdown_timeout])
794
795   @_RpcTimeout(_TMO_1DAY)
796   def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
797     """Installs an OS on the given instance.
798
799     This is a single-node call.
800
801     """
802     return self._SingleNodeCall(node, "instance_os_add",
803                                 [self._InstDict(inst, osp=osparams),
804                                  reinstall, debug])
805
806   @_RpcTimeout(_TMO_SLOW)
807   def call_instance_run_rename(self, node, inst, old_name, debug):
808     """Run the OS rename script for an instance.
809
810     This is a single-node call.
811
812     """
813     return self._SingleNodeCall(node, "instance_run_rename",
814                                 [self._InstDict(inst), old_name, debug])
815
816   @_RpcTimeout(_TMO_URGENT)
817   def call_instance_info(self, node, instance, hname):
818     """Returns information about a single instance.
819
820     This is a single-node call.
821
822     @type node: list
823     @param node: the list of nodes to query
824     @type instance: string
825     @param instance: the instance name
826     @type hname: string
827     @param hname: the hypervisor type of the instance
828
829     """
830     return self._SingleNodeCall(node, "instance_info", [instance, hname])
831
832   @_RpcTimeout(_TMO_NORMAL)
833   def call_instance_migratable(self, node, instance):
834     """Checks whether the given instance can be migrated.
835
836     This is a single-node call.
837
838     @param node: the node to query
839     @type instance: L{objects.Instance}
840     @param instance: the instance to check
841
842
843     """
844     return self._SingleNodeCall(node, "instance_migratable",
845                                 [self._InstDict(instance)])
846
847   @_RpcTimeout(_TMO_URGENT)
848   def call_all_instances_info(self, node_list, hypervisor_list):
849     """Returns information about all instances on the given nodes.
850
851     This is a multi-node call.
852
853     @type node_list: list
854     @param node_list: the list of nodes to query
855     @type hypervisor_list: list
856     @param hypervisor_list: the hypervisors to query for instances
857
858     """
859     return self._MultiNodeCall(node_list, "all_instances_info",
860                                [hypervisor_list])
861
862   @_RpcTimeout(_TMO_URGENT)
863   def call_instance_list(self, node_list, hypervisor_list):
864     """Returns the list of running instances on a given node.
865
866     This is a multi-node call.
867
868     @type node_list: list
869     @param node_list: the list of nodes to query
870     @type hypervisor_list: list
871     @param hypervisor_list: the hypervisors to query for instances
872
873     """
874     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
875
876   @_RpcTimeout(_TMO_FAST)
877   def call_node_tcp_ping(self, node, source, target, port, timeout,
878                          live_port_needed):
879     """Do a TcpPing on the remote node
880
881     This is a single-node call.
882
883     """
884     return self._SingleNodeCall(node, "node_tcp_ping",
885                                 [source, target, port, timeout,
886                                  live_port_needed])
887
888   @_RpcTimeout(_TMO_FAST)
889   def call_node_has_ip_address(self, node, address):
890     """Checks if a node has the given IP address.
891
892     This is a single-node call.
893
894     """
895     return self._SingleNodeCall(node, "node_has_ip_address", [address])
896
897   @_RpcTimeout(_TMO_URGENT)
898   def call_node_info(self, node_list, vg_name, hypervisor_type):
899     """Return node information.
900
901     This will return memory information and volume group size and free
902     space.
903
904     This is a multi-node call.
905
906     @type node_list: list
907     @param node_list: the list of nodes to query
908     @type vg_name: C{string}
909     @param vg_name: the name of the volume group to ask for disk space
910         information
911     @type hypervisor_type: C{str}
912     @param hypervisor_type: the name of the hypervisor to ask for
913         memory information
914
915     """
916     return self._MultiNodeCall(node_list, "node_info",
917                                [vg_name, hypervisor_type])
918
919   @_RpcTimeout(_TMO_NORMAL)
920   def call_etc_hosts_modify(self, node, mode, name, ip):
921     """Modify hosts file with name
922
923     @type node: string
924     @param node: The node to call
925     @type mode: string
926     @param mode: The mode to operate. Currently "add" or "remove"
927     @type name: string
928     @param name: The host name to be modified
929     @type ip: string
930     @param ip: The ip of the entry (just valid if mode is "add")
931
932     """
933     return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
934
935   @_RpcTimeout(_TMO_NORMAL)
936   def call_node_verify(self, node_list, checkdict, cluster_name):
937     """Request verification of given parameters.
938
939     This is a multi-node call.
940
941     """
942     return self._MultiNodeCall(node_list, "node_verify",
943                                [checkdict, cluster_name])
944
945   @classmethod
946   @_RpcTimeout(_TMO_FAST)
947   def call_node_start_master_daemons(cls, node, no_voting):
948     """Starts master daemons on a node.
949
950     This is a single-node call.
951
952     """
953     return cls._StaticSingleNodeCall(node, "node_start_master_daemons",
954                                      [no_voting])
955
956   @classmethod
957   @_RpcTimeout(_TMO_FAST)
958   def call_node_activate_master_ip(cls, node):
959     """Activates master IP on a node.
960
961     This is a single-node call.
962
963     """
964     return cls._StaticSingleNodeCall(node, "node_activate_master_ip", [])
965
966   @classmethod
967   @_RpcTimeout(_TMO_FAST)
968   def call_node_stop_master(cls, node):
969     """Deactivates master IP and stops master daemons on a node.
970
971     This is a single-node call.
972
973     """
974     return cls._StaticSingleNodeCall(node, "node_stop_master", [])
975
976   @classmethod
977   @_RpcTimeout(_TMO_FAST)
978   def call_node_deactivate_master_ip(cls, node):
979     """Deactivates master IP on a node.
980
981     This is a single-node call.
982
983     """
984     return cls._StaticSingleNodeCall(node, "node_deactivate_master_ip", [])
985
986   @classmethod
987   @_RpcTimeout(_TMO_FAST)
988   def call_node_change_master_netmask(cls, node, netmask):
989     """Change master IP netmask.
990
991     This is a single-node call.
992
993     """
994     return cls._StaticSingleNodeCall(node, "node_change_master_netmask",
995                   [netmask])
996
997   @classmethod
998   @_RpcTimeout(_TMO_URGENT)
999   def call_master_info(cls, node_list):
1000     """Query master info.
1001
1002     This is a multi-node call.
1003
1004     """
1005     # TODO: should this method query down nodes?
1006     return cls._StaticMultiNodeCall(node_list, "master_info", [])
1007
1008   @classmethod
1009   @_RpcTimeout(_TMO_URGENT)
1010   def call_version(cls, node_list):
1011     """Query node version.
1012
1013     This is a multi-node call.
1014
1015     """
1016     return cls._StaticMultiNodeCall(node_list, "version", [])
1017
1018   @_RpcTimeout(_TMO_NORMAL)
1019   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
1020     """Request creation of a given block device.
1021
1022     This is a single-node call.
1023
1024     """
1025     return self._SingleNodeCall(node, "blockdev_create",
1026                                 [bdev.ToDict(), size, owner, on_primary, info])
1027
1028   @_RpcTimeout(_TMO_SLOW)
1029   def call_blockdev_wipe(self, node, bdev, offset, size):
1030     """Request wipe at given offset with given size of a block device.
1031
1032     This is a single-node call.
1033
1034     """
1035     return self._SingleNodeCall(node, "blockdev_wipe",
1036                                 [bdev.ToDict(), offset, size])
1037
1038   @_RpcTimeout(_TMO_NORMAL)
1039   def call_blockdev_remove(self, node, bdev):
1040     """Request removal of a given block device.
1041
1042     This is a single-node call.
1043
1044     """
1045     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
1046
1047   @_RpcTimeout(_TMO_NORMAL)
1048   def call_blockdev_rename(self, node, devlist):
1049     """Request rename of the given block devices.
1050
1051     This is a single-node call.
1052
1053     """
1054     return self._SingleNodeCall(node, "blockdev_rename",
1055                                 [(d.ToDict(), uid) for d, uid in devlist])
1056
1057   @_RpcTimeout(_TMO_NORMAL)
1058   def call_blockdev_pause_resume_sync(self, node, disks, pause):
1059     """Request a pause/resume of given block device.
1060
1061     This is a single-node call.
1062
1063     """
1064     return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1065                                 [[bdev.ToDict() for bdev in disks], pause])
1066
1067   @_RpcTimeout(_TMO_NORMAL)
1068   def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1069     """Request assembling of a given block device.
1070
1071     This is a single-node call.
1072
1073     """
1074     return self._SingleNodeCall(node, "blockdev_assemble",
1075                                 [disk.ToDict(), owner, on_primary, idx])
1076
1077   @_RpcTimeout(_TMO_NORMAL)
1078   def call_blockdev_shutdown(self, node, disk):
1079     """Request shutdown of a given block device.
1080
1081     This is a single-node call.
1082
1083     """
1084     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1085
1086   @_RpcTimeout(_TMO_NORMAL)
1087   def call_blockdev_addchildren(self, node, bdev, ndevs):
1088     """Request adding a list of children to a (mirroring) device.
1089
1090     This is a single-node call.
1091
1092     """
1093     return self._SingleNodeCall(node, "blockdev_addchildren",
1094                                 [bdev.ToDict(),
1095                                  [disk.ToDict() for disk in ndevs]])
1096
1097   @_RpcTimeout(_TMO_NORMAL)
1098   def call_blockdev_removechildren(self, node, bdev, ndevs):
1099     """Request removing a list of children from a (mirroring) device.
1100
1101     This is a single-node call.
1102
1103     """
1104     return self._SingleNodeCall(node, "blockdev_removechildren",
1105                                 [bdev.ToDict(),
1106                                  [disk.ToDict() for disk in ndevs]])
1107
1108   @_RpcTimeout(_TMO_NORMAL)
1109   def call_blockdev_getmirrorstatus(self, node, disks):
1110     """Request status of a (mirroring) device.
1111
1112     This is a single-node call.
1113
1114     """
1115     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1116                                   [dsk.ToDict() for dsk in disks])
1117     if not result.fail_msg:
1118       result.payload = [objects.BlockDevStatus.FromDict(i)
1119                         for i in result.payload]
1120     return result
1121
1122   @_RpcTimeout(_TMO_NORMAL)
1123   def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1124     """Request status of (mirroring) devices from multiple nodes.
1125
1126     This is a multi-node call.
1127
1128     """
1129     result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1130                                  [dict((name, [dsk.ToDict() for dsk in disks])
1131                                        for name, disks in node_disks.items())])
1132     for nres in result.values():
1133       if nres.fail_msg:
1134         continue
1135
1136       for idx, (success, status) in enumerate(nres.payload):
1137         if success:
1138           nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1139
1140     return result
1141
1142   @_RpcTimeout(_TMO_NORMAL)
1143   def call_blockdev_find(self, node, disk):
1144     """Request identification of a given block device.
1145
1146     This is a single-node call.
1147
1148     """
1149     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1150     if not result.fail_msg and result.payload is not None:
1151       result.payload = objects.BlockDevStatus.FromDict(result.payload)
1152     return result
1153
1154   @_RpcTimeout(_TMO_NORMAL)
1155   def call_blockdev_close(self, node, instance_name, disks):
1156     """Closes the given block devices.
1157
1158     This is a single-node call.
1159
1160     """
1161     params = [instance_name, [cf.ToDict() for cf in disks]]
1162     return self._SingleNodeCall(node, "blockdev_close", params)
1163
1164   @_RpcTimeout(_TMO_NORMAL)
1165   def call_blockdev_getsize(self, node, disks):
1166     """Returns the size of the given disks.
1167
1168     This is a single-node call.
1169
1170     """
1171     params = [[cf.ToDict() for cf in disks]]
1172     return self._SingleNodeCall(node, "blockdev_getsize", params)
1173
1174   @_RpcTimeout(_TMO_NORMAL)
1175   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1176     """Disconnects the network of the given drbd devices.
1177
1178     This is a multi-node call.
1179
1180     """
1181     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1182                                [nodes_ip, [cf.ToDict() for cf in disks]])
1183
1184   @_RpcTimeout(_TMO_NORMAL)
1185   def call_drbd_attach_net(self, node_list, nodes_ip,
1186                            disks, instance_name, multimaster):
1187     """Disconnects the given drbd devices.
1188
1189     This is a multi-node call.
1190
1191     """
1192     return self._MultiNodeCall(node_list, "drbd_attach_net",
1193                                [nodes_ip, [cf.ToDict() for cf in disks],
1194                                 instance_name, multimaster])
1195
1196   @_RpcTimeout(_TMO_SLOW)
1197   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1198     """Waits for the synchronization of drbd devices is complete.
1199
1200     This is a multi-node call.
1201
1202     """
1203     return self._MultiNodeCall(node_list, "drbd_wait_sync",
1204                                [nodes_ip, [cf.ToDict() for cf in disks]])
1205
1206   @_RpcTimeout(_TMO_URGENT)
1207   def call_drbd_helper(self, node_list):
1208     """Gets drbd helper.
1209
1210     This is a multi-node call.
1211
1212     """
1213     return self._MultiNodeCall(node_list, "drbd_helper", [])
1214
1215   @classmethod
1216   @_RpcTimeout(_TMO_NORMAL)
1217   def call_upload_file(cls, node_list, file_name, address_list=None):
1218     """Upload a file.
1219
1220     The node will refuse the operation in case the file is not on the
1221     approved file list.
1222
1223     This is a multi-node call.
1224
1225     @type node_list: list
1226     @param node_list: the list of node names to upload to
1227     @type file_name: str
1228     @param file_name: the filename to upload
1229     @type address_list: list or None
1230     @keyword address_list: an optional list of node addresses, in order
1231         to optimize the RPC speed
1232
1233     """
1234     file_contents = utils.ReadFile(file_name)
1235     data = _Compress(file_contents)
1236     st = os.stat(file_name)
1237     getents = runtime.GetEnts()
1238     params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1239               getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1240     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1241                                     address_list=address_list)
1242
1243   @classmethod
1244   @_RpcTimeout(_TMO_NORMAL)
1245   def call_write_ssconf_files(cls, node_list, values):
1246     """Write ssconf files.
1247
1248     This is a multi-node call.
1249
1250     """
1251     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1252
1253   @_RpcTimeout(_TMO_NORMAL)
1254   def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1255     """Runs OOB.
1256
1257     This is a single-node call.
1258
1259     """
1260     return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1261                                                   remote_node, timeout])
1262
1263   @_RpcTimeout(_TMO_FAST)
1264   def call_os_diagnose(self, node_list):
1265     """Request a diagnose of OS definitions.
1266
1267     This is a multi-node call.
1268
1269     """
1270     return self._MultiNodeCall(node_list, "os_diagnose", [])
1271
1272   @_RpcTimeout(_TMO_FAST)
1273   def call_os_get(self, node, name):
1274     """Returns an OS definition.
1275
1276     This is a single-node call.
1277
1278     """
1279     result = self._SingleNodeCall(node, "os_get", [name])
1280     if not result.fail_msg and isinstance(result.payload, dict):
1281       result.payload = objects.OS.FromDict(result.payload)
1282     return result
1283
1284   @_RpcTimeout(_TMO_FAST)
1285   def call_os_validate(self, required, nodes, name, checks, params):
1286     """Run a validation routine for a given OS.
1287
1288     This is a multi-node call.
1289
1290     """
1291     return self._MultiNodeCall(nodes, "os_validate",
1292                                [required, name, checks, params])
1293
1294   @_RpcTimeout(_TMO_NORMAL)
1295   def call_hooks_runner(self, node_list, hpath, phase, env):
1296     """Call the hooks runner.
1297
1298     Args:
1299       - op: the OpCode instance
1300       - env: a dictionary with the environment
1301
1302     This is a multi-node call.
1303
1304     """
1305     params = [hpath, phase, env]
1306     return self._MultiNodeCall(node_list, "hooks_runner", params)
1307
1308   @_RpcTimeout(_TMO_NORMAL)
1309   def call_iallocator_runner(self, node, name, idata):
1310     """Call an iallocator on a remote node
1311
1312     Args:
1313       - name: the iallocator name
1314       - input: the json-encoded input string
1315
1316     This is a single-node call.
1317
1318     """
1319     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1320
1321   @_RpcTimeout(_TMO_NORMAL)
1322   def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
1323     """Request a snapshot of the given block device.
1324
1325     This is a single-node call.
1326
1327     """
1328     return self._SingleNodeCall(node, "blockdev_grow",
1329                                 [cf_bdev.ToDict(), amount, dryrun])
1330
1331   @_RpcTimeout(_TMO_1DAY)
1332   def call_blockdev_export(self, node, cf_bdev,
1333                            dest_node, dest_path, cluster_name):
1334     """Export a given disk to another node.
1335
1336     This is a single-node call.
1337
1338     """
1339     return self._SingleNodeCall(node, "blockdev_export",
1340                                 [cf_bdev.ToDict(), dest_node, dest_path,
1341                                  cluster_name])
1342
1343   @_RpcTimeout(_TMO_NORMAL)
1344   def call_blockdev_snapshot(self, node, cf_bdev):
1345     """Request a snapshot of the given block device.
1346
1347     This is a single-node call.
1348
1349     """
1350     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1351
1352   @_RpcTimeout(_TMO_NORMAL)
1353   def call_finalize_export(self, node, instance, snap_disks):
1354     """Request the completion of an export operation.
1355
1356     This writes the export config file, etc.
1357
1358     This is a single-node call.
1359
1360     """
1361     flat_disks = []
1362     for disk in snap_disks:
1363       if isinstance(disk, bool):
1364         flat_disks.append(disk)
1365       else:
1366         flat_disks.append(disk.ToDict())
1367
1368     return self._SingleNodeCall(node, "finalize_export",
1369                                 [self._InstDict(instance), flat_disks])
1370
1371   @_RpcTimeout(_TMO_FAST)
1372   def call_export_info(self, node, path):
1373     """Queries the export information in a given path.
1374
1375     This is a single-node call.
1376
1377     """
1378     return self._SingleNodeCall(node, "export_info", [path])
1379
1380   @_RpcTimeout(_TMO_FAST)
1381   def call_export_list(self, node_list):
1382     """Gets the stored exports list.
1383
1384     This is a multi-node call.
1385
1386     """
1387     return self._MultiNodeCall(node_list, "export_list", [])
1388
1389   @_RpcTimeout(_TMO_FAST)
1390   def call_export_remove(self, node, export):
1391     """Requests removal of a given export.
1392
1393     This is a single-node call.
1394
1395     """
1396     return self._SingleNodeCall(node, "export_remove", [export])
1397
1398   @classmethod
1399   @_RpcTimeout(_TMO_NORMAL)
1400   def call_node_leave_cluster(cls, node, modify_ssh_setup):
1401     """Requests a node to clean the cluster information it has.
1402
1403     This will remove the configuration information from the ganeti data
1404     dir.
1405
1406     This is a single-node call.
1407
1408     """
1409     return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1410                                      [modify_ssh_setup])
1411
1412   @_RpcTimeout(_TMO_FAST)
1413   def call_node_volumes(self, node_list):
1414     """Gets all volumes on node(s).
1415
1416     This is a multi-node call.
1417
1418     """
1419     return self._MultiNodeCall(node_list, "node_volumes", [])
1420
1421   @_RpcTimeout(_TMO_FAST)
1422   def call_node_demote_from_mc(self, node):
1423     """Demote a node from the master candidate role.
1424
1425     This is a single-node call.
1426
1427     """
1428     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1429
1430   @_RpcTimeout(_TMO_NORMAL)
1431   def call_node_powercycle(self, node, hypervisor):
1432     """Tries to powercycle a node.
1433
1434     This is a single-node call.
1435
1436     """
1437     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1438
1439   @_RpcTimeout(None)
1440   def call_test_delay(self, node_list, duration):
1441     """Sleep for a fixed time on given node(s).
1442
1443     This is a multi-node call.
1444
1445     """
1446     return self._MultiNodeCall(node_list, "test_delay", [duration],
1447                                read_timeout=int(duration + 5))
1448
1449   @_RpcTimeout(_TMO_FAST)
1450   def call_file_storage_dir_create(self, node, file_storage_dir):
1451     """Create the given file storage directory.
1452
1453     This is a single-node call.
1454
1455     """
1456     return self._SingleNodeCall(node, "file_storage_dir_create",
1457                                 [file_storage_dir])
1458
1459   @_RpcTimeout(_TMO_FAST)
1460   def call_file_storage_dir_remove(self, node, file_storage_dir):
1461     """Remove the given file storage directory.
1462
1463     This is a single-node call.
1464
1465     """
1466     return self._SingleNodeCall(node, "file_storage_dir_remove",
1467                                 [file_storage_dir])
1468
1469   @_RpcTimeout(_TMO_FAST)
1470   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1471                                    new_file_storage_dir):
1472     """Rename file storage directory.
1473
1474     This is a single-node call.
1475
1476     """
1477     return self._SingleNodeCall(node, "file_storage_dir_rename",
1478                                 [old_file_storage_dir, new_file_storage_dir])
1479
1480   @classmethod
1481   @_RpcTimeout(_TMO_URGENT)
1482   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1483     """Update job queue.
1484
1485     This is a multi-node call.
1486
1487     """
1488     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1489                                     [file_name, _Compress(content)],
1490                                     address_list=address_list)
1491
1492   @classmethod
1493   @_RpcTimeout(_TMO_NORMAL)
1494   def call_jobqueue_purge(cls, node):
1495     """Purge job queue.
1496
1497     This is a single-node call.
1498
1499     """
1500     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1501
1502   @classmethod
1503   @_RpcTimeout(_TMO_URGENT)
1504   def call_jobqueue_rename(cls, node_list, address_list, rename):
1505     """Rename a job queue file.
1506
1507     This is a multi-node call.
1508
1509     """
1510     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1511                                     address_list=address_list)
1512
1513   @_RpcTimeout(_TMO_NORMAL)
1514   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1515     """Validate the hypervisor params.
1516
1517     This is a multi-node call.
1518
1519     @type node_list: list
1520     @param node_list: the list of nodes to query
1521     @type hvname: string
1522     @param hvname: the hypervisor name
1523     @type hvparams: dict
1524     @param hvparams: the hypervisor parameters to be validated
1525
1526     """
1527     cluster = self._cfg.GetClusterInfo()
1528     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1529     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1530                                [hvname, hv_full])
1531
1532   @_RpcTimeout(_TMO_NORMAL)
1533   def call_x509_cert_create(self, node, validity):
1534     """Creates a new X509 certificate for SSL/TLS.
1535
1536     This is a single-node call.
1537
1538     @type validity: int
1539     @param validity: Validity in seconds
1540
1541     """
1542     return self._SingleNodeCall(node, "x509_cert_create", [validity])
1543
1544   @_RpcTimeout(_TMO_NORMAL)
1545   def call_x509_cert_remove(self, node, name):
1546     """Removes a X509 certificate.
1547
1548     This is a single-node call.
1549
1550     @type name: string
1551     @param name: Certificate name
1552
1553     """
1554     return self._SingleNodeCall(node, "x509_cert_remove", [name])
1555
1556   @_RpcTimeout(_TMO_NORMAL)
1557   def call_import_start(self, node, opts, instance, component,
1558                         dest, dest_args):
1559     """Starts a listener for an import.
1560
1561     This is a single-node call.
1562
1563     @type node: string
1564     @param node: Node name
1565     @type instance: C{objects.Instance}
1566     @param instance: Instance object
1567     @type component: string
1568     @param component: which part of the instance is being imported
1569
1570     """
1571     return self._SingleNodeCall(node, "import_start",
1572                                 [opts.ToDict(),
1573                                  self._InstDict(instance), component, dest,
1574                                  _EncodeImportExportIO(dest, dest_args)])
1575
1576   @_RpcTimeout(_TMO_NORMAL)
1577   def call_export_start(self, node, opts, host, port,
1578                         instance, component, source, source_args):
1579     """Starts an export daemon.
1580
1581     This is a single-node call.
1582
1583     @type node: string
1584     @param node: Node name
1585     @type instance: C{objects.Instance}
1586     @param instance: Instance object
1587     @type component: string
1588     @param component: which part of the instance is being imported
1589
1590     """
1591     return self._SingleNodeCall(node, "export_start",
1592                                 [opts.ToDict(), host, port,
1593                                  self._InstDict(instance),
1594                                  component, source,
1595                                  _EncodeImportExportIO(source, source_args)])
1596
1597   @_RpcTimeout(_TMO_FAST)
1598   def call_impexp_status(self, node, names):
1599     """Gets the status of an import or export.
1600
1601     This is a single-node call.
1602
1603     @type node: string
1604     @param node: Node name
1605     @type names: List of strings
1606     @param names: Import/export names
1607     @rtype: List of L{objects.ImportExportStatus} instances
1608     @return: Returns a list of the state of each named import/export or None if
1609              a status couldn't be retrieved
1610
1611     """
1612     result = self._SingleNodeCall(node, "impexp_status", [names])
1613
1614     if not result.fail_msg:
1615       decoded = []
1616
1617       for i in result.payload:
1618         if i is None:
1619           decoded.append(None)
1620           continue
1621         decoded.append(objects.ImportExportStatus.FromDict(i))
1622
1623       result.payload = decoded
1624
1625     return result
1626
1627   @_RpcTimeout(_TMO_NORMAL)
1628   def call_impexp_abort(self, node, name):
1629     """Aborts an import or export.
1630
1631     This is a single-node call.
1632
1633     @type node: string
1634     @param node: Node name
1635     @type name: string
1636     @param name: Import/export name
1637
1638     """
1639     return self._SingleNodeCall(node, "impexp_abort", [name])
1640
1641   @_RpcTimeout(_TMO_NORMAL)
1642   def call_impexp_cleanup(self, node, name):
1643     """Cleans up after an import or export.
1644
1645     This is a single-node call.
1646
1647     @type node: string
1648     @param node: Node name
1649     @type name: string
1650     @param name: Import/export name
1651
1652     """
1653     return self._SingleNodeCall(node, "impexp_cleanup", [name])