LUClusterVerify: Complain if disk is marked faulty
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37 import pycurl
38 import threading
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48
49 # pylint has a bug here, doesn't see this import
50 import ganeti.http.client  # pylint: disable-msg=W0611
51
52
53 # Timeout for connecting to nodes (seconds)
54 _RPC_CONNECT_TIMEOUT = 5
55
56 _RPC_CLIENT_HEADERS = [
57   "Content-type: %s" % http.HTTP_APP_JSON,
58   "Expect:",
59   ]
60
61 # Various time constants for the timeout table
62 _TMO_URGENT = 60 # one minute
63 _TMO_FAST = 5 * 60 # five minutes
64 _TMO_NORMAL = 15 * 60 # 15 minutes
65 _TMO_SLOW = 3600 # one hour
66 _TMO_4HRS = 4 * 3600
67 _TMO_1DAY = 86400
68
69 # Timeout table that will be built later by decorators
70 # Guidelines for choosing timeouts:
71 # - call used during watcher: timeout -> 1min, _TMO_URGENT
72 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
73 # - other calls: 15 min, _TMO_NORMAL
74 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
75
76 _TIMEOUTS = {
77 }
78
79
80 def Init():
81   """Initializes the module-global HTTP client manager.
82
83   Must be called before using any RPC function and while exactly one thread is
84   running.
85
86   """
87   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
88   # one thread running. This check is just a safety measure -- it doesn't
89   # cover all cases.
90   assert threading.activeCount() == 1, \
91          "Found more than one active thread when initializing pycURL"
92
93   logging.info("Using PycURL %s", pycurl.version)
94
95   pycurl.global_init(pycurl.GLOBAL_ALL)
96
97
98 def Shutdown():
99   """Stops the module-global HTTP client manager.
100
101   Must be called before quitting the program and while exactly one thread is
102   running.
103
104   """
105   pycurl.global_cleanup()
106
107
108 def _ConfigRpcCurl(curl):
109   noded_cert = str(constants.NODED_CERT_FILE)
110
111   curl.setopt(pycurl.FOLLOWLOCATION, False)
112   curl.setopt(pycurl.CAINFO, noded_cert)
113   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
114   curl.setopt(pycurl.SSL_VERIFYPEER, True)
115   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
116   curl.setopt(pycurl.SSLCERT, noded_cert)
117   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
118   curl.setopt(pycurl.SSLKEY, noded_cert)
119   curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
120
121
122 # Aliasing this module avoids the following warning by epydoc: "Warning: No
123 # information available for ganeti.rpc._RpcThreadLocal's base threading.local"
124 _threading = threading
125
126
127 class _RpcThreadLocal(_threading.local):
128   def GetHttpClientPool(self):
129     """Returns a per-thread HTTP client pool.
130
131     @rtype: L{http.client.HttpClientPool}
132
133     """
134     try:
135       pool = self.hcp
136     except AttributeError:
137       pool = http.client.HttpClientPool(_ConfigRpcCurl)
138       self.hcp = pool
139
140     return pool
141
142
143 # Remove module alias (see above)
144 del _threading
145
146
147 _thread_local = _RpcThreadLocal()
148
149
150 def _RpcTimeout(secs):
151   """Timeout decorator.
152
153   When applied to a rpc call_* function, it updates the global timeout
154   table with the given function/timeout.
155
156   """
157   def decorator(f):
158     name = f.__name__
159     assert name.startswith("call_")
160     _TIMEOUTS[name[len("call_"):]] = secs
161     return f
162   return decorator
163
164
165 def RunWithRPC(fn):
166   """RPC-wrapper decorator.
167
168   When applied to a function, it runs it with the RPC system
169   initialized, and it shutsdown the system afterwards. This means the
170   function must be called without RPC being initialized.
171
172   """
173   def wrapper(*args, **kwargs):
174     Init()
175     try:
176       return fn(*args, **kwargs)
177     finally:
178       Shutdown()
179   return wrapper
180
181
182 class RpcResult(object):
183   """RPC Result class.
184
185   This class holds an RPC result. It is needed since in multi-node
186   calls we can't raise an exception just because one one out of many
187   failed, and therefore we use this class to encapsulate the result.
188
189   @ivar data: the data payload, for successful results, or None
190   @ivar call: the name of the RPC call
191   @ivar node: the name of the node to which we made the call
192   @ivar offline: whether the operation failed because the node was
193       offline, as opposed to actual failure; offline=True will always
194       imply failed=True, in order to allow simpler checking if
195       the user doesn't care about the exact failure mode
196   @ivar fail_msg: the error message if the call failed
197
198   """
199   def __init__(self, data=None, failed=False, offline=False,
200                call=None, node=None):
201     self.offline = offline
202     self.call = call
203     self.node = node
204
205     if offline:
206       self.fail_msg = "Node is marked offline"
207       self.data = self.payload = None
208     elif failed:
209       self.fail_msg = self._EnsureErr(data)
210       self.data = self.payload = None
211     else:
212       self.data = data
213       if not isinstance(self.data, (tuple, list)):
214         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
215                          type(self.data))
216         self.payload = None
217       elif len(data) != 2:
218         self.fail_msg = ("RPC layer error: invalid result length (%d), "
219                          "expected 2" % len(self.data))
220         self.payload = None
221       elif not self.data[0]:
222         self.fail_msg = self._EnsureErr(self.data[1])
223         self.payload = None
224       else:
225         # finally success
226         self.fail_msg = None
227         self.payload = data[1]
228
229     assert hasattr(self, "call")
230     assert hasattr(self, "data")
231     assert hasattr(self, "fail_msg")
232     assert hasattr(self, "node")
233     assert hasattr(self, "offline")
234     assert hasattr(self, "payload")
235
236   @staticmethod
237   def _EnsureErr(val):
238     """Helper to ensure we return a 'True' value for error."""
239     if val:
240       return val
241     else:
242       return "No error information"
243
244   def Raise(self, msg, prereq=False, ecode=None):
245     """If the result has failed, raise an OpExecError.
246
247     This is used so that LU code doesn't have to check for each
248     result, but instead can call this function.
249
250     """
251     if not self.fail_msg:
252       return
253
254     if not msg: # one could pass None for default message
255       msg = ("Call '%s' to node '%s' has failed: %s" %
256              (self.call, self.node, self.fail_msg))
257     else:
258       msg = "%s: %s" % (msg, self.fail_msg)
259     if prereq:
260       ec = errors.OpPrereqError
261     else:
262       ec = errors.OpExecError
263     if ecode is not None:
264       args = (msg, ecode)
265     else:
266       args = (msg, )
267     raise ec(*args) # pylint: disable-msg=W0142
268
269
270 def _AddressLookup(node_list,
271                    ssc=ssconf.SimpleStore,
272                    nslookup_fn=netutils.Hostname.GetIP):
273   """Return addresses for given node names.
274
275   @type node_list: list
276   @param node_list: List of node names
277   @type ssc: class
278   @param ssc: SimpleStore class that is used to obtain node->ip mappings
279   @type nslookup_fn: callable
280   @param nslookup_fn: function use to do NS lookup
281   @rtype: list of addresses and/or None's
282   @returns: List of corresponding addresses, if found
283
284   """
285   ss = ssc()
286   iplist = ss.GetNodePrimaryIPList()
287   family = ss.GetPrimaryIPFamily()
288   addresses = []
289   ipmap = dict(entry.split() for entry in iplist)
290   for node in node_list:
291     address = ipmap.get(node)
292     if address is None:
293       address = nslookup_fn(node, family=family)
294     addresses.append(address)
295
296   return addresses
297
298
299 class Client:
300   """RPC Client class.
301
302   This class, given a (remote) method name, a list of parameters and a
303   list of nodes, will contact (in parallel) all nodes, and return a
304   dict of results (key: node name, value: result).
305
306   One current bug is that generic failure is still signaled by
307   'False' result, which is not good. This overloading of values can
308   cause bugs.
309
310   """
311   def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
312     assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
313                                     " timeouts table")
314     self.procedure = procedure
315     self.body = body
316     self.port = port
317     self._request = {}
318     self._address_lookup_fn = address_lookup_fn
319
320   def ConnectList(self, node_list, address_list=None, read_timeout=None):
321     """Add a list of nodes to the target nodes.
322
323     @type node_list: list
324     @param node_list: the list of node names to connect
325     @type address_list: list or None
326     @keyword address_list: either None or a list with node addresses,
327         which must have the same length as the node list
328     @type read_timeout: int
329     @param read_timeout: overwrites default timeout for operation
330
331     """
332     if address_list is None:
333       # Always use IP address instead of node name
334       address_list = self._address_lookup_fn(node_list)
335
336     assert len(node_list) == len(address_list), \
337            "Name and address lists must have the same length"
338
339     for node, address in zip(node_list, address_list):
340       self.ConnectNode(node, address, read_timeout=read_timeout)
341
342   def ConnectNode(self, name, address=None, read_timeout=None):
343     """Add a node to the target list.
344
345     @type name: str
346     @param name: the node name
347     @type address: str
348     @param address: the node address, if known
349     @type read_timeout: int
350     @param read_timeout: overwrites default timeout for operation
351
352     """
353     if address is None:
354       # Always use IP address instead of node name
355       address = self._address_lookup_fn([name])[0]
356
357     assert(address is not None)
358
359     if read_timeout is None:
360       read_timeout = _TIMEOUTS[self.procedure]
361
362     self._request[name] = \
363       http.client.HttpClientRequest(str(address), self.port,
364                                     http.HTTP_PUT, str("/%s" % self.procedure),
365                                     headers=_RPC_CLIENT_HEADERS,
366                                     post_data=str(self.body),
367                                     read_timeout=read_timeout)
368
369   def GetResults(self, http_pool=None):
370     """Call nodes and return results.
371
372     @rtype: list
373     @return: List of RPC results
374
375     """
376     if not http_pool:
377       http_pool = _thread_local.GetHttpClientPool()
378
379     http_pool.ProcessRequests(self._request.values())
380
381     results = {}
382
383     for name, req in self._request.iteritems():
384       if req.success and req.resp_status_code == http.HTTP_OK:
385         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
386                                   node=name, call=self.procedure)
387         continue
388
389       # TODO: Better error reporting
390       if req.error:
391         msg = req.error
392       else:
393         msg = req.resp_body
394
395       logging.error("RPC error in %s from node %s: %s",
396                     self.procedure, name, msg)
397       results[name] = RpcResult(data=msg, failed=True, node=name,
398                                 call=self.procedure)
399
400     return results
401
402
403 def _EncodeImportExportIO(ieio, ieioargs):
404   """Encodes import/export I/O information.
405
406   """
407   if ieio == constants.IEIO_RAW_DISK:
408     assert len(ieioargs) == 1
409     return (ieioargs[0].ToDict(), )
410
411   if ieio == constants.IEIO_SCRIPT:
412     assert len(ieioargs) == 2
413     return (ieioargs[0].ToDict(), ieioargs[1])
414
415   return ieioargs
416
417
418 class RpcRunner(object):
419   """RPC runner class"""
420
421   def __init__(self, cfg):
422     """Initialized the rpc runner.
423
424     @type cfg:  C{config.ConfigWriter}
425     @param cfg: the configuration object that will be used to get data
426                 about the cluster
427
428     """
429     self._cfg = cfg
430     self.port = netutils.GetDaemonPort(constants.NODED)
431
432   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
433     """Convert the given instance to a dict.
434
435     This is done via the instance's ToDict() method and additionally
436     we fill the hvparams with the cluster defaults.
437
438     @type instance: L{objects.Instance}
439     @param instance: an Instance object
440     @type hvp: dict or None
441     @param hvp: a dictionary with overridden hypervisor parameters
442     @type bep: dict or None
443     @param bep: a dictionary with overridden backend parameters
444     @type osp: dict or None
445     @param osp: a dictionary with overridden os parameters
446     @rtype: dict
447     @return: the instance dict, with the hvparams filled with the
448         cluster defaults
449
450     """
451     idict = instance.ToDict()
452     cluster = self._cfg.GetClusterInfo()
453     idict["hvparams"] = cluster.FillHV(instance)
454     if hvp is not None:
455       idict["hvparams"].update(hvp)
456     idict["beparams"] = cluster.FillBE(instance)
457     if bep is not None:
458       idict["beparams"].update(bep)
459     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
460     if osp is not None:
461       idict["osparams"].update(osp)
462     for nic in idict["nics"]:
463       nic['nicparams'] = objects.FillDict(
464         cluster.nicparams[constants.PP_DEFAULT],
465         nic['nicparams'])
466     return idict
467
468   def _ConnectList(self, client, node_list, call, read_timeout=None):
469     """Helper for computing node addresses.
470
471     @type client: L{ganeti.rpc.Client}
472     @param client: a C{Client} instance
473     @type node_list: list
474     @param node_list: the node list we should connect
475     @type call: string
476     @param call: the name of the remote procedure call, for filling in
477         correctly any eventual offline nodes' results
478     @type read_timeout: int
479     @param read_timeout: overwrites the default read timeout for the
480         given operation
481
482     """
483     all_nodes = self._cfg.GetAllNodesInfo()
484     name_list = []
485     addr_list = []
486     skip_dict = {}
487     for node in node_list:
488       if node in all_nodes:
489         if all_nodes[node].offline:
490           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
491           continue
492         val = all_nodes[node].primary_ip
493       else:
494         val = None
495       addr_list.append(val)
496       name_list.append(node)
497     if name_list:
498       client.ConnectList(name_list, address_list=addr_list,
499                          read_timeout=read_timeout)
500     return skip_dict
501
502   def _ConnectNode(self, client, node, call, read_timeout=None):
503     """Helper for computing one node's address.
504
505     @type client: L{ganeti.rpc.Client}
506     @param client: a C{Client} instance
507     @type node: str
508     @param node: the node we should connect
509     @type call: string
510     @param call: the name of the remote procedure call, for filling in
511         correctly any eventual offline nodes' results
512     @type read_timeout: int
513     @param read_timeout: overwrites the default read timeout for the
514         given operation
515
516     """
517     node_info = self._cfg.GetNodeInfo(node)
518     if node_info is not None:
519       if node_info.offline:
520         return RpcResult(node=node, offline=True, call=call)
521       addr = node_info.primary_ip
522     else:
523       addr = None
524     client.ConnectNode(node, address=addr, read_timeout=read_timeout)
525
526   def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
527     """Helper for making a multi-node call
528
529     """
530     body = serializer.DumpJson(args, indent=False)
531     c = Client(procedure, body, self.port)
532     skip_dict = self._ConnectList(c, node_list, procedure,
533                                   read_timeout=read_timeout)
534     skip_dict.update(c.GetResults())
535     return skip_dict
536
537   @classmethod
538   def _StaticMultiNodeCall(cls, node_list, procedure, args,
539                            address_list=None, read_timeout=None):
540     """Helper for making a multi-node static call
541
542     """
543     body = serializer.DumpJson(args, indent=False)
544     c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
545     c.ConnectList(node_list, address_list=address_list,
546                   read_timeout=read_timeout)
547     return c.GetResults()
548
549   def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
550     """Helper for making a single-node call
551
552     """
553     body = serializer.DumpJson(args, indent=False)
554     c = Client(procedure, body, self.port)
555     result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
556     if result is None:
557       # we did connect, node is not offline
558       result = c.GetResults()[node]
559     return result
560
561   @classmethod
562   def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
563     """Helper for making a single-node static call
564
565     """
566     body = serializer.DumpJson(args, indent=False)
567     c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
568     c.ConnectNode(node, read_timeout=read_timeout)
569     return c.GetResults()[node]
570
571   @staticmethod
572   def _Compress(data):
573     """Compresses a string for transport over RPC.
574
575     Small amounts of data are not compressed.
576
577     @type data: str
578     @param data: Data
579     @rtype: tuple
580     @return: Encoded data to send
581
582     """
583     # Small amounts of data are not compressed
584     if len(data) < 512:
585       return (constants.RPC_ENCODING_NONE, data)
586
587     # Compress with zlib and encode in base64
588     return (constants.RPC_ENCODING_ZLIB_BASE64,
589             base64.b64encode(zlib.compress(data, 3)))
590
591   #
592   # Begin RPC calls
593   #
594
595   @_RpcTimeout(_TMO_URGENT)
596   def call_lv_list(self, node_list, vg_name):
597     """Gets the logical volumes present in a given volume group.
598
599     This is a multi-node call.
600
601     """
602     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
603
604   @_RpcTimeout(_TMO_URGENT)
605   def call_vg_list(self, node_list):
606     """Gets the volume group list.
607
608     This is a multi-node call.
609
610     """
611     return self._MultiNodeCall(node_list, "vg_list", [])
612
613   @_RpcTimeout(_TMO_NORMAL)
614   def call_storage_list(self, node_list, su_name, su_args, name, fields):
615     """Get list of storage units.
616
617     This is a multi-node call.
618
619     """
620     return self._MultiNodeCall(node_list, "storage_list",
621                                [su_name, su_args, name, fields])
622
623   @_RpcTimeout(_TMO_NORMAL)
624   def call_storage_modify(self, node, su_name, su_args, name, changes):
625     """Modify a storage unit.
626
627     This is a single-node call.
628
629     """
630     return self._SingleNodeCall(node, "storage_modify",
631                                 [su_name, su_args, name, changes])
632
633   @_RpcTimeout(_TMO_NORMAL)
634   def call_storage_execute(self, node, su_name, su_args, name, op):
635     """Executes an operation on a storage unit.
636
637     This is a single-node call.
638
639     """
640     return self._SingleNodeCall(node, "storage_execute",
641                                 [su_name, su_args, name, op])
642
643   @_RpcTimeout(_TMO_URGENT)
644   def call_bridges_exist(self, node, bridges_list):
645     """Checks if a node has all the bridges given.
646
647     This method checks if all bridges given in the bridges_list are
648     present on the remote node, so that an instance that uses interfaces
649     on those bridges can be started.
650
651     This is a single-node call.
652
653     """
654     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
655
656   @_RpcTimeout(_TMO_NORMAL)
657   def call_instance_start(self, node, instance, hvp, bep):
658     """Starts an instance.
659
660     This is a single-node call.
661
662     """
663     idict = self._InstDict(instance, hvp=hvp, bep=bep)
664     return self._SingleNodeCall(node, "instance_start", [idict])
665
666   @_RpcTimeout(_TMO_NORMAL)
667   def call_instance_shutdown(self, node, instance, timeout):
668     """Stops an instance.
669
670     This is a single-node call.
671
672     """
673     return self._SingleNodeCall(node, "instance_shutdown",
674                                 [self._InstDict(instance), timeout])
675
676   @_RpcTimeout(_TMO_NORMAL)
677   def call_migration_info(self, node, instance):
678     """Gather the information necessary to prepare an instance migration.
679
680     This is a single-node call.
681
682     @type node: string
683     @param node: the node on which the instance is currently running
684     @type instance: C{objects.Instance}
685     @param instance: the instance definition
686
687     """
688     return self._SingleNodeCall(node, "migration_info",
689                                 [self._InstDict(instance)])
690
691   @_RpcTimeout(_TMO_NORMAL)
692   def call_accept_instance(self, node, instance, info, target):
693     """Prepare a node to accept an instance.
694
695     This is a single-node call.
696
697     @type node: string
698     @param node: the target node for the migration
699     @type instance: C{objects.Instance}
700     @param instance: the instance definition
701     @type info: opaque/hypervisor specific (string/data)
702     @param info: result for the call_migration_info call
703     @type target: string
704     @param target: target hostname (usually ip address) (on the node itself)
705
706     """
707     return self._SingleNodeCall(node, "accept_instance",
708                                 [self._InstDict(instance), info, target])
709
710   @_RpcTimeout(_TMO_NORMAL)
711   def call_finalize_migration(self, node, instance, info, success):
712     """Finalize any target-node migration specific operation.
713
714     This is called both in case of a successful migration and in case of error
715     (in which case it should abort the migration).
716
717     This is a single-node call.
718
719     @type node: string
720     @param node: the target node for the migration
721     @type instance: C{objects.Instance}
722     @param instance: the instance definition
723     @type info: opaque/hypervisor specific (string/data)
724     @param info: result for the call_migration_info call
725     @type success: boolean
726     @param success: whether the migration was a success or a failure
727
728     """
729     return self._SingleNodeCall(node, "finalize_migration",
730                                 [self._InstDict(instance), info, success])
731
732   @_RpcTimeout(_TMO_SLOW)
733   def call_instance_migrate(self, node, instance, target, live):
734     """Migrate an instance.
735
736     This is a single-node call.
737
738     @type node: string
739     @param node: the node on which the instance is currently running
740     @type instance: C{objects.Instance}
741     @param instance: the instance definition
742     @type target: string
743     @param target: the target node name
744     @type live: boolean
745     @param live: whether the migration should be done live or not (the
746         interpretation of this parameter is left to the hypervisor)
747
748     """
749     return self._SingleNodeCall(node, "instance_migrate",
750                                 [self._InstDict(instance), target, live])
751
752   @_RpcTimeout(_TMO_NORMAL)
753   def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
754     """Reboots an instance.
755
756     This is a single-node call.
757
758     """
759     return self._SingleNodeCall(node, "instance_reboot",
760                                 [self._InstDict(inst), reboot_type,
761                                  shutdown_timeout])
762
763   @_RpcTimeout(_TMO_1DAY)
764   def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
765     """Installs an OS on the given instance.
766
767     This is a single-node call.
768
769     """
770     return self._SingleNodeCall(node, "instance_os_add",
771                                 [self._InstDict(inst, osp=osparams),
772                                  reinstall, debug])
773
774   @_RpcTimeout(_TMO_SLOW)
775   def call_instance_run_rename(self, node, inst, old_name, debug):
776     """Run the OS rename script for an instance.
777
778     This is a single-node call.
779
780     """
781     return self._SingleNodeCall(node, "instance_run_rename",
782                                 [self._InstDict(inst), old_name, debug])
783
784   @_RpcTimeout(_TMO_URGENT)
785   def call_instance_info(self, node, instance, hname):
786     """Returns information about a single instance.
787
788     This is a single-node call.
789
790     @type node: list
791     @param node: the list of nodes to query
792     @type instance: string
793     @param instance: the instance name
794     @type hname: string
795     @param hname: the hypervisor type of the instance
796
797     """
798     return self._SingleNodeCall(node, "instance_info", [instance, hname])
799
800   @_RpcTimeout(_TMO_NORMAL)
801   def call_instance_migratable(self, node, instance):
802     """Checks whether the given instance can be migrated.
803
804     This is a single-node call.
805
806     @param node: the node to query
807     @type instance: L{objects.Instance}
808     @param instance: the instance to check
809
810
811     """
812     return self._SingleNodeCall(node, "instance_migratable",
813                                 [self._InstDict(instance)])
814
815   @_RpcTimeout(_TMO_URGENT)
816   def call_all_instances_info(self, node_list, hypervisor_list):
817     """Returns information about all instances on the given nodes.
818
819     This is a multi-node call.
820
821     @type node_list: list
822     @param node_list: the list of nodes to query
823     @type hypervisor_list: list
824     @param hypervisor_list: the hypervisors to query for instances
825
826     """
827     return self._MultiNodeCall(node_list, "all_instances_info",
828                                [hypervisor_list])
829
830   @_RpcTimeout(_TMO_URGENT)
831   def call_instance_list(self, node_list, hypervisor_list):
832     """Returns the list of running instances on a given node.
833
834     This is a multi-node call.
835
836     @type node_list: list
837     @param node_list: the list of nodes to query
838     @type hypervisor_list: list
839     @param hypervisor_list: the hypervisors to query for instances
840
841     """
842     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
843
844   @_RpcTimeout(_TMO_FAST)
845   def call_node_tcp_ping(self, node, source, target, port, timeout,
846                          live_port_needed):
847     """Do a TcpPing on the remote node
848
849     This is a single-node call.
850
851     """
852     return self._SingleNodeCall(node, "node_tcp_ping",
853                                 [source, target, port, timeout,
854                                  live_port_needed])
855
856   @_RpcTimeout(_TMO_FAST)
857   def call_node_has_ip_address(self, node, address):
858     """Checks if a node has the given IP address.
859
860     This is a single-node call.
861
862     """
863     return self._SingleNodeCall(node, "node_has_ip_address", [address])
864
865   @_RpcTimeout(_TMO_URGENT)
866   def call_node_info(self, node_list, vg_name, hypervisor_type):
867     """Return node information.
868
869     This will return memory information and volume group size and free
870     space.
871
872     This is a multi-node call.
873
874     @type node_list: list
875     @param node_list: the list of nodes to query
876     @type vg_name: C{string}
877     @param vg_name: the name of the volume group to ask for disk space
878         information
879     @type hypervisor_type: C{str}
880     @param hypervisor_type: the name of the hypervisor to ask for
881         memory information
882
883     """
884     return self._MultiNodeCall(node_list, "node_info",
885                                [vg_name, hypervisor_type])
886
887   @_RpcTimeout(_TMO_NORMAL)
888   def call_etc_hosts_modify(self, node, mode, name, ip):
889     """Modify hosts file with name
890
891     @type node: string
892     @param node: The node to call
893     @type mode: string
894     @param mode: The mode to operate. Currently "add" or "remove"
895     @type name: string
896     @param name: The host name to be modified
897     @type ip: string
898     @param ip: The ip of the entry (just valid if mode is "add")
899
900     """
901     return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
902
903   @_RpcTimeout(_TMO_NORMAL)
904   def call_node_verify(self, node_list, checkdict, cluster_name):
905     """Request verification of given parameters.
906
907     This is a multi-node call.
908
909     """
910     return self._MultiNodeCall(node_list, "node_verify",
911                                [checkdict, cluster_name])
912
913   @classmethod
914   @_RpcTimeout(_TMO_FAST)
915   def call_node_start_master(cls, node, start_daemons, no_voting):
916     """Tells a node to activate itself as a master.
917
918     This is a single-node call.
919
920     """
921     return cls._StaticSingleNodeCall(node, "node_start_master",
922                                      [start_daemons, no_voting])
923
924   @classmethod
925   @_RpcTimeout(_TMO_FAST)
926   def call_node_stop_master(cls, node, stop_daemons):
927     """Tells a node to demote itself from master status.
928
929     This is a single-node call.
930
931     """
932     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
933
934   @classmethod
935   @_RpcTimeout(_TMO_URGENT)
936   def call_master_info(cls, node_list):
937     """Query master info.
938
939     This is a multi-node call.
940
941     """
942     # TODO: should this method query down nodes?
943     return cls._StaticMultiNodeCall(node_list, "master_info", [])
944
945   @classmethod
946   @_RpcTimeout(_TMO_URGENT)
947   def call_version(cls, node_list):
948     """Query node version.
949
950     This is a multi-node call.
951
952     """
953     return cls._StaticMultiNodeCall(node_list, "version", [])
954
955   @_RpcTimeout(_TMO_NORMAL)
956   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
957     """Request creation of a given block device.
958
959     This is a single-node call.
960
961     """
962     return self._SingleNodeCall(node, "blockdev_create",
963                                 [bdev.ToDict(), size, owner, on_primary, info])
964
965   @_RpcTimeout(_TMO_SLOW)
966   def call_blockdev_wipe(self, node, bdev, offset, size):
967     """Request wipe at given offset with given size of a block device.
968
969     This is a single-node call.
970
971     """
972     return self._SingleNodeCall(node, "blockdev_wipe",
973                                 [bdev.ToDict(), offset, size])
974
975   @_RpcTimeout(_TMO_NORMAL)
976   def call_blockdev_remove(self, node, bdev):
977     """Request removal of a given block device.
978
979     This is a single-node call.
980
981     """
982     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
983
984   @_RpcTimeout(_TMO_NORMAL)
985   def call_blockdev_rename(self, node, devlist):
986     """Request rename of the given block devices.
987
988     This is a single-node call.
989
990     """
991     return self._SingleNodeCall(node, "blockdev_rename",
992                                 [(d.ToDict(), uid) for d, uid in devlist])
993
994   @_RpcTimeout(_TMO_NORMAL)
995   def call_blockdev_assemble(self, node, disk, owner, on_primary):
996     """Request assembling of a given block device.
997
998     This is a single-node call.
999
1000     """
1001     return self._SingleNodeCall(node, "blockdev_assemble",
1002                                 [disk.ToDict(), owner, on_primary])
1003
1004   @_RpcTimeout(_TMO_NORMAL)
1005   def call_blockdev_shutdown(self, node, disk):
1006     """Request shutdown of a given block device.
1007
1008     This is a single-node call.
1009
1010     """
1011     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1012
1013   @_RpcTimeout(_TMO_NORMAL)
1014   def call_blockdev_addchildren(self, node, bdev, ndevs):
1015     """Request adding a list of children to a (mirroring) device.
1016
1017     This is a single-node call.
1018
1019     """
1020     return self._SingleNodeCall(node, "blockdev_addchildren",
1021                                 [bdev.ToDict(),
1022                                  [disk.ToDict() for disk in ndevs]])
1023
1024   @_RpcTimeout(_TMO_NORMAL)
1025   def call_blockdev_removechildren(self, node, bdev, ndevs):
1026     """Request removing a list of children from a (mirroring) device.
1027
1028     This is a single-node call.
1029
1030     """
1031     return self._SingleNodeCall(node, "blockdev_removechildren",
1032                                 [bdev.ToDict(),
1033                                  [disk.ToDict() for disk in ndevs]])
1034
1035   @_RpcTimeout(_TMO_NORMAL)
1036   def call_blockdev_getmirrorstatus(self, node, disks):
1037     """Request status of a (mirroring) device.
1038
1039     This is a single-node call.
1040
1041     """
1042     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1043                                   [dsk.ToDict() for dsk in disks])
1044     if not result.fail_msg:
1045       result.payload = [objects.BlockDevStatus.FromDict(i)
1046                         for i in result.payload]
1047     return result
1048
1049   @_RpcTimeout(_TMO_NORMAL)
1050   def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1051     """Request status of (mirroring) devices from multiple nodes.
1052
1053     This is a multi-node call.
1054
1055     """
1056     result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1057                                  [dict((name, [dsk.ToDict() for dsk in disks])
1058                                        for name, disks in node_disks.items())])
1059     for nres in result.values():
1060       if not nres.fail_msg:
1061         nres.payload = [objects.BlockDevStatus.FromDict(i)
1062                         for i in nres.payload]
1063     return result
1064
1065   @_RpcTimeout(_TMO_NORMAL)
1066   def call_blockdev_find(self, node, disk):
1067     """Request identification of a given block device.
1068
1069     This is a single-node call.
1070
1071     """
1072     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1073     if not result.fail_msg and result.payload is not None:
1074       result.payload = objects.BlockDevStatus.FromDict(result.payload)
1075     return result
1076
1077   @_RpcTimeout(_TMO_NORMAL)
1078   def call_blockdev_close(self, node, instance_name, disks):
1079     """Closes the given block devices.
1080
1081     This is a single-node call.
1082
1083     """
1084     params = [instance_name, [cf.ToDict() for cf in disks]]
1085     return self._SingleNodeCall(node, "blockdev_close", params)
1086
1087   @_RpcTimeout(_TMO_NORMAL)
1088   def call_blockdev_getsizes(self, node, disks):
1089     """Returns the size of the given disks.
1090
1091     This is a single-node call.
1092
1093     """
1094     params = [[cf.ToDict() for cf in disks]]
1095     return self._SingleNodeCall(node, "blockdev_getsize", params)
1096
1097   @_RpcTimeout(_TMO_NORMAL)
1098   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1099     """Disconnects the network of the given drbd devices.
1100
1101     This is a multi-node call.
1102
1103     """
1104     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1105                                [nodes_ip, [cf.ToDict() for cf in disks]])
1106
1107   @_RpcTimeout(_TMO_NORMAL)
1108   def call_drbd_attach_net(self, node_list, nodes_ip,
1109                            disks, instance_name, multimaster):
1110     """Disconnects the given drbd devices.
1111
1112     This is a multi-node call.
1113
1114     """
1115     return self._MultiNodeCall(node_list, "drbd_attach_net",
1116                                [nodes_ip, [cf.ToDict() for cf in disks],
1117                                 instance_name, multimaster])
1118
1119   @_RpcTimeout(_TMO_SLOW)
1120   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1121     """Waits for the synchronization of drbd devices is complete.
1122
1123     This is a multi-node call.
1124
1125     """
1126     return self._MultiNodeCall(node_list, "drbd_wait_sync",
1127                                [nodes_ip, [cf.ToDict() for cf in disks]])
1128
1129   @_RpcTimeout(_TMO_URGENT)
1130   def call_drbd_helper(self, node_list):
1131     """Gets drbd helper.
1132
1133     This is a multi-node call.
1134
1135     """
1136     return self._MultiNodeCall(node_list, "drbd_helper", [])
1137
1138   @classmethod
1139   @_RpcTimeout(_TMO_NORMAL)
1140   def call_upload_file(cls, node_list, file_name, address_list=None):
1141     """Upload a file.
1142
1143     The node will refuse the operation in case the file is not on the
1144     approved file list.
1145
1146     This is a multi-node call.
1147
1148     @type node_list: list
1149     @param node_list: the list of node names to upload to
1150     @type file_name: str
1151     @param file_name: the filename to upload
1152     @type address_list: list or None
1153     @keyword address_list: an optional list of node addresses, in order
1154         to optimize the RPC speed
1155
1156     """
1157     file_contents = utils.ReadFile(file_name)
1158     data = cls._Compress(file_contents)
1159     st = os.stat(file_name)
1160     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1161               st.st_atime, st.st_mtime]
1162     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1163                                     address_list=address_list)
1164
1165   @classmethod
1166   @_RpcTimeout(_TMO_NORMAL)
1167   def call_write_ssconf_files(cls, node_list, values):
1168     """Write ssconf files.
1169
1170     This is a multi-node call.
1171
1172     """
1173     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1174
1175   @_RpcTimeout(_TMO_FAST)
1176   def call_os_diagnose(self, node_list):
1177     """Request a diagnose of OS definitions.
1178
1179     This is a multi-node call.
1180
1181     """
1182     return self._MultiNodeCall(node_list, "os_diagnose", [])
1183
1184   @_RpcTimeout(_TMO_FAST)
1185   def call_os_get(self, node, name):
1186     """Returns an OS definition.
1187
1188     This is a single-node call.
1189
1190     """
1191     result = self._SingleNodeCall(node, "os_get", [name])
1192     if not result.fail_msg and isinstance(result.payload, dict):
1193       result.payload = objects.OS.FromDict(result.payload)
1194     return result
1195
1196   @_RpcTimeout(_TMO_FAST)
1197   def call_os_validate(self, required, nodes, name, checks, params):
1198     """Run a validation routine for a given OS.
1199
1200     This is a multi-node call.
1201
1202     """
1203     return self._MultiNodeCall(nodes, "os_validate",
1204                                [required, name, checks, params])
1205
1206   @_RpcTimeout(_TMO_NORMAL)
1207   def call_hooks_runner(self, node_list, hpath, phase, env):
1208     """Call the hooks runner.
1209
1210     Args:
1211       - op: the OpCode instance
1212       - env: a dictionary with the environment
1213
1214     This is a multi-node call.
1215
1216     """
1217     params = [hpath, phase, env]
1218     return self._MultiNodeCall(node_list, "hooks_runner", params)
1219
1220   @_RpcTimeout(_TMO_NORMAL)
1221   def call_iallocator_runner(self, node, name, idata):
1222     """Call an iallocator on a remote node
1223
1224     Args:
1225       - name: the iallocator name
1226       - input: the json-encoded input string
1227
1228     This is a single-node call.
1229
1230     """
1231     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1232
1233   @_RpcTimeout(_TMO_NORMAL)
1234   def call_blockdev_grow(self, node, cf_bdev, amount):
1235     """Request a snapshot of the given block device.
1236
1237     This is a single-node call.
1238
1239     """
1240     return self._SingleNodeCall(node, "blockdev_grow",
1241                                 [cf_bdev.ToDict(), amount])
1242
1243   @_RpcTimeout(_TMO_1DAY)
1244   def call_blockdev_export(self, node, cf_bdev,
1245                            dest_node, dest_path, cluster_name):
1246     """Export a given disk to another node.
1247
1248     This is a single-node call.
1249
1250     """
1251     return self._SingleNodeCall(node, "blockdev_export",
1252                                 [cf_bdev.ToDict(), dest_node, dest_path,
1253                                  cluster_name])
1254
1255   @_RpcTimeout(_TMO_NORMAL)
1256   def call_blockdev_snapshot(self, node, cf_bdev):
1257     """Request a snapshot of the given block device.
1258
1259     This is a single-node call.
1260
1261     """
1262     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1263
1264   @_RpcTimeout(_TMO_NORMAL)
1265   def call_finalize_export(self, node, instance, snap_disks):
1266     """Request the completion of an export operation.
1267
1268     This writes the export config file, etc.
1269
1270     This is a single-node call.
1271
1272     """
1273     flat_disks = []
1274     for disk in snap_disks:
1275       if isinstance(disk, bool):
1276         flat_disks.append(disk)
1277       else:
1278         flat_disks.append(disk.ToDict())
1279
1280     return self._SingleNodeCall(node, "finalize_export",
1281                                 [self._InstDict(instance), flat_disks])
1282
1283   @_RpcTimeout(_TMO_FAST)
1284   def call_export_info(self, node, path):
1285     """Queries the export information in a given path.
1286
1287     This is a single-node call.
1288
1289     """
1290     return self._SingleNodeCall(node, "export_info", [path])
1291
1292   @_RpcTimeout(_TMO_FAST)
1293   def call_export_list(self, node_list):
1294     """Gets the stored exports list.
1295
1296     This is a multi-node call.
1297
1298     """
1299     return self._MultiNodeCall(node_list, "export_list", [])
1300
1301   @_RpcTimeout(_TMO_FAST)
1302   def call_export_remove(self, node, export):
1303     """Requests removal of a given export.
1304
1305     This is a single-node call.
1306
1307     """
1308     return self._SingleNodeCall(node, "export_remove", [export])
1309
1310   @classmethod
1311   @_RpcTimeout(_TMO_NORMAL)
1312   def call_node_leave_cluster(cls, node, modify_ssh_setup):
1313     """Requests a node to clean the cluster information it has.
1314
1315     This will remove the configuration information from the ganeti data
1316     dir.
1317
1318     This is a single-node call.
1319
1320     """
1321     return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1322                                      [modify_ssh_setup])
1323
1324   @_RpcTimeout(_TMO_FAST)
1325   def call_node_volumes(self, node_list):
1326     """Gets all volumes on node(s).
1327
1328     This is a multi-node call.
1329
1330     """
1331     return self._MultiNodeCall(node_list, "node_volumes", [])
1332
1333   @_RpcTimeout(_TMO_FAST)
1334   def call_node_demote_from_mc(self, node):
1335     """Demote a node from the master candidate role.
1336
1337     This is a single-node call.
1338
1339     """
1340     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1341
1342   @_RpcTimeout(_TMO_NORMAL)
1343   def call_node_powercycle(self, node, hypervisor):
1344     """Tries to powercycle a node.
1345
1346     This is a single-node call.
1347
1348     """
1349     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1350
1351   @_RpcTimeout(None)
1352   def call_test_delay(self, node_list, duration):
1353     """Sleep for a fixed time on given node(s).
1354
1355     This is a multi-node call.
1356
1357     """
1358     return self._MultiNodeCall(node_list, "test_delay", [duration],
1359                                read_timeout=int(duration + 5))
1360
1361   @_RpcTimeout(_TMO_FAST)
1362   def call_file_storage_dir_create(self, node, file_storage_dir):
1363     """Create the given file storage directory.
1364
1365     This is a single-node call.
1366
1367     """
1368     return self._SingleNodeCall(node, "file_storage_dir_create",
1369                                 [file_storage_dir])
1370
1371   @_RpcTimeout(_TMO_FAST)
1372   def call_file_storage_dir_remove(self, node, file_storage_dir):
1373     """Remove the given file storage directory.
1374
1375     This is a single-node call.
1376
1377     """
1378     return self._SingleNodeCall(node, "file_storage_dir_remove",
1379                                 [file_storage_dir])
1380
1381   @_RpcTimeout(_TMO_FAST)
1382   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1383                                    new_file_storage_dir):
1384     """Rename file storage directory.
1385
1386     This is a single-node call.
1387
1388     """
1389     return self._SingleNodeCall(node, "file_storage_dir_rename",
1390                                 [old_file_storage_dir, new_file_storage_dir])
1391
1392   @classmethod
1393   @_RpcTimeout(_TMO_FAST)
1394   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1395     """Update job queue.
1396
1397     This is a multi-node call.
1398
1399     """
1400     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1401                                     [file_name, cls._Compress(content)],
1402                                     address_list=address_list)
1403
1404   @classmethod
1405   @_RpcTimeout(_TMO_NORMAL)
1406   def call_jobqueue_purge(cls, node):
1407     """Purge job queue.
1408
1409     This is a single-node call.
1410
1411     """
1412     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1413
1414   @classmethod
1415   @_RpcTimeout(_TMO_FAST)
1416   def call_jobqueue_rename(cls, node_list, address_list, rename):
1417     """Rename a job queue file.
1418
1419     This is a multi-node call.
1420
1421     """
1422     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1423                                     address_list=address_list)
1424
1425   @_RpcTimeout(_TMO_NORMAL)
1426   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1427     """Validate the hypervisor params.
1428
1429     This is a multi-node call.
1430
1431     @type node_list: list
1432     @param node_list: the list of nodes to query
1433     @type hvname: string
1434     @param hvname: the hypervisor name
1435     @type hvparams: dict
1436     @param hvparams: the hypervisor parameters to be validated
1437
1438     """
1439     cluster = self._cfg.GetClusterInfo()
1440     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1441     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1442                                [hvname, hv_full])
1443
1444   @_RpcTimeout(_TMO_NORMAL)
1445   def call_x509_cert_create(self, node, validity):
1446     """Creates a new X509 certificate for SSL/TLS.
1447
1448     This is a single-node call.
1449
1450     @type validity: int
1451     @param validity: Validity in seconds
1452
1453     """
1454     return self._SingleNodeCall(node, "x509_cert_create", [validity])
1455
1456   @_RpcTimeout(_TMO_NORMAL)
1457   def call_x509_cert_remove(self, node, name):
1458     """Removes a X509 certificate.
1459
1460     This is a single-node call.
1461
1462     @type name: string
1463     @param name: Certificate name
1464
1465     """
1466     return self._SingleNodeCall(node, "x509_cert_remove", [name])
1467
1468   @_RpcTimeout(_TMO_NORMAL)
1469   def call_import_start(self, node, opts, instance, dest, dest_args):
1470     """Starts a listener for an import.
1471
1472     This is a single-node call.
1473
1474     @type node: string
1475     @param node: Node name
1476     @type instance: C{objects.Instance}
1477     @param instance: Instance object
1478
1479     """
1480     return self._SingleNodeCall(node, "import_start",
1481                                 [opts.ToDict(),
1482                                  self._InstDict(instance), dest,
1483                                  _EncodeImportExportIO(dest, dest_args)])
1484
1485   @_RpcTimeout(_TMO_NORMAL)
1486   def call_export_start(self, node, opts, host, port,
1487                         instance, source, source_args):
1488     """Starts an export daemon.
1489
1490     This is a single-node call.
1491
1492     @type node: string
1493     @param node: Node name
1494     @type instance: C{objects.Instance}
1495     @param instance: Instance object
1496
1497     """
1498     return self._SingleNodeCall(node, "export_start",
1499                                 [opts.ToDict(), host, port,
1500                                  self._InstDict(instance), source,
1501                                  _EncodeImportExportIO(source, source_args)])
1502
1503   @_RpcTimeout(_TMO_FAST)
1504   def call_impexp_status(self, node, names):
1505     """Gets the status of an import or export.
1506
1507     This is a single-node call.
1508
1509     @type node: string
1510     @param node: Node name
1511     @type names: List of strings
1512     @param names: Import/export names
1513     @rtype: List of L{objects.ImportExportStatus} instances
1514     @return: Returns a list of the state of each named import/export or None if
1515              a status couldn't be retrieved
1516
1517     """
1518     result = self._SingleNodeCall(node, "impexp_status", [names])
1519
1520     if not result.fail_msg:
1521       decoded = []
1522
1523       for i in result.payload:
1524         if i is None:
1525           decoded.append(None)
1526           continue
1527         decoded.append(objects.ImportExportStatus.FromDict(i))
1528
1529       result.payload = decoded
1530
1531     return result
1532
1533   @_RpcTimeout(_TMO_NORMAL)
1534   def call_impexp_abort(self, node, name):
1535     """Aborts an import or export.
1536
1537     This is a single-node call.
1538
1539     @type node: string
1540     @param node: Node name
1541     @type name: string
1542     @param name: Import/export name
1543
1544     """
1545     return self._SingleNodeCall(node, "impexp_abort", [name])
1546
1547   @_RpcTimeout(_TMO_NORMAL)
1548   def call_impexp_cleanup(self, node, name):
1549     """Cleans up after an import or export.
1550
1551     This is a single-node call.
1552
1553     @type node: string
1554     @param node: Node name
1555     @type name: string
1556     @param name: Import/export name
1557
1558     """
1559     return self._SingleNodeCall(node, "impexp_cleanup", [name])