Another fix for LUClusterVerifyDisks
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37 import pycurl
38 import threading
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48
49 # pylint has a bug here, doesn't see this import
50 import ganeti.http.client  # pylint: disable-msg=W0611
51
52
53 # Timeout for connecting to nodes (seconds)
54 _RPC_CONNECT_TIMEOUT = 5
55
56 _RPC_CLIENT_HEADERS = [
57   "Content-type: %s" % http.HTTP_APP_JSON,
58   "Expect:",
59   ]
60
61 # Various time constants for the timeout table
62 _TMO_URGENT = 60 # one minute
63 _TMO_FAST = 5 * 60 # five minutes
64 _TMO_NORMAL = 15 * 60 # 15 minutes
65 _TMO_SLOW = 3600 # one hour
66 _TMO_4HRS = 4 * 3600
67 _TMO_1DAY = 86400
68
69 # Timeout table that will be built later by decorators
70 # Guidelines for choosing timeouts:
71 # - call used during watcher: timeout -> 1min, _TMO_URGENT
72 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
73 # - other calls: 15 min, _TMO_NORMAL
74 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
75
76 _TIMEOUTS = {
77 }
78
79
80 def Init():
81   """Initializes the module-global HTTP client manager.
82
83   Must be called before using any RPC function and while exactly one thread is
84   running.
85
86   """
87   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
88   # one thread running. This check is just a safety measure -- it doesn't
89   # cover all cases.
90   assert threading.activeCount() == 1, \
91          "Found more than one active thread when initializing pycURL"
92
93   logging.info("Using PycURL %s", pycurl.version)
94
95   pycurl.global_init(pycurl.GLOBAL_ALL)
96
97
98 def Shutdown():
99   """Stops the module-global HTTP client manager.
100
101   Must be called before quitting the program and while exactly one thread is
102   running.
103
104   """
105   pycurl.global_cleanup()
106
107
108 def _ConfigRpcCurl(curl):
109   noded_cert = str(constants.NODED_CERT_FILE)
110
111   curl.setopt(pycurl.FOLLOWLOCATION, False)
112   curl.setopt(pycurl.CAINFO, noded_cert)
113   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
114   curl.setopt(pycurl.SSL_VERIFYPEER, True)
115   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
116   curl.setopt(pycurl.SSLCERT, noded_cert)
117   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
118   curl.setopt(pycurl.SSLKEY, noded_cert)
119   curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
120
121
122 # Aliasing this module avoids the following warning by epydoc: "Warning: No
123 # information available for ganeti.rpc._RpcThreadLocal's base threading.local"
124 _threading = threading
125
126
127 class _RpcThreadLocal(_threading.local):
128   def GetHttpClientPool(self):
129     """Returns a per-thread HTTP client pool.
130
131     @rtype: L{http.client.HttpClientPool}
132
133     """
134     try:
135       pool = self.hcp
136     except AttributeError:
137       pool = http.client.HttpClientPool(_ConfigRpcCurl)
138       self.hcp = pool
139
140     return pool
141
142
143 # Remove module alias (see above)
144 del _threading
145
146
147 _thread_local = _RpcThreadLocal()
148
149
150 def _RpcTimeout(secs):
151   """Timeout decorator.
152
153   When applied to a rpc call_* function, it updates the global timeout
154   table with the given function/timeout.
155
156   """
157   def decorator(f):
158     name = f.__name__
159     assert name.startswith("call_")
160     _TIMEOUTS[name[len("call_"):]] = secs
161     return f
162   return decorator
163
164
165 def RunWithRPC(fn):
166   """RPC-wrapper decorator.
167
168   When applied to a function, it runs it with the RPC system
169   initialized, and it shutsdown the system afterwards. This means the
170   function must be called without RPC being initialized.
171
172   """
173   def wrapper(*args, **kwargs):
174     Init()
175     try:
176       return fn(*args, **kwargs)
177     finally:
178       Shutdown()
179   return wrapper
180
181
182 class RpcResult(object):
183   """RPC Result class.
184
185   This class holds an RPC result. It is needed since in multi-node
186   calls we can't raise an exception just because one one out of many
187   failed, and therefore we use this class to encapsulate the result.
188
189   @ivar data: the data payload, for successful results, or None
190   @ivar call: the name of the RPC call
191   @ivar node: the name of the node to which we made the call
192   @ivar offline: whether the operation failed because the node was
193       offline, as opposed to actual failure; offline=True will always
194       imply failed=True, in order to allow simpler checking if
195       the user doesn't care about the exact failure mode
196   @ivar fail_msg: the error message if the call failed
197
198   """
199   def __init__(self, data=None, failed=False, offline=False,
200                call=None, node=None):
201     self.offline = offline
202     self.call = call
203     self.node = node
204
205     if offline:
206       self.fail_msg = "Node is marked offline"
207       self.data = self.payload = None
208     elif failed:
209       self.fail_msg = self._EnsureErr(data)
210       self.data = self.payload = None
211     else:
212       self.data = data
213       if not isinstance(self.data, (tuple, list)):
214         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
215                          type(self.data))
216         self.payload = None
217       elif len(data) != 2:
218         self.fail_msg = ("RPC layer error: invalid result length (%d), "
219                          "expected 2" % len(self.data))
220         self.payload = None
221       elif not self.data[0]:
222         self.fail_msg = self._EnsureErr(self.data[1])
223         self.payload = None
224       else:
225         # finally success
226         self.fail_msg = None
227         self.payload = data[1]
228
229     for attr_name in ["call", "data", "fail_msg",
230                       "node", "offline", "payload"]:
231       assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
232
233   @staticmethod
234   def _EnsureErr(val):
235     """Helper to ensure we return a 'True' value for error."""
236     if val:
237       return val
238     else:
239       return "No error information"
240
241   def Raise(self, msg, prereq=False, ecode=None):
242     """If the result has failed, raise an OpExecError.
243
244     This is used so that LU code doesn't have to check for each
245     result, but instead can call this function.
246
247     """
248     if not self.fail_msg:
249       return
250
251     if not msg: # one could pass None for default message
252       msg = ("Call '%s' to node '%s' has failed: %s" %
253              (self.call, self.node, self.fail_msg))
254     else:
255       msg = "%s: %s" % (msg, self.fail_msg)
256     if prereq:
257       ec = errors.OpPrereqError
258     else:
259       ec = errors.OpExecError
260     if ecode is not None:
261       args = (msg, ecode)
262     else:
263       args = (msg, )
264     raise ec(*args) # pylint: disable-msg=W0142
265
266
267 def _AddressLookup(node_list,
268                    ssc=ssconf.SimpleStore,
269                    nslookup_fn=netutils.Hostname.GetIP):
270   """Return addresses for given node names.
271
272   @type node_list: list
273   @param node_list: List of node names
274   @type ssc: class
275   @param ssc: SimpleStore class that is used to obtain node->ip mappings
276   @type nslookup_fn: callable
277   @param nslookup_fn: function use to do NS lookup
278   @rtype: list of addresses and/or None's
279   @returns: List of corresponding addresses, if found
280
281   """
282   ss = ssc()
283   iplist = ss.GetNodePrimaryIPList()
284   family = ss.GetPrimaryIPFamily()
285   addresses = []
286   ipmap = dict(entry.split() for entry in iplist)
287   for node in node_list:
288     address = ipmap.get(node)
289     if address is None:
290       address = nslookup_fn(node, family=family)
291     addresses.append(address)
292
293   return addresses
294
295
296 class Client:
297   """RPC Client class.
298
299   This class, given a (remote) method name, a list of parameters and a
300   list of nodes, will contact (in parallel) all nodes, and return a
301   dict of results (key: node name, value: result).
302
303   One current bug is that generic failure is still signaled by
304   'False' result, which is not good. This overloading of values can
305   cause bugs.
306
307   """
308   def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
309     assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
310                                     " timeouts table")
311     self.procedure = procedure
312     self.body = body
313     self.port = port
314     self._request = {}
315     self._address_lookup_fn = address_lookup_fn
316
317   def ConnectList(self, node_list, address_list=None, read_timeout=None):
318     """Add a list of nodes to the target nodes.
319
320     @type node_list: list
321     @param node_list: the list of node names to connect
322     @type address_list: list or None
323     @keyword address_list: either None or a list with node addresses,
324         which must have the same length as the node list
325     @type read_timeout: int
326     @param read_timeout: overwrites default timeout for operation
327
328     """
329     if address_list is None:
330       # Always use IP address instead of node name
331       address_list = self._address_lookup_fn(node_list)
332
333     assert len(node_list) == len(address_list), \
334            "Name and address lists must have the same length"
335
336     for node, address in zip(node_list, address_list):
337       self.ConnectNode(node, address, read_timeout=read_timeout)
338
339   def ConnectNode(self, name, address=None, read_timeout=None):
340     """Add a node to the target list.
341
342     @type name: str
343     @param name: the node name
344     @type address: str
345     @param address: the node address, if known
346     @type read_timeout: int
347     @param read_timeout: overwrites default timeout for operation
348
349     """
350     if address is None:
351       # Always use IP address instead of node name
352       address = self._address_lookup_fn([name])[0]
353
354     assert(address is not None)
355
356     if read_timeout is None:
357       read_timeout = _TIMEOUTS[self.procedure]
358
359     self._request[name] = \
360       http.client.HttpClientRequest(str(address), self.port,
361                                     http.HTTP_PUT, str("/%s" % self.procedure),
362                                     headers=_RPC_CLIENT_HEADERS,
363                                     post_data=str(self.body),
364                                     read_timeout=read_timeout)
365
366   def GetResults(self, http_pool=None):
367     """Call nodes and return results.
368
369     @rtype: list
370     @return: List of RPC results
371
372     """
373     if not http_pool:
374       http_pool = _thread_local.GetHttpClientPool()
375
376     http_pool.ProcessRequests(self._request.values())
377
378     results = {}
379
380     for name, req in self._request.iteritems():
381       if req.success and req.resp_status_code == http.HTTP_OK:
382         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
383                                   node=name, call=self.procedure)
384         continue
385
386       # TODO: Better error reporting
387       if req.error:
388         msg = req.error
389       else:
390         msg = req.resp_body
391
392       logging.error("RPC error in %s from node %s: %s",
393                     self.procedure, name, msg)
394       results[name] = RpcResult(data=msg, failed=True, node=name,
395                                 call=self.procedure)
396
397     return results
398
399
400 def _EncodeImportExportIO(ieio, ieioargs):
401   """Encodes import/export I/O information.
402
403   """
404   if ieio == constants.IEIO_RAW_DISK:
405     assert len(ieioargs) == 1
406     return (ieioargs[0].ToDict(), )
407
408   if ieio == constants.IEIO_SCRIPT:
409     assert len(ieioargs) == 2
410     return (ieioargs[0].ToDict(), ieioargs[1])
411
412   return ieioargs
413
414
415 class RpcRunner(object):
416   """RPC runner class"""
417
418   def __init__(self, cfg):
419     """Initialized the rpc runner.
420
421     @type cfg:  C{config.ConfigWriter}
422     @param cfg: the configuration object that will be used to get data
423                 about the cluster
424
425     """
426     self._cfg = cfg
427     self.port = netutils.GetDaemonPort(constants.NODED)
428
429   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
430     """Convert the given instance to a dict.
431
432     This is done via the instance's ToDict() method and additionally
433     we fill the hvparams with the cluster defaults.
434
435     @type instance: L{objects.Instance}
436     @param instance: an Instance object
437     @type hvp: dict or None
438     @param hvp: a dictionary with overridden hypervisor parameters
439     @type bep: dict or None
440     @param bep: a dictionary with overridden backend parameters
441     @type osp: dict or None
442     @param osp: a dictionary with overridden os parameters
443     @rtype: dict
444     @return: the instance dict, with the hvparams filled with the
445         cluster defaults
446
447     """
448     idict = instance.ToDict()
449     cluster = self._cfg.GetClusterInfo()
450     idict["hvparams"] = cluster.FillHV(instance)
451     if hvp is not None:
452       idict["hvparams"].update(hvp)
453     idict["beparams"] = cluster.FillBE(instance)
454     if bep is not None:
455       idict["beparams"].update(bep)
456     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
457     if osp is not None:
458       idict["osparams"].update(osp)
459     for nic in idict["nics"]:
460       nic['nicparams'] = objects.FillDict(
461         cluster.nicparams[constants.PP_DEFAULT],
462         nic['nicparams'])
463     return idict
464
465   def _ConnectList(self, client, node_list, call, read_timeout=None):
466     """Helper for computing node addresses.
467
468     @type client: L{ganeti.rpc.Client}
469     @param client: a C{Client} instance
470     @type node_list: list
471     @param node_list: the node list we should connect
472     @type call: string
473     @param call: the name of the remote procedure call, for filling in
474         correctly any eventual offline nodes' results
475     @type read_timeout: int
476     @param read_timeout: overwrites the default read timeout for the
477         given operation
478
479     """
480     all_nodes = self._cfg.GetAllNodesInfo()
481     name_list = []
482     addr_list = []
483     skip_dict = {}
484     for node in node_list:
485       if node in all_nodes:
486         if all_nodes[node].offline:
487           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
488           continue
489         val = all_nodes[node].primary_ip
490       else:
491         val = None
492       addr_list.append(val)
493       name_list.append(node)
494     if name_list:
495       client.ConnectList(name_list, address_list=addr_list,
496                          read_timeout=read_timeout)
497     return skip_dict
498
499   def _ConnectNode(self, client, node, call, read_timeout=None):
500     """Helper for computing one node's address.
501
502     @type client: L{ganeti.rpc.Client}
503     @param client: a C{Client} instance
504     @type node: str
505     @param node: the node we should connect
506     @type call: string
507     @param call: the name of the remote procedure call, for filling in
508         correctly any eventual offline nodes' results
509     @type read_timeout: int
510     @param read_timeout: overwrites the default read timeout for the
511         given operation
512
513     """
514     node_info = self._cfg.GetNodeInfo(node)
515     if node_info is not None:
516       if node_info.offline:
517         return RpcResult(node=node, offline=True, call=call)
518       addr = node_info.primary_ip
519     else:
520       addr = None
521     client.ConnectNode(node, address=addr, read_timeout=read_timeout)
522
523   def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
524     """Helper for making a multi-node call
525
526     """
527     body = serializer.DumpJson(args, indent=False)
528     c = Client(procedure, body, self.port)
529     skip_dict = self._ConnectList(c, node_list, procedure,
530                                   read_timeout=read_timeout)
531     skip_dict.update(c.GetResults())
532     return skip_dict
533
534   @classmethod
535   def _StaticMultiNodeCall(cls, node_list, procedure, args,
536                            address_list=None, read_timeout=None):
537     """Helper for making a multi-node static call
538
539     """
540     body = serializer.DumpJson(args, indent=False)
541     c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
542     c.ConnectList(node_list, address_list=address_list,
543                   read_timeout=read_timeout)
544     return c.GetResults()
545
546   def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
547     """Helper for making a single-node call
548
549     """
550     body = serializer.DumpJson(args, indent=False)
551     c = Client(procedure, body, self.port)
552     result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
553     if result is None:
554       # we did connect, node is not offline
555       result = c.GetResults()[node]
556     return result
557
558   @classmethod
559   def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
560     """Helper for making a single-node static call
561
562     """
563     body = serializer.DumpJson(args, indent=False)
564     c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
565     c.ConnectNode(node, read_timeout=read_timeout)
566     return c.GetResults()[node]
567
568   @staticmethod
569   def _Compress(data):
570     """Compresses a string for transport over RPC.
571
572     Small amounts of data are not compressed.
573
574     @type data: str
575     @param data: Data
576     @rtype: tuple
577     @return: Encoded data to send
578
579     """
580     # Small amounts of data are not compressed
581     if len(data) < 512:
582       return (constants.RPC_ENCODING_NONE, data)
583
584     # Compress with zlib and encode in base64
585     return (constants.RPC_ENCODING_ZLIB_BASE64,
586             base64.b64encode(zlib.compress(data, 3)))
587
588   #
589   # Begin RPC calls
590   #
591
592   @_RpcTimeout(_TMO_URGENT)
593   def call_lv_list(self, node_list, vg_name):
594     """Gets the logical volumes present in a given volume group.
595
596     This is a multi-node call.
597
598     """
599     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
600
601   @_RpcTimeout(_TMO_URGENT)
602   def call_vg_list(self, node_list):
603     """Gets the volume group list.
604
605     This is a multi-node call.
606
607     """
608     return self._MultiNodeCall(node_list, "vg_list", [])
609
610   @_RpcTimeout(_TMO_NORMAL)
611   def call_storage_list(self, node_list, su_name, su_args, name, fields):
612     """Get list of storage units.
613
614     This is a multi-node call.
615
616     """
617     return self._MultiNodeCall(node_list, "storage_list",
618                                [su_name, su_args, name, fields])
619
620   @_RpcTimeout(_TMO_NORMAL)
621   def call_storage_modify(self, node, su_name, su_args, name, changes):
622     """Modify a storage unit.
623
624     This is a single-node call.
625
626     """
627     return self._SingleNodeCall(node, "storage_modify",
628                                 [su_name, su_args, name, changes])
629
630   @_RpcTimeout(_TMO_NORMAL)
631   def call_storage_execute(self, node, su_name, su_args, name, op):
632     """Executes an operation on a storage unit.
633
634     This is a single-node call.
635
636     """
637     return self._SingleNodeCall(node, "storage_execute",
638                                 [su_name, su_args, name, op])
639
640   @_RpcTimeout(_TMO_URGENT)
641   def call_bridges_exist(self, node, bridges_list):
642     """Checks if a node has all the bridges given.
643
644     This method checks if all bridges given in the bridges_list are
645     present on the remote node, so that an instance that uses interfaces
646     on those bridges can be started.
647
648     This is a single-node call.
649
650     """
651     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
652
653   @_RpcTimeout(_TMO_NORMAL)
654   def call_instance_start(self, node, instance, hvp, bep):
655     """Starts an instance.
656
657     This is a single-node call.
658
659     """
660     idict = self._InstDict(instance, hvp=hvp, bep=bep)
661     return self._SingleNodeCall(node, "instance_start", [idict])
662
663   @_RpcTimeout(_TMO_NORMAL)
664   def call_instance_shutdown(self, node, instance, timeout):
665     """Stops an instance.
666
667     This is a single-node call.
668
669     """
670     return self._SingleNodeCall(node, "instance_shutdown",
671                                 [self._InstDict(instance), timeout])
672
673   @_RpcTimeout(_TMO_NORMAL)
674   def call_migration_info(self, node, instance):
675     """Gather the information necessary to prepare an instance migration.
676
677     This is a single-node call.
678
679     @type node: string
680     @param node: the node on which the instance is currently running
681     @type instance: C{objects.Instance}
682     @param instance: the instance definition
683
684     """
685     return self._SingleNodeCall(node, "migration_info",
686                                 [self._InstDict(instance)])
687
688   @_RpcTimeout(_TMO_NORMAL)
689   def call_accept_instance(self, node, instance, info, target):
690     """Prepare a node to accept an instance.
691
692     This is a single-node call.
693
694     @type node: string
695     @param node: the target node for the migration
696     @type instance: C{objects.Instance}
697     @param instance: the instance definition
698     @type info: opaque/hypervisor specific (string/data)
699     @param info: result for the call_migration_info call
700     @type target: string
701     @param target: target hostname (usually ip address) (on the node itself)
702
703     """
704     return self._SingleNodeCall(node, "accept_instance",
705                                 [self._InstDict(instance), info, target])
706
707   @_RpcTimeout(_TMO_NORMAL)
708   def call_finalize_migration(self, node, instance, info, success):
709     """Finalize any target-node migration specific operation.
710
711     This is called both in case of a successful migration and in case of error
712     (in which case it should abort the migration).
713
714     This is a single-node call.
715
716     @type node: string
717     @param node: the target node for the migration
718     @type instance: C{objects.Instance}
719     @param instance: the instance definition
720     @type info: opaque/hypervisor specific (string/data)
721     @param info: result for the call_migration_info call
722     @type success: boolean
723     @param success: whether the migration was a success or a failure
724
725     """
726     return self._SingleNodeCall(node, "finalize_migration",
727                                 [self._InstDict(instance), info, success])
728
729   @_RpcTimeout(_TMO_SLOW)
730   def call_instance_migrate(self, node, instance, target, live):
731     """Migrate an instance.
732
733     This is a single-node call.
734
735     @type node: string
736     @param node: the node on which the instance is currently running
737     @type instance: C{objects.Instance}
738     @param instance: the instance definition
739     @type target: string
740     @param target: the target node name
741     @type live: boolean
742     @param live: whether the migration should be done live or not (the
743         interpretation of this parameter is left to the hypervisor)
744
745     """
746     return self._SingleNodeCall(node, "instance_migrate",
747                                 [self._InstDict(instance), target, live])
748
749   @_RpcTimeout(_TMO_NORMAL)
750   def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
751     """Reboots an instance.
752
753     This is a single-node call.
754
755     """
756     return self._SingleNodeCall(node, "instance_reboot",
757                                 [self._InstDict(inst), reboot_type,
758                                  shutdown_timeout])
759
760   @_RpcTimeout(_TMO_1DAY)
761   def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
762     """Installs an OS on the given instance.
763
764     This is a single-node call.
765
766     """
767     return self._SingleNodeCall(node, "instance_os_add",
768                                 [self._InstDict(inst, osp=osparams),
769                                  reinstall, debug])
770
771   @_RpcTimeout(_TMO_SLOW)
772   def call_instance_run_rename(self, node, inst, old_name, debug):
773     """Run the OS rename script for an instance.
774
775     This is a single-node call.
776
777     """
778     return self._SingleNodeCall(node, "instance_run_rename",
779                                 [self._InstDict(inst), old_name, debug])
780
781   @_RpcTimeout(_TMO_URGENT)
782   def call_instance_info(self, node, instance, hname):
783     """Returns information about a single instance.
784
785     This is a single-node call.
786
787     @type node: list
788     @param node: the list of nodes to query
789     @type instance: string
790     @param instance: the instance name
791     @type hname: string
792     @param hname: the hypervisor type of the instance
793
794     """
795     return self._SingleNodeCall(node, "instance_info", [instance, hname])
796
797   @_RpcTimeout(_TMO_NORMAL)
798   def call_instance_migratable(self, node, instance):
799     """Checks whether the given instance can be migrated.
800
801     This is a single-node call.
802
803     @param node: the node to query
804     @type instance: L{objects.Instance}
805     @param instance: the instance to check
806
807
808     """
809     return self._SingleNodeCall(node, "instance_migratable",
810                                 [self._InstDict(instance)])
811
812   @_RpcTimeout(_TMO_URGENT)
813   def call_all_instances_info(self, node_list, hypervisor_list):
814     """Returns information about all instances on the given nodes.
815
816     This is a multi-node call.
817
818     @type node_list: list
819     @param node_list: the list of nodes to query
820     @type hypervisor_list: list
821     @param hypervisor_list: the hypervisors to query for instances
822
823     """
824     return self._MultiNodeCall(node_list, "all_instances_info",
825                                [hypervisor_list])
826
827   @_RpcTimeout(_TMO_URGENT)
828   def call_instance_list(self, node_list, hypervisor_list):
829     """Returns the list of running instances on a given node.
830
831     This is a multi-node call.
832
833     @type node_list: list
834     @param node_list: the list of nodes to query
835     @type hypervisor_list: list
836     @param hypervisor_list: the hypervisors to query for instances
837
838     """
839     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
840
841   @_RpcTimeout(_TMO_FAST)
842   def call_node_tcp_ping(self, node, source, target, port, timeout,
843                          live_port_needed):
844     """Do a TcpPing on the remote node
845
846     This is a single-node call.
847
848     """
849     return self._SingleNodeCall(node, "node_tcp_ping",
850                                 [source, target, port, timeout,
851                                  live_port_needed])
852
853   @_RpcTimeout(_TMO_FAST)
854   def call_node_has_ip_address(self, node, address):
855     """Checks if a node has the given IP address.
856
857     This is a single-node call.
858
859     """
860     return self._SingleNodeCall(node, "node_has_ip_address", [address])
861
862   @_RpcTimeout(_TMO_URGENT)
863   def call_node_info(self, node_list, vg_name, hypervisor_type):
864     """Return node information.
865
866     This will return memory information and volume group size and free
867     space.
868
869     This is a multi-node call.
870
871     @type node_list: list
872     @param node_list: the list of nodes to query
873     @type vg_name: C{string}
874     @param vg_name: the name of the volume group to ask for disk space
875         information
876     @type hypervisor_type: C{str}
877     @param hypervisor_type: the name of the hypervisor to ask for
878         memory information
879
880     """
881     return self._MultiNodeCall(node_list, "node_info",
882                                [vg_name, hypervisor_type])
883
884   @_RpcTimeout(_TMO_NORMAL)
885   def call_etc_hosts_modify(self, node, mode, name, ip):
886     """Modify hosts file with name
887
888     @type node: string
889     @param node: The node to call
890     @type mode: string
891     @param mode: The mode to operate. Currently "add" or "remove"
892     @type name: string
893     @param name: The host name to be modified
894     @type ip: string
895     @param ip: The ip of the entry (just valid if mode is "add")
896
897     """
898     return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
899
900   @_RpcTimeout(_TMO_NORMAL)
901   def call_node_verify(self, node_list, checkdict, cluster_name):
902     """Request verification of given parameters.
903
904     This is a multi-node call.
905
906     """
907     return self._MultiNodeCall(node_list, "node_verify",
908                                [checkdict, cluster_name])
909
910   @classmethod
911   @_RpcTimeout(_TMO_FAST)
912   def call_node_start_master(cls, node, start_daemons, no_voting):
913     """Tells a node to activate itself as a master.
914
915     This is a single-node call.
916
917     """
918     return cls._StaticSingleNodeCall(node, "node_start_master",
919                                      [start_daemons, no_voting])
920
921   @classmethod
922   @_RpcTimeout(_TMO_FAST)
923   def call_node_stop_master(cls, node, stop_daemons):
924     """Tells a node to demote itself from master status.
925
926     This is a single-node call.
927
928     """
929     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
930
931   @classmethod
932   @_RpcTimeout(_TMO_URGENT)
933   def call_master_info(cls, node_list):
934     """Query master info.
935
936     This is a multi-node call.
937
938     """
939     # TODO: should this method query down nodes?
940     return cls._StaticMultiNodeCall(node_list, "master_info", [])
941
942   @classmethod
943   @_RpcTimeout(_TMO_URGENT)
944   def call_version(cls, node_list):
945     """Query node version.
946
947     This is a multi-node call.
948
949     """
950     return cls._StaticMultiNodeCall(node_list, "version", [])
951
952   @_RpcTimeout(_TMO_NORMAL)
953   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
954     """Request creation of a given block device.
955
956     This is a single-node call.
957
958     """
959     return self._SingleNodeCall(node, "blockdev_create",
960                                 [bdev.ToDict(), size, owner, on_primary, info])
961
962   @_RpcTimeout(_TMO_SLOW)
963   def call_blockdev_wipe(self, node, bdev, offset, size):
964     """Request wipe at given offset with given size of a block device.
965
966     This is a single-node call.
967
968     """
969     return self._SingleNodeCall(node, "blockdev_wipe",
970                                 [bdev.ToDict(), offset, size])
971
972   @_RpcTimeout(_TMO_NORMAL)
973   def call_blockdev_remove(self, node, bdev):
974     """Request removal of a given block device.
975
976     This is a single-node call.
977
978     """
979     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
980
981   @_RpcTimeout(_TMO_NORMAL)
982   def call_blockdev_rename(self, node, devlist):
983     """Request rename of the given block devices.
984
985     This is a single-node call.
986
987     """
988     return self._SingleNodeCall(node, "blockdev_rename",
989                                 [(d.ToDict(), uid) for d, uid in devlist])
990
991   @_RpcTimeout(_TMO_NORMAL)
992   def call_blockdev_pause_resume_sync(self, node, disks, pause):
993     """Request a pause/resume of given block device.
994
995     This is a single-node call.
996
997     """
998     return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
999                                 [[bdev.ToDict() for bdev in disks], pause])
1000
1001   @_RpcTimeout(_TMO_NORMAL)
1002   def call_blockdev_assemble(self, node, disk, owner, on_primary):
1003     """Request assembling of a given block device.
1004
1005     This is a single-node call.
1006
1007     """
1008     return self._SingleNodeCall(node, "blockdev_assemble",
1009                                 [disk.ToDict(), owner, on_primary])
1010
1011   @_RpcTimeout(_TMO_NORMAL)
1012   def call_blockdev_shutdown(self, node, disk):
1013     """Request shutdown of a given block device.
1014
1015     This is a single-node call.
1016
1017     """
1018     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1019
1020   @_RpcTimeout(_TMO_NORMAL)
1021   def call_blockdev_addchildren(self, node, bdev, ndevs):
1022     """Request adding a list of children to a (mirroring) device.
1023
1024     This is a single-node call.
1025
1026     """
1027     return self._SingleNodeCall(node, "blockdev_addchildren",
1028                                 [bdev.ToDict(),
1029                                  [disk.ToDict() for disk in ndevs]])
1030
1031   @_RpcTimeout(_TMO_NORMAL)
1032   def call_blockdev_removechildren(self, node, bdev, ndevs):
1033     """Request removing a list of children from a (mirroring) device.
1034
1035     This is a single-node call.
1036
1037     """
1038     return self._SingleNodeCall(node, "blockdev_removechildren",
1039                                 [bdev.ToDict(),
1040                                  [disk.ToDict() for disk in ndevs]])
1041
1042   @_RpcTimeout(_TMO_NORMAL)
1043   def call_blockdev_getmirrorstatus(self, node, disks):
1044     """Request status of a (mirroring) device.
1045
1046     This is a single-node call.
1047
1048     """
1049     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1050                                   [dsk.ToDict() for dsk in disks])
1051     if not result.fail_msg:
1052       result.payload = [objects.BlockDevStatus.FromDict(i)
1053                         for i in result.payload]
1054     return result
1055
1056   @_RpcTimeout(_TMO_NORMAL)
1057   def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1058     """Request status of (mirroring) devices from multiple nodes.
1059
1060     This is a multi-node call.
1061
1062     """
1063     result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1064                                  [dict((name, [dsk.ToDict() for dsk in disks])
1065                                        for name, disks in node_disks.items())])
1066     for nres in result.values():
1067       if nres.fail_msg:
1068         continue
1069
1070       for idx, (success, status) in enumerate(nres.payload):
1071         if success:
1072           nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1073
1074     return result
1075
1076   @_RpcTimeout(_TMO_NORMAL)
1077   def call_blockdev_find(self, node, disk):
1078     """Request identification of a given block device.
1079
1080     This is a single-node call.
1081
1082     """
1083     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1084     if not result.fail_msg and result.payload is not None:
1085       result.payload = objects.BlockDevStatus.FromDict(result.payload)
1086     return result
1087
1088   @_RpcTimeout(_TMO_NORMAL)
1089   def call_blockdev_close(self, node, instance_name, disks):
1090     """Closes the given block devices.
1091
1092     This is a single-node call.
1093
1094     """
1095     params = [instance_name, [cf.ToDict() for cf in disks]]
1096     return self._SingleNodeCall(node, "blockdev_close", params)
1097
1098   @_RpcTimeout(_TMO_NORMAL)
1099   def call_blockdev_getsizes(self, node, disks):
1100     """Returns the size of the given disks.
1101
1102     This is a single-node call.
1103
1104     """
1105     params = [[cf.ToDict() for cf in disks]]
1106     return self._SingleNodeCall(node, "blockdev_getsize", params)
1107
1108   @_RpcTimeout(_TMO_NORMAL)
1109   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1110     """Disconnects the network of the given drbd devices.
1111
1112     This is a multi-node call.
1113
1114     """
1115     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1116                                [nodes_ip, [cf.ToDict() for cf in disks]])
1117
1118   @_RpcTimeout(_TMO_NORMAL)
1119   def call_drbd_attach_net(self, node_list, nodes_ip,
1120                            disks, instance_name, multimaster):
1121     """Disconnects the given drbd devices.
1122
1123     This is a multi-node call.
1124
1125     """
1126     return self._MultiNodeCall(node_list, "drbd_attach_net",
1127                                [nodes_ip, [cf.ToDict() for cf in disks],
1128                                 instance_name, multimaster])
1129
1130   @_RpcTimeout(_TMO_SLOW)
1131   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1132     """Waits for the synchronization of drbd devices is complete.
1133
1134     This is a multi-node call.
1135
1136     """
1137     return self._MultiNodeCall(node_list, "drbd_wait_sync",
1138                                [nodes_ip, [cf.ToDict() for cf in disks]])
1139
1140   @_RpcTimeout(_TMO_URGENT)
1141   def call_drbd_helper(self, node_list):
1142     """Gets drbd helper.
1143
1144     This is a multi-node call.
1145
1146     """
1147     return self._MultiNodeCall(node_list, "drbd_helper", [])
1148
1149   @classmethod
1150   @_RpcTimeout(_TMO_NORMAL)
1151   def call_upload_file(cls, node_list, file_name, address_list=None):
1152     """Upload a file.
1153
1154     The node will refuse the operation in case the file is not on the
1155     approved file list.
1156
1157     This is a multi-node call.
1158
1159     @type node_list: list
1160     @param node_list: the list of node names to upload to
1161     @type file_name: str
1162     @param file_name: the filename to upload
1163     @type address_list: list or None
1164     @keyword address_list: an optional list of node addresses, in order
1165         to optimize the RPC speed
1166
1167     """
1168     file_contents = utils.ReadFile(file_name)
1169     data = cls._Compress(file_contents)
1170     st = os.stat(file_name)
1171     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1172               st.st_atime, st.st_mtime]
1173     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1174                                     address_list=address_list)
1175
1176   @classmethod
1177   @_RpcTimeout(_TMO_NORMAL)
1178   def call_write_ssconf_files(cls, node_list, values):
1179     """Write ssconf files.
1180
1181     This is a multi-node call.
1182
1183     """
1184     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1185
1186   @_RpcTimeout(_TMO_NORMAL)
1187   def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1188     """Runs OOB.
1189
1190     This is a single-node call.
1191
1192     """
1193     return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1194                                                   remote_node, timeout])
1195
1196   @_RpcTimeout(_TMO_FAST)
1197   def call_os_diagnose(self, node_list):
1198     """Request a diagnose of OS definitions.
1199
1200     This is a multi-node call.
1201
1202     """
1203     return self._MultiNodeCall(node_list, "os_diagnose", [])
1204
1205   @_RpcTimeout(_TMO_FAST)
1206   def call_os_get(self, node, name):
1207     """Returns an OS definition.
1208
1209     This is a single-node call.
1210
1211     """
1212     result = self._SingleNodeCall(node, "os_get", [name])
1213     if not result.fail_msg and isinstance(result.payload, dict):
1214       result.payload = objects.OS.FromDict(result.payload)
1215     return result
1216
1217   @_RpcTimeout(_TMO_FAST)
1218   def call_os_validate(self, required, nodes, name, checks, params):
1219     """Run a validation routine for a given OS.
1220
1221     This is a multi-node call.
1222
1223     """
1224     return self._MultiNodeCall(nodes, "os_validate",
1225                                [required, name, checks, params])
1226
1227   @_RpcTimeout(_TMO_NORMAL)
1228   def call_hooks_runner(self, node_list, hpath, phase, env):
1229     """Call the hooks runner.
1230
1231     Args:
1232       - op: the OpCode instance
1233       - env: a dictionary with the environment
1234
1235     This is a multi-node call.
1236
1237     """
1238     params = [hpath, phase, env]
1239     return self._MultiNodeCall(node_list, "hooks_runner", params)
1240
1241   @_RpcTimeout(_TMO_NORMAL)
1242   def call_iallocator_runner(self, node, name, idata):
1243     """Call an iallocator on a remote node
1244
1245     Args:
1246       - name: the iallocator name
1247       - input: the json-encoded input string
1248
1249     This is a single-node call.
1250
1251     """
1252     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1253
1254   @_RpcTimeout(_TMO_NORMAL)
1255   def call_blockdev_grow(self, node, cf_bdev, amount):
1256     """Request a snapshot of the given block device.
1257
1258     This is a single-node call.
1259
1260     """
1261     return self._SingleNodeCall(node, "blockdev_grow",
1262                                 [cf_bdev.ToDict(), amount])
1263
1264   @_RpcTimeout(_TMO_1DAY)
1265   def call_blockdev_export(self, node, cf_bdev,
1266                            dest_node, dest_path, cluster_name):
1267     """Export a given disk to another node.
1268
1269     This is a single-node call.
1270
1271     """
1272     return self._SingleNodeCall(node, "blockdev_export",
1273                                 [cf_bdev.ToDict(), dest_node, dest_path,
1274                                  cluster_name])
1275
1276   @_RpcTimeout(_TMO_NORMAL)
1277   def call_blockdev_snapshot(self, node, cf_bdev):
1278     """Request a snapshot of the given block device.
1279
1280     This is a single-node call.
1281
1282     """
1283     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1284
1285   @_RpcTimeout(_TMO_NORMAL)
1286   def call_finalize_export(self, node, instance, snap_disks):
1287     """Request the completion of an export operation.
1288
1289     This writes the export config file, etc.
1290
1291     This is a single-node call.
1292
1293     """
1294     flat_disks = []
1295     for disk in snap_disks:
1296       if isinstance(disk, bool):
1297         flat_disks.append(disk)
1298       else:
1299         flat_disks.append(disk.ToDict())
1300
1301     return self._SingleNodeCall(node, "finalize_export",
1302                                 [self._InstDict(instance), flat_disks])
1303
1304   @_RpcTimeout(_TMO_FAST)
1305   def call_export_info(self, node, path):
1306     """Queries the export information in a given path.
1307
1308     This is a single-node call.
1309
1310     """
1311     return self._SingleNodeCall(node, "export_info", [path])
1312
1313   @_RpcTimeout(_TMO_FAST)
1314   def call_export_list(self, node_list):
1315     """Gets the stored exports list.
1316
1317     This is a multi-node call.
1318
1319     """
1320     return self._MultiNodeCall(node_list, "export_list", [])
1321
1322   @_RpcTimeout(_TMO_FAST)
1323   def call_export_remove(self, node, export):
1324     """Requests removal of a given export.
1325
1326     This is a single-node call.
1327
1328     """
1329     return self._SingleNodeCall(node, "export_remove", [export])
1330
1331   @classmethod
1332   @_RpcTimeout(_TMO_NORMAL)
1333   def call_node_leave_cluster(cls, node, modify_ssh_setup):
1334     """Requests a node to clean the cluster information it has.
1335
1336     This will remove the configuration information from the ganeti data
1337     dir.
1338
1339     This is a single-node call.
1340
1341     """
1342     return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1343                                      [modify_ssh_setup])
1344
1345   @_RpcTimeout(_TMO_FAST)
1346   def call_node_volumes(self, node_list):
1347     """Gets all volumes on node(s).
1348
1349     This is a multi-node call.
1350
1351     """
1352     return self._MultiNodeCall(node_list, "node_volumes", [])
1353
1354   @_RpcTimeout(_TMO_FAST)
1355   def call_node_demote_from_mc(self, node):
1356     """Demote a node from the master candidate role.
1357
1358     This is a single-node call.
1359
1360     """
1361     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1362
1363   @_RpcTimeout(_TMO_NORMAL)
1364   def call_node_powercycle(self, node, hypervisor):
1365     """Tries to powercycle a node.
1366
1367     This is a single-node call.
1368
1369     """
1370     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1371
1372   @_RpcTimeout(None)
1373   def call_test_delay(self, node_list, duration):
1374     """Sleep for a fixed time on given node(s).
1375
1376     This is a multi-node call.
1377
1378     """
1379     return self._MultiNodeCall(node_list, "test_delay", [duration],
1380                                read_timeout=int(duration + 5))
1381
1382   @_RpcTimeout(_TMO_FAST)
1383   def call_file_storage_dir_create(self, node, file_storage_dir):
1384     """Create the given file storage directory.
1385
1386     This is a single-node call.
1387
1388     """
1389     return self._SingleNodeCall(node, "file_storage_dir_create",
1390                                 [file_storage_dir])
1391
1392   @_RpcTimeout(_TMO_FAST)
1393   def call_file_storage_dir_remove(self, node, file_storage_dir):
1394     """Remove the given file storage directory.
1395
1396     This is a single-node call.
1397
1398     """
1399     return self._SingleNodeCall(node, "file_storage_dir_remove",
1400                                 [file_storage_dir])
1401
1402   @_RpcTimeout(_TMO_FAST)
1403   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1404                                    new_file_storage_dir):
1405     """Rename file storage directory.
1406
1407     This is a single-node call.
1408
1409     """
1410     return self._SingleNodeCall(node, "file_storage_dir_rename",
1411                                 [old_file_storage_dir, new_file_storage_dir])
1412
1413   @classmethod
1414   @_RpcTimeout(_TMO_URGENT)
1415   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1416     """Update job queue.
1417
1418     This is a multi-node call.
1419
1420     """
1421     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1422                                     [file_name, cls._Compress(content)],
1423                                     address_list=address_list)
1424
1425   @classmethod
1426   @_RpcTimeout(_TMO_NORMAL)
1427   def call_jobqueue_purge(cls, node):
1428     """Purge job queue.
1429
1430     This is a single-node call.
1431
1432     """
1433     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1434
1435   @classmethod
1436   @_RpcTimeout(_TMO_URGENT)
1437   def call_jobqueue_rename(cls, node_list, address_list, rename):
1438     """Rename a job queue file.
1439
1440     This is a multi-node call.
1441
1442     """
1443     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1444                                     address_list=address_list)
1445
1446   @_RpcTimeout(_TMO_NORMAL)
1447   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1448     """Validate the hypervisor params.
1449
1450     This is a multi-node call.
1451
1452     @type node_list: list
1453     @param node_list: the list of nodes to query
1454     @type hvname: string
1455     @param hvname: the hypervisor name
1456     @type hvparams: dict
1457     @param hvparams: the hypervisor parameters to be validated
1458
1459     """
1460     cluster = self._cfg.GetClusterInfo()
1461     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1462     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1463                                [hvname, hv_full])
1464
1465   @_RpcTimeout(_TMO_NORMAL)
1466   def call_x509_cert_create(self, node, validity):
1467     """Creates a new X509 certificate for SSL/TLS.
1468
1469     This is a single-node call.
1470
1471     @type validity: int
1472     @param validity: Validity in seconds
1473
1474     """
1475     return self._SingleNodeCall(node, "x509_cert_create", [validity])
1476
1477   @_RpcTimeout(_TMO_NORMAL)
1478   def call_x509_cert_remove(self, node, name):
1479     """Removes a X509 certificate.
1480
1481     This is a single-node call.
1482
1483     @type name: string
1484     @param name: Certificate name
1485
1486     """
1487     return self._SingleNodeCall(node, "x509_cert_remove", [name])
1488
1489   @_RpcTimeout(_TMO_NORMAL)
1490   def call_import_start(self, node, opts, instance, dest, dest_args):
1491     """Starts a listener for an import.
1492
1493     This is a single-node call.
1494
1495     @type node: string
1496     @param node: Node name
1497     @type instance: C{objects.Instance}
1498     @param instance: Instance object
1499
1500     """
1501     return self._SingleNodeCall(node, "import_start",
1502                                 [opts.ToDict(),
1503                                  self._InstDict(instance), dest,
1504                                  _EncodeImportExportIO(dest, dest_args)])
1505
1506   @_RpcTimeout(_TMO_NORMAL)
1507   def call_export_start(self, node, opts, host, port,
1508                         instance, source, source_args):
1509     """Starts an export daemon.
1510
1511     This is a single-node call.
1512
1513     @type node: string
1514     @param node: Node name
1515     @type instance: C{objects.Instance}
1516     @param instance: Instance object
1517
1518     """
1519     return self._SingleNodeCall(node, "export_start",
1520                                 [opts.ToDict(), host, port,
1521                                  self._InstDict(instance), source,
1522                                  _EncodeImportExportIO(source, source_args)])
1523
1524   @_RpcTimeout(_TMO_FAST)
1525   def call_impexp_status(self, node, names):
1526     """Gets the status of an import or export.
1527
1528     This is a single-node call.
1529
1530     @type node: string
1531     @param node: Node name
1532     @type names: List of strings
1533     @param names: Import/export names
1534     @rtype: List of L{objects.ImportExportStatus} instances
1535     @return: Returns a list of the state of each named import/export or None if
1536              a status couldn't be retrieved
1537
1538     """
1539     result = self._SingleNodeCall(node, "impexp_status", [names])
1540
1541     if not result.fail_msg:
1542       decoded = []
1543
1544       for i in result.payload:
1545         if i is None:
1546           decoded.append(None)
1547           continue
1548         decoded.append(objects.ImportExportStatus.FromDict(i))
1549
1550       result.payload = decoded
1551
1552     return result
1553
1554   @_RpcTimeout(_TMO_NORMAL)
1555   def call_impexp_abort(self, node, name):
1556     """Aborts an import or export.
1557
1558     This is a single-node call.
1559
1560     @type node: string
1561     @param node: Node name
1562     @type name: string
1563     @param name: Import/export name
1564
1565     """
1566     return self._SingleNodeCall(node, "impexp_abort", [name])
1567
1568   @_RpcTimeout(_TMO_NORMAL)
1569   def call_impexp_cleanup(self, node, name):
1570     """Cleans up after an import or export.
1571
1572     This is a single-node call.
1573
1574     @type node: string
1575     @param node: Node name
1576     @type name: string
1577     @param name: Import/export name
1578
1579     """
1580     return self._SingleNodeCall(node, "impexp_cleanup", [name])