Implement grow dry-run at RPC level
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37 import pycurl
38 import threading
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48
49 # pylint has a bug here, doesn't see this import
50 import ganeti.http.client  # pylint: disable-msg=W0611
51
52
53 # Timeout for connecting to nodes (seconds)
54 _RPC_CONNECT_TIMEOUT = 5
55
56 _RPC_CLIENT_HEADERS = [
57   "Content-type: %s" % http.HTTP_APP_JSON,
58   "Expect:",
59   ]
60
61 # Various time constants for the timeout table
62 _TMO_URGENT = 60 # one minute
63 _TMO_FAST = 5 * 60 # five minutes
64 _TMO_NORMAL = 15 * 60 # 15 minutes
65 _TMO_SLOW = 3600 # one hour
66 _TMO_4HRS = 4 * 3600
67 _TMO_1DAY = 86400
68
69 # Timeout table that will be built later by decorators
70 # Guidelines for choosing timeouts:
71 # - call used during watcher: timeout -> 1min, _TMO_URGENT
72 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
73 # - other calls: 15 min, _TMO_NORMAL
74 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
75
76 _TIMEOUTS = {
77 }
78
79
80 def Init():
81   """Initializes the module-global HTTP client manager.
82
83   Must be called before using any RPC function and while exactly one thread is
84   running.
85
86   """
87   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
88   # one thread running. This check is just a safety measure -- it doesn't
89   # cover all cases.
90   assert threading.activeCount() == 1, \
91          "Found more than one active thread when initializing pycURL"
92
93   logging.info("Using PycURL %s", pycurl.version)
94
95   pycurl.global_init(pycurl.GLOBAL_ALL)
96
97
98 def Shutdown():
99   """Stops the module-global HTTP client manager.
100
101   Must be called before quitting the program and while exactly one thread is
102   running.
103
104   """
105   pycurl.global_cleanup()
106
107
108 def _ConfigRpcCurl(curl):
109   noded_cert = str(constants.NODED_CERT_FILE)
110
111   curl.setopt(pycurl.FOLLOWLOCATION, False)
112   curl.setopt(pycurl.CAINFO, noded_cert)
113   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
114   curl.setopt(pycurl.SSL_VERIFYPEER, True)
115   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
116   curl.setopt(pycurl.SSLCERT, noded_cert)
117   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
118   curl.setopt(pycurl.SSLKEY, noded_cert)
119   curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
120
121
122 # Aliasing this module avoids the following warning by epydoc: "Warning: No
123 # information available for ganeti.rpc._RpcThreadLocal's base threading.local"
124 _threading = threading
125
126
127 class _RpcThreadLocal(_threading.local):
128   def GetHttpClientPool(self):
129     """Returns a per-thread HTTP client pool.
130
131     @rtype: L{http.client.HttpClientPool}
132
133     """
134     try:
135       pool = self.hcp
136     except AttributeError:
137       pool = http.client.HttpClientPool(_ConfigRpcCurl)
138       self.hcp = pool
139
140     return pool
141
142
143 # Remove module alias (see above)
144 del _threading
145
146
147 _thread_local = _RpcThreadLocal()
148
149
150 def _RpcTimeout(secs):
151   """Timeout decorator.
152
153   When applied to a rpc call_* function, it updates the global timeout
154   table with the given function/timeout.
155
156   """
157   def decorator(f):
158     name = f.__name__
159     assert name.startswith("call_")
160     _TIMEOUTS[name[len("call_"):]] = secs
161     return f
162   return decorator
163
164
165 def RunWithRPC(fn):
166   """RPC-wrapper decorator.
167
168   When applied to a function, it runs it with the RPC system
169   initialized, and it shutsdown the system afterwards. This means the
170   function must be called without RPC being initialized.
171
172   """
173   def wrapper(*args, **kwargs):
174     Init()
175     try:
176       return fn(*args, **kwargs)
177     finally:
178       Shutdown()
179   return wrapper
180
181
182 class RpcResult(object):
183   """RPC Result class.
184
185   This class holds an RPC result. It is needed since in multi-node
186   calls we can't raise an exception just because one one out of many
187   failed, and therefore we use this class to encapsulate the result.
188
189   @ivar data: the data payload, for successful results, or None
190   @ivar call: the name of the RPC call
191   @ivar node: the name of the node to which we made the call
192   @ivar offline: whether the operation failed because the node was
193       offline, as opposed to actual failure; offline=True will always
194       imply failed=True, in order to allow simpler checking if
195       the user doesn't care about the exact failure mode
196   @ivar fail_msg: the error message if the call failed
197
198   """
199   def __init__(self, data=None, failed=False, offline=False,
200                call=None, node=None):
201     self.offline = offline
202     self.call = call
203     self.node = node
204
205     if offline:
206       self.fail_msg = "Node is marked offline"
207       self.data = self.payload = None
208     elif failed:
209       self.fail_msg = self._EnsureErr(data)
210       self.data = self.payload = None
211     else:
212       self.data = data
213       if not isinstance(self.data, (tuple, list)):
214         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
215                          type(self.data))
216         self.payload = None
217       elif len(data) != 2:
218         self.fail_msg = ("RPC layer error: invalid result length (%d), "
219                          "expected 2" % len(self.data))
220         self.payload = None
221       elif not self.data[0]:
222         self.fail_msg = self._EnsureErr(self.data[1])
223         self.payload = None
224       else:
225         # finally success
226         self.fail_msg = None
227         self.payload = data[1]
228
229     for attr_name in ["call", "data", "fail_msg",
230                       "node", "offline", "payload"]:
231       assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
232
233   @staticmethod
234   def _EnsureErr(val):
235     """Helper to ensure we return a 'True' value for error."""
236     if val:
237       return val
238     else:
239       return "No error information"
240
241   def Raise(self, msg, prereq=False, ecode=None):
242     """If the result has failed, raise an OpExecError.
243
244     This is used so that LU code doesn't have to check for each
245     result, but instead can call this function.
246
247     """
248     if not self.fail_msg:
249       return
250
251     if not msg: # one could pass None for default message
252       msg = ("Call '%s' to node '%s' has failed: %s" %
253              (self.call, self.node, self.fail_msg))
254     else:
255       msg = "%s: %s" % (msg, self.fail_msg)
256     if prereq:
257       ec = errors.OpPrereqError
258     else:
259       ec = errors.OpExecError
260     if ecode is not None:
261       args = (msg, ecode)
262     else:
263       args = (msg, )
264     raise ec(*args) # pylint: disable-msg=W0142
265
266
267 def _AddressLookup(node_list,
268                    ssc=ssconf.SimpleStore,
269                    nslookup_fn=netutils.Hostname.GetIP):
270   """Return addresses for given node names.
271
272   @type node_list: list
273   @param node_list: List of node names
274   @type ssc: class
275   @param ssc: SimpleStore class that is used to obtain node->ip mappings
276   @type nslookup_fn: callable
277   @param nslookup_fn: function use to do NS lookup
278   @rtype: list of addresses and/or None's
279   @returns: List of corresponding addresses, if found
280
281   """
282   ss = ssc()
283   iplist = ss.GetNodePrimaryIPList()
284   family = ss.GetPrimaryIPFamily()
285   addresses = []
286   ipmap = dict(entry.split() for entry in iplist)
287   for node in node_list:
288     address = ipmap.get(node)
289     if address is None:
290       address = nslookup_fn(node, family=family)
291     addresses.append(address)
292
293   return addresses
294
295
296 class Client:
297   """RPC Client class.
298
299   This class, given a (remote) method name, a list of parameters and a
300   list of nodes, will contact (in parallel) all nodes, and return a
301   dict of results (key: node name, value: result).
302
303   One current bug is that generic failure is still signaled by
304   'False' result, which is not good. This overloading of values can
305   cause bugs.
306
307   """
308   def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
309     assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
310                                     " timeouts table")
311     self.procedure = procedure
312     self.body = body
313     self.port = port
314     self._request = {}
315     self._address_lookup_fn = address_lookup_fn
316
317   def ConnectList(self, node_list, address_list=None, read_timeout=None):
318     """Add a list of nodes to the target nodes.
319
320     @type node_list: list
321     @param node_list: the list of node names to connect
322     @type address_list: list or None
323     @keyword address_list: either None or a list with node addresses,
324         which must have the same length as the node list
325     @type read_timeout: int
326     @param read_timeout: overwrites default timeout for operation
327
328     """
329     if address_list is None:
330       # Always use IP address instead of node name
331       address_list = self._address_lookup_fn(node_list)
332
333     assert len(node_list) == len(address_list), \
334            "Name and address lists must have the same length"
335
336     for node, address in zip(node_list, address_list):
337       self.ConnectNode(node, address, read_timeout=read_timeout)
338
339   def ConnectNode(self, name, address=None, read_timeout=None):
340     """Add a node to the target list.
341
342     @type name: str
343     @param name: the node name
344     @type address: str
345     @param address: the node address, if known
346     @type read_timeout: int
347     @param read_timeout: overwrites default timeout for operation
348
349     """
350     if address is None:
351       # Always use IP address instead of node name
352       address = self._address_lookup_fn([name])[0]
353
354     assert(address is not None)
355
356     if read_timeout is None:
357       read_timeout = _TIMEOUTS[self.procedure]
358
359     self._request[name] = \
360       http.client.HttpClientRequest(str(address), self.port,
361                                     http.HTTP_PUT, str("/%s" % self.procedure),
362                                     headers=_RPC_CLIENT_HEADERS,
363                                     post_data=str(self.body),
364                                     read_timeout=read_timeout)
365
366   def GetResults(self, http_pool=None):
367     """Call nodes and return results.
368
369     @rtype: list
370     @return: List of RPC results
371
372     """
373     if not http_pool:
374       http_pool = _thread_local.GetHttpClientPool()
375
376     http_pool.ProcessRequests(self._request.values())
377
378     results = {}
379
380     for name, req in self._request.iteritems():
381       if req.success and req.resp_status_code == http.HTTP_OK:
382         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
383                                   node=name, call=self.procedure)
384         continue
385
386       # TODO: Better error reporting
387       if req.error:
388         msg = req.error
389       else:
390         msg = req.resp_body
391
392       logging.error("RPC error in %s from node %s: %s",
393                     self.procedure, name, msg)
394       results[name] = RpcResult(data=msg, failed=True, node=name,
395                                 call=self.procedure)
396
397     return results
398
399
400 def _EncodeImportExportIO(ieio, ieioargs):
401   """Encodes import/export I/O information.
402
403   """
404   if ieio == constants.IEIO_RAW_DISK:
405     assert len(ieioargs) == 1
406     return (ieioargs[0].ToDict(), )
407
408   if ieio == constants.IEIO_SCRIPT:
409     assert len(ieioargs) == 2
410     return (ieioargs[0].ToDict(), ieioargs[1])
411
412   return ieioargs
413
414
415 class RpcRunner(object):
416   """RPC runner class"""
417
418   def __init__(self, cfg):
419     """Initialized the rpc runner.
420
421     @type cfg:  C{config.ConfigWriter}
422     @param cfg: the configuration object that will be used to get data
423                 about the cluster
424
425     """
426     self._cfg = cfg
427     self.port = netutils.GetDaemonPort(constants.NODED)
428
429   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
430     """Convert the given instance to a dict.
431
432     This is done via the instance's ToDict() method and additionally
433     we fill the hvparams with the cluster defaults.
434
435     @type instance: L{objects.Instance}
436     @param instance: an Instance object
437     @type hvp: dict or None
438     @param hvp: a dictionary with overridden hypervisor parameters
439     @type bep: dict or None
440     @param bep: a dictionary with overridden backend parameters
441     @type osp: dict or None
442     @param osp: a dictionary with overridden os parameters
443     @rtype: dict
444     @return: the instance dict, with the hvparams filled with the
445         cluster defaults
446
447     """
448     idict = instance.ToDict()
449     cluster = self._cfg.GetClusterInfo()
450     idict["hvparams"] = cluster.FillHV(instance)
451     if hvp is not None:
452       idict["hvparams"].update(hvp)
453     idict["beparams"] = cluster.FillBE(instance)
454     if bep is not None:
455       idict["beparams"].update(bep)
456     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
457     if osp is not None:
458       idict["osparams"].update(osp)
459     for nic in idict["nics"]:
460       nic['nicparams'] = objects.FillDict(
461         cluster.nicparams[constants.PP_DEFAULT],
462         nic['nicparams'])
463     return idict
464
465   def _ConnectList(self, client, node_list, call, read_timeout=None):
466     """Helper for computing node addresses.
467
468     @type client: L{ganeti.rpc.Client}
469     @param client: a C{Client} instance
470     @type node_list: list
471     @param node_list: the node list we should connect
472     @type call: string
473     @param call: the name of the remote procedure call, for filling in
474         correctly any eventual offline nodes' results
475     @type read_timeout: int
476     @param read_timeout: overwrites the default read timeout for the
477         given operation
478
479     """
480     all_nodes = self._cfg.GetAllNodesInfo()
481     name_list = []
482     addr_list = []
483     skip_dict = {}
484     for node in node_list:
485       if node in all_nodes:
486         if all_nodes[node].offline:
487           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
488           continue
489         val = all_nodes[node].primary_ip
490       else:
491         val = None
492       addr_list.append(val)
493       name_list.append(node)
494     if name_list:
495       client.ConnectList(name_list, address_list=addr_list,
496                          read_timeout=read_timeout)
497     return skip_dict
498
499   def _ConnectNode(self, client, node, call, read_timeout=None):
500     """Helper for computing one node's address.
501
502     @type client: L{ganeti.rpc.Client}
503     @param client: a C{Client} instance
504     @type node: str
505     @param node: the node we should connect
506     @type call: string
507     @param call: the name of the remote procedure call, for filling in
508         correctly any eventual offline nodes' results
509     @type read_timeout: int
510     @param read_timeout: overwrites the default read timeout for the
511         given operation
512
513     """
514     node_info = self._cfg.GetNodeInfo(node)
515     if node_info is not None:
516       if node_info.offline:
517         return RpcResult(node=node, offline=True, call=call)
518       addr = node_info.primary_ip
519     else:
520       addr = None
521     client.ConnectNode(node, address=addr, read_timeout=read_timeout)
522
523   def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
524     """Helper for making a multi-node call
525
526     """
527     body = serializer.DumpJson(args, indent=False)
528     c = Client(procedure, body, self.port)
529     skip_dict = self._ConnectList(c, node_list, procedure,
530                                   read_timeout=read_timeout)
531     skip_dict.update(c.GetResults())
532     return skip_dict
533
534   @classmethod
535   def _StaticMultiNodeCall(cls, node_list, procedure, args,
536                            address_list=None, read_timeout=None):
537     """Helper for making a multi-node static call
538
539     """
540     body = serializer.DumpJson(args, indent=False)
541     c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
542     c.ConnectList(node_list, address_list=address_list,
543                   read_timeout=read_timeout)
544     return c.GetResults()
545
546   def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
547     """Helper for making a single-node call
548
549     """
550     body = serializer.DumpJson(args, indent=False)
551     c = Client(procedure, body, self.port)
552     result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
553     if result is None:
554       # we did connect, node is not offline
555       result = c.GetResults()[node]
556     return result
557
558   @classmethod
559   def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
560     """Helper for making a single-node static call
561
562     """
563     body = serializer.DumpJson(args, indent=False)
564     c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
565     c.ConnectNode(node, read_timeout=read_timeout)
566     return c.GetResults()[node]
567
568   @staticmethod
569   def _Compress(data):
570     """Compresses a string for transport over RPC.
571
572     Small amounts of data are not compressed.
573
574     @type data: str
575     @param data: Data
576     @rtype: tuple
577     @return: Encoded data to send
578
579     """
580     # Small amounts of data are not compressed
581     if len(data) < 512:
582       return (constants.RPC_ENCODING_NONE, data)
583
584     # Compress with zlib and encode in base64
585     return (constants.RPC_ENCODING_ZLIB_BASE64,
586             base64.b64encode(zlib.compress(data, 3)))
587
588   #
589   # Begin RPC calls
590   #
591
592   @_RpcTimeout(_TMO_URGENT)
593   def call_bdev_sizes(self, node_list, devices):
594     """Gets the sizes of requested block devices present on a node
595
596     This is a multi-node call.
597
598     """
599     return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
600
601   @_RpcTimeout(_TMO_URGENT)
602   def call_lv_list(self, node_list, vg_name):
603     """Gets the logical volumes present in a given volume group.
604
605     This is a multi-node call.
606
607     """
608     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
609
610   @_RpcTimeout(_TMO_URGENT)
611   def call_vg_list(self, node_list):
612     """Gets the volume group list.
613
614     This is a multi-node call.
615
616     """
617     return self._MultiNodeCall(node_list, "vg_list", [])
618
619   @_RpcTimeout(_TMO_NORMAL)
620   def call_storage_list(self, node_list, su_name, su_args, name, fields):
621     """Get list of storage units.
622
623     This is a multi-node call.
624
625     """
626     return self._MultiNodeCall(node_list, "storage_list",
627                                [su_name, su_args, name, fields])
628
629   @_RpcTimeout(_TMO_NORMAL)
630   def call_storage_modify(self, node, su_name, su_args, name, changes):
631     """Modify a storage unit.
632
633     This is a single-node call.
634
635     """
636     return self._SingleNodeCall(node, "storage_modify",
637                                 [su_name, su_args, name, changes])
638
639   @_RpcTimeout(_TMO_NORMAL)
640   def call_storage_execute(self, node, su_name, su_args, name, op):
641     """Executes an operation on a storage unit.
642
643     This is a single-node call.
644
645     """
646     return self._SingleNodeCall(node, "storage_execute",
647                                 [su_name, su_args, name, op])
648
649   @_RpcTimeout(_TMO_URGENT)
650   def call_bridges_exist(self, node, bridges_list):
651     """Checks if a node has all the bridges given.
652
653     This method checks if all bridges given in the bridges_list are
654     present on the remote node, so that an instance that uses interfaces
655     on those bridges can be started.
656
657     This is a single-node call.
658
659     """
660     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
661
662   @_RpcTimeout(_TMO_NORMAL)
663   def call_instance_start(self, node, instance, hvp, bep):
664     """Starts an instance.
665
666     This is a single-node call.
667
668     """
669     idict = self._InstDict(instance, hvp=hvp, bep=bep)
670     return self._SingleNodeCall(node, "instance_start", [idict])
671
672   @_RpcTimeout(_TMO_NORMAL)
673   def call_instance_shutdown(self, node, instance, timeout):
674     """Stops an instance.
675
676     This is a single-node call.
677
678     """
679     return self._SingleNodeCall(node, "instance_shutdown",
680                                 [self._InstDict(instance), timeout])
681
682   @_RpcTimeout(_TMO_NORMAL)
683   def call_migration_info(self, node, instance):
684     """Gather the information necessary to prepare an instance migration.
685
686     This is a single-node call.
687
688     @type node: string
689     @param node: the node on which the instance is currently running
690     @type instance: C{objects.Instance}
691     @param instance: the instance definition
692
693     """
694     return self._SingleNodeCall(node, "migration_info",
695                                 [self._InstDict(instance)])
696
697   @_RpcTimeout(_TMO_NORMAL)
698   def call_accept_instance(self, node, instance, info, target):
699     """Prepare a node to accept an instance.
700
701     This is a single-node call.
702
703     @type node: string
704     @param node: the target node for the migration
705     @type instance: C{objects.Instance}
706     @param instance: the instance definition
707     @type info: opaque/hypervisor specific (string/data)
708     @param info: result for the call_migration_info call
709     @type target: string
710     @param target: target hostname (usually ip address) (on the node itself)
711
712     """
713     return self._SingleNodeCall(node, "accept_instance",
714                                 [self._InstDict(instance), info, target])
715
716   @_RpcTimeout(_TMO_NORMAL)
717   def call_finalize_migration(self, node, instance, info, success):
718     """Finalize any target-node migration specific operation.
719
720     This is called both in case of a successful migration and in case of error
721     (in which case it should abort the migration).
722
723     This is a single-node call.
724
725     @type node: string
726     @param node: the target node for the migration
727     @type instance: C{objects.Instance}
728     @param instance: the instance definition
729     @type info: opaque/hypervisor specific (string/data)
730     @param info: result for the call_migration_info call
731     @type success: boolean
732     @param success: whether the migration was a success or a failure
733
734     """
735     return self._SingleNodeCall(node, "finalize_migration",
736                                 [self._InstDict(instance), info, success])
737
738   @_RpcTimeout(_TMO_SLOW)
739   def call_instance_migrate(self, node, instance, target, live):
740     """Migrate an instance.
741
742     This is a single-node call.
743
744     @type node: string
745     @param node: the node on which the instance is currently running
746     @type instance: C{objects.Instance}
747     @param instance: the instance definition
748     @type target: string
749     @param target: the target node name
750     @type live: boolean
751     @param live: whether the migration should be done live or not (the
752         interpretation of this parameter is left to the hypervisor)
753
754     """
755     return self._SingleNodeCall(node, "instance_migrate",
756                                 [self._InstDict(instance), target, live])
757
758   @_RpcTimeout(_TMO_NORMAL)
759   def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
760     """Reboots an instance.
761
762     This is a single-node call.
763
764     """
765     return self._SingleNodeCall(node, "instance_reboot",
766                                 [self._InstDict(inst), reboot_type,
767                                  shutdown_timeout])
768
769   @_RpcTimeout(_TMO_1DAY)
770   def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
771     """Installs an OS on the given instance.
772
773     This is a single-node call.
774
775     """
776     return self._SingleNodeCall(node, "instance_os_add",
777                                 [self._InstDict(inst, osp=osparams),
778                                  reinstall, debug])
779
780   @_RpcTimeout(_TMO_SLOW)
781   def call_instance_run_rename(self, node, inst, old_name, debug):
782     """Run the OS rename script for an instance.
783
784     This is a single-node call.
785
786     """
787     return self._SingleNodeCall(node, "instance_run_rename",
788                                 [self._InstDict(inst), old_name, debug])
789
790   @_RpcTimeout(_TMO_URGENT)
791   def call_instance_info(self, node, instance, hname):
792     """Returns information about a single instance.
793
794     This is a single-node call.
795
796     @type node: list
797     @param node: the list of nodes to query
798     @type instance: string
799     @param instance: the instance name
800     @type hname: string
801     @param hname: the hypervisor type of the instance
802
803     """
804     return self._SingleNodeCall(node, "instance_info", [instance, hname])
805
806   @_RpcTimeout(_TMO_NORMAL)
807   def call_instance_migratable(self, node, instance):
808     """Checks whether the given instance can be migrated.
809
810     This is a single-node call.
811
812     @param node: the node to query
813     @type instance: L{objects.Instance}
814     @param instance: the instance to check
815
816
817     """
818     return self._SingleNodeCall(node, "instance_migratable",
819                                 [self._InstDict(instance)])
820
821   @_RpcTimeout(_TMO_URGENT)
822   def call_all_instances_info(self, node_list, hypervisor_list):
823     """Returns information about all instances on the given nodes.
824
825     This is a multi-node call.
826
827     @type node_list: list
828     @param node_list: the list of nodes to query
829     @type hypervisor_list: list
830     @param hypervisor_list: the hypervisors to query for instances
831
832     """
833     return self._MultiNodeCall(node_list, "all_instances_info",
834                                [hypervisor_list])
835
836   @_RpcTimeout(_TMO_URGENT)
837   def call_instance_list(self, node_list, hypervisor_list):
838     """Returns the list of running instances on a given node.
839
840     This is a multi-node call.
841
842     @type node_list: list
843     @param node_list: the list of nodes to query
844     @type hypervisor_list: list
845     @param hypervisor_list: the hypervisors to query for instances
846
847     """
848     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
849
850   @_RpcTimeout(_TMO_FAST)
851   def call_node_tcp_ping(self, node, source, target, port, timeout,
852                          live_port_needed):
853     """Do a TcpPing on the remote node
854
855     This is a single-node call.
856
857     """
858     return self._SingleNodeCall(node, "node_tcp_ping",
859                                 [source, target, port, timeout,
860                                  live_port_needed])
861
862   @_RpcTimeout(_TMO_FAST)
863   def call_node_has_ip_address(self, node, address):
864     """Checks if a node has the given IP address.
865
866     This is a single-node call.
867
868     """
869     return self._SingleNodeCall(node, "node_has_ip_address", [address])
870
871   @_RpcTimeout(_TMO_URGENT)
872   def call_node_info(self, node_list, vg_name, hypervisor_type):
873     """Return node information.
874
875     This will return memory information and volume group size and free
876     space.
877
878     This is a multi-node call.
879
880     @type node_list: list
881     @param node_list: the list of nodes to query
882     @type vg_name: C{string}
883     @param vg_name: the name of the volume group to ask for disk space
884         information
885     @type hypervisor_type: C{str}
886     @param hypervisor_type: the name of the hypervisor to ask for
887         memory information
888
889     """
890     return self._MultiNodeCall(node_list, "node_info",
891                                [vg_name, hypervisor_type])
892
893   @_RpcTimeout(_TMO_NORMAL)
894   def call_etc_hosts_modify(self, node, mode, name, ip):
895     """Modify hosts file with name
896
897     @type node: string
898     @param node: The node to call
899     @type mode: string
900     @param mode: The mode to operate. Currently "add" or "remove"
901     @type name: string
902     @param name: The host name to be modified
903     @type ip: string
904     @param ip: The ip of the entry (just valid if mode is "add")
905
906     """
907     return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
908
909   @_RpcTimeout(_TMO_NORMAL)
910   def call_node_verify(self, node_list, checkdict, cluster_name):
911     """Request verification of given parameters.
912
913     This is a multi-node call.
914
915     """
916     return self._MultiNodeCall(node_list, "node_verify",
917                                [checkdict, cluster_name])
918
919   @classmethod
920   @_RpcTimeout(_TMO_FAST)
921   def call_node_start_master(cls, node, start_daemons, no_voting):
922     """Tells a node to activate itself as a master.
923
924     This is a single-node call.
925
926     """
927     return cls._StaticSingleNodeCall(node, "node_start_master",
928                                      [start_daemons, no_voting])
929
930   @classmethod
931   @_RpcTimeout(_TMO_FAST)
932   def call_node_stop_master(cls, node, stop_daemons):
933     """Tells a node to demote itself from master status.
934
935     This is a single-node call.
936
937     """
938     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
939
940   @classmethod
941   @_RpcTimeout(_TMO_URGENT)
942   def call_master_info(cls, node_list):
943     """Query master info.
944
945     This is a multi-node call.
946
947     """
948     # TODO: should this method query down nodes?
949     return cls._StaticMultiNodeCall(node_list, "master_info", [])
950
951   @classmethod
952   @_RpcTimeout(_TMO_URGENT)
953   def call_version(cls, node_list):
954     """Query node version.
955
956     This is a multi-node call.
957
958     """
959     return cls._StaticMultiNodeCall(node_list, "version", [])
960
961   @_RpcTimeout(_TMO_NORMAL)
962   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
963     """Request creation of a given block device.
964
965     This is a single-node call.
966
967     """
968     return self._SingleNodeCall(node, "blockdev_create",
969                                 [bdev.ToDict(), size, owner, on_primary, info])
970
971   @_RpcTimeout(_TMO_SLOW)
972   def call_blockdev_wipe(self, node, bdev, offset, size):
973     """Request wipe at given offset with given size of a block device.
974
975     This is a single-node call.
976
977     """
978     return self._SingleNodeCall(node, "blockdev_wipe",
979                                 [bdev.ToDict(), offset, size])
980
981   @_RpcTimeout(_TMO_NORMAL)
982   def call_blockdev_remove(self, node, bdev):
983     """Request removal of a given block device.
984
985     This is a single-node call.
986
987     """
988     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
989
990   @_RpcTimeout(_TMO_NORMAL)
991   def call_blockdev_rename(self, node, devlist):
992     """Request rename of the given block devices.
993
994     This is a single-node call.
995
996     """
997     return self._SingleNodeCall(node, "blockdev_rename",
998                                 [(d.ToDict(), uid) for d, uid in devlist])
999
1000   @_RpcTimeout(_TMO_NORMAL)
1001   def call_blockdev_pause_resume_sync(self, node, disks, pause):
1002     """Request a pause/resume of given block device.
1003
1004     This is a single-node call.
1005
1006     """
1007     return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1008                                 [[bdev.ToDict() for bdev in disks], pause])
1009
1010   @_RpcTimeout(_TMO_NORMAL)
1011   def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1012     """Request assembling of a given block device.
1013
1014     This is a single-node call.
1015
1016     """
1017     return self._SingleNodeCall(node, "blockdev_assemble",
1018                                 [disk.ToDict(), owner, on_primary, idx])
1019
1020   @_RpcTimeout(_TMO_NORMAL)
1021   def call_blockdev_shutdown(self, node, disk):
1022     """Request shutdown of a given block device.
1023
1024     This is a single-node call.
1025
1026     """
1027     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1028
1029   @_RpcTimeout(_TMO_NORMAL)
1030   def call_blockdev_addchildren(self, node, bdev, ndevs):
1031     """Request adding a list of children to a (mirroring) device.
1032
1033     This is a single-node call.
1034
1035     """
1036     return self._SingleNodeCall(node, "blockdev_addchildren",
1037                                 [bdev.ToDict(),
1038                                  [disk.ToDict() for disk in ndevs]])
1039
1040   @_RpcTimeout(_TMO_NORMAL)
1041   def call_blockdev_removechildren(self, node, bdev, ndevs):
1042     """Request removing a list of children from a (mirroring) device.
1043
1044     This is a single-node call.
1045
1046     """
1047     return self._SingleNodeCall(node, "blockdev_removechildren",
1048                                 [bdev.ToDict(),
1049                                  [disk.ToDict() for disk in ndevs]])
1050
1051   @_RpcTimeout(_TMO_NORMAL)
1052   def call_blockdev_getmirrorstatus(self, node, disks):
1053     """Request status of a (mirroring) device.
1054
1055     This is a single-node call.
1056
1057     """
1058     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1059                                   [dsk.ToDict() for dsk in disks])
1060     if not result.fail_msg:
1061       result.payload = [objects.BlockDevStatus.FromDict(i)
1062                         for i in result.payload]
1063     return result
1064
1065   @_RpcTimeout(_TMO_NORMAL)
1066   def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1067     """Request status of (mirroring) devices from multiple nodes.
1068
1069     This is a multi-node call.
1070
1071     """
1072     result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1073                                  [dict((name, [dsk.ToDict() for dsk in disks])
1074                                        for name, disks in node_disks.items())])
1075     for nres in result.values():
1076       if nres.fail_msg:
1077         continue
1078
1079       for idx, (success, status) in enumerate(nres.payload):
1080         if success:
1081           nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1082
1083     return result
1084
1085   @_RpcTimeout(_TMO_NORMAL)
1086   def call_blockdev_find(self, node, disk):
1087     """Request identification of a given block device.
1088
1089     This is a single-node call.
1090
1091     """
1092     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1093     if not result.fail_msg and result.payload is not None:
1094       result.payload = objects.BlockDevStatus.FromDict(result.payload)
1095     return result
1096
1097   @_RpcTimeout(_TMO_NORMAL)
1098   def call_blockdev_close(self, node, instance_name, disks):
1099     """Closes the given block devices.
1100
1101     This is a single-node call.
1102
1103     """
1104     params = [instance_name, [cf.ToDict() for cf in disks]]
1105     return self._SingleNodeCall(node, "blockdev_close", params)
1106
1107   @_RpcTimeout(_TMO_NORMAL)
1108   def call_blockdev_getsize(self, node, disks):
1109     """Returns the size of the given disks.
1110
1111     This is a single-node call.
1112
1113     """
1114     params = [[cf.ToDict() for cf in disks]]
1115     return self._SingleNodeCall(node, "blockdev_getsize", params)
1116
1117   @_RpcTimeout(_TMO_NORMAL)
1118   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1119     """Disconnects the network of the given drbd devices.
1120
1121     This is a multi-node call.
1122
1123     """
1124     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1125                                [nodes_ip, [cf.ToDict() for cf in disks]])
1126
1127   @_RpcTimeout(_TMO_NORMAL)
1128   def call_drbd_attach_net(self, node_list, nodes_ip,
1129                            disks, instance_name, multimaster):
1130     """Disconnects the given drbd devices.
1131
1132     This is a multi-node call.
1133
1134     """
1135     return self._MultiNodeCall(node_list, "drbd_attach_net",
1136                                [nodes_ip, [cf.ToDict() for cf in disks],
1137                                 instance_name, multimaster])
1138
1139   @_RpcTimeout(_TMO_SLOW)
1140   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1141     """Waits for the synchronization of drbd devices is complete.
1142
1143     This is a multi-node call.
1144
1145     """
1146     return self._MultiNodeCall(node_list, "drbd_wait_sync",
1147                                [nodes_ip, [cf.ToDict() for cf in disks]])
1148
1149   @_RpcTimeout(_TMO_URGENT)
1150   def call_drbd_helper(self, node_list):
1151     """Gets drbd helper.
1152
1153     This is a multi-node call.
1154
1155     """
1156     return self._MultiNodeCall(node_list, "drbd_helper", [])
1157
1158   @classmethod
1159   @_RpcTimeout(_TMO_NORMAL)
1160   def call_upload_file(cls, node_list, file_name, address_list=None):
1161     """Upload a file.
1162
1163     The node will refuse the operation in case the file is not on the
1164     approved file list.
1165
1166     This is a multi-node call.
1167
1168     @type node_list: list
1169     @param node_list: the list of node names to upload to
1170     @type file_name: str
1171     @param file_name: the filename to upload
1172     @type address_list: list or None
1173     @keyword address_list: an optional list of node addresses, in order
1174         to optimize the RPC speed
1175
1176     """
1177     file_contents = utils.ReadFile(file_name)
1178     data = cls._Compress(file_contents)
1179     st = os.stat(file_name)
1180     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1181               st.st_atime, st.st_mtime]
1182     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1183                                     address_list=address_list)
1184
1185   @classmethod
1186   @_RpcTimeout(_TMO_NORMAL)
1187   def call_write_ssconf_files(cls, node_list, values):
1188     """Write ssconf files.
1189
1190     This is a multi-node call.
1191
1192     """
1193     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1194
1195   @_RpcTimeout(_TMO_NORMAL)
1196   def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1197     """Runs OOB.
1198
1199     This is a single-node call.
1200
1201     """
1202     return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1203                                                   remote_node, timeout])
1204
1205   @_RpcTimeout(_TMO_FAST)
1206   def call_os_diagnose(self, node_list):
1207     """Request a diagnose of OS definitions.
1208
1209     This is a multi-node call.
1210
1211     """
1212     return self._MultiNodeCall(node_list, "os_diagnose", [])
1213
1214   @_RpcTimeout(_TMO_FAST)
1215   def call_os_get(self, node, name):
1216     """Returns an OS definition.
1217
1218     This is a single-node call.
1219
1220     """
1221     result = self._SingleNodeCall(node, "os_get", [name])
1222     if not result.fail_msg and isinstance(result.payload, dict):
1223       result.payload = objects.OS.FromDict(result.payload)
1224     return result
1225
1226   @_RpcTimeout(_TMO_FAST)
1227   def call_os_validate(self, required, nodes, name, checks, params):
1228     """Run a validation routine for a given OS.
1229
1230     This is a multi-node call.
1231
1232     """
1233     return self._MultiNodeCall(nodes, "os_validate",
1234                                [required, name, checks, params])
1235
1236   @_RpcTimeout(_TMO_NORMAL)
1237   def call_hooks_runner(self, node_list, hpath, phase, env):
1238     """Call the hooks runner.
1239
1240     Args:
1241       - op: the OpCode instance
1242       - env: a dictionary with the environment
1243
1244     This is a multi-node call.
1245
1246     """
1247     params = [hpath, phase, env]
1248     return self._MultiNodeCall(node_list, "hooks_runner", params)
1249
1250   @_RpcTimeout(_TMO_NORMAL)
1251   def call_iallocator_runner(self, node, name, idata):
1252     """Call an iallocator on a remote node
1253
1254     Args:
1255       - name: the iallocator name
1256       - input: the json-encoded input string
1257
1258     This is a single-node call.
1259
1260     """
1261     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1262
1263   @_RpcTimeout(_TMO_NORMAL)
1264   def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
1265     """Request a snapshot of the given block device.
1266
1267     This is a single-node call.
1268
1269     """
1270     return self._SingleNodeCall(node, "blockdev_grow",
1271                                 [cf_bdev.ToDict(), amount, dryrun])
1272
1273   @_RpcTimeout(_TMO_1DAY)
1274   def call_blockdev_export(self, node, cf_bdev,
1275                            dest_node, dest_path, cluster_name):
1276     """Export a given disk to another node.
1277
1278     This is a single-node call.
1279
1280     """
1281     return self._SingleNodeCall(node, "blockdev_export",
1282                                 [cf_bdev.ToDict(), dest_node, dest_path,
1283                                  cluster_name])
1284
1285   @_RpcTimeout(_TMO_NORMAL)
1286   def call_blockdev_snapshot(self, node, cf_bdev):
1287     """Request a snapshot of the given block device.
1288
1289     This is a single-node call.
1290
1291     """
1292     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1293
1294   @_RpcTimeout(_TMO_NORMAL)
1295   def call_finalize_export(self, node, instance, snap_disks):
1296     """Request the completion of an export operation.
1297
1298     This writes the export config file, etc.
1299
1300     This is a single-node call.
1301
1302     """
1303     flat_disks = []
1304     for disk in snap_disks:
1305       if isinstance(disk, bool):
1306         flat_disks.append(disk)
1307       else:
1308         flat_disks.append(disk.ToDict())
1309
1310     return self._SingleNodeCall(node, "finalize_export",
1311                                 [self._InstDict(instance), flat_disks])
1312
1313   @_RpcTimeout(_TMO_FAST)
1314   def call_export_info(self, node, path):
1315     """Queries the export information in a given path.
1316
1317     This is a single-node call.
1318
1319     """
1320     return self._SingleNodeCall(node, "export_info", [path])
1321
1322   @_RpcTimeout(_TMO_FAST)
1323   def call_export_list(self, node_list):
1324     """Gets the stored exports list.
1325
1326     This is a multi-node call.
1327
1328     """
1329     return self._MultiNodeCall(node_list, "export_list", [])
1330
1331   @_RpcTimeout(_TMO_FAST)
1332   def call_export_remove(self, node, export):
1333     """Requests removal of a given export.
1334
1335     This is a single-node call.
1336
1337     """
1338     return self._SingleNodeCall(node, "export_remove", [export])
1339
1340   @classmethod
1341   @_RpcTimeout(_TMO_NORMAL)
1342   def call_node_leave_cluster(cls, node, modify_ssh_setup):
1343     """Requests a node to clean the cluster information it has.
1344
1345     This will remove the configuration information from the ganeti data
1346     dir.
1347
1348     This is a single-node call.
1349
1350     """
1351     return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1352                                      [modify_ssh_setup])
1353
1354   @_RpcTimeout(_TMO_FAST)
1355   def call_node_volumes(self, node_list):
1356     """Gets all volumes on node(s).
1357
1358     This is a multi-node call.
1359
1360     """
1361     return self._MultiNodeCall(node_list, "node_volumes", [])
1362
1363   @_RpcTimeout(_TMO_FAST)
1364   def call_node_demote_from_mc(self, node):
1365     """Demote a node from the master candidate role.
1366
1367     This is a single-node call.
1368
1369     """
1370     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1371
1372   @_RpcTimeout(_TMO_NORMAL)
1373   def call_node_powercycle(self, node, hypervisor):
1374     """Tries to powercycle a node.
1375
1376     This is a single-node call.
1377
1378     """
1379     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1380
1381   @_RpcTimeout(None)
1382   def call_test_delay(self, node_list, duration):
1383     """Sleep for a fixed time on given node(s).
1384
1385     This is a multi-node call.
1386
1387     """
1388     return self._MultiNodeCall(node_list, "test_delay", [duration],
1389                                read_timeout=int(duration + 5))
1390
1391   @_RpcTimeout(_TMO_FAST)
1392   def call_file_storage_dir_create(self, node, file_storage_dir):
1393     """Create the given file storage directory.
1394
1395     This is a single-node call.
1396
1397     """
1398     return self._SingleNodeCall(node, "file_storage_dir_create",
1399                                 [file_storage_dir])
1400
1401   @_RpcTimeout(_TMO_FAST)
1402   def call_file_storage_dir_remove(self, node, file_storage_dir):
1403     """Remove the given file storage directory.
1404
1405     This is a single-node call.
1406
1407     """
1408     return self._SingleNodeCall(node, "file_storage_dir_remove",
1409                                 [file_storage_dir])
1410
1411   @_RpcTimeout(_TMO_FAST)
1412   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1413                                    new_file_storage_dir):
1414     """Rename file storage directory.
1415
1416     This is a single-node call.
1417
1418     """
1419     return self._SingleNodeCall(node, "file_storage_dir_rename",
1420                                 [old_file_storage_dir, new_file_storage_dir])
1421
1422   @classmethod
1423   @_RpcTimeout(_TMO_URGENT)
1424   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1425     """Update job queue.
1426
1427     This is a multi-node call.
1428
1429     """
1430     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1431                                     [file_name, cls._Compress(content)],
1432                                     address_list=address_list)
1433
1434   @classmethod
1435   @_RpcTimeout(_TMO_NORMAL)
1436   def call_jobqueue_purge(cls, node):
1437     """Purge job queue.
1438
1439     This is a single-node call.
1440
1441     """
1442     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1443
1444   @classmethod
1445   @_RpcTimeout(_TMO_URGENT)
1446   def call_jobqueue_rename(cls, node_list, address_list, rename):
1447     """Rename a job queue file.
1448
1449     This is a multi-node call.
1450
1451     """
1452     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1453                                     address_list=address_list)
1454
1455   @_RpcTimeout(_TMO_NORMAL)
1456   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1457     """Validate the hypervisor params.
1458
1459     This is a multi-node call.
1460
1461     @type node_list: list
1462     @param node_list: the list of nodes to query
1463     @type hvname: string
1464     @param hvname: the hypervisor name
1465     @type hvparams: dict
1466     @param hvparams: the hypervisor parameters to be validated
1467
1468     """
1469     cluster = self._cfg.GetClusterInfo()
1470     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1471     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1472                                [hvname, hv_full])
1473
1474   @_RpcTimeout(_TMO_NORMAL)
1475   def call_x509_cert_create(self, node, validity):
1476     """Creates a new X509 certificate for SSL/TLS.
1477
1478     This is a single-node call.
1479
1480     @type validity: int
1481     @param validity: Validity in seconds
1482
1483     """
1484     return self._SingleNodeCall(node, "x509_cert_create", [validity])
1485
1486   @_RpcTimeout(_TMO_NORMAL)
1487   def call_x509_cert_remove(self, node, name):
1488     """Removes a X509 certificate.
1489
1490     This is a single-node call.
1491
1492     @type name: string
1493     @param name: Certificate name
1494
1495     """
1496     return self._SingleNodeCall(node, "x509_cert_remove", [name])
1497
1498   @_RpcTimeout(_TMO_NORMAL)
1499   def call_import_start(self, node, opts, instance, dest, dest_args):
1500     """Starts a listener for an import.
1501
1502     This is a single-node call.
1503
1504     @type node: string
1505     @param node: Node name
1506     @type instance: C{objects.Instance}
1507     @param instance: Instance object
1508
1509     """
1510     return self._SingleNodeCall(node, "import_start",
1511                                 [opts.ToDict(),
1512                                  self._InstDict(instance), dest,
1513                                  _EncodeImportExportIO(dest, dest_args)])
1514
1515   @_RpcTimeout(_TMO_NORMAL)
1516   def call_export_start(self, node, opts, host, port,
1517                         instance, source, source_args):
1518     """Starts an export daemon.
1519
1520     This is a single-node call.
1521
1522     @type node: string
1523     @param node: Node name
1524     @type instance: C{objects.Instance}
1525     @param instance: Instance object
1526
1527     """
1528     return self._SingleNodeCall(node, "export_start",
1529                                 [opts.ToDict(), host, port,
1530                                  self._InstDict(instance), source,
1531                                  _EncodeImportExportIO(source, source_args)])
1532
1533   @_RpcTimeout(_TMO_FAST)
1534   def call_impexp_status(self, node, names):
1535     """Gets the status of an import or export.
1536
1537     This is a single-node call.
1538
1539     @type node: string
1540     @param node: Node name
1541     @type names: List of strings
1542     @param names: Import/export names
1543     @rtype: List of L{objects.ImportExportStatus} instances
1544     @return: Returns a list of the state of each named import/export or None if
1545              a status couldn't be retrieved
1546
1547     """
1548     result = self._SingleNodeCall(node, "impexp_status", [names])
1549
1550     if not result.fail_msg:
1551       decoded = []
1552
1553       for i in result.payload:
1554         if i is None:
1555           decoded.append(None)
1556           continue
1557         decoded.append(objects.ImportExportStatus.FromDict(i))
1558
1559       result.payload = decoded
1560
1561     return result
1562
1563   @_RpcTimeout(_TMO_NORMAL)
1564   def call_impexp_abort(self, node, name):
1565     """Aborts an import or export.
1566
1567     This is a single-node call.
1568
1569     @type node: string
1570     @param node: Node name
1571     @type name: string
1572     @param name: Import/export name
1573
1574     """
1575     return self._SingleNodeCall(node, "impexp_abort", [name])
1576
1577   @_RpcTimeout(_TMO_NORMAL)
1578   def call_impexp_cleanup(self, node, name):
1579     """Cleans up after an import or export.
1580
1581     This is a single-node call.
1582
1583     @type node: string
1584     @param node: Node name
1585     @type name: string
1586     @param name: Import/export name
1587
1588     """
1589     return self._SingleNodeCall(node, "impexp_cleanup", [name])