DeprecationWarning fixes for pylint
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37 import pycurl
38 import threading
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49
50 # pylint has a bug here, doesn't see this import
51 import ganeti.http.client  # pylint: disable=W0611
52
53
54 # Timeout for connecting to nodes (seconds)
55 _RPC_CONNECT_TIMEOUT = 5
56
57 _RPC_CLIENT_HEADERS = [
58   "Content-type: %s" % http.HTTP_APP_JSON,
59   "Expect:",
60   ]
61
62 # Various time constants for the timeout table
63 _TMO_URGENT = 60 # one minute
64 _TMO_FAST = 5 * 60 # five minutes
65 _TMO_NORMAL = 15 * 60 # 15 minutes
66 _TMO_SLOW = 3600 # one hour
67 _TMO_4HRS = 4 * 3600
68 _TMO_1DAY = 86400
69
70 # Timeout table that will be built later by decorators
71 # Guidelines for choosing timeouts:
72 # - call used during watcher: timeout -> 1min, _TMO_URGENT
73 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
74 # - other calls: 15 min, _TMO_NORMAL
75 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
76
77 _TIMEOUTS = {
78 }
79
80
81 def Init():
82   """Initializes the module-global HTTP client manager.
83
84   Must be called before using any RPC function and while exactly one thread is
85   running.
86
87   """
88   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
89   # one thread running. This check is just a safety measure -- it doesn't
90   # cover all cases.
91   assert threading.activeCount() == 1, \
92          "Found more than one active thread when initializing pycURL"
93
94   logging.info("Using PycURL %s", pycurl.version)
95
96   pycurl.global_init(pycurl.GLOBAL_ALL)
97
98
99 def Shutdown():
100   """Stops the module-global HTTP client manager.
101
102   Must be called before quitting the program and while exactly one thread is
103   running.
104
105   """
106   pycurl.global_cleanup()
107
108
109 def _ConfigRpcCurl(curl):
110   noded_cert = str(constants.NODED_CERT_FILE)
111
112   curl.setopt(pycurl.FOLLOWLOCATION, False)
113   curl.setopt(pycurl.CAINFO, noded_cert)
114   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
115   curl.setopt(pycurl.SSL_VERIFYPEER, True)
116   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
117   curl.setopt(pycurl.SSLCERT, noded_cert)
118   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
119   curl.setopt(pycurl.SSLKEY, noded_cert)
120   curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
121
122
123 # Aliasing this module avoids the following warning by epydoc: "Warning: No
124 # information available for ganeti.rpc._RpcThreadLocal's base threading.local"
125 _threading = threading
126
127
128 class _RpcThreadLocal(_threading.local):
129   def GetHttpClientPool(self):
130     """Returns a per-thread HTTP client pool.
131
132     @rtype: L{http.client.HttpClientPool}
133
134     """
135     try:
136       pool = self.hcp
137     except AttributeError:
138       pool = http.client.HttpClientPool(_ConfigRpcCurl)
139       self.hcp = pool
140
141     return pool
142
143
144 # Remove module alias (see above)
145 del _threading
146
147
148 _thread_local = _RpcThreadLocal()
149
150
151 def _RpcTimeout(secs):
152   """Timeout decorator.
153
154   When applied to a rpc call_* function, it updates the global timeout
155   table with the given function/timeout.
156
157   """
158   def decorator(f):
159     name = f.__name__
160     assert name.startswith("call_")
161     _TIMEOUTS[name[len("call_"):]] = secs
162     return f
163   return decorator
164
165
166 def RunWithRPC(fn):
167   """RPC-wrapper decorator.
168
169   When applied to a function, it runs it with the RPC system
170   initialized, and it shutsdown the system afterwards. This means the
171   function must be called without RPC being initialized.
172
173   """
174   def wrapper(*args, **kwargs):
175     Init()
176     try:
177       return fn(*args, **kwargs)
178     finally:
179       Shutdown()
180   return wrapper
181
182
183 class RpcResult(object):
184   """RPC Result class.
185
186   This class holds an RPC result. It is needed since in multi-node
187   calls we can't raise an exception just because one one out of many
188   failed, and therefore we use this class to encapsulate the result.
189
190   @ivar data: the data payload, for successful results, or None
191   @ivar call: the name of the RPC call
192   @ivar node: the name of the node to which we made the call
193   @ivar offline: whether the operation failed because the node was
194       offline, as opposed to actual failure; offline=True will always
195       imply failed=True, in order to allow simpler checking if
196       the user doesn't care about the exact failure mode
197   @ivar fail_msg: the error message if the call failed
198
199   """
200   def __init__(self, data=None, failed=False, offline=False,
201                call=None, node=None):
202     self.offline = offline
203     self.call = call
204     self.node = node
205
206     if offline:
207       self.fail_msg = "Node is marked offline"
208       self.data = self.payload = None
209     elif failed:
210       self.fail_msg = self._EnsureErr(data)
211       self.data = self.payload = None
212     else:
213       self.data = data
214       if not isinstance(self.data, (tuple, list)):
215         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
216                          type(self.data))
217         self.payload = None
218       elif len(data) != 2:
219         self.fail_msg = ("RPC layer error: invalid result length (%d), "
220                          "expected 2" % len(self.data))
221         self.payload = None
222       elif not self.data[0]:
223         self.fail_msg = self._EnsureErr(self.data[1])
224         self.payload = None
225       else:
226         # finally success
227         self.fail_msg = None
228         self.payload = data[1]
229
230     for attr_name in ["call", "data", "fail_msg",
231                       "node", "offline", "payload"]:
232       assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
233
234   @staticmethod
235   def _EnsureErr(val):
236     """Helper to ensure we return a 'True' value for error."""
237     if val:
238       return val
239     else:
240       return "No error information"
241
242   def Raise(self, msg, prereq=False, ecode=None):
243     """If the result has failed, raise an OpExecError.
244
245     This is used so that LU code doesn't have to check for each
246     result, but instead can call this function.
247
248     """
249     if not self.fail_msg:
250       return
251
252     if not msg: # one could pass None for default message
253       msg = ("Call '%s' to node '%s' has failed: %s" %
254              (self.call, self.node, self.fail_msg))
255     else:
256       msg = "%s: %s" % (msg, self.fail_msg)
257     if prereq:
258       ec = errors.OpPrereqError
259     else:
260       ec = errors.OpExecError
261     if ecode is not None:
262       args = (msg, ecode)
263     else:
264       args = (msg, )
265     raise ec(*args) # pylint: disable=W0142
266
267
268 def _AddressLookup(node_list,
269                    ssc=ssconf.SimpleStore,
270                    nslookup_fn=netutils.Hostname.GetIP):
271   """Return addresses for given node names.
272
273   @type node_list: list
274   @param node_list: List of node names
275   @type ssc: class
276   @param ssc: SimpleStore class that is used to obtain node->ip mappings
277   @type nslookup_fn: callable
278   @param nslookup_fn: function use to do NS lookup
279   @rtype: list of addresses and/or None's
280   @returns: List of corresponding addresses, if found
281
282   """
283   ss = ssc()
284   iplist = ss.GetNodePrimaryIPList()
285   family = ss.GetPrimaryIPFamily()
286   addresses = []
287   ipmap = dict(entry.split() for entry in iplist)
288   for node in node_list:
289     address = ipmap.get(node)
290     if address is None:
291       address = nslookup_fn(node, family=family)
292     addresses.append(address)
293
294   return addresses
295
296
297 class Client:
298   """RPC Client class.
299
300   This class, given a (remote) method name, a list of parameters and a
301   list of nodes, will contact (in parallel) all nodes, and return a
302   dict of results (key: node name, value: result).
303
304   One current bug is that generic failure is still signaled by
305   'False' result, which is not good. This overloading of values can
306   cause bugs.
307
308   """
309   def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
310     assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
311                                     " timeouts table")
312     self.procedure = procedure
313     self.body = body
314     self.port = port
315     self._request = {}
316     self._address_lookup_fn = address_lookup_fn
317
318   def ConnectList(self, node_list, address_list=None, read_timeout=None):
319     """Add a list of nodes to the target nodes.
320
321     @type node_list: list
322     @param node_list: the list of node names to connect
323     @type address_list: list or None
324     @keyword address_list: either None or a list with node addresses,
325         which must have the same length as the node list
326     @type read_timeout: int
327     @param read_timeout: overwrites default timeout for operation
328
329     """
330     if address_list is None:
331       # Always use IP address instead of node name
332       address_list = self._address_lookup_fn(node_list)
333
334     assert len(node_list) == len(address_list), \
335            "Name and address lists must have the same length"
336
337     for node, address in zip(node_list, address_list):
338       self.ConnectNode(node, address, read_timeout=read_timeout)
339
340   def ConnectNode(self, name, address=None, read_timeout=None):
341     """Add a node to the target list.
342
343     @type name: str
344     @param name: the node name
345     @type address: str
346     @param address: the node address, if known
347     @type read_timeout: int
348     @param read_timeout: overwrites default timeout for operation
349
350     """
351     if address is None:
352       # Always use IP address instead of node name
353       address = self._address_lookup_fn([name])[0]
354
355     assert(address is not None)
356
357     if read_timeout is None:
358       read_timeout = _TIMEOUTS[self.procedure]
359
360     self._request[name] = \
361       http.client.HttpClientRequest(str(address), self.port,
362                                     http.HTTP_PUT, str("/%s" % self.procedure),
363                                     headers=_RPC_CLIENT_HEADERS,
364                                     post_data=str(self.body),
365                                     read_timeout=read_timeout)
366
367   def GetResults(self, http_pool=None):
368     """Call nodes and return results.
369
370     @rtype: list
371     @return: List of RPC results
372
373     """
374     if not http_pool:
375       http_pool = _thread_local.GetHttpClientPool()
376
377     http_pool.ProcessRequests(self._request.values())
378
379     results = {}
380
381     for name, req in self._request.iteritems():
382       if req.success and req.resp_status_code == http.HTTP_OK:
383         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
384                                   node=name, call=self.procedure)
385         continue
386
387       # TODO: Better error reporting
388       if req.error:
389         msg = req.error
390       else:
391         msg = req.resp_body
392
393       logging.error("RPC error in %s from node %s: %s",
394                     self.procedure, name, msg)
395       results[name] = RpcResult(data=msg, failed=True, node=name,
396                                 call=self.procedure)
397
398     return results
399
400
401 def _EncodeImportExportIO(ieio, ieioargs):
402   """Encodes import/export I/O information.
403
404   """
405   if ieio == constants.IEIO_RAW_DISK:
406     assert len(ieioargs) == 1
407     return (ieioargs[0].ToDict(), )
408
409   if ieio == constants.IEIO_SCRIPT:
410     assert len(ieioargs) == 2
411     return (ieioargs[0].ToDict(), ieioargs[1])
412
413   return ieioargs
414
415
416 class RpcRunner(object):
417   """RPC runner class"""
418
419   def __init__(self, cfg):
420     """Initialized the rpc runner.
421
422     @type cfg:  C{config.ConfigWriter}
423     @param cfg: the configuration object that will be used to get data
424                 about the cluster
425
426     """
427     self._cfg = cfg
428     self.port = netutils.GetDaemonPort(constants.NODED)
429
430   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
431     """Convert the given instance to a dict.
432
433     This is done via the instance's ToDict() method and additionally
434     we fill the hvparams with the cluster defaults.
435
436     @type instance: L{objects.Instance}
437     @param instance: an Instance object
438     @type hvp: dict or None
439     @param hvp: a dictionary with overridden hypervisor parameters
440     @type bep: dict or None
441     @param bep: a dictionary with overridden backend parameters
442     @type osp: dict or None
443     @param osp: a dictionary with overridden os parameters
444     @rtype: dict
445     @return: the instance dict, with the hvparams filled with the
446         cluster defaults
447
448     """
449     idict = instance.ToDict()
450     cluster = self._cfg.GetClusterInfo()
451     idict["hvparams"] = cluster.FillHV(instance)
452     if hvp is not None:
453       idict["hvparams"].update(hvp)
454     idict["beparams"] = cluster.FillBE(instance)
455     if bep is not None:
456       idict["beparams"].update(bep)
457     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
458     if osp is not None:
459       idict["osparams"].update(osp)
460     for nic in idict["nics"]:
461       nic['nicparams'] = objects.FillDict(
462         cluster.nicparams[constants.PP_DEFAULT],
463         nic['nicparams'])
464     return idict
465
466   def _ConnectList(self, client, node_list, call, read_timeout=None):
467     """Helper for computing node addresses.
468
469     @type client: L{ganeti.rpc.Client}
470     @param client: a C{Client} instance
471     @type node_list: list
472     @param node_list: the node list we should connect
473     @type call: string
474     @param call: the name of the remote procedure call, for filling in
475         correctly any eventual offline nodes' results
476     @type read_timeout: int
477     @param read_timeout: overwrites the default read timeout for the
478         given operation
479
480     """
481     all_nodes = self._cfg.GetAllNodesInfo()
482     name_list = []
483     addr_list = []
484     skip_dict = {}
485     for node in node_list:
486       if node in all_nodes:
487         if all_nodes[node].offline:
488           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
489           continue
490         val = all_nodes[node].primary_ip
491       else:
492         val = None
493       addr_list.append(val)
494       name_list.append(node)
495     if name_list:
496       client.ConnectList(name_list, address_list=addr_list,
497                          read_timeout=read_timeout)
498     return skip_dict
499
500   def _ConnectNode(self, client, node, call, read_timeout=None):
501     """Helper for computing one node's address.
502
503     @type client: L{ganeti.rpc.Client}
504     @param client: a C{Client} instance
505     @type node: str
506     @param node: the node we should connect
507     @type call: string
508     @param call: the name of the remote procedure call, for filling in
509         correctly any eventual offline nodes' results
510     @type read_timeout: int
511     @param read_timeout: overwrites the default read timeout for the
512         given operation
513
514     """
515     node_info = self._cfg.GetNodeInfo(node)
516     if node_info is not None:
517       if node_info.offline:
518         return RpcResult(node=node, offline=True, call=call)
519       addr = node_info.primary_ip
520     else:
521       addr = None
522     client.ConnectNode(node, address=addr, read_timeout=read_timeout)
523
524   def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
525     """Helper for making a multi-node call
526
527     """
528     body = serializer.DumpJson(args, indent=False)
529     c = Client(procedure, body, self.port)
530     skip_dict = self._ConnectList(c, node_list, procedure,
531                                   read_timeout=read_timeout)
532     skip_dict.update(c.GetResults())
533     return skip_dict
534
535   @classmethod
536   def _StaticMultiNodeCall(cls, node_list, procedure, args,
537                            address_list=None, read_timeout=None):
538     """Helper for making a multi-node static call
539
540     """
541     body = serializer.DumpJson(args, indent=False)
542     c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
543     c.ConnectList(node_list, address_list=address_list,
544                   read_timeout=read_timeout)
545     return c.GetResults()
546
547   def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
548     """Helper for making a single-node call
549
550     """
551     body = serializer.DumpJson(args, indent=False)
552     c = Client(procedure, body, self.port)
553     result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
554     if result is None:
555       # we did connect, node is not offline
556       result = c.GetResults()[node]
557     return result
558
559   @classmethod
560   def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
561     """Helper for making a single-node static call
562
563     """
564     body = serializer.DumpJson(args, indent=False)
565     c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
566     c.ConnectNode(node, read_timeout=read_timeout)
567     return c.GetResults()[node]
568
569   @staticmethod
570   def _Compress(data):
571     """Compresses a string for transport over RPC.
572
573     Small amounts of data are not compressed.
574
575     @type data: str
576     @param data: Data
577     @rtype: tuple
578     @return: Encoded data to send
579
580     """
581     # Small amounts of data are not compressed
582     if len(data) < 512:
583       return (constants.RPC_ENCODING_NONE, data)
584
585     # Compress with zlib and encode in base64
586     return (constants.RPC_ENCODING_ZLIB_BASE64,
587             base64.b64encode(zlib.compress(data, 3)))
588
589   #
590   # Begin RPC calls
591   #
592
593   @_RpcTimeout(_TMO_URGENT)
594   def call_bdev_sizes(self, node_list, devices):
595     """Gets the sizes of requested block devices present on a node
596
597     This is a multi-node call.
598
599     """
600     return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
601
602   @_RpcTimeout(_TMO_URGENT)
603   def call_lv_list(self, node_list, vg_name):
604     """Gets the logical volumes present in a given volume group.
605
606     This is a multi-node call.
607
608     """
609     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
610
611   @_RpcTimeout(_TMO_URGENT)
612   def call_vg_list(self, node_list):
613     """Gets the volume group list.
614
615     This is a multi-node call.
616
617     """
618     return self._MultiNodeCall(node_list, "vg_list", [])
619
620   @_RpcTimeout(_TMO_NORMAL)
621   def call_storage_list(self, node_list, su_name, su_args, name, fields):
622     """Get list of storage units.
623
624     This is a multi-node call.
625
626     """
627     return self._MultiNodeCall(node_list, "storage_list",
628                                [su_name, su_args, name, fields])
629
630   @_RpcTimeout(_TMO_NORMAL)
631   def call_storage_modify(self, node, su_name, su_args, name, changes):
632     """Modify a storage unit.
633
634     This is a single-node call.
635
636     """
637     return self._SingleNodeCall(node, "storage_modify",
638                                 [su_name, su_args, name, changes])
639
640   @_RpcTimeout(_TMO_NORMAL)
641   def call_storage_execute(self, node, su_name, su_args, name, op):
642     """Executes an operation on a storage unit.
643
644     This is a single-node call.
645
646     """
647     return self._SingleNodeCall(node, "storage_execute",
648                                 [su_name, su_args, name, op])
649
650   @_RpcTimeout(_TMO_URGENT)
651   def call_bridges_exist(self, node, bridges_list):
652     """Checks if a node has all the bridges given.
653
654     This method checks if all bridges given in the bridges_list are
655     present on the remote node, so that an instance that uses interfaces
656     on those bridges can be started.
657
658     This is a single-node call.
659
660     """
661     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
662
663   @_RpcTimeout(_TMO_NORMAL)
664   def call_instance_start(self, node, instance, hvp, bep, startup_paused):
665     """Starts an instance.
666
667     This is a single-node call.
668
669     """
670     idict = self._InstDict(instance, hvp=hvp, bep=bep)
671     return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
672
673   @_RpcTimeout(_TMO_NORMAL)
674   def call_instance_shutdown(self, node, instance, timeout):
675     """Stops an instance.
676
677     This is a single-node call.
678
679     """
680     return self._SingleNodeCall(node, "instance_shutdown",
681                                 [self._InstDict(instance), timeout])
682
683   @_RpcTimeout(_TMO_NORMAL)
684   def call_migration_info(self, node, instance):
685     """Gather the information necessary to prepare an instance migration.
686
687     This is a single-node call.
688
689     @type node: string
690     @param node: the node on which the instance is currently running
691     @type instance: C{objects.Instance}
692     @param instance: the instance definition
693
694     """
695     return self._SingleNodeCall(node, "migration_info",
696                                 [self._InstDict(instance)])
697
698   @_RpcTimeout(_TMO_NORMAL)
699   def call_accept_instance(self, node, instance, info, target):
700     """Prepare a node to accept an instance.
701
702     This is a single-node call.
703
704     @type node: string
705     @param node: the target node for the migration
706     @type instance: C{objects.Instance}
707     @param instance: the instance definition
708     @type info: opaque/hypervisor specific (string/data)
709     @param info: result for the call_migration_info call
710     @type target: string
711     @param target: target hostname (usually ip address) (on the node itself)
712
713     """
714     return self._SingleNodeCall(node, "accept_instance",
715                                 [self._InstDict(instance), info, target])
716
717   @_RpcTimeout(_TMO_NORMAL)
718   def call_finalize_migration(self, node, instance, info, success):
719     """Finalize any target-node migration specific operation.
720
721     This is called both in case of a successful migration and in case of error
722     (in which case it should abort the migration).
723
724     This is a single-node call.
725
726     @type node: string
727     @param node: the target node for the migration
728     @type instance: C{objects.Instance}
729     @param instance: the instance definition
730     @type info: opaque/hypervisor specific (string/data)
731     @param info: result for the call_migration_info call
732     @type success: boolean
733     @param success: whether the migration was a success or a failure
734
735     """
736     return self._SingleNodeCall(node, "finalize_migration",
737                                 [self._InstDict(instance), info, success])
738
739   @_RpcTimeout(_TMO_SLOW)
740   def call_instance_migrate(self, node, instance, target, live):
741     """Migrate an instance.
742
743     This is a single-node call.
744
745     @type node: string
746     @param node: the node on which the instance is currently running
747     @type instance: C{objects.Instance}
748     @param instance: the instance definition
749     @type target: string
750     @param target: the target node name
751     @type live: boolean
752     @param live: whether the migration should be done live or not (the
753         interpretation of this parameter is left to the hypervisor)
754
755     """
756     return self._SingleNodeCall(node, "instance_migrate",
757                                 [self._InstDict(instance), target, live])
758
759   @_RpcTimeout(_TMO_NORMAL)
760   def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
761     """Reboots an instance.
762
763     This is a single-node call.
764
765     """
766     return self._SingleNodeCall(node, "instance_reboot",
767                                 [self._InstDict(inst), reboot_type,
768                                  shutdown_timeout])
769
770   @_RpcTimeout(_TMO_1DAY)
771   def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
772     """Installs an OS on the given instance.
773
774     This is a single-node call.
775
776     """
777     return self._SingleNodeCall(node, "instance_os_add",
778                                 [self._InstDict(inst, osp=osparams),
779                                  reinstall, debug])
780
781   @_RpcTimeout(_TMO_SLOW)
782   def call_instance_run_rename(self, node, inst, old_name, debug):
783     """Run the OS rename script for an instance.
784
785     This is a single-node call.
786
787     """
788     return self._SingleNodeCall(node, "instance_run_rename",
789                                 [self._InstDict(inst), old_name, debug])
790
791   @_RpcTimeout(_TMO_URGENT)
792   def call_instance_info(self, node, instance, hname):
793     """Returns information about a single instance.
794
795     This is a single-node call.
796
797     @type node: list
798     @param node: the list of nodes to query
799     @type instance: string
800     @param instance: the instance name
801     @type hname: string
802     @param hname: the hypervisor type of the instance
803
804     """
805     return self._SingleNodeCall(node, "instance_info", [instance, hname])
806
807   @_RpcTimeout(_TMO_NORMAL)
808   def call_instance_migratable(self, node, instance):
809     """Checks whether the given instance can be migrated.
810
811     This is a single-node call.
812
813     @param node: the node to query
814     @type instance: L{objects.Instance}
815     @param instance: the instance to check
816
817
818     """
819     return self._SingleNodeCall(node, "instance_migratable",
820                                 [self._InstDict(instance)])
821
822   @_RpcTimeout(_TMO_URGENT)
823   def call_all_instances_info(self, node_list, hypervisor_list):
824     """Returns information about all instances on the given nodes.
825
826     This is a multi-node call.
827
828     @type node_list: list
829     @param node_list: the list of nodes to query
830     @type hypervisor_list: list
831     @param hypervisor_list: the hypervisors to query for instances
832
833     """
834     return self._MultiNodeCall(node_list, "all_instances_info",
835                                [hypervisor_list])
836
837   @_RpcTimeout(_TMO_URGENT)
838   def call_instance_list(self, node_list, hypervisor_list):
839     """Returns the list of running instances on a given node.
840
841     This is a multi-node call.
842
843     @type node_list: list
844     @param node_list: the list of nodes to query
845     @type hypervisor_list: list
846     @param hypervisor_list: the hypervisors to query for instances
847
848     """
849     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
850
851   @_RpcTimeout(_TMO_FAST)
852   def call_node_tcp_ping(self, node, source, target, port, timeout,
853                          live_port_needed):
854     """Do a TcpPing on the remote node
855
856     This is a single-node call.
857
858     """
859     return self._SingleNodeCall(node, "node_tcp_ping",
860                                 [source, target, port, timeout,
861                                  live_port_needed])
862
863   @_RpcTimeout(_TMO_FAST)
864   def call_node_has_ip_address(self, node, address):
865     """Checks if a node has the given IP address.
866
867     This is a single-node call.
868
869     """
870     return self._SingleNodeCall(node, "node_has_ip_address", [address])
871
872   @_RpcTimeout(_TMO_URGENT)
873   def call_node_info(self, node_list, vg_name, hypervisor_type):
874     """Return node information.
875
876     This will return memory information and volume group size and free
877     space.
878
879     This is a multi-node call.
880
881     @type node_list: list
882     @param node_list: the list of nodes to query
883     @type vg_name: C{string}
884     @param vg_name: the name of the volume group to ask for disk space
885         information
886     @type hypervisor_type: C{str}
887     @param hypervisor_type: the name of the hypervisor to ask for
888         memory information
889
890     """
891     return self._MultiNodeCall(node_list, "node_info",
892                                [vg_name, hypervisor_type])
893
894   @_RpcTimeout(_TMO_NORMAL)
895   def call_etc_hosts_modify(self, node, mode, name, ip):
896     """Modify hosts file with name
897
898     @type node: string
899     @param node: The node to call
900     @type mode: string
901     @param mode: The mode to operate. Currently "add" or "remove"
902     @type name: string
903     @param name: The host name to be modified
904     @type ip: string
905     @param ip: The ip of the entry (just valid if mode is "add")
906
907     """
908     return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
909
910   @_RpcTimeout(_TMO_NORMAL)
911   def call_node_verify(self, node_list, checkdict, cluster_name):
912     """Request verification of given parameters.
913
914     This is a multi-node call.
915
916     """
917     return self._MultiNodeCall(node_list, "node_verify",
918                                [checkdict, cluster_name])
919
920   @classmethod
921   @_RpcTimeout(_TMO_FAST)
922   def call_node_start_master(cls, node, start_daemons, no_voting):
923     """Tells a node to activate itself as a master.
924
925     This is a single-node call.
926
927     """
928     return cls._StaticSingleNodeCall(node, "node_start_master",
929                                      [start_daemons, no_voting])
930
931   @classmethod
932   @_RpcTimeout(_TMO_FAST)
933   def call_node_stop_master(cls, node, stop_daemons):
934     """Tells a node to demote itself from master status.
935
936     This is a single-node call.
937
938     """
939     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
940
941   @classmethod
942   @_RpcTimeout(_TMO_URGENT)
943   def call_master_info(cls, node_list):
944     """Query master info.
945
946     This is a multi-node call.
947
948     """
949     # TODO: should this method query down nodes?
950     return cls._StaticMultiNodeCall(node_list, "master_info", [])
951
952   @classmethod
953   @_RpcTimeout(_TMO_URGENT)
954   def call_version(cls, node_list):
955     """Query node version.
956
957     This is a multi-node call.
958
959     """
960     return cls._StaticMultiNodeCall(node_list, "version", [])
961
962   @_RpcTimeout(_TMO_NORMAL)
963   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
964     """Request creation of a given block device.
965
966     This is a single-node call.
967
968     """
969     return self._SingleNodeCall(node, "blockdev_create",
970                                 [bdev.ToDict(), size, owner, on_primary, info])
971
972   @_RpcTimeout(_TMO_SLOW)
973   def call_blockdev_wipe(self, node, bdev, offset, size):
974     """Request wipe at given offset with given size of a block device.
975
976     This is a single-node call.
977
978     """
979     return self._SingleNodeCall(node, "blockdev_wipe",
980                                 [bdev.ToDict(), offset, size])
981
982   @_RpcTimeout(_TMO_NORMAL)
983   def call_blockdev_remove(self, node, bdev):
984     """Request removal of a given block device.
985
986     This is a single-node call.
987
988     """
989     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
990
991   @_RpcTimeout(_TMO_NORMAL)
992   def call_blockdev_rename(self, node, devlist):
993     """Request rename of the given block devices.
994
995     This is a single-node call.
996
997     """
998     return self._SingleNodeCall(node, "blockdev_rename",
999                                 [(d.ToDict(), uid) for d, uid in devlist])
1000
1001   @_RpcTimeout(_TMO_NORMAL)
1002   def call_blockdev_pause_resume_sync(self, node, disks, pause):
1003     """Request a pause/resume of given block device.
1004
1005     This is a single-node call.
1006
1007     """
1008     return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1009                                 [[bdev.ToDict() for bdev in disks], pause])
1010
1011   @_RpcTimeout(_TMO_NORMAL)
1012   def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1013     """Request assembling of a given block device.
1014
1015     This is a single-node call.
1016
1017     """
1018     return self._SingleNodeCall(node, "blockdev_assemble",
1019                                 [disk.ToDict(), owner, on_primary, idx])
1020
1021   @_RpcTimeout(_TMO_NORMAL)
1022   def call_blockdev_shutdown(self, node, disk):
1023     """Request shutdown of a given block device.
1024
1025     This is a single-node call.
1026
1027     """
1028     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1029
1030   @_RpcTimeout(_TMO_NORMAL)
1031   def call_blockdev_addchildren(self, node, bdev, ndevs):
1032     """Request adding a list of children to a (mirroring) device.
1033
1034     This is a single-node call.
1035
1036     """
1037     return self._SingleNodeCall(node, "blockdev_addchildren",
1038                                 [bdev.ToDict(),
1039                                  [disk.ToDict() for disk in ndevs]])
1040
1041   @_RpcTimeout(_TMO_NORMAL)
1042   def call_blockdev_removechildren(self, node, bdev, ndevs):
1043     """Request removing a list of children from a (mirroring) device.
1044
1045     This is a single-node call.
1046
1047     """
1048     return self._SingleNodeCall(node, "blockdev_removechildren",
1049                                 [bdev.ToDict(),
1050                                  [disk.ToDict() for disk in ndevs]])
1051
1052   @_RpcTimeout(_TMO_NORMAL)
1053   def call_blockdev_getmirrorstatus(self, node, disks):
1054     """Request status of a (mirroring) device.
1055
1056     This is a single-node call.
1057
1058     """
1059     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1060                                   [dsk.ToDict() for dsk in disks])
1061     if not result.fail_msg:
1062       result.payload = [objects.BlockDevStatus.FromDict(i)
1063                         for i in result.payload]
1064     return result
1065
1066   @_RpcTimeout(_TMO_NORMAL)
1067   def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1068     """Request status of (mirroring) devices from multiple nodes.
1069
1070     This is a multi-node call.
1071
1072     """
1073     result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1074                                  [dict((name, [dsk.ToDict() for dsk in disks])
1075                                        for name, disks in node_disks.items())])
1076     for nres in result.values():
1077       if nres.fail_msg:
1078         continue
1079
1080       for idx, (success, status) in enumerate(nres.payload):
1081         if success:
1082           nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1083
1084     return result
1085
1086   @_RpcTimeout(_TMO_NORMAL)
1087   def call_blockdev_find(self, node, disk):
1088     """Request identification of a given block device.
1089
1090     This is a single-node call.
1091
1092     """
1093     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1094     if not result.fail_msg and result.payload is not None:
1095       result.payload = objects.BlockDevStatus.FromDict(result.payload)
1096     return result
1097
1098   @_RpcTimeout(_TMO_NORMAL)
1099   def call_blockdev_close(self, node, instance_name, disks):
1100     """Closes the given block devices.
1101
1102     This is a single-node call.
1103
1104     """
1105     params = [instance_name, [cf.ToDict() for cf in disks]]
1106     return self._SingleNodeCall(node, "blockdev_close", params)
1107
1108   @_RpcTimeout(_TMO_NORMAL)
1109   def call_blockdev_getsize(self, node, disks):
1110     """Returns the size of the given disks.
1111
1112     This is a single-node call.
1113
1114     """
1115     params = [[cf.ToDict() for cf in disks]]
1116     return self._SingleNodeCall(node, "blockdev_getsize", params)
1117
1118   @_RpcTimeout(_TMO_NORMAL)
1119   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1120     """Disconnects the network of the given drbd devices.
1121
1122     This is a multi-node call.
1123
1124     """
1125     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1126                                [nodes_ip, [cf.ToDict() for cf in disks]])
1127
1128   @_RpcTimeout(_TMO_NORMAL)
1129   def call_drbd_attach_net(self, node_list, nodes_ip,
1130                            disks, instance_name, multimaster):
1131     """Disconnects the given drbd devices.
1132
1133     This is a multi-node call.
1134
1135     """
1136     return self._MultiNodeCall(node_list, "drbd_attach_net",
1137                                [nodes_ip, [cf.ToDict() for cf in disks],
1138                                 instance_name, multimaster])
1139
1140   @_RpcTimeout(_TMO_SLOW)
1141   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1142     """Waits for the synchronization of drbd devices is complete.
1143
1144     This is a multi-node call.
1145
1146     """
1147     return self._MultiNodeCall(node_list, "drbd_wait_sync",
1148                                [nodes_ip, [cf.ToDict() for cf in disks]])
1149
1150   @_RpcTimeout(_TMO_URGENT)
1151   def call_drbd_helper(self, node_list):
1152     """Gets drbd helper.
1153
1154     This is a multi-node call.
1155
1156     """
1157     return self._MultiNodeCall(node_list, "drbd_helper", [])
1158
1159   @classmethod
1160   @_RpcTimeout(_TMO_NORMAL)
1161   def call_upload_file(cls, node_list, file_name, address_list=None):
1162     """Upload a file.
1163
1164     The node will refuse the operation in case the file is not on the
1165     approved file list.
1166
1167     This is a multi-node call.
1168
1169     @type node_list: list
1170     @param node_list: the list of node names to upload to
1171     @type file_name: str
1172     @param file_name: the filename to upload
1173     @type address_list: list or None
1174     @keyword address_list: an optional list of node addresses, in order
1175         to optimize the RPC speed
1176
1177     """
1178     file_contents = utils.ReadFile(file_name)
1179     data = cls._Compress(file_contents)
1180     st = os.stat(file_name)
1181     getents = runtime.GetEnts()
1182     params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1183               getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1184     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1185                                     address_list=address_list)
1186
1187   @classmethod
1188   @_RpcTimeout(_TMO_NORMAL)
1189   def call_write_ssconf_files(cls, node_list, values):
1190     """Write ssconf files.
1191
1192     This is a multi-node call.
1193
1194     """
1195     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1196
1197   @_RpcTimeout(_TMO_NORMAL)
1198   def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1199     """Runs OOB.
1200
1201     This is a single-node call.
1202
1203     """
1204     return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1205                                                   remote_node, timeout])
1206
1207   @_RpcTimeout(_TMO_FAST)
1208   def call_os_diagnose(self, node_list):
1209     """Request a diagnose of OS definitions.
1210
1211     This is a multi-node call.
1212
1213     """
1214     return self._MultiNodeCall(node_list, "os_diagnose", [])
1215
1216   @_RpcTimeout(_TMO_FAST)
1217   def call_os_get(self, node, name):
1218     """Returns an OS definition.
1219
1220     This is a single-node call.
1221
1222     """
1223     result = self._SingleNodeCall(node, "os_get", [name])
1224     if not result.fail_msg and isinstance(result.payload, dict):
1225       result.payload = objects.OS.FromDict(result.payload)
1226     return result
1227
1228   @_RpcTimeout(_TMO_FAST)
1229   def call_os_validate(self, required, nodes, name, checks, params):
1230     """Run a validation routine for a given OS.
1231
1232     This is a multi-node call.
1233
1234     """
1235     return self._MultiNodeCall(nodes, "os_validate",
1236                                [required, name, checks, params])
1237
1238   @_RpcTimeout(_TMO_NORMAL)
1239   def call_hooks_runner(self, node_list, hpath, phase, env):
1240     """Call the hooks runner.
1241
1242     Args:
1243       - op: the OpCode instance
1244       - env: a dictionary with the environment
1245
1246     This is a multi-node call.
1247
1248     """
1249     params = [hpath, phase, env]
1250     return self._MultiNodeCall(node_list, "hooks_runner", params)
1251
1252   @_RpcTimeout(_TMO_NORMAL)
1253   def call_iallocator_runner(self, node, name, idata):
1254     """Call an iallocator on a remote node
1255
1256     Args:
1257       - name: the iallocator name
1258       - input: the json-encoded input string
1259
1260     This is a single-node call.
1261
1262     """
1263     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1264
1265   @_RpcTimeout(_TMO_NORMAL)
1266   def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
1267     """Request a snapshot of the given block device.
1268
1269     This is a single-node call.
1270
1271     """
1272     return self._SingleNodeCall(node, "blockdev_grow",
1273                                 [cf_bdev.ToDict(), amount, dryrun])
1274
1275   @_RpcTimeout(_TMO_1DAY)
1276   def call_blockdev_export(self, node, cf_bdev,
1277                            dest_node, dest_path, cluster_name):
1278     """Export a given disk to another node.
1279
1280     This is a single-node call.
1281
1282     """
1283     return self._SingleNodeCall(node, "blockdev_export",
1284                                 [cf_bdev.ToDict(), dest_node, dest_path,
1285                                  cluster_name])
1286
1287   @_RpcTimeout(_TMO_NORMAL)
1288   def call_blockdev_snapshot(self, node, cf_bdev):
1289     """Request a snapshot of the given block device.
1290
1291     This is a single-node call.
1292
1293     """
1294     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1295
1296   @_RpcTimeout(_TMO_NORMAL)
1297   def call_finalize_export(self, node, instance, snap_disks):
1298     """Request the completion of an export operation.
1299
1300     This writes the export config file, etc.
1301
1302     This is a single-node call.
1303
1304     """
1305     flat_disks = []
1306     for disk in snap_disks:
1307       if isinstance(disk, bool):
1308         flat_disks.append(disk)
1309       else:
1310         flat_disks.append(disk.ToDict())
1311
1312     return self._SingleNodeCall(node, "finalize_export",
1313                                 [self._InstDict(instance), flat_disks])
1314
1315   @_RpcTimeout(_TMO_FAST)
1316   def call_export_info(self, node, path):
1317     """Queries the export information in a given path.
1318
1319     This is a single-node call.
1320
1321     """
1322     return self._SingleNodeCall(node, "export_info", [path])
1323
1324   @_RpcTimeout(_TMO_FAST)
1325   def call_export_list(self, node_list):
1326     """Gets the stored exports list.
1327
1328     This is a multi-node call.
1329
1330     """
1331     return self._MultiNodeCall(node_list, "export_list", [])
1332
1333   @_RpcTimeout(_TMO_FAST)
1334   def call_export_remove(self, node, export):
1335     """Requests removal of a given export.
1336
1337     This is a single-node call.
1338
1339     """
1340     return self._SingleNodeCall(node, "export_remove", [export])
1341
1342   @classmethod
1343   @_RpcTimeout(_TMO_NORMAL)
1344   def call_node_leave_cluster(cls, node, modify_ssh_setup):
1345     """Requests a node to clean the cluster information it has.
1346
1347     This will remove the configuration information from the ganeti data
1348     dir.
1349
1350     This is a single-node call.
1351
1352     """
1353     return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1354                                      [modify_ssh_setup])
1355
1356   @_RpcTimeout(_TMO_FAST)
1357   def call_node_volumes(self, node_list):
1358     """Gets all volumes on node(s).
1359
1360     This is a multi-node call.
1361
1362     """
1363     return self._MultiNodeCall(node_list, "node_volumes", [])
1364
1365   @_RpcTimeout(_TMO_FAST)
1366   def call_node_demote_from_mc(self, node):
1367     """Demote a node from the master candidate role.
1368
1369     This is a single-node call.
1370
1371     """
1372     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1373
1374   @_RpcTimeout(_TMO_NORMAL)
1375   def call_node_powercycle(self, node, hypervisor):
1376     """Tries to powercycle a node.
1377
1378     This is a single-node call.
1379
1380     """
1381     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1382
1383   @_RpcTimeout(None)
1384   def call_test_delay(self, node_list, duration):
1385     """Sleep for a fixed time on given node(s).
1386
1387     This is a multi-node call.
1388
1389     """
1390     return self._MultiNodeCall(node_list, "test_delay", [duration],
1391                                read_timeout=int(duration + 5))
1392
1393   @_RpcTimeout(_TMO_FAST)
1394   def call_file_storage_dir_create(self, node, file_storage_dir):
1395     """Create the given file storage directory.
1396
1397     This is a single-node call.
1398
1399     """
1400     return self._SingleNodeCall(node, "file_storage_dir_create",
1401                                 [file_storage_dir])
1402
1403   @_RpcTimeout(_TMO_FAST)
1404   def call_file_storage_dir_remove(self, node, file_storage_dir):
1405     """Remove the given file storage directory.
1406
1407     This is a single-node call.
1408
1409     """
1410     return self._SingleNodeCall(node, "file_storage_dir_remove",
1411                                 [file_storage_dir])
1412
1413   @_RpcTimeout(_TMO_FAST)
1414   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1415                                    new_file_storage_dir):
1416     """Rename file storage directory.
1417
1418     This is a single-node call.
1419
1420     """
1421     return self._SingleNodeCall(node, "file_storage_dir_rename",
1422                                 [old_file_storage_dir, new_file_storage_dir])
1423
1424   @classmethod
1425   @_RpcTimeout(_TMO_URGENT)
1426   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1427     """Update job queue.
1428
1429     This is a multi-node call.
1430
1431     """
1432     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1433                                     [file_name, cls._Compress(content)],
1434                                     address_list=address_list)
1435
1436   @classmethod
1437   @_RpcTimeout(_TMO_NORMAL)
1438   def call_jobqueue_purge(cls, node):
1439     """Purge job queue.
1440
1441     This is a single-node call.
1442
1443     """
1444     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1445
1446   @classmethod
1447   @_RpcTimeout(_TMO_URGENT)
1448   def call_jobqueue_rename(cls, node_list, address_list, rename):
1449     """Rename a job queue file.
1450
1451     This is a multi-node call.
1452
1453     """
1454     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1455                                     address_list=address_list)
1456
1457   @_RpcTimeout(_TMO_NORMAL)
1458   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1459     """Validate the hypervisor params.
1460
1461     This is a multi-node call.
1462
1463     @type node_list: list
1464     @param node_list: the list of nodes to query
1465     @type hvname: string
1466     @param hvname: the hypervisor name
1467     @type hvparams: dict
1468     @param hvparams: the hypervisor parameters to be validated
1469
1470     """
1471     cluster = self._cfg.GetClusterInfo()
1472     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1473     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1474                                [hvname, hv_full])
1475
1476   @_RpcTimeout(_TMO_NORMAL)
1477   def call_x509_cert_create(self, node, validity):
1478     """Creates a new X509 certificate for SSL/TLS.
1479
1480     This is a single-node call.
1481
1482     @type validity: int
1483     @param validity: Validity in seconds
1484
1485     """
1486     return self._SingleNodeCall(node, "x509_cert_create", [validity])
1487
1488   @_RpcTimeout(_TMO_NORMAL)
1489   def call_x509_cert_remove(self, node, name):
1490     """Removes a X509 certificate.
1491
1492     This is a single-node call.
1493
1494     @type name: string
1495     @param name: Certificate name
1496
1497     """
1498     return self._SingleNodeCall(node, "x509_cert_remove", [name])
1499
1500   @_RpcTimeout(_TMO_NORMAL)
1501   def call_import_start(self, node, opts, instance, component,
1502                         dest, dest_args):
1503     """Starts a listener for an import.
1504
1505     This is a single-node call.
1506
1507     @type node: string
1508     @param node: Node name
1509     @type instance: C{objects.Instance}
1510     @param instance: Instance object
1511     @type component: string
1512     @param component: which part of the instance is being imported
1513
1514     """
1515     return self._SingleNodeCall(node, "import_start",
1516                                 [opts.ToDict(),
1517                                  self._InstDict(instance), component, dest,
1518                                  _EncodeImportExportIO(dest, dest_args)])
1519
1520   @_RpcTimeout(_TMO_NORMAL)
1521   def call_export_start(self, node, opts, host, port,
1522                         instance, component, source, source_args):
1523     """Starts an export daemon.
1524
1525     This is a single-node call.
1526
1527     @type node: string
1528     @param node: Node name
1529     @type instance: C{objects.Instance}
1530     @param instance: Instance object
1531     @type component: string
1532     @param component: which part of the instance is being imported
1533
1534     """
1535     return self._SingleNodeCall(node, "export_start",
1536                                 [opts.ToDict(), host, port,
1537                                  self._InstDict(instance),
1538                                  component, source,
1539                                  _EncodeImportExportIO(source, source_args)])
1540
1541   @_RpcTimeout(_TMO_FAST)
1542   def call_impexp_status(self, node, names):
1543     """Gets the status of an import or export.
1544
1545     This is a single-node call.
1546
1547     @type node: string
1548     @param node: Node name
1549     @type names: List of strings
1550     @param names: Import/export names
1551     @rtype: List of L{objects.ImportExportStatus} instances
1552     @return: Returns a list of the state of each named import/export or None if
1553              a status couldn't be retrieved
1554
1555     """
1556     result = self._SingleNodeCall(node, "impexp_status", [names])
1557
1558     if not result.fail_msg:
1559       decoded = []
1560
1561       for i in result.payload:
1562         if i is None:
1563           decoded.append(None)
1564           continue
1565         decoded.append(objects.ImportExportStatus.FromDict(i))
1566
1567       result.payload = decoded
1568
1569     return result
1570
1571   @_RpcTimeout(_TMO_NORMAL)
1572   def call_impexp_abort(self, node, name):
1573     """Aborts an import or export.
1574
1575     This is a single-node call.
1576
1577     @type node: string
1578     @param node: Node name
1579     @type name: string
1580     @param name: Import/export name
1581
1582     """
1583     return self._SingleNodeCall(node, "impexp_abort", [name])
1584
1585   @_RpcTimeout(_TMO_NORMAL)
1586   def call_impexp_cleanup(self, node, name):
1587     """Cleans up after an import or export.
1588
1589     This is a single-node call.
1590
1591     @type node: string
1592     @param node: Node name
1593     @type name: string
1594     @param name: Import/export name
1595
1596     """
1597     return self._SingleNodeCall(node, "impexp_cleanup", [name])