jqueue: Fix potential race condition when cancelling queued jobs
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37 import pycurl
38 import threading
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49
50 # pylint has a bug here, doesn't see this import
51 import ganeti.http.client  # pylint: disable-msg=W0611
52
53
54 # Timeout for connecting to nodes (seconds)
55 _RPC_CONNECT_TIMEOUT = 5
56
57 _RPC_CLIENT_HEADERS = [
58   "Content-type: %s" % http.HTTP_APP_JSON,
59   "Expect:",
60   ]
61
62 # Various time constants for the timeout table
63 _TMO_URGENT = 60 # one minute
64 _TMO_FAST = 5 * 60 # five minutes
65 _TMO_NORMAL = 15 * 60 # 15 minutes
66 _TMO_SLOW = 3600 # one hour
67 _TMO_4HRS = 4 * 3600
68 _TMO_1DAY = 86400
69
70 # Timeout table that will be built later by decorators
71 # Guidelines for choosing timeouts:
72 # - call used during watcher: timeout -> 1min, _TMO_URGENT
73 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
74 # - other calls: 15 min, _TMO_NORMAL
75 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
76
77 _TIMEOUTS = {
78 }
79
80
81 def Init():
82   """Initializes the module-global HTTP client manager.
83
84   Must be called before using any RPC function and while exactly one thread is
85   running.
86
87   """
88   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
89   # one thread running. This check is just a safety measure -- it doesn't
90   # cover all cases.
91   assert threading.activeCount() == 1, \
92          "Found more than one active thread when initializing pycURL"
93
94   logging.info("Using PycURL %s", pycurl.version)
95
96   pycurl.global_init(pycurl.GLOBAL_ALL)
97
98
99 def Shutdown():
100   """Stops the module-global HTTP client manager.
101
102   Must be called before quitting the program and while exactly one thread is
103   running.
104
105   """
106   pycurl.global_cleanup()
107
108
109 def _ConfigRpcCurl(curl):
110   noded_cert = str(constants.NODED_CERT_FILE)
111
112   curl.setopt(pycurl.FOLLOWLOCATION, False)
113   curl.setopt(pycurl.CAINFO, noded_cert)
114   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
115   curl.setopt(pycurl.SSL_VERIFYPEER, True)
116   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
117   curl.setopt(pycurl.SSLCERT, noded_cert)
118   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
119   curl.setopt(pycurl.SSLKEY, noded_cert)
120   curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
121
122
123 # Aliasing this module avoids the following warning by epydoc: "Warning: No
124 # information available for ganeti.rpc._RpcThreadLocal's base threading.local"
125 _threading = threading
126
127
128 class _RpcThreadLocal(_threading.local):
129   def GetHttpClientPool(self):
130     """Returns a per-thread HTTP client pool.
131
132     @rtype: L{http.client.HttpClientPool}
133
134     """
135     try:
136       pool = self.hcp
137     except AttributeError:
138       pool = http.client.HttpClientPool(_ConfigRpcCurl)
139       self.hcp = pool
140
141     return pool
142
143
144 # Remove module alias (see above)
145 del _threading
146
147
148 _thread_local = _RpcThreadLocal()
149
150
151 def _RpcTimeout(secs):
152   """Timeout decorator.
153
154   When applied to a rpc call_* function, it updates the global timeout
155   table with the given function/timeout.
156
157   """
158   def decorator(f):
159     name = f.__name__
160     assert name.startswith("call_")
161     _TIMEOUTS[name[len("call_"):]] = secs
162     return f
163   return decorator
164
165
166 def RunWithRPC(fn):
167   """RPC-wrapper decorator.
168
169   When applied to a function, it runs it with the RPC system
170   initialized, and it shutsdown the system afterwards. This means the
171   function must be called without RPC being initialized.
172
173   """
174   def wrapper(*args, **kwargs):
175     Init()
176     try:
177       return fn(*args, **kwargs)
178     finally:
179       Shutdown()
180   return wrapper
181
182
183 class RpcResult(object):
184   """RPC Result class.
185
186   This class holds an RPC result. It is needed since in multi-node
187   calls we can't raise an exception just because one one out of many
188   failed, and therefore we use this class to encapsulate the result.
189
190   @ivar data: the data payload, for successful results, or None
191   @ivar call: the name of the RPC call
192   @ivar node: the name of the node to which we made the call
193   @ivar offline: whether the operation failed because the node was
194       offline, as opposed to actual failure; offline=True will always
195       imply failed=True, in order to allow simpler checking if
196       the user doesn't care about the exact failure mode
197   @ivar fail_msg: the error message if the call failed
198
199   """
200   def __init__(self, data=None, failed=False, offline=False,
201                call=None, node=None):
202     self.offline = offline
203     self.call = call
204     self.node = node
205
206     if offline:
207       self.fail_msg = "Node is marked offline"
208       self.data = self.payload = None
209     elif failed:
210       self.fail_msg = self._EnsureErr(data)
211       self.data = self.payload = None
212     else:
213       self.data = data
214       if not isinstance(self.data, (tuple, list)):
215         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
216                          type(self.data))
217         self.payload = None
218       elif len(data) != 2:
219         self.fail_msg = ("RPC layer error: invalid result length (%d), "
220                          "expected 2" % len(self.data))
221         self.payload = None
222       elif not self.data[0]:
223         self.fail_msg = self._EnsureErr(self.data[1])
224         self.payload = None
225       else:
226         # finally success
227         self.fail_msg = None
228         self.payload = data[1]
229
230     for attr_name in ["call", "data", "fail_msg",
231                       "node", "offline", "payload"]:
232       assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
233
234   @staticmethod
235   def _EnsureErr(val):
236     """Helper to ensure we return a 'True' value for error."""
237     if val:
238       return val
239     else:
240       return "No error information"
241
242   def Raise(self, msg, prereq=False, ecode=None):
243     """If the result has failed, raise an OpExecError.
244
245     This is used so that LU code doesn't have to check for each
246     result, but instead can call this function.
247
248     """
249     if not self.fail_msg:
250       return
251
252     if not msg: # one could pass None for default message
253       msg = ("Call '%s' to node '%s' has failed: %s" %
254              (self.call, self.node, self.fail_msg))
255     else:
256       msg = "%s: %s" % (msg, self.fail_msg)
257     if prereq:
258       ec = errors.OpPrereqError
259     else:
260       ec = errors.OpExecError
261     if ecode is not None:
262       args = (msg, ecode)
263     else:
264       args = (msg, )
265     raise ec(*args) # pylint: disable-msg=W0142
266
267
268 def _AddressLookup(node_list,
269                    ssc=ssconf.SimpleStore,
270                    nslookup_fn=netutils.Hostname.GetIP):
271   """Return addresses for given node names.
272
273   @type node_list: list
274   @param node_list: List of node names
275   @type ssc: class
276   @param ssc: SimpleStore class that is used to obtain node->ip mappings
277   @type nslookup_fn: callable
278   @param nslookup_fn: function use to do NS lookup
279   @rtype: list of addresses and/or None's
280   @returns: List of corresponding addresses, if found
281
282   """
283   ss = ssc()
284   iplist = ss.GetNodePrimaryIPList()
285   family = ss.GetPrimaryIPFamily()
286   addresses = []
287   ipmap = dict(entry.split() for entry in iplist)
288   for node in node_list:
289     address = ipmap.get(node)
290     if address is None:
291       address = nslookup_fn(node, family=family)
292     addresses.append(address)
293
294   return addresses
295
296
297 class Client:
298   """RPC Client class.
299
300   This class, given a (remote) method name, a list of parameters and a
301   list of nodes, will contact (in parallel) all nodes, and return a
302   dict of results (key: node name, value: result).
303
304   One current bug is that generic failure is still signaled by
305   'False' result, which is not good. This overloading of values can
306   cause bugs.
307
308   """
309   def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
310     assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
311                                     " timeouts table")
312     self.procedure = procedure
313     self.body = body
314     self.port = port
315     self._request = {}
316     self._address_lookup_fn = address_lookup_fn
317
318   def ConnectList(self, node_list, address_list=None, read_timeout=None):
319     """Add a list of nodes to the target nodes.
320
321     @type node_list: list
322     @param node_list: the list of node names to connect
323     @type address_list: list or None
324     @keyword address_list: either None or a list with node addresses,
325         which must have the same length as the node list
326     @type read_timeout: int
327     @param read_timeout: overwrites default timeout for operation
328
329     """
330     if address_list is None:
331       # Always use IP address instead of node name
332       address_list = self._address_lookup_fn(node_list)
333
334     assert len(node_list) == len(address_list), \
335            "Name and address lists must have the same length"
336
337     for node, address in zip(node_list, address_list):
338       self.ConnectNode(node, address, read_timeout=read_timeout)
339
340   def ConnectNode(self, name, address=None, read_timeout=None):
341     """Add a node to the target list.
342
343     @type name: str
344     @param name: the node name
345     @type address: str
346     @param address: the node address, if known
347     @type read_timeout: int
348     @param read_timeout: overwrites default timeout for operation
349
350     """
351     if address is None:
352       # Always use IP address instead of node name
353       address = self._address_lookup_fn([name])[0]
354
355     assert(address is not None)
356
357     if read_timeout is None:
358       read_timeout = _TIMEOUTS[self.procedure]
359
360     self._request[name] = \
361       http.client.HttpClientRequest(str(address), self.port,
362                                     http.HTTP_PUT, str("/%s" % self.procedure),
363                                     headers=_RPC_CLIENT_HEADERS,
364                                     post_data=str(self.body),
365                                     read_timeout=read_timeout)
366
367   def GetResults(self, http_pool=None):
368     """Call nodes and return results.
369
370     @rtype: list
371     @return: List of RPC results
372
373     """
374     if not http_pool:
375       http_pool = _thread_local.GetHttpClientPool()
376
377     http_pool.ProcessRequests(self._request.values())
378
379     results = {}
380
381     for name, req in self._request.iteritems():
382       if req.success and req.resp_status_code == http.HTTP_OK:
383         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
384                                   node=name, call=self.procedure)
385         continue
386
387       # TODO: Better error reporting
388       if req.error:
389         msg = req.error
390       else:
391         msg = req.resp_body
392
393       logging.error("RPC error in %s from node %s: %s",
394                     self.procedure, name, msg)
395       results[name] = RpcResult(data=msg, failed=True, node=name,
396                                 call=self.procedure)
397
398     return results
399
400
401 def _EncodeImportExportIO(ieio, ieioargs):
402   """Encodes import/export I/O information.
403
404   """
405   if ieio == constants.IEIO_RAW_DISK:
406     assert len(ieioargs) == 1
407     return (ieioargs[0].ToDict(), )
408
409   if ieio == constants.IEIO_SCRIPT:
410     assert len(ieioargs) == 2
411     return (ieioargs[0].ToDict(), ieioargs[1])
412
413   return ieioargs
414
415
416 class RpcRunner(object):
417   """RPC runner class"""
418
419   def __init__(self, cfg):
420     """Initialized the rpc runner.
421
422     @type cfg:  C{config.ConfigWriter}
423     @param cfg: the configuration object that will be used to get data
424                 about the cluster
425
426     """
427     self._cfg = cfg
428     self.port = netutils.GetDaemonPort(constants.NODED)
429
430   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
431     """Convert the given instance to a dict.
432
433     This is done via the instance's ToDict() method and additionally
434     we fill the hvparams with the cluster defaults.
435
436     @type instance: L{objects.Instance}
437     @param instance: an Instance object
438     @type hvp: dict or None
439     @param hvp: a dictionary with overridden hypervisor parameters
440     @type bep: dict or None
441     @param bep: a dictionary with overridden backend parameters
442     @type osp: dict or None
443     @param osp: a dictionary with overridden os parameters
444     @rtype: dict
445     @return: the instance dict, with the hvparams filled with the
446         cluster defaults
447
448     """
449     idict = instance.ToDict()
450     cluster = self._cfg.GetClusterInfo()
451     idict["hvparams"] = cluster.FillHV(instance)
452     if hvp is not None:
453       idict["hvparams"].update(hvp)
454     idict["beparams"] = cluster.FillBE(instance)
455     if bep is not None:
456       idict["beparams"].update(bep)
457     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
458     if osp is not None:
459       idict["osparams"].update(osp)
460     for nic in idict["nics"]:
461       nic['nicparams'] = objects.FillDict(
462         cluster.nicparams[constants.PP_DEFAULT],
463         nic['nicparams'])
464     return idict
465
466   def _ConnectList(self, client, node_list, call, read_timeout=None):
467     """Helper for computing node addresses.
468
469     @type client: L{ganeti.rpc.Client}
470     @param client: a C{Client} instance
471     @type node_list: list
472     @param node_list: the node list we should connect
473     @type call: string
474     @param call: the name of the remote procedure call, for filling in
475         correctly any eventual offline nodes' results
476     @type read_timeout: int
477     @param read_timeout: overwrites the default read timeout for the
478         given operation
479
480     """
481     all_nodes = self._cfg.GetAllNodesInfo()
482     name_list = []
483     addr_list = []
484     skip_dict = {}
485     for node in node_list:
486       if node in all_nodes:
487         if all_nodes[node].offline:
488           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
489           continue
490         val = all_nodes[node].primary_ip
491       else:
492         val = None
493       addr_list.append(val)
494       name_list.append(node)
495     if name_list:
496       client.ConnectList(name_list, address_list=addr_list,
497                          read_timeout=read_timeout)
498     return skip_dict
499
500   def _ConnectNode(self, client, node, call, read_timeout=None):
501     """Helper for computing one node's address.
502
503     @type client: L{ganeti.rpc.Client}
504     @param client: a C{Client} instance
505     @type node: str
506     @param node: the node we should connect
507     @type call: string
508     @param call: the name of the remote procedure call, for filling in
509         correctly any eventual offline nodes' results
510     @type read_timeout: int
511     @param read_timeout: overwrites the default read timeout for the
512         given operation
513
514     """
515     node_info = self._cfg.GetNodeInfo(node)
516     if node_info is not None:
517       if node_info.offline:
518         return RpcResult(node=node, offline=True, call=call)
519       addr = node_info.primary_ip
520     else:
521       addr = None
522     client.ConnectNode(node, address=addr, read_timeout=read_timeout)
523
524   def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
525     """Helper for making a multi-node call
526
527     """
528     body = serializer.DumpJson(args, indent=False)
529     c = Client(procedure, body, self.port)
530     skip_dict = self._ConnectList(c, node_list, procedure,
531                                   read_timeout=read_timeout)
532     skip_dict.update(c.GetResults())
533     return skip_dict
534
535   @classmethod
536   def _StaticMultiNodeCall(cls, node_list, procedure, args,
537                            address_list=None, read_timeout=None):
538     """Helper for making a multi-node static call
539
540     """
541     body = serializer.DumpJson(args, indent=False)
542     c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
543     c.ConnectList(node_list, address_list=address_list,
544                   read_timeout=read_timeout)
545     return c.GetResults()
546
547   def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
548     """Helper for making a single-node call
549
550     """
551     body = serializer.DumpJson(args, indent=False)
552     c = Client(procedure, body, self.port)
553     result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
554     if result is None:
555       # we did connect, node is not offline
556       result = c.GetResults()[node]
557     return result
558
559   @classmethod
560   def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
561     """Helper for making a single-node static call
562
563     """
564     body = serializer.DumpJson(args, indent=False)
565     c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
566     c.ConnectNode(node, read_timeout=read_timeout)
567     return c.GetResults()[node]
568
569   @staticmethod
570   def _Compress(data):
571     """Compresses a string for transport over RPC.
572
573     Small amounts of data are not compressed.
574
575     @type data: str
576     @param data: Data
577     @rtype: tuple
578     @return: Encoded data to send
579
580     """
581     # Small amounts of data are not compressed
582     if len(data) < 512:
583       return (constants.RPC_ENCODING_NONE, data)
584
585     # Compress with zlib and encode in base64
586     return (constants.RPC_ENCODING_ZLIB_BASE64,
587             base64.b64encode(zlib.compress(data, 3)))
588
589   #
590   # Begin RPC calls
591   #
592
593   @_RpcTimeout(_TMO_URGENT)
594   def call_lv_list(self, node_list, vg_name):
595     """Gets the logical volumes present in a given volume group.
596
597     This is a multi-node call.
598
599     """
600     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
601
602   @_RpcTimeout(_TMO_URGENT)
603   def call_vg_list(self, node_list):
604     """Gets the volume group list.
605
606     This is a multi-node call.
607
608     """
609     return self._MultiNodeCall(node_list, "vg_list", [])
610
611   @_RpcTimeout(_TMO_NORMAL)
612   def call_storage_list(self, node_list, su_name, su_args, name, fields):
613     """Get list of storage units.
614
615     This is a multi-node call.
616
617     """
618     return self._MultiNodeCall(node_list, "storage_list",
619                                [su_name, su_args, name, fields])
620
621   @_RpcTimeout(_TMO_NORMAL)
622   def call_storage_modify(self, node, su_name, su_args, name, changes):
623     """Modify a storage unit.
624
625     This is a single-node call.
626
627     """
628     return self._SingleNodeCall(node, "storage_modify",
629                                 [su_name, su_args, name, changes])
630
631   @_RpcTimeout(_TMO_NORMAL)
632   def call_storage_execute(self, node, su_name, su_args, name, op):
633     """Executes an operation on a storage unit.
634
635     This is a single-node call.
636
637     """
638     return self._SingleNodeCall(node, "storage_execute",
639                                 [su_name, su_args, name, op])
640
641   @_RpcTimeout(_TMO_URGENT)
642   def call_bridges_exist(self, node, bridges_list):
643     """Checks if a node has all the bridges given.
644
645     This method checks if all bridges given in the bridges_list are
646     present on the remote node, so that an instance that uses interfaces
647     on those bridges can be started.
648
649     This is a single-node call.
650
651     """
652     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
653
654   @_RpcTimeout(_TMO_NORMAL)
655   def call_instance_start(self, node, instance, hvp, bep):
656     """Starts an instance.
657
658     This is a single-node call.
659
660     """
661     idict = self._InstDict(instance, hvp=hvp, bep=bep)
662     return self._SingleNodeCall(node, "instance_start", [idict])
663
664   @_RpcTimeout(_TMO_NORMAL)
665   def call_instance_shutdown(self, node, instance, timeout):
666     """Stops an instance.
667
668     This is a single-node call.
669
670     """
671     return self._SingleNodeCall(node, "instance_shutdown",
672                                 [self._InstDict(instance), timeout])
673
674   @_RpcTimeout(_TMO_NORMAL)
675   def call_migration_info(self, node, instance):
676     """Gather the information necessary to prepare an instance migration.
677
678     This is a single-node call.
679
680     @type node: string
681     @param node: the node on which the instance is currently running
682     @type instance: C{objects.Instance}
683     @param instance: the instance definition
684
685     """
686     return self._SingleNodeCall(node, "migration_info",
687                                 [self._InstDict(instance)])
688
689   @_RpcTimeout(_TMO_NORMAL)
690   def call_accept_instance(self, node, instance, info, target):
691     """Prepare a node to accept an instance.
692
693     This is a single-node call.
694
695     @type node: string
696     @param node: the target node for the migration
697     @type instance: C{objects.Instance}
698     @param instance: the instance definition
699     @type info: opaque/hypervisor specific (string/data)
700     @param info: result for the call_migration_info call
701     @type target: string
702     @param target: target hostname (usually ip address) (on the node itself)
703
704     """
705     return self._SingleNodeCall(node, "accept_instance",
706                                 [self._InstDict(instance), info, target])
707
708   @_RpcTimeout(_TMO_NORMAL)
709   def call_finalize_migration(self, node, instance, info, success):
710     """Finalize any target-node migration specific operation.
711
712     This is called both in case of a successful migration and in case of error
713     (in which case it should abort the migration).
714
715     This is a single-node call.
716
717     @type node: string
718     @param node: the target node for the migration
719     @type instance: C{objects.Instance}
720     @param instance: the instance definition
721     @type info: opaque/hypervisor specific (string/data)
722     @param info: result for the call_migration_info call
723     @type success: boolean
724     @param success: whether the migration was a success or a failure
725
726     """
727     return self._SingleNodeCall(node, "finalize_migration",
728                                 [self._InstDict(instance), info, success])
729
730   @_RpcTimeout(_TMO_SLOW)
731   def call_instance_migrate(self, node, instance, target, live):
732     """Migrate an instance.
733
734     This is a single-node call.
735
736     @type node: string
737     @param node: the node on which the instance is currently running
738     @type instance: C{objects.Instance}
739     @param instance: the instance definition
740     @type target: string
741     @param target: the target node name
742     @type live: boolean
743     @param live: whether the migration should be done live or not (the
744         interpretation of this parameter is left to the hypervisor)
745
746     """
747     return self._SingleNodeCall(node, "instance_migrate",
748                                 [self._InstDict(instance), target, live])
749
750   @_RpcTimeout(_TMO_NORMAL)
751   def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
752     """Reboots an instance.
753
754     This is a single-node call.
755
756     """
757     return self._SingleNodeCall(node, "instance_reboot",
758                                 [self._InstDict(inst), reboot_type,
759                                  shutdown_timeout])
760
761   @_RpcTimeout(_TMO_1DAY)
762   def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
763     """Installs an OS on the given instance.
764
765     This is a single-node call.
766
767     """
768     return self._SingleNodeCall(node, "instance_os_add",
769                                 [self._InstDict(inst, osp=osparams),
770                                  reinstall, debug])
771
772   @_RpcTimeout(_TMO_SLOW)
773   def call_instance_run_rename(self, node, inst, old_name, debug):
774     """Run the OS rename script for an instance.
775
776     This is a single-node call.
777
778     """
779     return self._SingleNodeCall(node, "instance_run_rename",
780                                 [self._InstDict(inst), old_name, debug])
781
782   @_RpcTimeout(_TMO_URGENT)
783   def call_instance_info(self, node, instance, hname):
784     """Returns information about a single instance.
785
786     This is a single-node call.
787
788     @type node: list
789     @param node: the list of nodes to query
790     @type instance: string
791     @param instance: the instance name
792     @type hname: string
793     @param hname: the hypervisor type of the instance
794
795     """
796     return self._SingleNodeCall(node, "instance_info", [instance, hname])
797
798   @_RpcTimeout(_TMO_NORMAL)
799   def call_instance_migratable(self, node, instance):
800     """Checks whether the given instance can be migrated.
801
802     This is a single-node call.
803
804     @param node: the node to query
805     @type instance: L{objects.Instance}
806     @param instance: the instance to check
807
808
809     """
810     return self._SingleNodeCall(node, "instance_migratable",
811                                 [self._InstDict(instance)])
812
813   @_RpcTimeout(_TMO_URGENT)
814   def call_all_instances_info(self, node_list, hypervisor_list):
815     """Returns information about all instances on the given nodes.
816
817     This is a multi-node call.
818
819     @type node_list: list
820     @param node_list: the list of nodes to query
821     @type hypervisor_list: list
822     @param hypervisor_list: the hypervisors to query for instances
823
824     """
825     return self._MultiNodeCall(node_list, "all_instances_info",
826                                [hypervisor_list])
827
828   @_RpcTimeout(_TMO_URGENT)
829   def call_instance_list(self, node_list, hypervisor_list):
830     """Returns the list of running instances on a given node.
831
832     This is a multi-node call.
833
834     @type node_list: list
835     @param node_list: the list of nodes to query
836     @type hypervisor_list: list
837     @param hypervisor_list: the hypervisors to query for instances
838
839     """
840     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
841
842   @_RpcTimeout(_TMO_FAST)
843   def call_node_tcp_ping(self, node, source, target, port, timeout,
844                          live_port_needed):
845     """Do a TcpPing on the remote node
846
847     This is a single-node call.
848
849     """
850     return self._SingleNodeCall(node, "node_tcp_ping",
851                                 [source, target, port, timeout,
852                                  live_port_needed])
853
854   @_RpcTimeout(_TMO_FAST)
855   def call_node_has_ip_address(self, node, address):
856     """Checks if a node has the given IP address.
857
858     This is a single-node call.
859
860     """
861     return self._SingleNodeCall(node, "node_has_ip_address", [address])
862
863   @_RpcTimeout(_TMO_URGENT)
864   def call_node_info(self, node_list, vg_name, hypervisor_type):
865     """Return node information.
866
867     This will return memory information and volume group size and free
868     space.
869
870     This is a multi-node call.
871
872     @type node_list: list
873     @param node_list: the list of nodes to query
874     @type vg_name: C{string}
875     @param vg_name: the name of the volume group to ask for disk space
876         information
877     @type hypervisor_type: C{str}
878     @param hypervisor_type: the name of the hypervisor to ask for
879         memory information
880
881     """
882     return self._MultiNodeCall(node_list, "node_info",
883                                [vg_name, hypervisor_type])
884
885   @_RpcTimeout(_TMO_NORMAL)
886   def call_etc_hosts_modify(self, node, mode, name, ip):
887     """Modify hosts file with name
888
889     @type node: string
890     @param node: The node to call
891     @type mode: string
892     @param mode: The mode to operate. Currently "add" or "remove"
893     @type name: string
894     @param name: The host name to be modified
895     @type ip: string
896     @param ip: The ip of the entry (just valid if mode is "add")
897
898     """
899     return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
900
901   @_RpcTimeout(_TMO_NORMAL)
902   def call_node_verify(self, node_list, checkdict, cluster_name):
903     """Request verification of given parameters.
904
905     This is a multi-node call.
906
907     """
908     return self._MultiNodeCall(node_list, "node_verify",
909                                [checkdict, cluster_name])
910
911   @classmethod
912   @_RpcTimeout(_TMO_FAST)
913   def call_node_start_master(cls, node, start_daemons, no_voting):
914     """Tells a node to activate itself as a master.
915
916     This is a single-node call.
917
918     """
919     return cls._StaticSingleNodeCall(node, "node_start_master",
920                                      [start_daemons, no_voting])
921
922   @classmethod
923   @_RpcTimeout(_TMO_FAST)
924   def call_node_stop_master(cls, node, stop_daemons):
925     """Tells a node to demote itself from master status.
926
927     This is a single-node call.
928
929     """
930     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
931
932   @classmethod
933   @_RpcTimeout(_TMO_URGENT)
934   def call_master_info(cls, node_list):
935     """Query master info.
936
937     This is a multi-node call.
938
939     """
940     # TODO: should this method query down nodes?
941     return cls._StaticMultiNodeCall(node_list, "master_info", [])
942
943   @classmethod
944   @_RpcTimeout(_TMO_URGENT)
945   def call_version(cls, node_list):
946     """Query node version.
947
948     This is a multi-node call.
949
950     """
951     return cls._StaticMultiNodeCall(node_list, "version", [])
952
953   @_RpcTimeout(_TMO_NORMAL)
954   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
955     """Request creation of a given block device.
956
957     This is a single-node call.
958
959     """
960     return self._SingleNodeCall(node, "blockdev_create",
961                                 [bdev.ToDict(), size, owner, on_primary, info])
962
963   @_RpcTimeout(_TMO_SLOW)
964   def call_blockdev_wipe(self, node, bdev, offset, size):
965     """Request wipe at given offset with given size of a block device.
966
967     This is a single-node call.
968
969     """
970     return self._SingleNodeCall(node, "blockdev_wipe",
971                                 [bdev.ToDict(), offset, size])
972
973   @_RpcTimeout(_TMO_NORMAL)
974   def call_blockdev_remove(self, node, bdev):
975     """Request removal of a given block device.
976
977     This is a single-node call.
978
979     """
980     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
981
982   @_RpcTimeout(_TMO_NORMAL)
983   def call_blockdev_rename(self, node, devlist):
984     """Request rename of the given block devices.
985
986     This is a single-node call.
987
988     """
989     return self._SingleNodeCall(node, "blockdev_rename",
990                                 [(d.ToDict(), uid) for d, uid in devlist])
991
992   @_RpcTimeout(_TMO_NORMAL)
993   def call_blockdev_pause_resume_sync(self, node, disks, pause):
994     """Request a pause/resume of given block device.
995
996     This is a single-node call.
997
998     """
999     return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1000                                 [[bdev.ToDict() for bdev in disks], pause])
1001
1002   @_RpcTimeout(_TMO_NORMAL)
1003   def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1004     """Request assembling of a given block device.
1005
1006     This is a single-node call.
1007
1008     """
1009     return self._SingleNodeCall(node, "blockdev_assemble",
1010                                 [disk.ToDict(), owner, on_primary, idx])
1011
1012   @_RpcTimeout(_TMO_NORMAL)
1013   def call_blockdev_shutdown(self, node, disk):
1014     """Request shutdown of a given block device.
1015
1016     This is a single-node call.
1017
1018     """
1019     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1020
1021   @_RpcTimeout(_TMO_NORMAL)
1022   def call_blockdev_addchildren(self, node, bdev, ndevs):
1023     """Request adding a list of children to a (mirroring) device.
1024
1025     This is a single-node call.
1026
1027     """
1028     return self._SingleNodeCall(node, "blockdev_addchildren",
1029                                 [bdev.ToDict(),
1030                                  [disk.ToDict() for disk in ndevs]])
1031
1032   @_RpcTimeout(_TMO_NORMAL)
1033   def call_blockdev_removechildren(self, node, bdev, ndevs):
1034     """Request removing a list of children from a (mirroring) device.
1035
1036     This is a single-node call.
1037
1038     """
1039     return self._SingleNodeCall(node, "blockdev_removechildren",
1040                                 [bdev.ToDict(),
1041                                  [disk.ToDict() for disk in ndevs]])
1042
1043   @_RpcTimeout(_TMO_NORMAL)
1044   def call_blockdev_getmirrorstatus(self, node, disks):
1045     """Request status of a (mirroring) device.
1046
1047     This is a single-node call.
1048
1049     """
1050     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1051                                   [dsk.ToDict() for dsk in disks])
1052     if not result.fail_msg:
1053       result.payload = [objects.BlockDevStatus.FromDict(i)
1054                         for i in result.payload]
1055     return result
1056
1057   @_RpcTimeout(_TMO_NORMAL)
1058   def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1059     """Request status of (mirroring) devices from multiple nodes.
1060
1061     This is a multi-node call.
1062
1063     """
1064     result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1065                                  [dict((name, [dsk.ToDict() for dsk in disks])
1066                                        for name, disks in node_disks.items())])
1067     for nres in result.values():
1068       if nres.fail_msg:
1069         continue
1070
1071       for idx, (success, status) in enumerate(nres.payload):
1072         if success:
1073           nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1074
1075     return result
1076
1077   @_RpcTimeout(_TMO_NORMAL)
1078   def call_blockdev_find(self, node, disk):
1079     """Request identification of a given block device.
1080
1081     This is a single-node call.
1082
1083     """
1084     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1085     if not result.fail_msg and result.payload is not None:
1086       result.payload = objects.BlockDevStatus.FromDict(result.payload)
1087     return result
1088
1089   @_RpcTimeout(_TMO_NORMAL)
1090   def call_blockdev_close(self, node, instance_name, disks):
1091     """Closes the given block devices.
1092
1093     This is a single-node call.
1094
1095     """
1096     params = [instance_name, [cf.ToDict() for cf in disks]]
1097     return self._SingleNodeCall(node, "blockdev_close", params)
1098
1099   @_RpcTimeout(_TMO_NORMAL)
1100   def call_blockdev_getsize(self, node, disks):
1101     """Returns the size of the given disks.
1102
1103     This is a single-node call.
1104
1105     """
1106     params = [[cf.ToDict() for cf in disks]]
1107     return self._SingleNodeCall(node, "blockdev_getsize", params)
1108
1109   @_RpcTimeout(_TMO_NORMAL)
1110   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1111     """Disconnects the network of the given drbd devices.
1112
1113     This is a multi-node call.
1114
1115     """
1116     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1117                                [nodes_ip, [cf.ToDict() for cf in disks]])
1118
1119   @_RpcTimeout(_TMO_NORMAL)
1120   def call_drbd_attach_net(self, node_list, nodes_ip,
1121                            disks, instance_name, multimaster):
1122     """Disconnects the given drbd devices.
1123
1124     This is a multi-node call.
1125
1126     """
1127     return self._MultiNodeCall(node_list, "drbd_attach_net",
1128                                [nodes_ip, [cf.ToDict() for cf in disks],
1129                                 instance_name, multimaster])
1130
1131   @_RpcTimeout(_TMO_SLOW)
1132   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1133     """Waits for the synchronization of drbd devices is complete.
1134
1135     This is a multi-node call.
1136
1137     """
1138     return self._MultiNodeCall(node_list, "drbd_wait_sync",
1139                                [nodes_ip, [cf.ToDict() for cf in disks]])
1140
1141   @_RpcTimeout(_TMO_URGENT)
1142   def call_drbd_helper(self, node_list):
1143     """Gets drbd helper.
1144
1145     This is a multi-node call.
1146
1147     """
1148     return self._MultiNodeCall(node_list, "drbd_helper", [])
1149
1150   @classmethod
1151   @_RpcTimeout(_TMO_NORMAL)
1152   def call_upload_file(cls, node_list, file_name, address_list=None):
1153     """Upload a file.
1154
1155     The node will refuse the operation in case the file is not on the
1156     approved file list.
1157
1158     This is a multi-node call.
1159
1160     @type node_list: list
1161     @param node_list: the list of node names to upload to
1162     @type file_name: str
1163     @param file_name: the filename to upload
1164     @type address_list: list or None
1165     @keyword address_list: an optional list of node addresses, in order
1166         to optimize the RPC speed
1167
1168     """
1169     file_contents = utils.ReadFile(file_name)
1170     data = cls._Compress(file_contents)
1171     st = os.stat(file_name)
1172     getents = runtime.GetEnts()
1173     params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1174               getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1175     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1176                                     address_list=address_list)
1177
1178   @classmethod
1179   @_RpcTimeout(_TMO_NORMAL)
1180   def call_write_ssconf_files(cls, node_list, values):
1181     """Write ssconf files.
1182
1183     This is a multi-node call.
1184
1185     """
1186     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1187
1188   @_RpcTimeout(_TMO_NORMAL)
1189   def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1190     """Runs OOB.
1191
1192     This is a single-node call.
1193
1194     """
1195     return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1196                                                   remote_node, timeout])
1197
1198   @_RpcTimeout(_TMO_FAST)
1199   def call_os_diagnose(self, node_list):
1200     """Request a diagnose of OS definitions.
1201
1202     This is a multi-node call.
1203
1204     """
1205     return self._MultiNodeCall(node_list, "os_diagnose", [])
1206
1207   @_RpcTimeout(_TMO_FAST)
1208   def call_os_get(self, node, name):
1209     """Returns an OS definition.
1210
1211     This is a single-node call.
1212
1213     """
1214     result = self._SingleNodeCall(node, "os_get", [name])
1215     if not result.fail_msg and isinstance(result.payload, dict):
1216       result.payload = objects.OS.FromDict(result.payload)
1217     return result
1218
1219   @_RpcTimeout(_TMO_FAST)
1220   def call_os_validate(self, required, nodes, name, checks, params):
1221     """Run a validation routine for a given OS.
1222
1223     This is a multi-node call.
1224
1225     """
1226     return self._MultiNodeCall(nodes, "os_validate",
1227                                [required, name, checks, params])
1228
1229   @_RpcTimeout(_TMO_NORMAL)
1230   def call_hooks_runner(self, node_list, hpath, phase, env):
1231     """Call the hooks runner.
1232
1233     Args:
1234       - op: the OpCode instance
1235       - env: a dictionary with the environment
1236
1237     This is a multi-node call.
1238
1239     """
1240     params = [hpath, phase, env]
1241     return self._MultiNodeCall(node_list, "hooks_runner", params)
1242
1243   @_RpcTimeout(_TMO_NORMAL)
1244   def call_iallocator_runner(self, node, name, idata):
1245     """Call an iallocator on a remote node
1246
1247     Args:
1248       - name: the iallocator name
1249       - input: the json-encoded input string
1250
1251     This is a single-node call.
1252
1253     """
1254     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1255
1256   @_RpcTimeout(_TMO_NORMAL)
1257   def call_blockdev_grow(self, node, cf_bdev, amount):
1258     """Request a snapshot of the given block device.
1259
1260     This is a single-node call.
1261
1262     """
1263     return self._SingleNodeCall(node, "blockdev_grow",
1264                                 [cf_bdev.ToDict(), amount])
1265
1266   @_RpcTimeout(_TMO_1DAY)
1267   def call_blockdev_export(self, node, cf_bdev,
1268                            dest_node, dest_path, cluster_name):
1269     """Export a given disk to another node.
1270
1271     This is a single-node call.
1272
1273     """
1274     return self._SingleNodeCall(node, "blockdev_export",
1275                                 [cf_bdev.ToDict(), dest_node, dest_path,
1276                                  cluster_name])
1277
1278   @_RpcTimeout(_TMO_NORMAL)
1279   def call_blockdev_snapshot(self, node, cf_bdev):
1280     """Request a snapshot of the given block device.
1281
1282     This is a single-node call.
1283
1284     """
1285     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1286
1287   @_RpcTimeout(_TMO_NORMAL)
1288   def call_finalize_export(self, node, instance, snap_disks):
1289     """Request the completion of an export operation.
1290
1291     This writes the export config file, etc.
1292
1293     This is a single-node call.
1294
1295     """
1296     flat_disks = []
1297     for disk in snap_disks:
1298       if isinstance(disk, bool):
1299         flat_disks.append(disk)
1300       else:
1301         flat_disks.append(disk.ToDict())
1302
1303     return self._SingleNodeCall(node, "finalize_export",
1304                                 [self._InstDict(instance), flat_disks])
1305
1306   @_RpcTimeout(_TMO_FAST)
1307   def call_export_info(self, node, path):
1308     """Queries the export information in a given path.
1309
1310     This is a single-node call.
1311
1312     """
1313     return self._SingleNodeCall(node, "export_info", [path])
1314
1315   @_RpcTimeout(_TMO_FAST)
1316   def call_export_list(self, node_list):
1317     """Gets the stored exports list.
1318
1319     This is a multi-node call.
1320
1321     """
1322     return self._MultiNodeCall(node_list, "export_list", [])
1323
1324   @_RpcTimeout(_TMO_FAST)
1325   def call_export_remove(self, node, export):
1326     """Requests removal of a given export.
1327
1328     This is a single-node call.
1329
1330     """
1331     return self._SingleNodeCall(node, "export_remove", [export])
1332
1333   @classmethod
1334   @_RpcTimeout(_TMO_NORMAL)
1335   def call_node_leave_cluster(cls, node, modify_ssh_setup):
1336     """Requests a node to clean the cluster information it has.
1337
1338     This will remove the configuration information from the ganeti data
1339     dir.
1340
1341     This is a single-node call.
1342
1343     """
1344     return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1345                                      [modify_ssh_setup])
1346
1347   @_RpcTimeout(_TMO_FAST)
1348   def call_node_volumes(self, node_list):
1349     """Gets all volumes on node(s).
1350
1351     This is a multi-node call.
1352
1353     """
1354     return self._MultiNodeCall(node_list, "node_volumes", [])
1355
1356   @_RpcTimeout(_TMO_FAST)
1357   def call_node_demote_from_mc(self, node):
1358     """Demote a node from the master candidate role.
1359
1360     This is a single-node call.
1361
1362     """
1363     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1364
1365   @_RpcTimeout(_TMO_NORMAL)
1366   def call_node_powercycle(self, node, hypervisor):
1367     """Tries to powercycle a node.
1368
1369     This is a single-node call.
1370
1371     """
1372     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1373
1374   @_RpcTimeout(None)
1375   def call_test_delay(self, node_list, duration):
1376     """Sleep for a fixed time on given node(s).
1377
1378     This is a multi-node call.
1379
1380     """
1381     return self._MultiNodeCall(node_list, "test_delay", [duration],
1382                                read_timeout=int(duration + 5))
1383
1384   @_RpcTimeout(_TMO_FAST)
1385   def call_file_storage_dir_create(self, node, file_storage_dir):
1386     """Create the given file storage directory.
1387
1388     This is a single-node call.
1389
1390     """
1391     return self._SingleNodeCall(node, "file_storage_dir_create",
1392                                 [file_storage_dir])
1393
1394   @_RpcTimeout(_TMO_FAST)
1395   def call_file_storage_dir_remove(self, node, file_storage_dir):
1396     """Remove the given file storage directory.
1397
1398     This is a single-node call.
1399
1400     """
1401     return self._SingleNodeCall(node, "file_storage_dir_remove",
1402                                 [file_storage_dir])
1403
1404   @_RpcTimeout(_TMO_FAST)
1405   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1406                                    new_file_storage_dir):
1407     """Rename file storage directory.
1408
1409     This is a single-node call.
1410
1411     """
1412     return self._SingleNodeCall(node, "file_storage_dir_rename",
1413                                 [old_file_storage_dir, new_file_storage_dir])
1414
1415   @classmethod
1416   @_RpcTimeout(_TMO_URGENT)
1417   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1418     """Update job queue.
1419
1420     This is a multi-node call.
1421
1422     """
1423     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1424                                     [file_name, cls._Compress(content)],
1425                                     address_list=address_list)
1426
1427   @classmethod
1428   @_RpcTimeout(_TMO_NORMAL)
1429   def call_jobqueue_purge(cls, node):
1430     """Purge job queue.
1431
1432     This is a single-node call.
1433
1434     """
1435     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1436
1437   @classmethod
1438   @_RpcTimeout(_TMO_URGENT)
1439   def call_jobqueue_rename(cls, node_list, address_list, rename):
1440     """Rename a job queue file.
1441
1442     This is a multi-node call.
1443
1444     """
1445     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1446                                     address_list=address_list)
1447
1448   @_RpcTimeout(_TMO_NORMAL)
1449   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1450     """Validate the hypervisor params.
1451
1452     This is a multi-node call.
1453
1454     @type node_list: list
1455     @param node_list: the list of nodes to query
1456     @type hvname: string
1457     @param hvname: the hypervisor name
1458     @type hvparams: dict
1459     @param hvparams: the hypervisor parameters to be validated
1460
1461     """
1462     cluster = self._cfg.GetClusterInfo()
1463     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1464     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1465                                [hvname, hv_full])
1466
1467   @_RpcTimeout(_TMO_NORMAL)
1468   def call_x509_cert_create(self, node, validity):
1469     """Creates a new X509 certificate for SSL/TLS.
1470
1471     This is a single-node call.
1472
1473     @type validity: int
1474     @param validity: Validity in seconds
1475
1476     """
1477     return self._SingleNodeCall(node, "x509_cert_create", [validity])
1478
1479   @_RpcTimeout(_TMO_NORMAL)
1480   def call_x509_cert_remove(self, node, name):
1481     """Removes a X509 certificate.
1482
1483     This is a single-node call.
1484
1485     @type name: string
1486     @param name: Certificate name
1487
1488     """
1489     return self._SingleNodeCall(node, "x509_cert_remove", [name])
1490
1491   @_RpcTimeout(_TMO_NORMAL)
1492   def call_import_start(self, node, opts, instance, dest, dest_args):
1493     """Starts a listener for an import.
1494
1495     This is a single-node call.
1496
1497     @type node: string
1498     @param node: Node name
1499     @type instance: C{objects.Instance}
1500     @param instance: Instance object
1501
1502     """
1503     return self._SingleNodeCall(node, "import_start",
1504                                 [opts.ToDict(),
1505                                  self._InstDict(instance), dest,
1506                                  _EncodeImportExportIO(dest, dest_args)])
1507
1508   @_RpcTimeout(_TMO_NORMAL)
1509   def call_export_start(self, node, opts, host, port,
1510                         instance, source, source_args):
1511     """Starts an export daemon.
1512
1513     This is a single-node call.
1514
1515     @type node: string
1516     @param node: Node name
1517     @type instance: C{objects.Instance}
1518     @param instance: Instance object
1519
1520     """
1521     return self._SingleNodeCall(node, "export_start",
1522                                 [opts.ToDict(), host, port,
1523                                  self._InstDict(instance), source,
1524                                  _EncodeImportExportIO(source, source_args)])
1525
1526   @_RpcTimeout(_TMO_FAST)
1527   def call_impexp_status(self, node, names):
1528     """Gets the status of an import or export.
1529
1530     This is a single-node call.
1531
1532     @type node: string
1533     @param node: Node name
1534     @type names: List of strings
1535     @param names: Import/export names
1536     @rtype: List of L{objects.ImportExportStatus} instances
1537     @return: Returns a list of the state of each named import/export or None if
1538              a status couldn't be retrieved
1539
1540     """
1541     result = self._SingleNodeCall(node, "impexp_status", [names])
1542
1543     if not result.fail_msg:
1544       decoded = []
1545
1546       for i in result.payload:
1547         if i is None:
1548           decoded.append(None)
1549           continue
1550         decoded.append(objects.ImportExportStatus.FromDict(i))
1551
1552       result.payload = decoded
1553
1554     return result
1555
1556   @_RpcTimeout(_TMO_NORMAL)
1557   def call_impexp_abort(self, node, name):
1558     """Aborts an import or export.
1559
1560     This is a single-node call.
1561
1562     @type node: string
1563     @param node: Node name
1564     @type name: string
1565     @param name: Import/export name
1566
1567     """
1568     return self._SingleNodeCall(node, "impexp_abort", [name])
1569
1570   @_RpcTimeout(_TMO_NORMAL)
1571   def call_impexp_cleanup(self, node, name):
1572     """Cleans up after an import or export.
1573
1574     This is a single-node call.
1575
1576     @type node: string
1577     @param node: Node name
1578     @type name: string
1579     @param name: Import/export name
1580
1581     """
1582     return self._SingleNodeCall(node, "impexp_cleanup", [name])