objects: Add custom de-/serializing code for query responses
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37 import pycurl
38 import threading
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48
49 # pylint has a bug here, doesn't see this import
50 import ganeti.http.client  # pylint: disable-msg=W0611
51
52
53 # Timeout for connecting to nodes (seconds)
54 _RPC_CONNECT_TIMEOUT = 5
55
56 _RPC_CLIENT_HEADERS = [
57   "Content-type: %s" % http.HTTP_APP_JSON,
58   "Expect:",
59   ]
60
61 # Various time constants for the timeout table
62 _TMO_URGENT = 60 # one minute
63 _TMO_FAST = 5 * 60 # five minutes
64 _TMO_NORMAL = 15 * 60 # 15 minutes
65 _TMO_SLOW = 3600 # one hour
66 _TMO_4HRS = 4 * 3600
67 _TMO_1DAY = 86400
68
69 # Timeout table that will be built later by decorators
70 # Guidelines for choosing timeouts:
71 # - call used during watcher: timeout -> 1min, _TMO_URGENT
72 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
73 # - other calls: 15 min, _TMO_NORMAL
74 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
75
76 _TIMEOUTS = {
77 }
78
79
80 def Init():
81   """Initializes the module-global HTTP client manager.
82
83   Must be called before using any RPC function and while exactly one thread is
84   running.
85
86   """
87   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
88   # one thread running. This check is just a safety measure -- it doesn't
89   # cover all cases.
90   assert threading.activeCount() == 1, \
91          "Found more than one active thread when initializing pycURL"
92
93   logging.info("Using PycURL %s", pycurl.version)
94
95   pycurl.global_init(pycurl.GLOBAL_ALL)
96
97
98 def Shutdown():
99   """Stops the module-global HTTP client manager.
100
101   Must be called before quitting the program and while exactly one thread is
102   running.
103
104   """
105   pycurl.global_cleanup()
106
107
108 def _ConfigRpcCurl(curl):
109   noded_cert = str(constants.NODED_CERT_FILE)
110
111   curl.setopt(pycurl.FOLLOWLOCATION, False)
112   curl.setopt(pycurl.CAINFO, noded_cert)
113   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
114   curl.setopt(pycurl.SSL_VERIFYPEER, True)
115   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
116   curl.setopt(pycurl.SSLCERT, noded_cert)
117   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
118   curl.setopt(pycurl.SSLKEY, noded_cert)
119   curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
120
121
122 # Aliasing this module avoids the following warning by epydoc: "Warning: No
123 # information available for ganeti.rpc._RpcThreadLocal's base threading.local"
124 _threading = threading
125
126
127 class _RpcThreadLocal(_threading.local):
128   def GetHttpClientPool(self):
129     """Returns a per-thread HTTP client pool.
130
131     @rtype: L{http.client.HttpClientPool}
132
133     """
134     try:
135       pool = self.hcp
136     except AttributeError:
137       pool = http.client.HttpClientPool(_ConfigRpcCurl)
138       self.hcp = pool
139
140     return pool
141
142
143 # Remove module alias (see above)
144 del _threading
145
146
147 _thread_local = _RpcThreadLocal()
148
149
150 def _RpcTimeout(secs):
151   """Timeout decorator.
152
153   When applied to a rpc call_* function, it updates the global timeout
154   table with the given function/timeout.
155
156   """
157   def decorator(f):
158     name = f.__name__
159     assert name.startswith("call_")
160     _TIMEOUTS[name[len("call_"):]] = secs
161     return f
162   return decorator
163
164
165 def RunWithRPC(fn):
166   """RPC-wrapper decorator.
167
168   When applied to a function, it runs it with the RPC system
169   initialized, and it shutsdown the system afterwards. This means the
170   function must be called without RPC being initialized.
171
172   """
173   def wrapper(*args, **kwargs):
174     Init()
175     try:
176       return fn(*args, **kwargs)
177     finally:
178       Shutdown()
179   return wrapper
180
181
182 class RpcResult(object):
183   """RPC Result class.
184
185   This class holds an RPC result. It is needed since in multi-node
186   calls we can't raise an exception just because one one out of many
187   failed, and therefore we use this class to encapsulate the result.
188
189   @ivar data: the data payload, for successful results, or None
190   @ivar call: the name of the RPC call
191   @ivar node: the name of the node to which we made the call
192   @ivar offline: whether the operation failed because the node was
193       offline, as opposed to actual failure; offline=True will always
194       imply failed=True, in order to allow simpler checking if
195       the user doesn't care about the exact failure mode
196   @ivar fail_msg: the error message if the call failed
197
198   """
199   def __init__(self, data=None, failed=False, offline=False,
200                call=None, node=None):
201     self.offline = offline
202     self.call = call
203     self.node = node
204
205     if offline:
206       self.fail_msg = "Node is marked offline"
207       self.data = self.payload = None
208     elif failed:
209       self.fail_msg = self._EnsureErr(data)
210       self.data = self.payload = None
211     else:
212       self.data = data
213       if not isinstance(self.data, (tuple, list)):
214         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
215                          type(self.data))
216         self.payload = None
217       elif len(data) != 2:
218         self.fail_msg = ("RPC layer error: invalid result length (%d), "
219                          "expected 2" % len(self.data))
220         self.payload = None
221       elif not self.data[0]:
222         self.fail_msg = self._EnsureErr(self.data[1])
223         self.payload = None
224       else:
225         # finally success
226         self.fail_msg = None
227         self.payload = data[1]
228
229     assert hasattr(self, "call")
230     assert hasattr(self, "data")
231     assert hasattr(self, "fail_msg")
232     assert hasattr(self, "node")
233     assert hasattr(self, "offline")
234     assert hasattr(self, "payload")
235
236   @staticmethod
237   def _EnsureErr(val):
238     """Helper to ensure we return a 'True' value for error."""
239     if val:
240       return val
241     else:
242       return "No error information"
243
244   def Raise(self, msg, prereq=False, ecode=None):
245     """If the result has failed, raise an OpExecError.
246
247     This is used so that LU code doesn't have to check for each
248     result, but instead can call this function.
249
250     """
251     if not self.fail_msg:
252       return
253
254     if not msg: # one could pass None for default message
255       msg = ("Call '%s' to node '%s' has failed: %s" %
256              (self.call, self.node, self.fail_msg))
257     else:
258       msg = "%s: %s" % (msg, self.fail_msg)
259     if prereq:
260       ec = errors.OpPrereqError
261     else:
262       ec = errors.OpExecError
263     if ecode is not None:
264       args = (msg, ecode)
265     else:
266       args = (msg, )
267     raise ec(*args) # pylint: disable-msg=W0142
268
269
270 def _AddressLookup(node_list,
271                    ssc=ssconf.SimpleStore,
272                    nslookup_fn=netutils.Hostname.GetIP):
273   """Return addresses for given node names.
274
275   @type node_list: list
276   @param node_list: List of node names
277   @type ssc: class
278   @param ssc: SimpleStore class that is used to obtain node->ip mappings
279   @type nslookup_fn: callable
280   @param nslookup_fn: function use to do NS lookup
281   @rtype: list of addresses and/or None's
282   @returns: List of corresponding addresses, if found
283
284   """
285   ss = ssc()
286   iplist = ss.GetNodePrimaryIPList()
287   family = ss.GetPrimaryIPFamily()
288   addresses = []
289   ipmap = dict(entry.split() for entry in iplist)
290   for node in node_list:
291     address = ipmap.get(node)
292     if address is None:
293       address = nslookup_fn(node, family=family)
294     addresses.append(address)
295
296   return addresses
297
298
299 class Client:
300   """RPC Client class.
301
302   This class, given a (remote) method name, a list of parameters and a
303   list of nodes, will contact (in parallel) all nodes, and return a
304   dict of results (key: node name, value: result).
305
306   One current bug is that generic failure is still signaled by
307   'False' result, which is not good. This overloading of values can
308   cause bugs.
309
310   """
311   def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
312     assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
313                                     " timeouts table")
314     self.procedure = procedure
315     self.body = body
316     self.port = port
317     self._request = {}
318     self._address_lookup_fn = address_lookup_fn
319
320   def ConnectList(self, node_list, address_list=None, read_timeout=None):
321     """Add a list of nodes to the target nodes.
322
323     @type node_list: list
324     @param node_list: the list of node names to connect
325     @type address_list: list or None
326     @keyword address_list: either None or a list with node addresses,
327         which must have the same length as the node list
328     @type read_timeout: int
329     @param read_timeout: overwrites default timeout for operation
330
331     """
332     if address_list is None:
333       # Always use IP address instead of node name
334       address_list = self._address_lookup_fn(node_list)
335
336     assert len(node_list) == len(address_list), \
337            "Name and address lists must have the same length"
338
339     for node, address in zip(node_list, address_list):
340       self.ConnectNode(node, address, read_timeout=read_timeout)
341
342   def ConnectNode(self, name, address=None, read_timeout=None):
343     """Add a node to the target list.
344
345     @type name: str
346     @param name: the node name
347     @type address: str
348     @param address: the node address, if known
349     @type read_timeout: int
350     @param read_timeout: overwrites default timeout for operation
351
352     """
353     if address is None:
354       # Always use IP address instead of node name
355       address = self._address_lookup_fn([name])[0]
356
357     assert(address is not None)
358
359     if read_timeout is None:
360       read_timeout = _TIMEOUTS[self.procedure]
361
362     self._request[name] = \
363       http.client.HttpClientRequest(str(address), self.port,
364                                     http.HTTP_PUT, str("/%s" % self.procedure),
365                                     headers=_RPC_CLIENT_HEADERS,
366                                     post_data=str(self.body),
367                                     read_timeout=read_timeout)
368
369   def GetResults(self, http_pool=None):
370     """Call nodes and return results.
371
372     @rtype: list
373     @return: List of RPC results
374
375     """
376     if not http_pool:
377       http_pool = _thread_local.GetHttpClientPool()
378
379     http_pool.ProcessRequests(self._request.values())
380
381     results = {}
382
383     for name, req in self._request.iteritems():
384       if req.success and req.resp_status_code == http.HTTP_OK:
385         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
386                                   node=name, call=self.procedure)
387         continue
388
389       # TODO: Better error reporting
390       if req.error:
391         msg = req.error
392       else:
393         msg = req.resp_body
394
395       logging.error("RPC error in %s from node %s: %s",
396                     self.procedure, name, msg)
397       results[name] = RpcResult(data=msg, failed=True, node=name,
398                                 call=self.procedure)
399
400     return results
401
402
403 def _EncodeImportExportIO(ieio, ieioargs):
404   """Encodes import/export I/O information.
405
406   """
407   if ieio == constants.IEIO_RAW_DISK:
408     assert len(ieioargs) == 1
409     return (ieioargs[0].ToDict(), )
410
411   if ieio == constants.IEIO_SCRIPT:
412     assert len(ieioargs) == 2
413     return (ieioargs[0].ToDict(), ieioargs[1])
414
415   return ieioargs
416
417
418 class RpcRunner(object):
419   """RPC runner class"""
420
421   def __init__(self, cfg):
422     """Initialized the rpc runner.
423
424     @type cfg:  C{config.ConfigWriter}
425     @param cfg: the configuration object that will be used to get data
426                 about the cluster
427
428     """
429     self._cfg = cfg
430     self.port = netutils.GetDaemonPort(constants.NODED)
431
432   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
433     """Convert the given instance to a dict.
434
435     This is done via the instance's ToDict() method and additionally
436     we fill the hvparams with the cluster defaults.
437
438     @type instance: L{objects.Instance}
439     @param instance: an Instance object
440     @type hvp: dict or None
441     @param hvp: a dictionary with overridden hypervisor parameters
442     @type bep: dict or None
443     @param bep: a dictionary with overridden backend parameters
444     @type osp: dict or None
445     @param osp: a dictionary with overridden os parameters
446     @rtype: dict
447     @return: the instance dict, with the hvparams filled with the
448         cluster defaults
449
450     """
451     idict = instance.ToDict()
452     cluster = self._cfg.GetClusterInfo()
453     idict["hvparams"] = cluster.FillHV(instance)
454     if hvp is not None:
455       idict["hvparams"].update(hvp)
456     idict["beparams"] = cluster.FillBE(instance)
457     if bep is not None:
458       idict["beparams"].update(bep)
459     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
460     if osp is not None:
461       idict["osparams"].update(osp)
462     for nic in idict["nics"]:
463       nic['nicparams'] = objects.FillDict(
464         cluster.nicparams[constants.PP_DEFAULT],
465         nic['nicparams'])
466     return idict
467
468   def _ConnectList(self, client, node_list, call, read_timeout=None):
469     """Helper for computing node addresses.
470
471     @type client: L{ganeti.rpc.Client}
472     @param client: a C{Client} instance
473     @type node_list: list
474     @param node_list: the node list we should connect
475     @type call: string
476     @param call: the name of the remote procedure call, for filling in
477         correctly any eventual offline nodes' results
478     @type read_timeout: int
479     @param read_timeout: overwrites the default read timeout for the
480         given operation
481
482     """
483     all_nodes = self._cfg.GetAllNodesInfo()
484     name_list = []
485     addr_list = []
486     skip_dict = {}
487     for node in node_list:
488       if node in all_nodes:
489         if all_nodes[node].offline:
490           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
491           continue
492         val = all_nodes[node].primary_ip
493       else:
494         val = None
495       addr_list.append(val)
496       name_list.append(node)
497     if name_list:
498       client.ConnectList(name_list, address_list=addr_list,
499                          read_timeout=read_timeout)
500     return skip_dict
501
502   def _ConnectNode(self, client, node, call, read_timeout=None):
503     """Helper for computing one node's address.
504
505     @type client: L{ganeti.rpc.Client}
506     @param client: a C{Client} instance
507     @type node: str
508     @param node: the node we should connect
509     @type call: string
510     @param call: the name of the remote procedure call, for filling in
511         correctly any eventual offline nodes' results
512     @type read_timeout: int
513     @param read_timeout: overwrites the default read timeout for the
514         given operation
515
516     """
517     node_info = self._cfg.GetNodeInfo(node)
518     if node_info is not None:
519       if node_info.offline:
520         return RpcResult(node=node, offline=True, call=call)
521       addr = node_info.primary_ip
522     else:
523       addr = None
524     client.ConnectNode(node, address=addr, read_timeout=read_timeout)
525
526   def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
527     """Helper for making a multi-node call
528
529     """
530     body = serializer.DumpJson(args, indent=False)
531     c = Client(procedure, body, self.port)
532     skip_dict = self._ConnectList(c, node_list, procedure,
533                                   read_timeout=read_timeout)
534     skip_dict.update(c.GetResults())
535     return skip_dict
536
537   @classmethod
538   def _StaticMultiNodeCall(cls, node_list, procedure, args,
539                            address_list=None, read_timeout=None):
540     """Helper for making a multi-node static call
541
542     """
543     body = serializer.DumpJson(args, indent=False)
544     c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
545     c.ConnectList(node_list, address_list=address_list,
546                   read_timeout=read_timeout)
547     return c.GetResults()
548
549   def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
550     """Helper for making a single-node call
551
552     """
553     body = serializer.DumpJson(args, indent=False)
554     c = Client(procedure, body, self.port)
555     result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
556     if result is None:
557       # we did connect, node is not offline
558       result = c.GetResults()[node]
559     return result
560
561   @classmethod
562   def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
563     """Helper for making a single-node static call
564
565     """
566     body = serializer.DumpJson(args, indent=False)
567     c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
568     c.ConnectNode(node, read_timeout=read_timeout)
569     return c.GetResults()[node]
570
571   @staticmethod
572   def _Compress(data):
573     """Compresses a string for transport over RPC.
574
575     Small amounts of data are not compressed.
576
577     @type data: str
578     @param data: Data
579     @rtype: tuple
580     @return: Encoded data to send
581
582     """
583     # Small amounts of data are not compressed
584     if len(data) < 512:
585       return (constants.RPC_ENCODING_NONE, data)
586
587     # Compress with zlib and encode in base64
588     return (constants.RPC_ENCODING_ZLIB_BASE64,
589             base64.b64encode(zlib.compress(data, 3)))
590
591   #
592   # Begin RPC calls
593   #
594
595   @_RpcTimeout(_TMO_URGENT)
596   def call_lv_list(self, node_list, vg_name):
597     """Gets the logical volumes present in a given volume group.
598
599     This is a multi-node call.
600
601     """
602     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
603
604   @_RpcTimeout(_TMO_URGENT)
605   def call_vg_list(self, node_list):
606     """Gets the volume group list.
607
608     This is a multi-node call.
609
610     """
611     return self._MultiNodeCall(node_list, "vg_list", [])
612
613   @_RpcTimeout(_TMO_NORMAL)
614   def call_storage_list(self, node_list, su_name, su_args, name, fields):
615     """Get list of storage units.
616
617     This is a multi-node call.
618
619     """
620     return self._MultiNodeCall(node_list, "storage_list",
621                                [su_name, su_args, name, fields])
622
623   @_RpcTimeout(_TMO_NORMAL)
624   def call_storage_modify(self, node, su_name, su_args, name, changes):
625     """Modify a storage unit.
626
627     This is a single-node call.
628
629     """
630     return self._SingleNodeCall(node, "storage_modify",
631                                 [su_name, su_args, name, changes])
632
633   @_RpcTimeout(_TMO_NORMAL)
634   def call_storage_execute(self, node, su_name, su_args, name, op):
635     """Executes an operation on a storage unit.
636
637     This is a single-node call.
638
639     """
640     return self._SingleNodeCall(node, "storage_execute",
641                                 [su_name, su_args, name, op])
642
643   @_RpcTimeout(_TMO_URGENT)
644   def call_bridges_exist(self, node, bridges_list):
645     """Checks if a node has all the bridges given.
646
647     This method checks if all bridges given in the bridges_list are
648     present on the remote node, so that an instance that uses interfaces
649     on those bridges can be started.
650
651     This is a single-node call.
652
653     """
654     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
655
656   @_RpcTimeout(_TMO_NORMAL)
657   def call_instance_start(self, node, instance, hvp, bep):
658     """Starts an instance.
659
660     This is a single-node call.
661
662     """
663     idict = self._InstDict(instance, hvp=hvp, bep=bep)
664     return self._SingleNodeCall(node, "instance_start", [idict])
665
666   @_RpcTimeout(_TMO_NORMAL)
667   def call_instance_shutdown(self, node, instance, timeout):
668     """Stops an instance.
669
670     This is a single-node call.
671
672     """
673     return self._SingleNodeCall(node, "instance_shutdown",
674                                 [self._InstDict(instance), timeout])
675
676   @_RpcTimeout(_TMO_NORMAL)
677   def call_migration_info(self, node, instance):
678     """Gather the information necessary to prepare an instance migration.
679
680     This is a single-node call.
681
682     @type node: string
683     @param node: the node on which the instance is currently running
684     @type instance: C{objects.Instance}
685     @param instance: the instance definition
686
687     """
688     return self._SingleNodeCall(node, "migration_info",
689                                 [self._InstDict(instance)])
690
691   @_RpcTimeout(_TMO_NORMAL)
692   def call_accept_instance(self, node, instance, info, target):
693     """Prepare a node to accept an instance.
694
695     This is a single-node call.
696
697     @type node: string
698     @param node: the target node for the migration
699     @type instance: C{objects.Instance}
700     @param instance: the instance definition
701     @type info: opaque/hypervisor specific (string/data)
702     @param info: result for the call_migration_info call
703     @type target: string
704     @param target: target hostname (usually ip address) (on the node itself)
705
706     """
707     return self._SingleNodeCall(node, "accept_instance",
708                                 [self._InstDict(instance), info, target])
709
710   @_RpcTimeout(_TMO_NORMAL)
711   def call_finalize_migration(self, node, instance, info, success):
712     """Finalize any target-node migration specific operation.
713
714     This is called both in case of a successful migration and in case of error
715     (in which case it should abort the migration).
716
717     This is a single-node call.
718
719     @type node: string
720     @param node: the target node for the migration
721     @type instance: C{objects.Instance}
722     @param instance: the instance definition
723     @type info: opaque/hypervisor specific (string/data)
724     @param info: result for the call_migration_info call
725     @type success: boolean
726     @param success: whether the migration was a success or a failure
727
728     """
729     return self._SingleNodeCall(node, "finalize_migration",
730                                 [self._InstDict(instance), info, success])
731
732   @_RpcTimeout(_TMO_SLOW)
733   def call_instance_migrate(self, node, instance, target, live):
734     """Migrate an instance.
735
736     This is a single-node call.
737
738     @type node: string
739     @param node: the node on which the instance is currently running
740     @type instance: C{objects.Instance}
741     @param instance: the instance definition
742     @type target: string
743     @param target: the target node name
744     @type live: boolean
745     @param live: whether the migration should be done live or not (the
746         interpretation of this parameter is left to the hypervisor)
747
748     """
749     return self._SingleNodeCall(node, "instance_migrate",
750                                 [self._InstDict(instance), target, live])
751
752   @_RpcTimeout(_TMO_NORMAL)
753   def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
754     """Reboots an instance.
755
756     This is a single-node call.
757
758     """
759     return self._SingleNodeCall(node, "instance_reboot",
760                                 [self._InstDict(inst), reboot_type,
761                                  shutdown_timeout])
762
763   @_RpcTimeout(_TMO_1DAY)
764   def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
765     """Installs an OS on the given instance.
766
767     This is a single-node call.
768
769     """
770     return self._SingleNodeCall(node, "instance_os_add",
771                                 [self._InstDict(inst, osp=osparams),
772                                  reinstall, debug])
773
774   @_RpcTimeout(_TMO_SLOW)
775   def call_instance_run_rename(self, node, inst, old_name, debug):
776     """Run the OS rename script for an instance.
777
778     This is a single-node call.
779
780     """
781     return self._SingleNodeCall(node, "instance_run_rename",
782                                 [self._InstDict(inst), old_name, debug])
783
784   @_RpcTimeout(_TMO_URGENT)
785   def call_instance_info(self, node, instance, hname):
786     """Returns information about a single instance.
787
788     This is a single-node call.
789
790     @type node: list
791     @param node: the list of nodes to query
792     @type instance: string
793     @param instance: the instance name
794     @type hname: string
795     @param hname: the hypervisor type of the instance
796
797     """
798     return self._SingleNodeCall(node, "instance_info", [instance, hname])
799
800   @_RpcTimeout(_TMO_NORMAL)
801   def call_instance_migratable(self, node, instance):
802     """Checks whether the given instance can be migrated.
803
804     This is a single-node call.
805
806     @param node: the node to query
807     @type instance: L{objects.Instance}
808     @param instance: the instance to check
809
810
811     """
812     return self._SingleNodeCall(node, "instance_migratable",
813                                 [self._InstDict(instance)])
814
815   @_RpcTimeout(_TMO_URGENT)
816   def call_all_instances_info(self, node_list, hypervisor_list):
817     """Returns information about all instances on the given nodes.
818
819     This is a multi-node call.
820
821     @type node_list: list
822     @param node_list: the list of nodes to query
823     @type hypervisor_list: list
824     @param hypervisor_list: the hypervisors to query for instances
825
826     """
827     return self._MultiNodeCall(node_list, "all_instances_info",
828                                [hypervisor_list])
829
830   @_RpcTimeout(_TMO_URGENT)
831   def call_instance_list(self, node_list, hypervisor_list):
832     """Returns the list of running instances on a given node.
833
834     This is a multi-node call.
835
836     @type node_list: list
837     @param node_list: the list of nodes to query
838     @type hypervisor_list: list
839     @param hypervisor_list: the hypervisors to query for instances
840
841     """
842     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
843
844   @_RpcTimeout(_TMO_FAST)
845   def call_node_tcp_ping(self, node, source, target, port, timeout,
846                          live_port_needed):
847     """Do a TcpPing on the remote node
848
849     This is a single-node call.
850
851     """
852     return self._SingleNodeCall(node, "node_tcp_ping",
853                                 [source, target, port, timeout,
854                                  live_port_needed])
855
856   @_RpcTimeout(_TMO_FAST)
857   def call_node_has_ip_address(self, node, address):
858     """Checks if a node has the given IP address.
859
860     This is a single-node call.
861
862     """
863     return self._SingleNodeCall(node, "node_has_ip_address", [address])
864
865   @_RpcTimeout(_TMO_URGENT)
866   def call_node_info(self, node_list, vg_name, hypervisor_type):
867     """Return node information.
868
869     This will return memory information and volume group size and free
870     space.
871
872     This is a multi-node call.
873
874     @type node_list: list
875     @param node_list: the list of nodes to query
876     @type vg_name: C{string}
877     @param vg_name: the name of the volume group to ask for disk space
878         information
879     @type hypervisor_type: C{str}
880     @param hypervisor_type: the name of the hypervisor to ask for
881         memory information
882
883     """
884     return self._MultiNodeCall(node_list, "node_info",
885                                [vg_name, hypervisor_type])
886
887   @_RpcTimeout(_TMO_NORMAL)
888   def call_etc_hosts_modify(self, node, mode, name, ip):
889     """Modify hosts file with name
890
891     @type node: string
892     @param node: The node to call
893     @type mode: string
894     @param mode: The mode to operate. Currently "add" or "remove"
895     @type name: string
896     @param name: The host name to be modified
897     @type ip: string
898     @param ip: The ip of the entry (just valid if mode is "add")
899
900     """
901     return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
902
903   @_RpcTimeout(_TMO_NORMAL)
904   def call_node_verify(self, node_list, checkdict, cluster_name):
905     """Request verification of given parameters.
906
907     This is a multi-node call.
908
909     """
910     return self._MultiNodeCall(node_list, "node_verify",
911                                [checkdict, cluster_name])
912
913   @classmethod
914   @_RpcTimeout(_TMO_FAST)
915   def call_node_start_master(cls, node, start_daemons, no_voting):
916     """Tells a node to activate itself as a master.
917
918     This is a single-node call.
919
920     """
921     return cls._StaticSingleNodeCall(node, "node_start_master",
922                                      [start_daemons, no_voting])
923
924   @classmethod
925   @_RpcTimeout(_TMO_FAST)
926   def call_node_stop_master(cls, node, stop_daemons):
927     """Tells a node to demote itself from master status.
928
929     This is a single-node call.
930
931     """
932     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
933
934   @classmethod
935   @_RpcTimeout(_TMO_URGENT)
936   def call_master_info(cls, node_list):
937     """Query master info.
938
939     This is a multi-node call.
940
941     """
942     # TODO: should this method query down nodes?
943     return cls._StaticMultiNodeCall(node_list, "master_info", [])
944
945   @classmethod
946   @_RpcTimeout(_TMO_URGENT)
947   def call_version(cls, node_list):
948     """Query node version.
949
950     This is a multi-node call.
951
952     """
953     return cls._StaticMultiNodeCall(node_list, "version", [])
954
955   @_RpcTimeout(_TMO_NORMAL)
956   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
957     """Request creation of a given block device.
958
959     This is a single-node call.
960
961     """
962     return self._SingleNodeCall(node, "blockdev_create",
963                                 [bdev.ToDict(), size, owner, on_primary, info])
964
965   @_RpcTimeout(_TMO_SLOW)
966   def call_blockdev_wipe(self, node, bdev, offset, size):
967     """Request wipe at given offset with given size of a block device.
968
969     This is a single-node call.
970
971     """
972     return self._SingleNodeCall(node, "blockdev_wipe",
973                                 [bdev.ToDict(), offset, size])
974
975   @_RpcTimeout(_TMO_NORMAL)
976   def call_blockdev_remove(self, node, bdev):
977     """Request removal of a given block device.
978
979     This is a single-node call.
980
981     """
982     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
983
984   @_RpcTimeout(_TMO_NORMAL)
985   def call_blockdev_rename(self, node, devlist):
986     """Request rename of the given block devices.
987
988     This is a single-node call.
989
990     """
991     return self._SingleNodeCall(node, "blockdev_rename",
992                                 [(d.ToDict(), uid) for d, uid in devlist])
993
994   @_RpcTimeout(_TMO_NORMAL)
995   def call_blockdev_assemble(self, node, disk, owner, on_primary):
996     """Request assembling of a given block device.
997
998     This is a single-node call.
999
1000     """
1001     return self._SingleNodeCall(node, "blockdev_assemble",
1002                                 [disk.ToDict(), owner, on_primary])
1003
1004   @_RpcTimeout(_TMO_NORMAL)
1005   def call_blockdev_shutdown(self, node, disk):
1006     """Request shutdown of a given block device.
1007
1008     This is a single-node call.
1009
1010     """
1011     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1012
1013   @_RpcTimeout(_TMO_NORMAL)
1014   def call_blockdev_addchildren(self, node, bdev, ndevs):
1015     """Request adding a list of children to a (mirroring) device.
1016
1017     This is a single-node call.
1018
1019     """
1020     return self._SingleNodeCall(node, "blockdev_addchildren",
1021                                 [bdev.ToDict(),
1022                                  [disk.ToDict() for disk in ndevs]])
1023
1024   @_RpcTimeout(_TMO_NORMAL)
1025   def call_blockdev_removechildren(self, node, bdev, ndevs):
1026     """Request removing a list of children from a (mirroring) device.
1027
1028     This is a single-node call.
1029
1030     """
1031     return self._SingleNodeCall(node, "blockdev_removechildren",
1032                                 [bdev.ToDict(),
1033                                  [disk.ToDict() for disk in ndevs]])
1034
1035   @_RpcTimeout(_TMO_NORMAL)
1036   def call_blockdev_getmirrorstatus(self, node, disks):
1037     """Request status of a (mirroring) device.
1038
1039     This is a single-node call.
1040
1041     """
1042     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1043                                   [dsk.ToDict() for dsk in disks])
1044     if not result.fail_msg:
1045       result.payload = [objects.BlockDevStatus.FromDict(i)
1046                         for i in result.payload]
1047     return result
1048
1049   @_RpcTimeout(_TMO_NORMAL)
1050   def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1051     """Request status of (mirroring) devices from multiple nodes.
1052
1053     This is a multi-node call.
1054
1055     """
1056     result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1057                                  [dict((name, [dsk.ToDict() for dsk in disks])
1058                                        for name, disks in node_disks.items())])
1059     for nres in result.values():
1060       if nres.fail_msg:
1061         continue
1062
1063       for idx, (success, status) in enumerate(nres.payload):
1064         if success:
1065           nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1066
1067     return result
1068
1069   @_RpcTimeout(_TMO_NORMAL)
1070   def call_blockdev_find(self, node, disk):
1071     """Request identification of a given block device.
1072
1073     This is a single-node call.
1074
1075     """
1076     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1077     if not result.fail_msg and result.payload is not None:
1078       result.payload = objects.BlockDevStatus.FromDict(result.payload)
1079     return result
1080
1081   @_RpcTimeout(_TMO_NORMAL)
1082   def call_blockdev_close(self, node, instance_name, disks):
1083     """Closes the given block devices.
1084
1085     This is a single-node call.
1086
1087     """
1088     params = [instance_name, [cf.ToDict() for cf in disks]]
1089     return self._SingleNodeCall(node, "blockdev_close", params)
1090
1091   @_RpcTimeout(_TMO_NORMAL)
1092   def call_blockdev_getsizes(self, node, disks):
1093     """Returns the size of the given disks.
1094
1095     This is a single-node call.
1096
1097     """
1098     params = [[cf.ToDict() for cf in disks]]
1099     return self._SingleNodeCall(node, "blockdev_getsize", params)
1100
1101   @_RpcTimeout(_TMO_NORMAL)
1102   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1103     """Disconnects the network of the given drbd devices.
1104
1105     This is a multi-node call.
1106
1107     """
1108     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1109                                [nodes_ip, [cf.ToDict() for cf in disks]])
1110
1111   @_RpcTimeout(_TMO_NORMAL)
1112   def call_drbd_attach_net(self, node_list, nodes_ip,
1113                            disks, instance_name, multimaster):
1114     """Disconnects the given drbd devices.
1115
1116     This is a multi-node call.
1117
1118     """
1119     return self._MultiNodeCall(node_list, "drbd_attach_net",
1120                                [nodes_ip, [cf.ToDict() for cf in disks],
1121                                 instance_name, multimaster])
1122
1123   @_RpcTimeout(_TMO_SLOW)
1124   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1125     """Waits for the synchronization of drbd devices is complete.
1126
1127     This is a multi-node call.
1128
1129     """
1130     return self._MultiNodeCall(node_list, "drbd_wait_sync",
1131                                [nodes_ip, [cf.ToDict() for cf in disks]])
1132
1133   @_RpcTimeout(_TMO_URGENT)
1134   def call_drbd_helper(self, node_list):
1135     """Gets drbd helper.
1136
1137     This is a multi-node call.
1138
1139     """
1140     return self._MultiNodeCall(node_list, "drbd_helper", [])
1141
1142   @classmethod
1143   @_RpcTimeout(_TMO_NORMAL)
1144   def call_upload_file(cls, node_list, file_name, address_list=None):
1145     """Upload a file.
1146
1147     The node will refuse the operation in case the file is not on the
1148     approved file list.
1149
1150     This is a multi-node call.
1151
1152     @type node_list: list
1153     @param node_list: the list of node names to upload to
1154     @type file_name: str
1155     @param file_name: the filename to upload
1156     @type address_list: list or None
1157     @keyword address_list: an optional list of node addresses, in order
1158         to optimize the RPC speed
1159
1160     """
1161     file_contents = utils.ReadFile(file_name)
1162     data = cls._Compress(file_contents)
1163     st = os.stat(file_name)
1164     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1165               st.st_atime, st.st_mtime]
1166     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1167                                     address_list=address_list)
1168
1169   @classmethod
1170   @_RpcTimeout(_TMO_NORMAL)
1171   def call_write_ssconf_files(cls, node_list, values):
1172     """Write ssconf files.
1173
1174     This is a multi-node call.
1175
1176     """
1177     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1178
1179   @_RpcTimeout(_TMO_NORMAL)
1180   def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1181     """Runs OOB.
1182
1183     This is a single-node call.
1184
1185     """
1186     return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1187                                                   remote_node, timeout])
1188
1189   @_RpcTimeout(_TMO_FAST)
1190   def call_os_diagnose(self, node_list):
1191     """Request a diagnose of OS definitions.
1192
1193     This is a multi-node call.
1194
1195     """
1196     return self._MultiNodeCall(node_list, "os_diagnose", [])
1197
1198   @_RpcTimeout(_TMO_FAST)
1199   def call_os_get(self, node, name):
1200     """Returns an OS definition.
1201
1202     This is a single-node call.
1203
1204     """
1205     result = self._SingleNodeCall(node, "os_get", [name])
1206     if not result.fail_msg and isinstance(result.payload, dict):
1207       result.payload = objects.OS.FromDict(result.payload)
1208     return result
1209
1210   @_RpcTimeout(_TMO_FAST)
1211   def call_os_validate(self, required, nodes, name, checks, params):
1212     """Run a validation routine for a given OS.
1213
1214     This is a multi-node call.
1215
1216     """
1217     return self._MultiNodeCall(nodes, "os_validate",
1218                                [required, name, checks, params])
1219
1220   @_RpcTimeout(_TMO_NORMAL)
1221   def call_hooks_runner(self, node_list, hpath, phase, env):
1222     """Call the hooks runner.
1223
1224     Args:
1225       - op: the OpCode instance
1226       - env: a dictionary with the environment
1227
1228     This is a multi-node call.
1229
1230     """
1231     params = [hpath, phase, env]
1232     return self._MultiNodeCall(node_list, "hooks_runner", params)
1233
1234   @_RpcTimeout(_TMO_NORMAL)
1235   def call_iallocator_runner(self, node, name, idata):
1236     """Call an iallocator on a remote node
1237
1238     Args:
1239       - name: the iallocator name
1240       - input: the json-encoded input string
1241
1242     This is a single-node call.
1243
1244     """
1245     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1246
1247   @_RpcTimeout(_TMO_NORMAL)
1248   def call_blockdev_grow(self, node, cf_bdev, amount):
1249     """Request a snapshot of the given block device.
1250
1251     This is a single-node call.
1252
1253     """
1254     return self._SingleNodeCall(node, "blockdev_grow",
1255                                 [cf_bdev.ToDict(), amount])
1256
1257   @_RpcTimeout(_TMO_1DAY)
1258   def call_blockdev_export(self, node, cf_bdev,
1259                            dest_node, dest_path, cluster_name):
1260     """Export a given disk to another node.
1261
1262     This is a single-node call.
1263
1264     """
1265     return self._SingleNodeCall(node, "blockdev_export",
1266                                 [cf_bdev.ToDict(), dest_node, dest_path,
1267                                  cluster_name])
1268
1269   @_RpcTimeout(_TMO_NORMAL)
1270   def call_blockdev_snapshot(self, node, cf_bdev):
1271     """Request a snapshot of the given block device.
1272
1273     This is a single-node call.
1274
1275     """
1276     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1277
1278   @_RpcTimeout(_TMO_NORMAL)
1279   def call_finalize_export(self, node, instance, snap_disks):
1280     """Request the completion of an export operation.
1281
1282     This writes the export config file, etc.
1283
1284     This is a single-node call.
1285
1286     """
1287     flat_disks = []
1288     for disk in snap_disks:
1289       if isinstance(disk, bool):
1290         flat_disks.append(disk)
1291       else:
1292         flat_disks.append(disk.ToDict())
1293
1294     return self._SingleNodeCall(node, "finalize_export",
1295                                 [self._InstDict(instance), flat_disks])
1296
1297   @_RpcTimeout(_TMO_FAST)
1298   def call_export_info(self, node, path):
1299     """Queries the export information in a given path.
1300
1301     This is a single-node call.
1302
1303     """
1304     return self._SingleNodeCall(node, "export_info", [path])
1305
1306   @_RpcTimeout(_TMO_FAST)
1307   def call_export_list(self, node_list):
1308     """Gets the stored exports list.
1309
1310     This is a multi-node call.
1311
1312     """
1313     return self._MultiNodeCall(node_list, "export_list", [])
1314
1315   @_RpcTimeout(_TMO_FAST)
1316   def call_export_remove(self, node, export):
1317     """Requests removal of a given export.
1318
1319     This is a single-node call.
1320
1321     """
1322     return self._SingleNodeCall(node, "export_remove", [export])
1323
1324   @classmethod
1325   @_RpcTimeout(_TMO_NORMAL)
1326   def call_node_leave_cluster(cls, node, modify_ssh_setup):
1327     """Requests a node to clean the cluster information it has.
1328
1329     This will remove the configuration information from the ganeti data
1330     dir.
1331
1332     This is a single-node call.
1333
1334     """
1335     return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1336                                      [modify_ssh_setup])
1337
1338   @_RpcTimeout(_TMO_FAST)
1339   def call_node_volumes(self, node_list):
1340     """Gets all volumes on node(s).
1341
1342     This is a multi-node call.
1343
1344     """
1345     return self._MultiNodeCall(node_list, "node_volumes", [])
1346
1347   @_RpcTimeout(_TMO_FAST)
1348   def call_node_demote_from_mc(self, node):
1349     """Demote a node from the master candidate role.
1350
1351     This is a single-node call.
1352
1353     """
1354     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1355
1356   @_RpcTimeout(_TMO_NORMAL)
1357   def call_node_powercycle(self, node, hypervisor):
1358     """Tries to powercycle a node.
1359
1360     This is a single-node call.
1361
1362     """
1363     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1364
1365   @_RpcTimeout(None)
1366   def call_test_delay(self, node_list, duration):
1367     """Sleep for a fixed time on given node(s).
1368
1369     This is a multi-node call.
1370
1371     """
1372     return self._MultiNodeCall(node_list, "test_delay", [duration],
1373                                read_timeout=int(duration + 5))
1374
1375   @_RpcTimeout(_TMO_FAST)
1376   def call_file_storage_dir_create(self, node, file_storage_dir):
1377     """Create the given file storage directory.
1378
1379     This is a single-node call.
1380
1381     """
1382     return self._SingleNodeCall(node, "file_storage_dir_create",
1383                                 [file_storage_dir])
1384
1385   @_RpcTimeout(_TMO_FAST)
1386   def call_file_storage_dir_remove(self, node, file_storage_dir):
1387     """Remove the given file storage directory.
1388
1389     This is a single-node call.
1390
1391     """
1392     return self._SingleNodeCall(node, "file_storage_dir_remove",
1393                                 [file_storage_dir])
1394
1395   @_RpcTimeout(_TMO_FAST)
1396   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1397                                    new_file_storage_dir):
1398     """Rename file storage directory.
1399
1400     This is a single-node call.
1401
1402     """
1403     return self._SingleNodeCall(node, "file_storage_dir_rename",
1404                                 [old_file_storage_dir, new_file_storage_dir])
1405
1406   @classmethod
1407   @_RpcTimeout(_TMO_FAST)
1408   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1409     """Update job queue.
1410
1411     This is a multi-node call.
1412
1413     """
1414     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1415                                     [file_name, cls._Compress(content)],
1416                                     address_list=address_list)
1417
1418   @classmethod
1419   @_RpcTimeout(_TMO_NORMAL)
1420   def call_jobqueue_purge(cls, node):
1421     """Purge job queue.
1422
1423     This is a single-node call.
1424
1425     """
1426     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1427
1428   @classmethod
1429   @_RpcTimeout(_TMO_FAST)
1430   def call_jobqueue_rename(cls, node_list, address_list, rename):
1431     """Rename a job queue file.
1432
1433     This is a multi-node call.
1434
1435     """
1436     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1437                                     address_list=address_list)
1438
1439   @_RpcTimeout(_TMO_NORMAL)
1440   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1441     """Validate the hypervisor params.
1442
1443     This is a multi-node call.
1444
1445     @type node_list: list
1446     @param node_list: the list of nodes to query
1447     @type hvname: string
1448     @param hvname: the hypervisor name
1449     @type hvparams: dict
1450     @param hvparams: the hypervisor parameters to be validated
1451
1452     """
1453     cluster = self._cfg.GetClusterInfo()
1454     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1455     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1456                                [hvname, hv_full])
1457
1458   @_RpcTimeout(_TMO_NORMAL)
1459   def call_x509_cert_create(self, node, validity):
1460     """Creates a new X509 certificate for SSL/TLS.
1461
1462     This is a single-node call.
1463
1464     @type validity: int
1465     @param validity: Validity in seconds
1466
1467     """
1468     return self._SingleNodeCall(node, "x509_cert_create", [validity])
1469
1470   @_RpcTimeout(_TMO_NORMAL)
1471   def call_x509_cert_remove(self, node, name):
1472     """Removes a X509 certificate.
1473
1474     This is a single-node call.
1475
1476     @type name: string
1477     @param name: Certificate name
1478
1479     """
1480     return self._SingleNodeCall(node, "x509_cert_remove", [name])
1481
1482   @_RpcTimeout(_TMO_NORMAL)
1483   def call_import_start(self, node, opts, instance, dest, dest_args):
1484     """Starts a listener for an import.
1485
1486     This is a single-node call.
1487
1488     @type node: string
1489     @param node: Node name
1490     @type instance: C{objects.Instance}
1491     @param instance: Instance object
1492
1493     """
1494     return self._SingleNodeCall(node, "import_start",
1495                                 [opts.ToDict(),
1496                                  self._InstDict(instance), dest,
1497                                  _EncodeImportExportIO(dest, dest_args)])
1498
1499   @_RpcTimeout(_TMO_NORMAL)
1500   def call_export_start(self, node, opts, host, port,
1501                         instance, source, source_args):
1502     """Starts an export daemon.
1503
1504     This is a single-node call.
1505
1506     @type node: string
1507     @param node: Node name
1508     @type instance: C{objects.Instance}
1509     @param instance: Instance object
1510
1511     """
1512     return self._SingleNodeCall(node, "export_start",
1513                                 [opts.ToDict(), host, port,
1514                                  self._InstDict(instance), source,
1515                                  _EncodeImportExportIO(source, source_args)])
1516
1517   @_RpcTimeout(_TMO_FAST)
1518   def call_impexp_status(self, node, names):
1519     """Gets the status of an import or export.
1520
1521     This is a single-node call.
1522
1523     @type node: string
1524     @param node: Node name
1525     @type names: List of strings
1526     @param names: Import/export names
1527     @rtype: List of L{objects.ImportExportStatus} instances
1528     @return: Returns a list of the state of each named import/export or None if
1529              a status couldn't be retrieved
1530
1531     """
1532     result = self._SingleNodeCall(node, "impexp_status", [names])
1533
1534     if not result.fail_msg:
1535       decoded = []
1536
1537       for i in result.payload:
1538         if i is None:
1539           decoded.append(None)
1540           continue
1541         decoded.append(objects.ImportExportStatus.FromDict(i))
1542
1543       result.payload = decoded
1544
1545     return result
1546
1547   @_RpcTimeout(_TMO_NORMAL)
1548   def call_impexp_abort(self, node, name):
1549     """Aborts an import or export.
1550
1551     This is a single-node call.
1552
1553     @type node: string
1554     @param node: Node name
1555     @type name: string
1556     @param name: Import/export name
1557
1558     """
1559     return self._SingleNodeCall(node, "impexp_abort", [name])
1560
1561   @_RpcTimeout(_TMO_NORMAL)
1562   def call_impexp_cleanup(self, node, name):
1563     """Cleans up after an import or export.
1564
1565     This is a single-node call.
1566
1567     @type node: string
1568     @param node: Node name
1569     @type name: string
1570     @param name: Import/export name
1571
1572     """
1573     return self._SingleNodeCall(node, "impexp_cleanup", [name])