rpc: Convert import/export functions
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37 import pycurl
38 import threading
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49 from ganeti import compat
50
51 # Special module generated at build time
52 from ganeti import _generated_rpc
53
54 # pylint has a bug here, doesn't see this import
55 import ganeti.http.client  # pylint: disable=W0611
56
57
58 # Timeout for connecting to nodes (seconds)
59 _RPC_CONNECT_TIMEOUT = 5
60
61 _RPC_CLIENT_HEADERS = [
62   "Content-type: %s" % http.HTTP_APP_JSON,
63   "Expect:",
64   ]
65
66 # Various time constants for the timeout table
67 _TMO_URGENT = 60 # one minute
68 _TMO_FAST = 5 * 60 # five minutes
69 _TMO_NORMAL = 15 * 60 # 15 minutes
70 _TMO_SLOW = 3600 # one hour
71 _TMO_4HRS = 4 * 3600
72 _TMO_1DAY = 86400
73
74 # Timeout table that will be built later by decorators
75 # Guidelines for choosing timeouts:
76 # - call used during watcher: timeout -> 1min, _TMO_URGENT
77 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
78 # - other calls: 15 min, _TMO_NORMAL
79 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
80
81 _TIMEOUTS = {
82 }
83
84 #: Special value to describe an offline host
85 _OFFLINE = object()
86
87
88 def Init():
89   """Initializes the module-global HTTP client manager.
90
91   Must be called before using any RPC function and while exactly one thread is
92   running.
93
94   """
95   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
96   # one thread running. This check is just a safety measure -- it doesn't
97   # cover all cases.
98   assert threading.activeCount() == 1, \
99          "Found more than one active thread when initializing pycURL"
100
101   logging.info("Using PycURL %s", pycurl.version)
102
103   pycurl.global_init(pycurl.GLOBAL_ALL)
104
105
106 def Shutdown():
107   """Stops the module-global HTTP client manager.
108
109   Must be called before quitting the program and while exactly one thread is
110   running.
111
112   """
113   pycurl.global_cleanup()
114
115
116 def _ConfigRpcCurl(curl):
117   noded_cert = str(constants.NODED_CERT_FILE)
118
119   curl.setopt(pycurl.FOLLOWLOCATION, False)
120   curl.setopt(pycurl.CAINFO, noded_cert)
121   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
122   curl.setopt(pycurl.SSL_VERIFYPEER, True)
123   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
124   curl.setopt(pycurl.SSLCERT, noded_cert)
125   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
126   curl.setopt(pycurl.SSLKEY, noded_cert)
127   curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
128
129
130 def _RpcTimeout(secs):
131   """Timeout decorator.
132
133   When applied to a rpc call_* function, it updates the global timeout
134   table with the given function/timeout.
135
136   """
137   def decorator(f):
138     name = f.__name__
139     assert name.startswith("call_")
140     _TIMEOUTS[name[len("call_"):]] = secs
141     return f
142   return decorator
143
144
145 def RunWithRPC(fn):
146   """RPC-wrapper decorator.
147
148   When applied to a function, it runs it with the RPC system
149   initialized, and it shutsdown the system afterwards. This means the
150   function must be called without RPC being initialized.
151
152   """
153   def wrapper(*args, **kwargs):
154     Init()
155     try:
156       return fn(*args, **kwargs)
157     finally:
158       Shutdown()
159   return wrapper
160
161
162 def _Compress(data):
163   """Compresses a string for transport over RPC.
164
165   Small amounts of data are not compressed.
166
167   @type data: str
168   @param data: Data
169   @rtype: tuple
170   @return: Encoded data to send
171
172   """
173   # Small amounts of data are not compressed
174   if len(data) < 512:
175     return (constants.RPC_ENCODING_NONE, data)
176
177   # Compress with zlib and encode in base64
178   return (constants.RPC_ENCODING_ZLIB_BASE64,
179           base64.b64encode(zlib.compress(data, 3)))
180
181
182 class RpcResult(object):
183   """RPC Result class.
184
185   This class holds an RPC result. It is needed since in multi-node
186   calls we can't raise an exception just because one one out of many
187   failed, and therefore we use this class to encapsulate the result.
188
189   @ivar data: the data payload, for successful results, or None
190   @ivar call: the name of the RPC call
191   @ivar node: the name of the node to which we made the call
192   @ivar offline: whether the operation failed because the node was
193       offline, as opposed to actual failure; offline=True will always
194       imply failed=True, in order to allow simpler checking if
195       the user doesn't care about the exact failure mode
196   @ivar fail_msg: the error message if the call failed
197
198   """
199   def __init__(self, data=None, failed=False, offline=False,
200                call=None, node=None):
201     self.offline = offline
202     self.call = call
203     self.node = node
204
205     if offline:
206       self.fail_msg = "Node is marked offline"
207       self.data = self.payload = None
208     elif failed:
209       self.fail_msg = self._EnsureErr(data)
210       self.data = self.payload = None
211     else:
212       self.data = data
213       if not isinstance(self.data, (tuple, list)):
214         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
215                          type(self.data))
216         self.payload = None
217       elif len(data) != 2:
218         self.fail_msg = ("RPC layer error: invalid result length (%d), "
219                          "expected 2" % len(self.data))
220         self.payload = None
221       elif not self.data[0]:
222         self.fail_msg = self._EnsureErr(self.data[1])
223         self.payload = None
224       else:
225         # finally success
226         self.fail_msg = None
227         self.payload = data[1]
228
229     for attr_name in ["call", "data", "fail_msg",
230                       "node", "offline", "payload"]:
231       assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
232
233   @staticmethod
234   def _EnsureErr(val):
235     """Helper to ensure we return a 'True' value for error."""
236     if val:
237       return val
238     else:
239       return "No error information"
240
241   def Raise(self, msg, prereq=False, ecode=None):
242     """If the result has failed, raise an OpExecError.
243
244     This is used so that LU code doesn't have to check for each
245     result, but instead can call this function.
246
247     """
248     if not self.fail_msg:
249       return
250
251     if not msg: # one could pass None for default message
252       msg = ("Call '%s' to node '%s' has failed: %s" %
253              (self.call, self.node, self.fail_msg))
254     else:
255       msg = "%s: %s" % (msg, self.fail_msg)
256     if prereq:
257       ec = errors.OpPrereqError
258     else:
259       ec = errors.OpExecError
260     if ecode is not None:
261       args = (msg, ecode)
262     else:
263       args = (msg, )
264     raise ec(*args) # pylint: disable=W0142
265
266
267 def _SsconfResolver(node_list,
268                     ssc=ssconf.SimpleStore,
269                     nslookup_fn=netutils.Hostname.GetIP):
270   """Return addresses for given node names.
271
272   @type node_list: list
273   @param node_list: List of node names
274   @type ssc: class
275   @param ssc: SimpleStore class that is used to obtain node->ip mappings
276   @type nslookup_fn: callable
277   @param nslookup_fn: function use to do NS lookup
278   @rtype: list of tuple; (string, string)
279   @return: List of tuples containing node name and IP address
280
281   """
282   ss = ssc()
283   iplist = ss.GetNodePrimaryIPList()
284   family = ss.GetPrimaryIPFamily()
285   ipmap = dict(entry.split() for entry in iplist)
286
287   result = []
288   for node in node_list:
289     ip = ipmap.get(node)
290     if ip is None:
291       ip = nslookup_fn(node, family=family)
292     result.append((node, ip))
293
294   return result
295
296
297 class _StaticResolver:
298   def __init__(self, addresses):
299     """Initializes this class.
300
301     """
302     self._addresses = addresses
303
304   def __call__(self, hosts):
305     """Returns static addresses for hosts.
306
307     """
308     assert len(hosts) == len(self._addresses)
309     return zip(hosts, self._addresses)
310
311
312 def _CheckConfigNode(name, node):
313   """Checks if a node is online.
314
315   @type name: string
316   @param name: Node name
317   @type node: L{objects.Node} or None
318   @param node: Node object
319
320   """
321   if node is None:
322     # Depend on DNS for name resolution
323     ip = name
324   elif node.offline:
325     ip = _OFFLINE
326   else:
327     ip = node.primary_ip
328   return (name, ip)
329
330
331 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
332   """Calculate node addresses using configuration.
333
334   """
335   # Special case for single-host lookups
336   if len(hosts) == 1:
337     (name, ) = hosts
338     return [_CheckConfigNode(name, single_node_fn(name))]
339   else:
340     all_nodes = all_nodes_fn()
341     return [_CheckConfigNode(name, all_nodes.get(name, None))
342             for name in hosts]
343
344
345 class _RpcProcessor:
346   def __init__(self, resolver, port, lock_monitor_cb=None):
347     """Initializes this class.
348
349     @param resolver: callable accepting a list of hostnames, returning a list
350       of tuples containing name and IP address (IP address can be the name or
351       the special value L{_OFFLINE} to mark offline machines)
352     @type port: int
353     @param port: TCP port
354     @param lock_monitor_cb: Callable for registering with lock monitor
355
356     """
357     self._resolver = resolver
358     self._port = port
359     self._lock_monitor_cb = lock_monitor_cb
360
361   @staticmethod
362   def _PrepareRequests(hosts, port, procedure, body, read_timeout):
363     """Prepares requests by sorting offline hosts into separate list.
364
365     """
366     results = {}
367     requests = {}
368
369     for (name, ip) in hosts:
370       if ip is _OFFLINE:
371         # Node is marked as offline
372         results[name] = RpcResult(node=name, offline=True, call=procedure)
373       else:
374         requests[name] = \
375           http.client.HttpClientRequest(str(ip), port,
376                                         http.HTTP_PUT, str("/%s" % procedure),
377                                         headers=_RPC_CLIENT_HEADERS,
378                                         post_data=body,
379                                         read_timeout=read_timeout,
380                                         nicename="%s/%s" % (name, procedure),
381                                         curl_config_fn=_ConfigRpcCurl)
382
383     return (results, requests)
384
385   @staticmethod
386   def _CombineResults(results, requests, procedure):
387     """Combines pre-computed results for offline hosts with actual call results.
388
389     """
390     for name, req in requests.items():
391       if req.success and req.resp_status_code == http.HTTP_OK:
392         host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
393                                 node=name, call=procedure)
394       else:
395         # TODO: Better error reporting
396         if req.error:
397           msg = req.error
398         else:
399           msg = req.resp_body
400
401         logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
402         host_result = RpcResult(data=msg, failed=True, node=name,
403                                 call=procedure)
404
405       results[name] = host_result
406
407     return results
408
409   def __call__(self, hosts, procedure, body, read_timeout=None,
410                _req_process_fn=http.client.ProcessRequests):
411     """Makes an RPC request to a number of nodes.
412
413     @type hosts: sequence
414     @param hosts: Hostnames
415     @type procedure: string
416     @param procedure: Request path
417     @type body: string
418     @param body: Request body
419     @type read_timeout: int or None
420     @param read_timeout: Read timeout for request
421
422     """
423     if read_timeout is None:
424       read_timeout = _TIMEOUTS.get(procedure, None)
425
426     assert read_timeout is not None, \
427       "Missing RPC read timeout for procedure '%s'" % procedure
428
429     (results, requests) = \
430       self._PrepareRequests(self._resolver(hosts), self._port, procedure,
431                             str(body), read_timeout)
432
433     _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
434
435     assert not frozenset(results).intersection(requests)
436
437     return self._CombineResults(results, requests, procedure)
438
439
440 def _EncodeImportExportIO(ieio, ieioargs):
441   """Encodes import/export I/O information.
442
443   """
444   if ieio == constants.IEIO_RAW_DISK:
445     assert len(ieioargs) == 1
446     return (ieioargs[0].ToDict(), )
447
448   if ieio == constants.IEIO_SCRIPT:
449     assert len(ieioargs) == 2
450     return (ieioargs[0].ToDict(), ieioargs[1])
451
452   return ieioargs
453
454
455 class RpcRunner(_generated_rpc.RpcClientDefault):
456   """RPC runner class.
457
458   """
459   def __init__(self, context):
460     """Initialized the RPC runner.
461
462     @type context: C{masterd.GanetiContext}
463     @param context: Ganeti context
464
465     """
466     _generated_rpc.RpcClientDefault.__init__(self)
467
468     self._cfg = context.cfg
469     self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
470                                               self._cfg.GetNodeInfo,
471                                               self._cfg.GetAllNodesInfo),
472                                netutils.GetDaemonPort(constants.NODED),
473                                lock_monitor_cb=context.glm.AddToLockMonitor)
474
475   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
476     """Convert the given instance to a dict.
477
478     This is done via the instance's ToDict() method and additionally
479     we fill the hvparams with the cluster defaults.
480
481     @type instance: L{objects.Instance}
482     @param instance: an Instance object
483     @type hvp: dict or None
484     @param hvp: a dictionary with overridden hypervisor parameters
485     @type bep: dict or None
486     @param bep: a dictionary with overridden backend parameters
487     @type osp: dict or None
488     @param osp: a dictionary with overridden os parameters
489     @rtype: dict
490     @return: the instance dict, with the hvparams filled with the
491         cluster defaults
492
493     """
494     idict = instance.ToDict()
495     cluster = self._cfg.GetClusterInfo()
496     idict["hvparams"] = cluster.FillHV(instance)
497     if hvp is not None:
498       idict["hvparams"].update(hvp)
499     idict["beparams"] = cluster.FillBE(instance)
500     if bep is not None:
501       idict["beparams"].update(bep)
502     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
503     if osp is not None:
504       idict["osparams"].update(osp)
505     for nic in idict["nics"]:
506       nic['nicparams'] = objects.FillDict(
507         cluster.nicparams[constants.PP_DEFAULT],
508         nic['nicparams'])
509     return idict
510
511   def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
512     """Helper for making a multi-node call
513
514     """
515     body = serializer.DumpJson(args, indent=False)
516     return self._proc(node_list, procedure, body, read_timeout=read_timeout)
517
518   def _Call(self, node_list, procedure, timeout, args):
519     """Entry point for automatically generated RPC wrappers.
520
521     """
522     return self._MultiNodeCall(node_list, procedure, args, read_timeout=timeout)
523
524   @staticmethod
525   def _StaticMultiNodeCall(node_list, procedure, args,
526                            address_list=None, read_timeout=None):
527     """Helper for making a multi-node static call
528
529     """
530     body = serializer.DumpJson(args, indent=False)
531
532     if address_list is None:
533       resolver = _SsconfResolver
534     else:
535       # Caller provided an address list
536       resolver = _StaticResolver(address_list)
537
538     proc = _RpcProcessor(resolver,
539                          netutils.GetDaemonPort(constants.NODED))
540     return proc(node_list, procedure, body, read_timeout=read_timeout)
541
542   def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
543     """Helper for making a single-node call
544
545     """
546     body = serializer.DumpJson(args, indent=False)
547     return self._proc([node], procedure, body, read_timeout=read_timeout)[node]
548
549   @classmethod
550   def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
551     """Helper for making a single-node static call
552
553     """
554     body = serializer.DumpJson(args, indent=False)
555     proc = _RpcProcessor(_SsconfResolver,
556                          netutils.GetDaemonPort(constants.NODED))
557     return proc([node], procedure, body, read_timeout=read_timeout)[node]
558
559   @staticmethod
560   def _BlockdevFindPostProc(result):
561     if not result.fail_msg and result.payload is not None:
562       result.payload = objects.BlockDevStatus.FromDict(result.payload)
563     return result
564
565   @staticmethod
566   def _BlockdevGetMirrorStatusPostProc(result):
567     if not result.fail_msg:
568       result.payload = [objects.BlockDevStatus.FromDict(i)
569                         for i in result.payload]
570     return result
571
572   @staticmethod
573   def _BlockdevGetMirrorStatusMultiPostProc(result):
574     for nres in result.values():
575       if nres.fail_msg:
576         continue
577
578       for idx, (success, status) in enumerate(nres.payload):
579         if success:
580           nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
581
582     return result
583
584   @staticmethod
585   def _OsGetPostProc(result):
586     if not result.fail_msg and isinstance(result.payload, dict):
587       result.payload = objects.OS.FromDict(result.payload)
588     return result
589
590   @staticmethod
591   def _PrepareFinalizeExportDisks(snap_disks):
592     flat_disks = []
593
594     for disk in snap_disks:
595       if isinstance(disk, bool):
596         flat_disks.append(disk)
597       else:
598         flat_disks.append(disk.ToDict())
599
600     return flat_disks
601
602   @staticmethod
603   def _ImpExpStatusPostProc(result):
604     """Post-processor for import/export status.
605
606     @rtype: Payload containing list of L{objects.ImportExportStatus} instances
607     @return: Returns a list of the state of each named import/export or None if
608              a status couldn't be retrieved
609
610     """
611     if not result.fail_msg:
612       decoded = []
613
614       for i in result.payload:
615         if i is None:
616           decoded.append(None)
617           continue
618         decoded.append(objects.ImportExportStatus.FromDict(i))
619
620       result.payload = decoded
621
622     return result
623
624   #
625   # Begin RPC calls
626   #
627
628   @_RpcTimeout(_TMO_URGENT)
629   def call_bdev_sizes(self, node_list, devices):
630     """Gets the sizes of requested block devices present on a node
631
632     This is a multi-node call.
633
634     """
635     return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
636
637   @_RpcTimeout(_TMO_URGENT)
638   def call_lv_list(self, node_list, vg_name):
639     """Gets the logical volumes present in a given volume group.
640
641     This is a multi-node call.
642
643     """
644     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
645
646   @_RpcTimeout(_TMO_URGENT)
647   def call_vg_list(self, node_list):
648     """Gets the volume group list.
649
650     This is a multi-node call.
651
652     """
653     return self._MultiNodeCall(node_list, "vg_list", [])
654
655   @_RpcTimeout(_TMO_NORMAL)
656   def call_storage_list(self, node_list, su_name, su_args, name, fields):
657     """Get list of storage units.
658
659     This is a multi-node call.
660
661     """
662     return self._MultiNodeCall(node_list, "storage_list",
663                                [su_name, su_args, name, fields])
664
665   @_RpcTimeout(_TMO_NORMAL)
666   def call_storage_modify(self, node, su_name, su_args, name, changes):
667     """Modify a storage unit.
668
669     This is a single-node call.
670
671     """
672     return self._SingleNodeCall(node, "storage_modify",
673                                 [su_name, su_args, name, changes])
674
675   @_RpcTimeout(_TMO_NORMAL)
676   def call_storage_execute(self, node, su_name, su_args, name, op):
677     """Executes an operation on a storage unit.
678
679     This is a single-node call.
680
681     """
682     return self._SingleNodeCall(node, "storage_execute",
683                                 [su_name, su_args, name, op])
684
685   @_RpcTimeout(_TMO_URGENT)
686   def call_bridges_exist(self, node, bridges_list):
687     """Checks if a node has all the bridges given.
688
689     This method checks if all bridges given in the bridges_list are
690     present on the remote node, so that an instance that uses interfaces
691     on those bridges can be started.
692
693     This is a single-node call.
694
695     """
696     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
697
698   @_RpcTimeout(_TMO_NORMAL)
699   def call_instance_start(self, node, instance, hvp, bep, startup_paused):
700     """Starts an instance.
701
702     This is a single-node call.
703
704     """
705     idict = self._InstDict(instance, hvp=hvp, bep=bep)
706     return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
707
708   @_RpcTimeout(_TMO_NORMAL)
709   def call_instance_shutdown(self, node, instance, timeout):
710     """Stops an instance.
711
712     This is a single-node call.
713
714     """
715     return self._SingleNodeCall(node, "instance_shutdown",
716                                 [self._InstDict(instance), timeout])
717
718   @_RpcTimeout(_TMO_NORMAL)
719   def call_migration_info(self, node, instance):
720     """Gather the information necessary to prepare an instance migration.
721
722     This is a single-node call.
723
724     @type node: string
725     @param node: the node on which the instance is currently running
726     @type instance: C{objects.Instance}
727     @param instance: the instance definition
728
729     """
730     return self._SingleNodeCall(node, "migration_info",
731                                 [self._InstDict(instance)])
732
733   @_RpcTimeout(_TMO_NORMAL)
734   def call_accept_instance(self, node, instance, info, target):
735     """Prepare a node to accept an instance.
736
737     This is a single-node call.
738
739     @type node: string
740     @param node: the target node for the migration
741     @type instance: C{objects.Instance}
742     @param instance: the instance definition
743     @type info: opaque/hypervisor specific (string/data)
744     @param info: result for the call_migration_info call
745     @type target: string
746     @param target: target hostname (usually ip address) (on the node itself)
747
748     """
749     return self._SingleNodeCall(node, "accept_instance",
750                                 [self._InstDict(instance), info, target])
751
752   @_RpcTimeout(_TMO_NORMAL)
753   def call_instance_finalize_migration_dst(self, node, instance, info, success):
754     """Finalize any target-node migration specific operation.
755
756     This is called both in case of a successful migration and in case of error
757     (in which case it should abort the migration).
758
759     This is a single-node call.
760
761     @type node: string
762     @param node: the target node for the migration
763     @type instance: C{objects.Instance}
764     @param instance: the instance definition
765     @type info: opaque/hypervisor specific (string/data)
766     @param info: result for the call_migration_info call
767     @type success: boolean
768     @param success: whether the migration was a success or a failure
769
770     """
771     return self._SingleNodeCall(node, "instance_finalize_migration_dst",
772                                 [self._InstDict(instance), info, success])
773
774   @_RpcTimeout(_TMO_SLOW)
775   def call_instance_migrate(self, node, instance, target, live):
776     """Migrate an instance.
777
778     This is a single-node call.
779
780     @type node: string
781     @param node: the node on which the instance is currently running
782     @type instance: C{objects.Instance}
783     @param instance: the instance definition
784     @type target: string
785     @param target: the target node name
786     @type live: boolean
787     @param live: whether the migration should be done live or not (the
788         interpretation of this parameter is left to the hypervisor)
789
790     """
791     return self._SingleNodeCall(node, "instance_migrate",
792                                 [self._InstDict(instance), target, live])
793
794   @_RpcTimeout(_TMO_SLOW)
795   def call_instance_finalize_migration_src(self, node, instance, success, live):
796     """Finalize the instance migration on the source node.
797
798     This is a single-node call.
799
800     @type instance: L{objects.Instance}
801     @param instance: the instance that was migrated
802     @type success: bool
803     @param success: whether the migration succeeded or not
804     @type live: bool
805     @param live: whether the user requested a live migration or not
806
807     """
808     return self._SingleNodeCall(node, "instance_finalize_migration_src",
809                                 [self._InstDict(instance), success, live])
810
811   @_RpcTimeout(_TMO_SLOW)
812   def call_instance_get_migration_status(self, node, instance):
813     """Report migration status.
814
815     This is a single-node call that must be executed on the source node.
816
817     @type instance: L{objects.Instance}
818     @param instance: the instance that is being migrated
819     @rtype: L{objects.MigrationStatus}
820     @return: the status of the current migration (one of
821              L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
822              progress info that can be retrieved from the hypervisor
823
824     """
825     result = self._SingleNodeCall(node, "instance_get_migration_status",
826                                   [self._InstDict(instance)])
827     if not result.fail_msg and result.payload is not None:
828       result.payload = objects.MigrationStatus.FromDict(result.payload)
829     return result
830
831   @_RpcTimeout(_TMO_NORMAL)
832   def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
833     """Reboots an instance.
834
835     This is a single-node call.
836
837     """
838     return self._SingleNodeCall(node, "instance_reboot",
839                                 [self._InstDict(inst), reboot_type,
840                                  shutdown_timeout])
841
842   @_RpcTimeout(_TMO_1DAY)
843   def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
844     """Installs an OS on the given instance.
845
846     This is a single-node call.
847
848     """
849     return self._SingleNodeCall(node, "instance_os_add",
850                                 [self._InstDict(inst, osp=osparams),
851                                  reinstall, debug])
852
853   @_RpcTimeout(_TMO_SLOW)
854   def call_instance_run_rename(self, node, inst, old_name, debug):
855     """Run the OS rename script for an instance.
856
857     This is a single-node call.
858
859     """
860     return self._SingleNodeCall(node, "instance_run_rename",
861                                 [self._InstDict(inst), old_name, debug])
862
863   @_RpcTimeout(_TMO_URGENT)
864   def call_instance_info(self, node, instance, hname):
865     """Returns information about a single instance.
866
867     This is a single-node call.
868
869     @type node: list
870     @param node: the list of nodes to query
871     @type instance: string
872     @param instance: the instance name
873     @type hname: string
874     @param hname: the hypervisor type of the instance
875
876     """
877     return self._SingleNodeCall(node, "instance_info", [instance, hname])
878
879   @_RpcTimeout(_TMO_NORMAL)
880   def call_instance_migratable(self, node, instance):
881     """Checks whether the given instance can be migrated.
882
883     This is a single-node call.
884
885     @param node: the node to query
886     @type instance: L{objects.Instance}
887     @param instance: the instance to check
888
889
890     """
891     return self._SingleNodeCall(node, "instance_migratable",
892                                 [self._InstDict(instance)])
893
894   @_RpcTimeout(_TMO_URGENT)
895   def call_all_instances_info(self, node_list, hypervisor_list):
896     """Returns information about all instances on the given nodes.
897
898     This is a multi-node call.
899
900     @type node_list: list
901     @param node_list: the list of nodes to query
902     @type hypervisor_list: list
903     @param hypervisor_list: the hypervisors to query for instances
904
905     """
906     return self._MultiNodeCall(node_list, "all_instances_info",
907                                [hypervisor_list])
908
909   @_RpcTimeout(_TMO_URGENT)
910   def call_instance_list(self, node_list, hypervisor_list):
911     """Returns the list of running instances on a given node.
912
913     This is a multi-node call.
914
915     @type node_list: list
916     @param node_list: the list of nodes to query
917     @type hypervisor_list: list
918     @param hypervisor_list: the hypervisors to query for instances
919
920     """
921     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
922
923   @_RpcTimeout(_TMO_FAST)
924   def call_node_has_ip_address(self, node, address):
925     """Checks if a node has the given IP address.
926
927     This is a single-node call.
928
929     """
930     return self._SingleNodeCall(node, "node_has_ip_address", [address])
931
932   @_RpcTimeout(_TMO_URGENT)
933   def call_node_info(self, node_list, vg_name, hypervisor_type):
934     """Return node information.
935
936     This will return memory information and volume group size and free
937     space.
938
939     This is a multi-node call.
940
941     @type node_list: list
942     @param node_list: the list of nodes to query
943     @type vg_name: C{string}
944     @param vg_name: the name of the volume group to ask for disk space
945         information
946     @type hypervisor_type: C{str}
947     @param hypervisor_type: the name of the hypervisor to ask for
948         memory information
949
950     """
951     return self._MultiNodeCall(node_list, "node_info",
952                                [vg_name, hypervisor_type])
953
954   @_RpcTimeout(_TMO_NORMAL)
955   def call_etc_hosts_modify(self, node, mode, name, ip):
956     """Modify hosts file with name
957
958     @type node: string
959     @param node: The node to call
960     @type mode: string
961     @param mode: The mode to operate. Currently "add" or "remove"
962     @type name: string
963     @param name: The host name to be modified
964     @type ip: string
965     @param ip: The ip of the entry (just valid if mode is "add")
966
967     """
968     return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
969
970   @_RpcTimeout(_TMO_NORMAL)
971   def call_node_verify(self, node_list, checkdict, cluster_name):
972     """Request verification of given parameters.
973
974     This is a multi-node call.
975
976     """
977     return self._MultiNodeCall(node_list, "node_verify",
978                                [checkdict, cluster_name])
979
980   @classmethod
981   @_RpcTimeout(_TMO_FAST)
982   def call_node_start_master_daemons(cls, node, no_voting):
983     """Starts master daemons on a node.
984
985     This is a single-node call.
986
987     """
988     return cls._StaticSingleNodeCall(node, "node_start_master_daemons",
989                                      [no_voting])
990
991   @classmethod
992   @_RpcTimeout(_TMO_FAST)
993   def call_node_activate_master_ip(cls, node):
994     """Activates master IP on a node.
995
996     This is a single-node call.
997
998     """
999     return cls._StaticSingleNodeCall(node, "node_activate_master_ip", [])
1000
1001   @classmethod
1002   @_RpcTimeout(_TMO_FAST)
1003   def call_node_stop_master(cls, node):
1004     """Deactivates master IP and stops master daemons on a node.
1005
1006     This is a single-node call.
1007
1008     """
1009     return cls._StaticSingleNodeCall(node, "node_stop_master", [])
1010
1011   @classmethod
1012   @_RpcTimeout(_TMO_FAST)
1013   def call_node_deactivate_master_ip(cls, node):
1014     """Deactivates master IP on a node.
1015
1016     This is a single-node call.
1017
1018     """
1019     return cls._StaticSingleNodeCall(node, "node_deactivate_master_ip", [])
1020
1021   @classmethod
1022   @_RpcTimeout(_TMO_FAST)
1023   def call_node_change_master_netmask(cls, node, netmask):
1024     """Change master IP netmask.
1025
1026     This is a single-node call.
1027
1028     """
1029     return cls._StaticSingleNodeCall(node, "node_change_master_netmask",
1030                   [netmask])
1031
1032   @classmethod
1033   @_RpcTimeout(_TMO_URGENT)
1034   def call_master_info(cls, node_list):
1035     """Query master info.
1036
1037     This is a multi-node call.
1038
1039     """
1040     # TODO: should this method query down nodes?
1041     return cls._StaticMultiNodeCall(node_list, "master_info", [])
1042
1043   @classmethod
1044   @_RpcTimeout(_TMO_URGENT)
1045   def call_version(cls, node_list):
1046     """Query node version.
1047
1048     This is a multi-node call.
1049
1050     """
1051     return cls._StaticMultiNodeCall(node_list, "version", [])
1052
1053   @_RpcTimeout(_TMO_NORMAL)
1054   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
1055     """Request creation of a given block device.
1056
1057     This is a single-node call.
1058
1059     """
1060     return self._SingleNodeCall(node, "blockdev_create",
1061                                 [bdev.ToDict(), size, owner, on_primary, info])
1062
1063   @_RpcTimeout(_TMO_SLOW)
1064   def call_blockdev_wipe(self, node, bdev, offset, size):
1065     """Request wipe at given offset with given size of a block device.
1066
1067     This is a single-node call.
1068
1069     """
1070     return self._SingleNodeCall(node, "blockdev_wipe",
1071                                 [bdev.ToDict(), offset, size])
1072
1073   @_RpcTimeout(_TMO_NORMAL)
1074   def call_blockdev_remove(self, node, bdev):
1075     """Request removal of a given block device.
1076
1077     This is a single-node call.
1078
1079     """
1080     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
1081
1082   @_RpcTimeout(_TMO_NORMAL)
1083   def call_blockdev_rename(self, node, devlist):
1084     """Request rename of the given block devices.
1085
1086     This is a single-node call.
1087
1088     """
1089     return self._SingleNodeCall(node, "blockdev_rename",
1090                                 [[(d.ToDict(), uid) for d, uid in devlist]])
1091
1092   @_RpcTimeout(_TMO_NORMAL)
1093   def call_blockdev_pause_resume_sync(self, node, disks, pause):
1094     """Request a pause/resume of given block device.
1095
1096     This is a single-node call.
1097
1098     """
1099     return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1100                                 [[bdev.ToDict() for bdev in disks], pause])
1101
1102   @_RpcTimeout(_TMO_NORMAL)
1103   def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1104     """Request assembling of a given block device.
1105
1106     This is a single-node call.
1107
1108     """
1109     return self._SingleNodeCall(node, "blockdev_assemble",
1110                                 [disk.ToDict(), owner, on_primary, idx])
1111
1112   @_RpcTimeout(_TMO_NORMAL)
1113   def call_blockdev_shutdown(self, node, disk):
1114     """Request shutdown of a given block device.
1115
1116     This is a single-node call.
1117
1118     """
1119     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1120
1121   @_RpcTimeout(_TMO_NORMAL)
1122   def call_blockdev_addchildren(self, node, bdev, ndevs):
1123     """Request adding a list of children to a (mirroring) device.
1124
1125     This is a single-node call.
1126
1127     """
1128     return self._SingleNodeCall(node, "blockdev_addchildren",
1129                                 [bdev.ToDict(),
1130                                  [disk.ToDict() for disk in ndevs]])
1131
1132   @_RpcTimeout(_TMO_NORMAL)
1133   def call_blockdev_removechildren(self, node, bdev, ndevs):
1134     """Request removing a list of children from a (mirroring) device.
1135
1136     This is a single-node call.
1137
1138     """
1139     return self._SingleNodeCall(node, "blockdev_removechildren",
1140                                 [bdev.ToDict(),
1141                                  [disk.ToDict() for disk in ndevs]])
1142
1143   @_RpcTimeout(_TMO_NORMAL)
1144   def call_blockdev_getmirrorstatus(self, node, disks):
1145     """Request status of a (mirroring) device.
1146
1147     This is a single-node call.
1148
1149     """
1150     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1151                                   [dsk.ToDict() for dsk in disks])
1152     if not result.fail_msg:
1153       result.payload = [objects.BlockDevStatus.FromDict(i)
1154                         for i in result.payload]
1155     return result
1156
1157   @_RpcTimeout(_TMO_NORMAL)
1158   def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1159     """Request status of (mirroring) devices from multiple nodes.
1160
1161     This is a multi-node call.
1162
1163     """
1164     result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1165                                  [dict((name, [dsk.ToDict() for dsk in disks])
1166                                        for name, disks in node_disks.items())])
1167     for nres in result.values():
1168       if nres.fail_msg:
1169         continue
1170
1171       for idx, (success, status) in enumerate(nres.payload):
1172         if success:
1173           nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1174
1175     return result
1176
1177   @_RpcTimeout(_TMO_NORMAL)
1178   def call_blockdev_find(self, node, disk):
1179     """Request identification of a given block device.
1180
1181     This is a single-node call.
1182
1183     """
1184     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1185     if not result.fail_msg and result.payload is not None:
1186       result.payload = objects.BlockDevStatus.FromDict(result.payload)
1187     return result
1188
1189   @_RpcTimeout(_TMO_NORMAL)
1190   def call_blockdev_close(self, node, instance_name, disks):
1191     """Closes the given block devices.
1192
1193     This is a single-node call.
1194
1195     """
1196     params = [instance_name, [cf.ToDict() for cf in disks]]
1197     return self._SingleNodeCall(node, "blockdev_close", params)
1198
1199   @_RpcTimeout(_TMO_NORMAL)
1200   def call_blockdev_getsize(self, node, disks):
1201     """Returns the size of the given disks.
1202
1203     This is a single-node call.
1204
1205     """
1206     params = [[cf.ToDict() for cf in disks]]
1207     return self._SingleNodeCall(node, "blockdev_getsize", params)
1208
1209   @_RpcTimeout(_TMO_NORMAL)
1210   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1211     """Disconnects the network of the given drbd devices.
1212
1213     This is a multi-node call.
1214
1215     """
1216     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1217                                [nodes_ip, [cf.ToDict() for cf in disks]])
1218
1219   @_RpcTimeout(_TMO_NORMAL)
1220   def call_drbd_attach_net(self, node_list, nodes_ip,
1221                            disks, instance_name, multimaster):
1222     """Disconnects the given drbd devices.
1223
1224     This is a multi-node call.
1225
1226     """
1227     return self._MultiNodeCall(node_list, "drbd_attach_net",
1228                                [nodes_ip, [cf.ToDict() for cf in disks],
1229                                 instance_name, multimaster])
1230
1231   @_RpcTimeout(_TMO_SLOW)
1232   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1233     """Waits for the synchronization of drbd devices is complete.
1234
1235     This is a multi-node call.
1236
1237     """
1238     return self._MultiNodeCall(node_list, "drbd_wait_sync",
1239                                [nodes_ip, [cf.ToDict() for cf in disks]])
1240
1241   @_RpcTimeout(_TMO_URGENT)
1242   def call_drbd_helper(self, node_list):
1243     """Gets drbd helper.
1244
1245     This is a multi-node call.
1246
1247     """
1248     return self._MultiNodeCall(node_list, "drbd_helper", [])
1249
1250   @classmethod
1251   @_RpcTimeout(_TMO_NORMAL)
1252   def call_upload_file(cls, node_list, file_name, address_list=None):
1253     """Upload a file.
1254
1255     The node will refuse the operation in case the file is not on the
1256     approved file list.
1257
1258     This is a multi-node call.
1259
1260     @type node_list: list
1261     @param node_list: the list of node names to upload to
1262     @type file_name: str
1263     @param file_name: the filename to upload
1264     @type address_list: list or None
1265     @keyword address_list: an optional list of node addresses, in order
1266         to optimize the RPC speed
1267
1268     """
1269     file_contents = utils.ReadFile(file_name)
1270     data = _Compress(file_contents)
1271     st = os.stat(file_name)
1272     getents = runtime.GetEnts()
1273     params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1274               getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1275     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1276                                     address_list=address_list)
1277
1278   @classmethod
1279   @_RpcTimeout(_TMO_NORMAL)
1280   def call_write_ssconf_files(cls, node_list, values):
1281     """Write ssconf files.
1282
1283     This is a multi-node call.
1284
1285     """
1286     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1287
1288   @_RpcTimeout(_TMO_NORMAL)
1289   def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1290     """Runs OOB.
1291
1292     This is a single-node call.
1293
1294     """
1295     return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1296                                                   remote_node, timeout])
1297
1298   @_RpcTimeout(_TMO_FAST)
1299   def call_os_diagnose(self, node_list):
1300     """Request a diagnose of OS definitions.
1301
1302     This is a multi-node call.
1303
1304     """
1305     return self._MultiNodeCall(node_list, "os_diagnose", [])
1306
1307   @_RpcTimeout(_TMO_FAST)
1308   def call_os_get(self, node, name):
1309     """Returns an OS definition.
1310
1311     This is a single-node call.
1312
1313     """
1314     result = self._SingleNodeCall(node, "os_get", [name])
1315     if not result.fail_msg and isinstance(result.payload, dict):
1316       result.payload = objects.OS.FromDict(result.payload)
1317     return result
1318
1319   @_RpcTimeout(_TMO_FAST)
1320   def call_os_validate(self, nodes, required, name, checks, params):
1321     """Run a validation routine for a given OS.
1322
1323     This is a multi-node call.
1324
1325     """
1326     return self._MultiNodeCall(nodes, "os_validate",
1327                                [required, name, checks, params])
1328
1329   @_RpcTimeout(_TMO_NORMAL)
1330   def call_hooks_runner(self, node_list, hpath, phase, env):
1331     """Call the hooks runner.
1332
1333     Args:
1334       - op: the OpCode instance
1335       - env: a dictionary with the environment
1336
1337     This is a multi-node call.
1338
1339     """
1340     params = [hpath, phase, env]
1341     return self._MultiNodeCall(node_list, "hooks_runner", params)
1342
1343   @_RpcTimeout(_TMO_NORMAL)
1344   def call_iallocator_runner(self, node, name, idata):
1345     """Call an iallocator on a remote node
1346
1347     Args:
1348       - name: the iallocator name
1349       - input: the json-encoded input string
1350
1351     This is a single-node call.
1352
1353     """
1354     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1355
1356   @_RpcTimeout(_TMO_NORMAL)
1357   def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
1358     """Request a snapshot of the given block device.
1359
1360     This is a single-node call.
1361
1362     """
1363     return self._SingleNodeCall(node, "blockdev_grow",
1364                                 [cf_bdev.ToDict(), amount, dryrun])
1365
1366   @_RpcTimeout(_TMO_1DAY)
1367   def call_blockdev_export(self, node, cf_bdev,
1368                            dest_node, dest_path, cluster_name):
1369     """Export a given disk to another node.
1370
1371     This is a single-node call.
1372
1373     """
1374     return self._SingleNodeCall(node, "blockdev_export",
1375                                 [cf_bdev.ToDict(), dest_node, dest_path,
1376                                  cluster_name])
1377
1378   @_RpcTimeout(_TMO_NORMAL)
1379   def call_blockdev_snapshot(self, node, cf_bdev):
1380     """Request a snapshot of the given block device.
1381
1382     This is a single-node call.
1383
1384     """
1385     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1386
1387   @classmethod
1388   @_RpcTimeout(_TMO_NORMAL)
1389   def call_node_leave_cluster(cls, node, modify_ssh_setup):
1390     """Requests a node to clean the cluster information it has.
1391
1392     This will remove the configuration information from the ganeti data
1393     dir.
1394
1395     This is a single-node call.
1396
1397     """
1398     return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1399                                      [modify_ssh_setup])
1400
1401   @_RpcTimeout(_TMO_FAST)
1402   def call_node_volumes(self, node_list):
1403     """Gets all volumes on node(s).
1404
1405     This is a multi-node call.
1406
1407     """
1408     return self._MultiNodeCall(node_list, "node_volumes", [])
1409
1410   @_RpcTimeout(_TMO_FAST)
1411   def call_node_demote_from_mc(self, node):
1412     """Demote a node from the master candidate role.
1413
1414     This is a single-node call.
1415
1416     """
1417     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1418
1419   @_RpcTimeout(_TMO_NORMAL)
1420   def call_node_powercycle(self, node, hypervisor):
1421     """Tries to powercycle a node.
1422
1423     This is a single-node call.
1424
1425     """
1426     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1427
1428   @_RpcTimeout(None)
1429   def call_test_delay(self, node_list, duration):
1430     """Sleep for a fixed time on given node(s).
1431
1432     This is a multi-node call.
1433
1434     """
1435     return self._MultiNodeCall(node_list, "test_delay", [duration],
1436                                read_timeout=int(duration + 5))
1437
1438   @_RpcTimeout(_TMO_FAST)
1439   def call_file_storage_dir_create(self, node, file_storage_dir):
1440     """Create the given file storage directory.
1441
1442     This is a single-node call.
1443
1444     """
1445     return self._SingleNodeCall(node, "file_storage_dir_create",
1446                                 [file_storage_dir])
1447
1448   @_RpcTimeout(_TMO_FAST)
1449   def call_file_storage_dir_remove(self, node, file_storage_dir):
1450     """Remove the given file storage directory.
1451
1452     This is a single-node call.
1453
1454     """
1455     return self._SingleNodeCall(node, "file_storage_dir_remove",
1456                                 [file_storage_dir])
1457
1458   @_RpcTimeout(_TMO_FAST)
1459   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1460                                    new_file_storage_dir):
1461     """Rename file storage directory.
1462
1463     This is a single-node call.
1464
1465     """
1466     return self._SingleNodeCall(node, "file_storage_dir_rename",
1467                                 [old_file_storage_dir, new_file_storage_dir])
1468
1469   @classmethod
1470   @_RpcTimeout(_TMO_URGENT)
1471   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1472     """Update job queue.
1473
1474     This is a multi-node call.
1475
1476     """
1477     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1478                                     [file_name, _Compress(content)],
1479                                     address_list=address_list)
1480
1481   @classmethod
1482   @_RpcTimeout(_TMO_NORMAL)
1483   def call_jobqueue_purge(cls, node):
1484     """Purge job queue.
1485
1486     This is a single-node call.
1487
1488     """
1489     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1490
1491   @classmethod
1492   @_RpcTimeout(_TMO_URGENT)
1493   def call_jobqueue_rename(cls, node_list, address_list, rename):
1494     """Rename a job queue file.
1495
1496     This is a multi-node call.
1497
1498     """
1499     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1500                                     address_list=address_list)
1501
1502   @_RpcTimeout(_TMO_NORMAL)
1503   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1504     """Validate the hypervisor params.
1505
1506     This is a multi-node call.
1507
1508     @type node_list: list
1509     @param node_list: the list of nodes to query
1510     @type hvname: string
1511     @param hvname: the hypervisor name
1512     @type hvparams: dict
1513     @param hvparams: the hypervisor parameters to be validated
1514
1515     """
1516     cluster = self._cfg.GetClusterInfo()
1517     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1518     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1519                                [hvname, hv_full])
1520
1521   @_RpcTimeout(_TMO_NORMAL)
1522   def call_x509_cert_create(self, node, validity):
1523     """Creates a new X509 certificate for SSL/TLS.
1524
1525     This is a single-node call.
1526
1527     @type validity: int
1528     @param validity: Validity in seconds
1529
1530     """
1531     return self._SingleNodeCall(node, "x509_cert_create", [validity])
1532
1533   @_RpcTimeout(_TMO_NORMAL)
1534   def call_x509_cert_remove(self, node, name):
1535     """Removes a X509 certificate.
1536
1537     This is a single-node call.
1538
1539     @type name: string
1540     @param name: Certificate name
1541
1542     """
1543     return self._SingleNodeCall(node, "x509_cert_remove", [name])
1544
1545   @_RpcTimeout(_TMO_NORMAL)
1546   def call_import_start(self, node, opts, instance, component,
1547                         dest, dest_args):
1548     """Starts a listener for an import.
1549
1550     This is a single-node call.
1551
1552     @type node: string
1553     @param node: Node name
1554     @type instance: C{objects.Instance}
1555     @param instance: Instance object
1556     @type component: string
1557     @param component: which part of the instance is being imported
1558
1559     """
1560     return self._SingleNodeCall(node, "import_start",
1561                                 [opts.ToDict(),
1562                                  self._InstDict(instance), component, dest,
1563                                  _EncodeImportExportIO(dest, dest_args)])
1564
1565   @_RpcTimeout(_TMO_NORMAL)
1566   def call_export_start(self, node, opts, host, port,
1567                         instance, component, source, source_args):
1568     """Starts an export daemon.
1569
1570     This is a single-node call.
1571
1572     @type node: string
1573     @param node: Node name
1574     @type instance: C{objects.Instance}
1575     @param instance: Instance object
1576     @type component: string
1577     @param component: which part of the instance is being imported
1578
1579     """
1580     return self._SingleNodeCall(node, "export_start",
1581                                 [opts.ToDict(), host, port,
1582                                  self._InstDict(instance),
1583                                  component, source,
1584                                  _EncodeImportExportIO(source, source_args)])