Merge branch 'devel-2.2' into devel-2.3
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37 import pycurl
38 import threading
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48
49 # pylint has a bug here, doesn't see this import
50 import ganeti.http.client  # pylint: disable-msg=W0611
51
52
53 # Timeout for connecting to nodes (seconds)
54 _RPC_CONNECT_TIMEOUT = 5
55
56 _RPC_CLIENT_HEADERS = [
57   "Content-type: %s" % http.HTTP_APP_JSON,
58   "Expect:",
59   ]
60
61 # Various time constants for the timeout table
62 _TMO_URGENT = 60 # one minute
63 _TMO_FAST = 5 * 60 # five minutes
64 _TMO_NORMAL = 15 * 60 # 15 minutes
65 _TMO_SLOW = 3600 # one hour
66 _TMO_4HRS = 4 * 3600
67 _TMO_1DAY = 86400
68
69 # Timeout table that will be built later by decorators
70 # Guidelines for choosing timeouts:
71 # - call used during watcher: timeout -> 1min, _TMO_URGENT
72 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
73 # - other calls: 15 min, _TMO_NORMAL
74 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
75
76 _TIMEOUTS = {
77 }
78
79
80 def Init():
81   """Initializes the module-global HTTP client manager.
82
83   Must be called before using any RPC function and while exactly one thread is
84   running.
85
86   """
87   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
88   # one thread running. This check is just a safety measure -- it doesn't
89   # cover all cases.
90   assert threading.activeCount() == 1, \
91          "Found more than one active thread when initializing pycURL"
92
93   logging.info("Using PycURL %s", pycurl.version)
94
95   pycurl.global_init(pycurl.GLOBAL_ALL)
96
97
98 def Shutdown():
99   """Stops the module-global HTTP client manager.
100
101   Must be called before quitting the program and while exactly one thread is
102   running.
103
104   """
105   pycurl.global_cleanup()
106
107
108 def _ConfigRpcCurl(curl):
109   noded_cert = str(constants.NODED_CERT_FILE)
110
111   curl.setopt(pycurl.FOLLOWLOCATION, False)
112   curl.setopt(pycurl.CAINFO, noded_cert)
113   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
114   curl.setopt(pycurl.SSL_VERIFYPEER, True)
115   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
116   curl.setopt(pycurl.SSLCERT, noded_cert)
117   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
118   curl.setopt(pycurl.SSLKEY, noded_cert)
119   curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
120
121
122 # Aliasing this module avoids the following warning by epydoc: "Warning: No
123 # information available for ganeti.rpc._RpcThreadLocal's base threading.local"
124 _threading = threading
125
126
127 class _RpcThreadLocal(_threading.local):
128   def GetHttpClientPool(self):
129     """Returns a per-thread HTTP client pool.
130
131     @rtype: L{http.client.HttpClientPool}
132
133     """
134     try:
135       pool = self.hcp
136     except AttributeError:
137       pool = http.client.HttpClientPool(_ConfigRpcCurl)
138       self.hcp = pool
139
140     return pool
141
142
143 # Remove module alias (see above)
144 del _threading
145
146
147 _thread_local = _RpcThreadLocal()
148
149
150 def _RpcTimeout(secs):
151   """Timeout decorator.
152
153   When applied to a rpc call_* function, it updates the global timeout
154   table with the given function/timeout.
155
156   """
157   def decorator(f):
158     name = f.__name__
159     assert name.startswith("call_")
160     _TIMEOUTS[name[len("call_"):]] = secs
161     return f
162   return decorator
163
164
165 def RunWithRPC(fn):
166   """RPC-wrapper decorator.
167
168   When applied to a function, it runs it with the RPC system
169   initialized, and it shutsdown the system afterwards. This means the
170   function must be called without RPC being initialized.
171
172   """
173   def wrapper(*args, **kwargs):
174     Init()
175     try:
176       return fn(*args, **kwargs)
177     finally:
178       Shutdown()
179   return wrapper
180
181
182 class RpcResult(object):
183   """RPC Result class.
184
185   This class holds an RPC result. It is needed since in multi-node
186   calls we can't raise an exception just because one one out of many
187   failed, and therefore we use this class to encapsulate the result.
188
189   @ivar data: the data payload, for successful results, or None
190   @ivar call: the name of the RPC call
191   @ivar node: the name of the node to which we made the call
192   @ivar offline: whether the operation failed because the node was
193       offline, as opposed to actual failure; offline=True will always
194       imply failed=True, in order to allow simpler checking if
195       the user doesn't care about the exact failure mode
196   @ivar fail_msg: the error message if the call failed
197
198   """
199   def __init__(self, data=None, failed=False, offline=False,
200                call=None, node=None):
201     self.offline = offline
202     self.call = call
203     self.node = node
204
205     if offline:
206       self.fail_msg = "Node is marked offline"
207       self.data = self.payload = None
208     elif failed:
209       self.fail_msg = self._EnsureErr(data)
210       self.data = self.payload = None
211     else:
212       self.data = data
213       if not isinstance(self.data, (tuple, list)):
214         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
215                          type(self.data))
216         self.payload = None
217       elif len(data) != 2:
218         self.fail_msg = ("RPC layer error: invalid result length (%d), "
219                          "expected 2" % len(self.data))
220         self.payload = None
221       elif not self.data[0]:
222         self.fail_msg = self._EnsureErr(self.data[1])
223         self.payload = None
224       else:
225         # finally success
226         self.fail_msg = None
227         self.payload = data[1]
228
229     assert hasattr(self, "call")
230     assert hasattr(self, "data")
231     assert hasattr(self, "fail_msg")
232     assert hasattr(self, "node")
233     assert hasattr(self, "offline")
234     assert hasattr(self, "payload")
235
236   @staticmethod
237   def _EnsureErr(val):
238     """Helper to ensure we return a 'True' value for error."""
239     if val:
240       return val
241     else:
242       return "No error information"
243
244   def Raise(self, msg, prereq=False, ecode=None):
245     """If the result has failed, raise an OpExecError.
246
247     This is used so that LU code doesn't have to check for each
248     result, but instead can call this function.
249
250     """
251     if not self.fail_msg:
252       return
253
254     if not msg: # one could pass None for default message
255       msg = ("Call '%s' to node '%s' has failed: %s" %
256              (self.call, self.node, self.fail_msg))
257     else:
258       msg = "%s: %s" % (msg, self.fail_msg)
259     if prereq:
260       ec = errors.OpPrereqError
261     else:
262       ec = errors.OpExecError
263     if ecode is not None:
264       args = (msg, ecode)
265     else:
266       args = (msg, )
267     raise ec(*args) # pylint: disable-msg=W0142
268
269
270 def _AddressLookup(node_list,
271                    ssc=ssconf.SimpleStore,
272                    nslookup_fn=netutils.Hostname.GetIP):
273   """Return addresses for given node names.
274
275   @type node_list: list
276   @param node_list: List of node names
277   @type ssc: class
278   @param ssc: SimpleStore class that is used to obtain node->ip mappings
279   @type nslookup_fn: callable
280   @param nslookup_fn: function use to do NS lookup
281   @rtype: list of addresses and/or None's
282   @returns: List of corresponding addresses, if found
283
284   """
285   ss = ssc()
286   iplist = ss.GetNodePrimaryIPList()
287   family = ss.GetPrimaryIPFamily()
288   addresses = []
289   ipmap = dict(entry.split() for entry in iplist)
290   for node in node_list:
291     address = ipmap.get(node)
292     if address is None:
293       address = nslookup_fn(node, family=family)
294     addresses.append(address)
295
296   return addresses
297
298
299 class Client:
300   """RPC Client class.
301
302   This class, given a (remote) method name, a list of parameters and a
303   list of nodes, will contact (in parallel) all nodes, and return a
304   dict of results (key: node name, value: result).
305
306   One current bug is that generic failure is still signaled by
307   'False' result, which is not good. This overloading of values can
308   cause bugs.
309
310   """
311   def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
312     assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
313                                     " timeouts table")
314     self.procedure = procedure
315     self.body = body
316     self.port = port
317     self._request = {}
318     self._address_lookup_fn = address_lookup_fn
319
320   def ConnectList(self, node_list, address_list=None, read_timeout=None):
321     """Add a list of nodes to the target nodes.
322
323     @type node_list: list
324     @param node_list: the list of node names to connect
325     @type address_list: list or None
326     @keyword address_list: either None or a list with node addresses,
327         which must have the same length as the node list
328     @type read_timeout: int
329     @param read_timeout: overwrites default timeout for operation
330
331     """
332     if address_list is None:
333       # Always use IP address instead of node name
334       address_list = self._address_lookup_fn(node_list)
335
336     assert len(node_list) == len(address_list), \
337            "Name and address lists must have the same length"
338
339     for node, address in zip(node_list, address_list):
340       self.ConnectNode(node, address, read_timeout=read_timeout)
341
342   def ConnectNode(self, name, address=None, read_timeout=None):
343     """Add a node to the target list.
344
345     @type name: str
346     @param name: the node name
347     @type address: str
348     @param address: the node address, if known
349     @type read_timeout: int
350     @param read_timeout: overwrites default timeout for operation
351
352     """
353     if address is None:
354       # Always use IP address instead of node name
355       address = self._address_lookup_fn([name])[0]
356
357     assert(address is not None)
358
359     if read_timeout is None:
360       read_timeout = _TIMEOUTS[self.procedure]
361
362     self._request[name] = \
363       http.client.HttpClientRequest(str(address), self.port,
364                                     http.HTTP_PUT, str("/%s" % self.procedure),
365                                     headers=_RPC_CLIENT_HEADERS,
366                                     post_data=str(self.body),
367                                     read_timeout=read_timeout)
368
369   def GetResults(self, http_pool=None):
370     """Call nodes and return results.
371
372     @rtype: list
373     @return: List of RPC results
374
375     """
376     if not http_pool:
377       http_pool = _thread_local.GetHttpClientPool()
378
379     http_pool.ProcessRequests(self._request.values())
380
381     results = {}
382
383     for name, req in self._request.iteritems():
384       if req.success and req.resp_status_code == http.HTTP_OK:
385         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
386                                   node=name, call=self.procedure)
387         continue
388
389       # TODO: Better error reporting
390       if req.error:
391         msg = req.error
392       else:
393         msg = req.resp_body
394
395       logging.error("RPC error in %s from node %s: %s",
396                     self.procedure, name, msg)
397       results[name] = RpcResult(data=msg, failed=True, node=name,
398                                 call=self.procedure)
399
400     return results
401
402
403 def _EncodeImportExportIO(ieio, ieioargs):
404   """Encodes import/export I/O information.
405
406   """
407   if ieio == constants.IEIO_RAW_DISK:
408     assert len(ieioargs) == 1
409     return (ieioargs[0].ToDict(), )
410
411   if ieio == constants.IEIO_SCRIPT:
412     assert len(ieioargs) == 2
413     return (ieioargs[0].ToDict(), ieioargs[1])
414
415   return ieioargs
416
417
418 class RpcRunner(object):
419   """RPC runner class"""
420
421   def __init__(self, cfg):
422     """Initialized the rpc runner.
423
424     @type cfg:  C{config.ConfigWriter}
425     @param cfg: the configuration object that will be used to get data
426                 about the cluster
427
428     """
429     self._cfg = cfg
430     self.port = netutils.GetDaemonPort(constants.NODED)
431
432   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
433     """Convert the given instance to a dict.
434
435     This is done via the instance's ToDict() method and additionally
436     we fill the hvparams with the cluster defaults.
437
438     @type instance: L{objects.Instance}
439     @param instance: an Instance object
440     @type hvp: dict or None
441     @param hvp: a dictionary with overridden hypervisor parameters
442     @type bep: dict or None
443     @param bep: a dictionary with overridden backend parameters
444     @type osp: dict or None
445     @param osp: a dictionary with overridden os parameters
446     @rtype: dict
447     @return: the instance dict, with the hvparams filled with the
448         cluster defaults
449
450     """
451     idict = instance.ToDict()
452     cluster = self._cfg.GetClusterInfo()
453     idict["hvparams"] = cluster.FillHV(instance)
454     if hvp is not None:
455       idict["hvparams"].update(hvp)
456     idict["beparams"] = cluster.FillBE(instance)
457     if bep is not None:
458       idict["beparams"].update(bep)
459     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
460     if osp is not None:
461       idict["osparams"].update(osp)
462     for nic in idict["nics"]:
463       nic['nicparams'] = objects.FillDict(
464         cluster.nicparams[constants.PP_DEFAULT],
465         nic['nicparams'])
466     return idict
467
468   def _ConnectList(self, client, node_list, call, read_timeout=None):
469     """Helper for computing node addresses.
470
471     @type client: L{ganeti.rpc.Client}
472     @param client: a C{Client} instance
473     @type node_list: list
474     @param node_list: the node list we should connect
475     @type call: string
476     @param call: the name of the remote procedure call, for filling in
477         correctly any eventual offline nodes' results
478     @type read_timeout: int
479     @param read_timeout: overwrites the default read timeout for the
480         given operation
481
482     """
483     all_nodes = self._cfg.GetAllNodesInfo()
484     name_list = []
485     addr_list = []
486     skip_dict = {}
487     for node in node_list:
488       if node in all_nodes:
489         if all_nodes[node].offline:
490           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
491           continue
492         val = all_nodes[node].primary_ip
493       else:
494         val = None
495       addr_list.append(val)
496       name_list.append(node)
497     if name_list:
498       client.ConnectList(name_list, address_list=addr_list,
499                          read_timeout=read_timeout)
500     return skip_dict
501
502   def _ConnectNode(self, client, node, call, read_timeout=None):
503     """Helper for computing one node's address.
504
505     @type client: L{ganeti.rpc.Client}
506     @param client: a C{Client} instance
507     @type node: str
508     @param node: the node we should connect
509     @type call: string
510     @param call: the name of the remote procedure call, for filling in
511         correctly any eventual offline nodes' results
512     @type read_timeout: int
513     @param read_timeout: overwrites the default read timeout for the
514         given operation
515
516     """
517     node_info = self._cfg.GetNodeInfo(node)
518     if node_info is not None:
519       if node_info.offline:
520         return RpcResult(node=node, offline=True, call=call)
521       addr = node_info.primary_ip
522     else:
523       addr = None
524     client.ConnectNode(node, address=addr, read_timeout=read_timeout)
525
526   def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
527     """Helper for making a multi-node call
528
529     """
530     body = serializer.DumpJson(args, indent=False)
531     c = Client(procedure, body, self.port)
532     skip_dict = self._ConnectList(c, node_list, procedure,
533                                   read_timeout=read_timeout)
534     skip_dict.update(c.GetResults())
535     return skip_dict
536
537   @classmethod
538   def _StaticMultiNodeCall(cls, node_list, procedure, args,
539                            address_list=None, read_timeout=None):
540     """Helper for making a multi-node static call
541
542     """
543     body = serializer.DumpJson(args, indent=False)
544     c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
545     c.ConnectList(node_list, address_list=address_list,
546                   read_timeout=read_timeout)
547     return c.GetResults()
548
549   def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
550     """Helper for making a single-node call
551
552     """
553     body = serializer.DumpJson(args, indent=False)
554     c = Client(procedure, body, self.port)
555     result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
556     if result is None:
557       # we did connect, node is not offline
558       result = c.GetResults()[node]
559     return result
560
561   @classmethod
562   def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
563     """Helper for making a single-node static call
564
565     """
566     body = serializer.DumpJson(args, indent=False)
567     c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
568     c.ConnectNode(node, read_timeout=read_timeout)
569     return c.GetResults()[node]
570
571   @staticmethod
572   def _Compress(data):
573     """Compresses a string for transport over RPC.
574
575     Small amounts of data are not compressed.
576
577     @type data: str
578     @param data: Data
579     @rtype: tuple
580     @return: Encoded data to send
581
582     """
583     # Small amounts of data are not compressed
584     if len(data) < 512:
585       return (constants.RPC_ENCODING_NONE, data)
586
587     # Compress with zlib and encode in base64
588     return (constants.RPC_ENCODING_ZLIB_BASE64,
589             base64.b64encode(zlib.compress(data, 3)))
590
591   #
592   # Begin RPC calls
593   #
594
595   @_RpcTimeout(_TMO_URGENT)
596   def call_lv_list(self, node_list, vg_name):
597     """Gets the logical volumes present in a given volume group.
598
599     This is a multi-node call.
600
601     """
602     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
603
604   @_RpcTimeout(_TMO_URGENT)
605   def call_vg_list(self, node_list):
606     """Gets the volume group list.
607
608     This is a multi-node call.
609
610     """
611     return self._MultiNodeCall(node_list, "vg_list", [])
612
613   @_RpcTimeout(_TMO_NORMAL)
614   def call_storage_list(self, node_list, su_name, su_args, name, fields):
615     """Get list of storage units.
616
617     This is a multi-node call.
618
619     """
620     return self._MultiNodeCall(node_list, "storage_list",
621                                [su_name, su_args, name, fields])
622
623   @_RpcTimeout(_TMO_NORMAL)
624   def call_storage_modify(self, node, su_name, su_args, name, changes):
625     """Modify a storage unit.
626
627     This is a single-node call.
628
629     """
630     return self._SingleNodeCall(node, "storage_modify",
631                                 [su_name, su_args, name, changes])
632
633   @_RpcTimeout(_TMO_NORMAL)
634   def call_storage_execute(self, node, su_name, su_args, name, op):
635     """Executes an operation on a storage unit.
636
637     This is a single-node call.
638
639     """
640     return self._SingleNodeCall(node, "storage_execute",
641                                 [su_name, su_args, name, op])
642
643   @_RpcTimeout(_TMO_URGENT)
644   def call_bridges_exist(self, node, bridges_list):
645     """Checks if a node has all the bridges given.
646
647     This method checks if all bridges given in the bridges_list are
648     present on the remote node, so that an instance that uses interfaces
649     on those bridges can be started.
650
651     This is a single-node call.
652
653     """
654     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
655
656   @_RpcTimeout(_TMO_NORMAL)
657   def call_instance_start(self, node, instance, hvp, bep):
658     """Starts an instance.
659
660     This is a single-node call.
661
662     """
663     idict = self._InstDict(instance, hvp=hvp, bep=bep)
664     return self._SingleNodeCall(node, "instance_start", [idict])
665
666   @_RpcTimeout(_TMO_NORMAL)
667   def call_instance_shutdown(self, node, instance, timeout):
668     """Stops an instance.
669
670     This is a single-node call.
671
672     """
673     return self._SingleNodeCall(node, "instance_shutdown",
674                                 [self._InstDict(instance), timeout])
675
676   @_RpcTimeout(_TMO_NORMAL)
677   def call_migration_info(self, node, instance):
678     """Gather the information necessary to prepare an instance migration.
679
680     This is a single-node call.
681
682     @type node: string
683     @param node: the node on which the instance is currently running
684     @type instance: C{objects.Instance}
685     @param instance: the instance definition
686
687     """
688     return self._SingleNodeCall(node, "migration_info",
689                                 [self._InstDict(instance)])
690
691   @_RpcTimeout(_TMO_NORMAL)
692   def call_accept_instance(self, node, instance, info, target):
693     """Prepare a node to accept an instance.
694
695     This is a single-node call.
696
697     @type node: string
698     @param node: the target node for the migration
699     @type instance: C{objects.Instance}
700     @param instance: the instance definition
701     @type info: opaque/hypervisor specific (string/data)
702     @param info: result for the call_migration_info call
703     @type target: string
704     @param target: target hostname (usually ip address) (on the node itself)
705
706     """
707     return self._SingleNodeCall(node, "accept_instance",
708                                 [self._InstDict(instance), info, target])
709
710   @_RpcTimeout(_TMO_NORMAL)
711   def call_finalize_migration(self, node, instance, info, success):
712     """Finalize any target-node migration specific operation.
713
714     This is called both in case of a successful migration and in case of error
715     (in which case it should abort the migration).
716
717     This is a single-node call.
718
719     @type node: string
720     @param node: the target node for the migration
721     @type instance: C{objects.Instance}
722     @param instance: the instance definition
723     @type info: opaque/hypervisor specific (string/data)
724     @param info: result for the call_migration_info call
725     @type success: boolean
726     @param success: whether the migration was a success or a failure
727
728     """
729     return self._SingleNodeCall(node, "finalize_migration",
730                                 [self._InstDict(instance), info, success])
731
732   @_RpcTimeout(_TMO_SLOW)
733   def call_instance_migrate(self, node, instance, target, live):
734     """Migrate an instance.
735
736     This is a single-node call.
737
738     @type node: string
739     @param node: the node on which the instance is currently running
740     @type instance: C{objects.Instance}
741     @param instance: the instance definition
742     @type target: string
743     @param target: the target node name
744     @type live: boolean
745     @param live: whether the migration should be done live or not (the
746         interpretation of this parameter is left to the hypervisor)
747
748     """
749     return self._SingleNodeCall(node, "instance_migrate",
750                                 [self._InstDict(instance), target, live])
751
752   @_RpcTimeout(_TMO_NORMAL)
753   def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
754     """Reboots an instance.
755
756     This is a single-node call.
757
758     """
759     return self._SingleNodeCall(node, "instance_reboot",
760                                 [self._InstDict(inst), reboot_type,
761                                  shutdown_timeout])
762
763   @_RpcTimeout(_TMO_1DAY)
764   def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
765     """Installs an OS on the given instance.
766
767     This is a single-node call.
768
769     """
770     return self._SingleNodeCall(node, "instance_os_add",
771                                 [self._InstDict(inst, osp=osparams),
772                                  reinstall, debug])
773
774   @_RpcTimeout(_TMO_SLOW)
775   def call_instance_run_rename(self, node, inst, old_name, debug):
776     """Run the OS rename script for an instance.
777
778     This is a single-node call.
779
780     """
781     return self._SingleNodeCall(node, "instance_run_rename",
782                                 [self._InstDict(inst), old_name, debug])
783
784   @_RpcTimeout(_TMO_URGENT)
785   def call_instance_info(self, node, instance, hname):
786     """Returns information about a single instance.
787
788     This is a single-node call.
789
790     @type node: list
791     @param node: the list of nodes to query
792     @type instance: string
793     @param instance: the instance name
794     @type hname: string
795     @param hname: the hypervisor type of the instance
796
797     """
798     return self._SingleNodeCall(node, "instance_info", [instance, hname])
799
800   @_RpcTimeout(_TMO_NORMAL)
801   def call_instance_migratable(self, node, instance):
802     """Checks whether the given instance can be migrated.
803
804     This is a single-node call.
805
806     @param node: the node to query
807     @type instance: L{objects.Instance}
808     @param instance: the instance to check
809
810
811     """
812     return self._SingleNodeCall(node, "instance_migratable",
813                                 [self._InstDict(instance)])
814
815   @_RpcTimeout(_TMO_URGENT)
816   def call_all_instances_info(self, node_list, hypervisor_list):
817     """Returns information about all instances on the given nodes.
818
819     This is a multi-node call.
820
821     @type node_list: list
822     @param node_list: the list of nodes to query
823     @type hypervisor_list: list
824     @param hypervisor_list: the hypervisors to query for instances
825
826     """
827     return self._MultiNodeCall(node_list, "all_instances_info",
828                                [hypervisor_list])
829
830   @_RpcTimeout(_TMO_URGENT)
831   def call_instance_list(self, node_list, hypervisor_list):
832     """Returns the list of running instances on a given node.
833
834     This is a multi-node call.
835
836     @type node_list: list
837     @param node_list: the list of nodes to query
838     @type hypervisor_list: list
839     @param hypervisor_list: the hypervisors to query for instances
840
841     """
842     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
843
844   @_RpcTimeout(_TMO_FAST)
845   def call_node_tcp_ping(self, node, source, target, port, timeout,
846                          live_port_needed):
847     """Do a TcpPing on the remote node
848
849     This is a single-node call.
850
851     """
852     return self._SingleNodeCall(node, "node_tcp_ping",
853                                 [source, target, port, timeout,
854                                  live_port_needed])
855
856   @_RpcTimeout(_TMO_FAST)
857   def call_node_has_ip_address(self, node, address):
858     """Checks if a node has the given IP address.
859
860     This is a single-node call.
861
862     """
863     return self._SingleNodeCall(node, "node_has_ip_address", [address])
864
865   @_RpcTimeout(_TMO_URGENT)
866   def call_node_info(self, node_list, vg_name, hypervisor_type):
867     """Return node information.
868
869     This will return memory information and volume group size and free
870     space.
871
872     This is a multi-node call.
873
874     @type node_list: list
875     @param node_list: the list of nodes to query
876     @type vg_name: C{string}
877     @param vg_name: the name of the volume group to ask for disk space
878         information
879     @type hypervisor_type: C{str}
880     @param hypervisor_type: the name of the hypervisor to ask for
881         memory information
882
883     """
884     return self._MultiNodeCall(node_list, "node_info",
885                                [vg_name, hypervisor_type])
886
887   @_RpcTimeout(_TMO_NORMAL)
888   def call_etc_hosts_modify(self, node, mode, name, ip):
889     """Modify hosts file with name
890
891     @type node: string
892     @param node: The node to call
893     @type mode: string
894     @param mode: The mode to operate. Currently "add" or "remove"
895     @type name: string
896     @param name: The host name to be modified
897     @type ip: string
898     @param ip: The ip of the entry (just valid if mode is "add")
899
900     """
901     return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
902
903   @_RpcTimeout(_TMO_NORMAL)
904   def call_node_verify(self, node_list, checkdict, cluster_name):
905     """Request verification of given parameters.
906
907     This is a multi-node call.
908
909     """
910     return self._MultiNodeCall(node_list, "node_verify",
911                                [checkdict, cluster_name])
912
913   @classmethod
914   @_RpcTimeout(_TMO_FAST)
915   def call_node_start_master(cls, node, start_daemons, no_voting):
916     """Tells a node to activate itself as a master.
917
918     This is a single-node call.
919
920     """
921     return cls._StaticSingleNodeCall(node, "node_start_master",
922                                      [start_daemons, no_voting])
923
924   @classmethod
925   @_RpcTimeout(_TMO_FAST)
926   def call_node_stop_master(cls, node, stop_daemons):
927     """Tells a node to demote itself from master status.
928
929     This is a single-node call.
930
931     """
932     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
933
934   @classmethod
935   @_RpcTimeout(_TMO_URGENT)
936   def call_master_info(cls, node_list):
937     """Query master info.
938
939     This is a multi-node call.
940
941     """
942     # TODO: should this method query down nodes?
943     return cls._StaticMultiNodeCall(node_list, "master_info", [])
944
945   @classmethod
946   @_RpcTimeout(_TMO_URGENT)
947   def call_version(cls, node_list):
948     """Query node version.
949
950     This is a multi-node call.
951
952     """
953     return cls._StaticMultiNodeCall(node_list, "version", [])
954
955   @_RpcTimeout(_TMO_NORMAL)
956   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
957     """Request creation of a given block device.
958
959     This is a single-node call.
960
961     """
962     return self._SingleNodeCall(node, "blockdev_create",
963                                 [bdev.ToDict(), size, owner, on_primary, info])
964
965   @_RpcTimeout(_TMO_SLOW)
966   def call_blockdev_wipe(self, node, bdev, offset, size):
967     """Request wipe at given offset with given size of a block device.
968
969     This is a single-node call.
970
971     """
972     return self._SingleNodeCall(node, "blockdev_wipe",
973                                 [bdev.ToDict(), offset, size])
974
975   @_RpcTimeout(_TMO_NORMAL)
976   def call_blockdev_remove(self, node, bdev):
977     """Request removal of a given block device.
978
979     This is a single-node call.
980
981     """
982     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
983
984   @_RpcTimeout(_TMO_NORMAL)
985   def call_blockdev_rename(self, node, devlist):
986     """Request rename of the given block devices.
987
988     This is a single-node call.
989
990     """
991     return self._SingleNodeCall(node, "blockdev_rename",
992                                 [(d.ToDict(), uid) for d, uid in devlist])
993
994   @_RpcTimeout(_TMO_NORMAL)
995   def call_blockdev_assemble(self, node, disk, owner, on_primary):
996     """Request assembling of a given block device.
997
998     This is a single-node call.
999
1000     """
1001     return self._SingleNodeCall(node, "blockdev_assemble",
1002                                 [disk.ToDict(), owner, on_primary])
1003
1004   @_RpcTimeout(_TMO_NORMAL)
1005   def call_blockdev_shutdown(self, node, disk):
1006     """Request shutdown of a given block device.
1007
1008     This is a single-node call.
1009
1010     """
1011     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1012
1013   @_RpcTimeout(_TMO_NORMAL)
1014   def call_blockdev_addchildren(self, node, bdev, ndevs):
1015     """Request adding a list of children to a (mirroring) device.
1016
1017     This is a single-node call.
1018
1019     """
1020     return self._SingleNodeCall(node, "blockdev_addchildren",
1021                                 [bdev.ToDict(),
1022                                  [disk.ToDict() for disk in ndevs]])
1023
1024   @_RpcTimeout(_TMO_NORMAL)
1025   def call_blockdev_removechildren(self, node, bdev, ndevs):
1026     """Request removing a list of children from a (mirroring) device.
1027
1028     This is a single-node call.
1029
1030     """
1031     return self._SingleNodeCall(node, "blockdev_removechildren",
1032                                 [bdev.ToDict(),
1033                                  [disk.ToDict() for disk in ndevs]])
1034
1035   @_RpcTimeout(_TMO_NORMAL)
1036   def call_blockdev_getmirrorstatus(self, node, disks):
1037     """Request status of a (mirroring) device.
1038
1039     This is a single-node call.
1040
1041     """
1042     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1043                                   [dsk.ToDict() for dsk in disks])
1044     if not result.fail_msg:
1045       result.payload = [objects.BlockDevStatus.FromDict(i)
1046                         for i in result.payload]
1047     return result
1048
1049   @_RpcTimeout(_TMO_NORMAL)
1050   def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1051     """Request status of (mirroring) devices from multiple nodes.
1052
1053     This is a multi-node call.
1054
1055     """
1056     result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1057                                  [dict((name, [dsk.ToDict() for dsk in disks])
1058                                        for name, disks in node_disks.items())])
1059     for nres in result.values():
1060       if nres.fail_msg:
1061         continue
1062
1063       for idx, (success, status) in enumerate(nres.payload):
1064         if success:
1065           nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1066
1067     return result
1068
1069   @_RpcTimeout(_TMO_NORMAL)
1070   def call_blockdev_find(self, node, disk):
1071     """Request identification of a given block device.
1072
1073     This is a single-node call.
1074
1075     """
1076     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1077     if not result.fail_msg and result.payload is not None:
1078       result.payload = objects.BlockDevStatus.FromDict(result.payload)
1079     return result
1080
1081   @_RpcTimeout(_TMO_NORMAL)
1082   def call_blockdev_close(self, node, instance_name, disks):
1083     """Closes the given block devices.
1084
1085     This is a single-node call.
1086
1087     """
1088     params = [instance_name, [cf.ToDict() for cf in disks]]
1089     return self._SingleNodeCall(node, "blockdev_close", params)
1090
1091   @_RpcTimeout(_TMO_NORMAL)
1092   def call_blockdev_getsize(self, node, disks):
1093     """Returns the size of the given disks.
1094
1095     This is a single-node call.
1096
1097     """
1098     params = [[cf.ToDict() for cf in disks]]
1099     return self._SingleNodeCall(node, "blockdev_getsize", params)
1100
1101   @_RpcTimeout(_TMO_NORMAL)
1102   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1103     """Disconnects the network of the given drbd devices.
1104
1105     This is a multi-node call.
1106
1107     """
1108     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1109                                [nodes_ip, [cf.ToDict() for cf in disks]])
1110
1111   @_RpcTimeout(_TMO_NORMAL)
1112   def call_drbd_attach_net(self, node_list, nodes_ip,
1113                            disks, instance_name, multimaster):
1114     """Disconnects the given drbd devices.
1115
1116     This is a multi-node call.
1117
1118     """
1119     return self._MultiNodeCall(node_list, "drbd_attach_net",
1120                                [nodes_ip, [cf.ToDict() for cf in disks],
1121                                 instance_name, multimaster])
1122
1123   @_RpcTimeout(_TMO_SLOW)
1124   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1125     """Waits for the synchronization of drbd devices is complete.
1126
1127     This is a multi-node call.
1128
1129     """
1130     return self._MultiNodeCall(node_list, "drbd_wait_sync",
1131                                [nodes_ip, [cf.ToDict() for cf in disks]])
1132
1133   @_RpcTimeout(_TMO_URGENT)
1134   def call_drbd_helper(self, node_list):
1135     """Gets drbd helper.
1136
1137     This is a multi-node call.
1138
1139     """
1140     return self._MultiNodeCall(node_list, "drbd_helper", [])
1141
1142   @classmethod
1143   @_RpcTimeout(_TMO_NORMAL)
1144   def call_upload_file(cls, node_list, file_name, address_list=None):
1145     """Upload a file.
1146
1147     The node will refuse the operation in case the file is not on the
1148     approved file list.
1149
1150     This is a multi-node call.
1151
1152     @type node_list: list
1153     @param node_list: the list of node names to upload to
1154     @type file_name: str
1155     @param file_name: the filename to upload
1156     @type address_list: list or None
1157     @keyword address_list: an optional list of node addresses, in order
1158         to optimize the RPC speed
1159
1160     """
1161     file_contents = utils.ReadFile(file_name)
1162     data = cls._Compress(file_contents)
1163     st = os.stat(file_name)
1164     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1165               st.st_atime, st.st_mtime]
1166     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1167                                     address_list=address_list)
1168
1169   @classmethod
1170   @_RpcTimeout(_TMO_NORMAL)
1171   def call_write_ssconf_files(cls, node_list, values):
1172     """Write ssconf files.
1173
1174     This is a multi-node call.
1175
1176     """
1177     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1178
1179   @_RpcTimeout(_TMO_FAST)
1180   def call_os_diagnose(self, node_list):
1181     """Request a diagnose of OS definitions.
1182
1183     This is a multi-node call.
1184
1185     """
1186     return self._MultiNodeCall(node_list, "os_diagnose", [])
1187
1188   @_RpcTimeout(_TMO_FAST)
1189   def call_os_get(self, node, name):
1190     """Returns an OS definition.
1191
1192     This is a single-node call.
1193
1194     """
1195     result = self._SingleNodeCall(node, "os_get", [name])
1196     if not result.fail_msg and isinstance(result.payload, dict):
1197       result.payload = objects.OS.FromDict(result.payload)
1198     return result
1199
1200   @_RpcTimeout(_TMO_FAST)
1201   def call_os_validate(self, required, nodes, name, checks, params):
1202     """Run a validation routine for a given OS.
1203
1204     This is a multi-node call.
1205
1206     """
1207     return self._MultiNodeCall(nodes, "os_validate",
1208                                [required, name, checks, params])
1209
1210   @_RpcTimeout(_TMO_NORMAL)
1211   def call_hooks_runner(self, node_list, hpath, phase, env):
1212     """Call the hooks runner.
1213
1214     Args:
1215       - op: the OpCode instance
1216       - env: a dictionary with the environment
1217
1218     This is a multi-node call.
1219
1220     """
1221     params = [hpath, phase, env]
1222     return self._MultiNodeCall(node_list, "hooks_runner", params)
1223
1224   @_RpcTimeout(_TMO_NORMAL)
1225   def call_iallocator_runner(self, node, name, idata):
1226     """Call an iallocator on a remote node
1227
1228     Args:
1229       - name: the iallocator name
1230       - input: the json-encoded input string
1231
1232     This is a single-node call.
1233
1234     """
1235     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1236
1237   @_RpcTimeout(_TMO_NORMAL)
1238   def call_blockdev_grow(self, node, cf_bdev, amount):
1239     """Request a snapshot of the given block device.
1240
1241     This is a single-node call.
1242
1243     """
1244     return self._SingleNodeCall(node, "blockdev_grow",
1245                                 [cf_bdev.ToDict(), amount])
1246
1247   @_RpcTimeout(_TMO_1DAY)
1248   def call_blockdev_export(self, node, cf_bdev,
1249                            dest_node, dest_path, cluster_name):
1250     """Export a given disk to another node.
1251
1252     This is a single-node call.
1253
1254     """
1255     return self._SingleNodeCall(node, "blockdev_export",
1256                                 [cf_bdev.ToDict(), dest_node, dest_path,
1257                                  cluster_name])
1258
1259   @_RpcTimeout(_TMO_NORMAL)
1260   def call_blockdev_snapshot(self, node, cf_bdev):
1261     """Request a snapshot of the given block device.
1262
1263     This is a single-node call.
1264
1265     """
1266     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1267
1268   @_RpcTimeout(_TMO_NORMAL)
1269   def call_finalize_export(self, node, instance, snap_disks):
1270     """Request the completion of an export operation.
1271
1272     This writes the export config file, etc.
1273
1274     This is a single-node call.
1275
1276     """
1277     flat_disks = []
1278     for disk in snap_disks:
1279       if isinstance(disk, bool):
1280         flat_disks.append(disk)
1281       else:
1282         flat_disks.append(disk.ToDict())
1283
1284     return self._SingleNodeCall(node, "finalize_export",
1285                                 [self._InstDict(instance), flat_disks])
1286
1287   @_RpcTimeout(_TMO_FAST)
1288   def call_export_info(self, node, path):
1289     """Queries the export information in a given path.
1290
1291     This is a single-node call.
1292
1293     """
1294     return self._SingleNodeCall(node, "export_info", [path])
1295
1296   @_RpcTimeout(_TMO_FAST)
1297   def call_export_list(self, node_list):
1298     """Gets the stored exports list.
1299
1300     This is a multi-node call.
1301
1302     """
1303     return self._MultiNodeCall(node_list, "export_list", [])
1304
1305   @_RpcTimeout(_TMO_FAST)
1306   def call_export_remove(self, node, export):
1307     """Requests removal of a given export.
1308
1309     This is a single-node call.
1310
1311     """
1312     return self._SingleNodeCall(node, "export_remove", [export])
1313
1314   @classmethod
1315   @_RpcTimeout(_TMO_NORMAL)
1316   def call_node_leave_cluster(cls, node, modify_ssh_setup):
1317     """Requests a node to clean the cluster information it has.
1318
1319     This will remove the configuration information from the ganeti data
1320     dir.
1321
1322     This is a single-node call.
1323
1324     """
1325     return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1326                                      [modify_ssh_setup])
1327
1328   @_RpcTimeout(_TMO_FAST)
1329   def call_node_volumes(self, node_list):
1330     """Gets all volumes on node(s).
1331
1332     This is a multi-node call.
1333
1334     """
1335     return self._MultiNodeCall(node_list, "node_volumes", [])
1336
1337   @_RpcTimeout(_TMO_FAST)
1338   def call_node_demote_from_mc(self, node):
1339     """Demote a node from the master candidate role.
1340
1341     This is a single-node call.
1342
1343     """
1344     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1345
1346   @_RpcTimeout(_TMO_NORMAL)
1347   def call_node_powercycle(self, node, hypervisor):
1348     """Tries to powercycle a node.
1349
1350     This is a single-node call.
1351
1352     """
1353     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1354
1355   @_RpcTimeout(None)
1356   def call_test_delay(self, node_list, duration):
1357     """Sleep for a fixed time on given node(s).
1358
1359     This is a multi-node call.
1360
1361     """
1362     return self._MultiNodeCall(node_list, "test_delay", [duration],
1363                                read_timeout=int(duration + 5))
1364
1365   @_RpcTimeout(_TMO_FAST)
1366   def call_file_storage_dir_create(self, node, file_storage_dir):
1367     """Create the given file storage directory.
1368
1369     This is a single-node call.
1370
1371     """
1372     return self._SingleNodeCall(node, "file_storage_dir_create",
1373                                 [file_storage_dir])
1374
1375   @_RpcTimeout(_TMO_FAST)
1376   def call_file_storage_dir_remove(self, node, file_storage_dir):
1377     """Remove the given file storage directory.
1378
1379     This is a single-node call.
1380
1381     """
1382     return self._SingleNodeCall(node, "file_storage_dir_remove",
1383                                 [file_storage_dir])
1384
1385   @_RpcTimeout(_TMO_FAST)
1386   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1387                                    new_file_storage_dir):
1388     """Rename file storage directory.
1389
1390     This is a single-node call.
1391
1392     """
1393     return self._SingleNodeCall(node, "file_storage_dir_rename",
1394                                 [old_file_storage_dir, new_file_storage_dir])
1395
1396   @classmethod
1397   @_RpcTimeout(_TMO_FAST)
1398   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1399     """Update job queue.
1400
1401     This is a multi-node call.
1402
1403     """
1404     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1405                                     [file_name, cls._Compress(content)],
1406                                     address_list=address_list)
1407
1408   @classmethod
1409   @_RpcTimeout(_TMO_NORMAL)
1410   def call_jobqueue_purge(cls, node):
1411     """Purge job queue.
1412
1413     This is a single-node call.
1414
1415     """
1416     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1417
1418   @classmethod
1419   @_RpcTimeout(_TMO_FAST)
1420   def call_jobqueue_rename(cls, node_list, address_list, rename):
1421     """Rename a job queue file.
1422
1423     This is a multi-node call.
1424
1425     """
1426     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1427                                     address_list=address_list)
1428
1429   @_RpcTimeout(_TMO_NORMAL)
1430   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1431     """Validate the hypervisor params.
1432
1433     This is a multi-node call.
1434
1435     @type node_list: list
1436     @param node_list: the list of nodes to query
1437     @type hvname: string
1438     @param hvname: the hypervisor name
1439     @type hvparams: dict
1440     @param hvparams: the hypervisor parameters to be validated
1441
1442     """
1443     cluster = self._cfg.GetClusterInfo()
1444     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1445     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1446                                [hvname, hv_full])
1447
1448   @_RpcTimeout(_TMO_NORMAL)
1449   def call_x509_cert_create(self, node, validity):
1450     """Creates a new X509 certificate for SSL/TLS.
1451
1452     This is a single-node call.
1453
1454     @type validity: int
1455     @param validity: Validity in seconds
1456
1457     """
1458     return self._SingleNodeCall(node, "x509_cert_create", [validity])
1459
1460   @_RpcTimeout(_TMO_NORMAL)
1461   def call_x509_cert_remove(self, node, name):
1462     """Removes a X509 certificate.
1463
1464     This is a single-node call.
1465
1466     @type name: string
1467     @param name: Certificate name
1468
1469     """
1470     return self._SingleNodeCall(node, "x509_cert_remove", [name])
1471
1472   @_RpcTimeout(_TMO_NORMAL)
1473   def call_import_start(self, node, opts, instance, dest, dest_args):
1474     """Starts a listener for an import.
1475
1476     This is a single-node call.
1477
1478     @type node: string
1479     @param node: Node name
1480     @type instance: C{objects.Instance}
1481     @param instance: Instance object
1482
1483     """
1484     return self._SingleNodeCall(node, "import_start",
1485                                 [opts.ToDict(),
1486                                  self._InstDict(instance), dest,
1487                                  _EncodeImportExportIO(dest, dest_args)])
1488
1489   @_RpcTimeout(_TMO_NORMAL)
1490   def call_export_start(self, node, opts, host, port,
1491                         instance, source, source_args):
1492     """Starts an export daemon.
1493
1494     This is a single-node call.
1495
1496     @type node: string
1497     @param node: Node name
1498     @type instance: C{objects.Instance}
1499     @param instance: Instance object
1500
1501     """
1502     return self._SingleNodeCall(node, "export_start",
1503                                 [opts.ToDict(), host, port,
1504                                  self._InstDict(instance), source,
1505                                  _EncodeImportExportIO(source, source_args)])
1506
1507   @_RpcTimeout(_TMO_FAST)
1508   def call_impexp_status(self, node, names):
1509     """Gets the status of an import or export.
1510
1511     This is a single-node call.
1512
1513     @type node: string
1514     @param node: Node name
1515     @type names: List of strings
1516     @param names: Import/export names
1517     @rtype: List of L{objects.ImportExportStatus} instances
1518     @return: Returns a list of the state of each named import/export or None if
1519              a status couldn't be retrieved
1520
1521     """
1522     result = self._SingleNodeCall(node, "impexp_status", [names])
1523
1524     if not result.fail_msg:
1525       decoded = []
1526
1527       for i in result.payload:
1528         if i is None:
1529           decoded.append(None)
1530           continue
1531         decoded.append(objects.ImportExportStatus.FromDict(i))
1532
1533       result.payload = decoded
1534
1535     return result
1536
1537   @_RpcTimeout(_TMO_NORMAL)
1538   def call_impexp_abort(self, node, name):
1539     """Aborts an import or export.
1540
1541     This is a single-node call.
1542
1543     @type node: string
1544     @param node: Node name
1545     @type name: string
1546     @param name: Import/export name
1547
1548     """
1549     return self._SingleNodeCall(node, "impexp_abort", [name])
1550
1551   @_RpcTimeout(_TMO_NORMAL)
1552   def call_impexp_cleanup(self, node, name):
1553     """Cleans up after an import or export.
1554
1555     This is a single-node call.
1556
1557     @type node: string
1558     @param node: Node name
1559     @type name: string
1560     @param name: Import/export name
1561
1562     """
1563     return self._SingleNodeCall(node, "impexp_cleanup", [name])