Fix wrong option names in QA and cluster-merge
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37 import pycurl
38 import threading
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49
50 # pylint has a bug here, doesn't see this import
51 import ganeti.http.client  # pylint: disable=W0611
52
53
54 # Timeout for connecting to nodes (seconds)
55 _RPC_CONNECT_TIMEOUT = 5
56
57 _RPC_CLIENT_HEADERS = [
58   "Content-type: %s" % http.HTTP_APP_JSON,
59   "Expect:",
60   ]
61
62 # Various time constants for the timeout table
63 _TMO_URGENT = 60 # one minute
64 _TMO_FAST = 5 * 60 # five minutes
65 _TMO_NORMAL = 15 * 60 # 15 minutes
66 _TMO_SLOW = 3600 # one hour
67 _TMO_4HRS = 4 * 3600
68 _TMO_1DAY = 86400
69
70 # Timeout table that will be built later by decorators
71 # Guidelines for choosing timeouts:
72 # - call used during watcher: timeout -> 1min, _TMO_URGENT
73 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
74 # - other calls: 15 min, _TMO_NORMAL
75 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
76
77 _TIMEOUTS = {
78 }
79
80
81 def Init():
82   """Initializes the module-global HTTP client manager.
83
84   Must be called before using any RPC function and while exactly one thread is
85   running.
86
87   """
88   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
89   # one thread running. This check is just a safety measure -- it doesn't
90   # cover all cases.
91   assert threading.activeCount() == 1, \
92          "Found more than one active thread when initializing pycURL"
93
94   logging.info("Using PycURL %s", pycurl.version)
95
96   pycurl.global_init(pycurl.GLOBAL_ALL)
97
98
99 def Shutdown():
100   """Stops the module-global HTTP client manager.
101
102   Must be called before quitting the program and while exactly one thread is
103   running.
104
105   """
106   pycurl.global_cleanup()
107
108
109 def _ConfigRpcCurl(curl):
110   noded_cert = str(constants.NODED_CERT_FILE)
111
112   curl.setopt(pycurl.FOLLOWLOCATION, False)
113   curl.setopt(pycurl.CAINFO, noded_cert)
114   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
115   curl.setopt(pycurl.SSL_VERIFYPEER, True)
116   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
117   curl.setopt(pycurl.SSLCERT, noded_cert)
118   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
119   curl.setopt(pycurl.SSLKEY, noded_cert)
120   curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
121
122
123 # Aliasing this module avoids the following warning by epydoc: "Warning: No
124 # information available for ganeti.rpc._RpcThreadLocal's base threading.local"
125 _threading = threading
126
127
128 class _RpcThreadLocal(_threading.local):
129   def GetHttpClientPool(self):
130     """Returns a per-thread HTTP client pool.
131
132     @rtype: L{http.client.HttpClientPool}
133
134     """
135     try:
136       pool = self.hcp
137     except AttributeError:
138       pool = http.client.HttpClientPool(_ConfigRpcCurl)
139       self.hcp = pool
140
141     return pool
142
143
144 # Remove module alias (see above)
145 del _threading
146
147
148 _thread_local = _RpcThreadLocal()
149
150
151 def _RpcTimeout(secs):
152   """Timeout decorator.
153
154   When applied to a rpc call_* function, it updates the global timeout
155   table with the given function/timeout.
156
157   """
158   def decorator(f):
159     name = f.__name__
160     assert name.startswith("call_")
161     _TIMEOUTS[name[len("call_"):]] = secs
162     return f
163   return decorator
164
165
166 def RunWithRPC(fn):
167   """RPC-wrapper decorator.
168
169   When applied to a function, it runs it with the RPC system
170   initialized, and it shutsdown the system afterwards. This means the
171   function must be called without RPC being initialized.
172
173   """
174   def wrapper(*args, **kwargs):
175     Init()
176     try:
177       return fn(*args, **kwargs)
178     finally:
179       Shutdown()
180   return wrapper
181
182
183 class RpcResult(object):
184   """RPC Result class.
185
186   This class holds an RPC result. It is needed since in multi-node
187   calls we can't raise an exception just because one one out of many
188   failed, and therefore we use this class to encapsulate the result.
189
190   @ivar data: the data payload, for successful results, or None
191   @ivar call: the name of the RPC call
192   @ivar node: the name of the node to which we made the call
193   @ivar offline: whether the operation failed because the node was
194       offline, as opposed to actual failure; offline=True will always
195       imply failed=True, in order to allow simpler checking if
196       the user doesn't care about the exact failure mode
197   @ivar fail_msg: the error message if the call failed
198
199   """
200   def __init__(self, data=None, failed=False, offline=False,
201                call=None, node=None):
202     self.offline = offline
203     self.call = call
204     self.node = node
205
206     if offline:
207       self.fail_msg = "Node is marked offline"
208       self.data = self.payload = None
209     elif failed:
210       self.fail_msg = self._EnsureErr(data)
211       self.data = self.payload = None
212     else:
213       self.data = data
214       if not isinstance(self.data, (tuple, list)):
215         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
216                          type(self.data))
217         self.payload = None
218       elif len(data) != 2:
219         self.fail_msg = ("RPC layer error: invalid result length (%d), "
220                          "expected 2" % len(self.data))
221         self.payload = None
222       elif not self.data[0]:
223         self.fail_msg = self._EnsureErr(self.data[1])
224         self.payload = None
225       else:
226         # finally success
227         self.fail_msg = None
228         self.payload = data[1]
229
230     for attr_name in ["call", "data", "fail_msg",
231                       "node", "offline", "payload"]:
232       assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
233
234   @staticmethod
235   def _EnsureErr(val):
236     """Helper to ensure we return a 'True' value for error."""
237     if val:
238       return val
239     else:
240       return "No error information"
241
242   def Raise(self, msg, prereq=False, ecode=None):
243     """If the result has failed, raise an OpExecError.
244
245     This is used so that LU code doesn't have to check for each
246     result, but instead can call this function.
247
248     """
249     if not self.fail_msg:
250       return
251
252     if not msg: # one could pass None for default message
253       msg = ("Call '%s' to node '%s' has failed: %s" %
254              (self.call, self.node, self.fail_msg))
255     else:
256       msg = "%s: %s" % (msg, self.fail_msg)
257     if prereq:
258       ec = errors.OpPrereqError
259     else:
260       ec = errors.OpExecError
261     if ecode is not None:
262       args = (msg, ecode)
263     else:
264       args = (msg, )
265     raise ec(*args) # pylint: disable=W0142
266
267
268 def _AddressLookup(node_list,
269                    ssc=ssconf.SimpleStore,
270                    nslookup_fn=netutils.Hostname.GetIP):
271   """Return addresses for given node names.
272
273   @type node_list: list
274   @param node_list: List of node names
275   @type ssc: class
276   @param ssc: SimpleStore class that is used to obtain node->ip mappings
277   @type nslookup_fn: callable
278   @param nslookup_fn: function use to do NS lookup
279   @rtype: list of addresses and/or None's
280   @returns: List of corresponding addresses, if found
281
282   """
283   ss = ssc()
284   iplist = ss.GetNodePrimaryIPList()
285   family = ss.GetPrimaryIPFamily()
286   addresses = []
287   ipmap = dict(entry.split() for entry in iplist)
288   for node in node_list:
289     address = ipmap.get(node)
290     if address is None:
291       address = nslookup_fn(node, family=family)
292     addresses.append(address)
293
294   return addresses
295
296
297 class Client:
298   """RPC Client class.
299
300   This class, given a (remote) method name, a list of parameters and a
301   list of nodes, will contact (in parallel) all nodes, and return a
302   dict of results (key: node name, value: result).
303
304   One current bug is that generic failure is still signaled by
305   'False' result, which is not good. This overloading of values can
306   cause bugs.
307
308   """
309   def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
310     assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
311                                     " timeouts table")
312     self.procedure = procedure
313     self.body = body
314     self.port = port
315     self._request = {}
316     self._address_lookup_fn = address_lookup_fn
317
318   def ConnectList(self, node_list, address_list=None, read_timeout=None):
319     """Add a list of nodes to the target nodes.
320
321     @type node_list: list
322     @param node_list: the list of node names to connect
323     @type address_list: list or None
324     @keyword address_list: either None or a list with node addresses,
325         which must have the same length as the node list
326     @type read_timeout: int
327     @param read_timeout: overwrites default timeout for operation
328
329     """
330     if address_list is None:
331       # Always use IP address instead of node name
332       address_list = self._address_lookup_fn(node_list)
333
334     assert len(node_list) == len(address_list), \
335            "Name and address lists must have the same length"
336
337     for node, address in zip(node_list, address_list):
338       self.ConnectNode(node, address, read_timeout=read_timeout)
339
340   def ConnectNode(self, name, address=None, read_timeout=None):
341     """Add a node to the target list.
342
343     @type name: str
344     @param name: the node name
345     @type address: str
346     @param address: the node address, if known
347     @type read_timeout: int
348     @param read_timeout: overwrites default timeout for operation
349
350     """
351     if address is None:
352       # Always use IP address instead of node name
353       address = self._address_lookup_fn([name])[0]
354
355     assert(address is not None)
356
357     if read_timeout is None:
358       read_timeout = _TIMEOUTS[self.procedure]
359
360     self._request[name] = \
361       http.client.HttpClientRequest(str(address), self.port,
362                                     http.HTTP_PUT, str("/%s" % self.procedure),
363                                     headers=_RPC_CLIENT_HEADERS,
364                                     post_data=str(self.body),
365                                     read_timeout=read_timeout)
366
367   def GetResults(self, http_pool=None):
368     """Call nodes and return results.
369
370     @rtype: list
371     @return: List of RPC results
372
373     """
374     if not http_pool:
375       http_pool = http.client.HttpClientPool(_ConfigRpcCurl)
376
377     http_pool.ProcessRequests(self._request.values())
378
379     results = {}
380
381     for name, req in self._request.iteritems():
382       if req.success and req.resp_status_code == http.HTTP_OK:
383         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
384                                   node=name, call=self.procedure)
385         continue
386
387       # TODO: Better error reporting
388       if req.error:
389         msg = req.error
390       else:
391         msg = req.resp_body
392
393       logging.error("RPC error in %s from node %s: %s",
394                     self.procedure, name, msg)
395       results[name] = RpcResult(data=msg, failed=True, node=name,
396                                 call=self.procedure)
397
398     return results
399
400
401 def _EncodeImportExportIO(ieio, ieioargs):
402   """Encodes import/export I/O information.
403
404   """
405   if ieio == constants.IEIO_RAW_DISK:
406     assert len(ieioargs) == 1
407     return (ieioargs[0].ToDict(), )
408
409   if ieio == constants.IEIO_SCRIPT:
410     assert len(ieioargs) == 2
411     return (ieioargs[0].ToDict(), ieioargs[1])
412
413   return ieioargs
414
415
416 class RpcRunner(object):
417   """RPC runner class"""
418
419   def __init__(self, cfg):
420     """Initialized the rpc runner.
421
422     @type cfg:  C{config.ConfigWriter}
423     @param cfg: the configuration object that will be used to get data
424                 about the cluster
425
426     """
427     self._cfg = cfg
428     self.port = netutils.GetDaemonPort(constants.NODED)
429
430   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
431     """Convert the given instance to a dict.
432
433     This is done via the instance's ToDict() method and additionally
434     we fill the hvparams with the cluster defaults.
435
436     @type instance: L{objects.Instance}
437     @param instance: an Instance object
438     @type hvp: dict or None
439     @param hvp: a dictionary with overridden hypervisor parameters
440     @type bep: dict or None
441     @param bep: a dictionary with overridden backend parameters
442     @type osp: dict or None
443     @param osp: a dictionary with overridden os parameters
444     @rtype: dict
445     @return: the instance dict, with the hvparams filled with the
446         cluster defaults
447
448     """
449     idict = instance.ToDict()
450     cluster = self._cfg.GetClusterInfo()
451     idict["hvparams"] = cluster.FillHV(instance)
452     if hvp is not None:
453       idict["hvparams"].update(hvp)
454     idict["beparams"] = cluster.FillBE(instance)
455     if bep is not None:
456       idict["beparams"].update(bep)
457     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
458     if osp is not None:
459       idict["osparams"].update(osp)
460     for nic in idict["nics"]:
461       nic['nicparams'] = objects.FillDict(
462         cluster.nicparams[constants.PP_DEFAULT],
463         nic['nicparams'])
464     return idict
465
466   def _ConnectList(self, client, node_list, call, read_timeout=None):
467     """Helper for computing node addresses.
468
469     @type client: L{ganeti.rpc.Client}
470     @param client: a C{Client} instance
471     @type node_list: list
472     @param node_list: the node list we should connect
473     @type call: string
474     @param call: the name of the remote procedure call, for filling in
475         correctly any eventual offline nodes' results
476     @type read_timeout: int
477     @param read_timeout: overwrites the default read timeout for the
478         given operation
479
480     """
481     all_nodes = self._cfg.GetAllNodesInfo()
482     name_list = []
483     addr_list = []
484     skip_dict = {}
485     for node in node_list:
486       if node in all_nodes:
487         if all_nodes[node].offline:
488           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
489           continue
490         val = all_nodes[node].primary_ip
491       else:
492         val = None
493       addr_list.append(val)
494       name_list.append(node)
495     if name_list:
496       client.ConnectList(name_list, address_list=addr_list,
497                          read_timeout=read_timeout)
498     return skip_dict
499
500   def _ConnectNode(self, client, node, call, read_timeout=None):
501     """Helper for computing one node's address.
502
503     @type client: L{ganeti.rpc.Client}
504     @param client: a C{Client} instance
505     @type node: str
506     @param node: the node we should connect
507     @type call: string
508     @param call: the name of the remote procedure call, for filling in
509         correctly any eventual offline nodes' results
510     @type read_timeout: int
511     @param read_timeout: overwrites the default read timeout for the
512         given operation
513
514     """
515     node_info = self._cfg.GetNodeInfo(node)
516     if node_info is not None:
517       if node_info.offline:
518         return RpcResult(node=node, offline=True, call=call)
519       addr = node_info.primary_ip
520     else:
521       addr = None
522     client.ConnectNode(node, address=addr, read_timeout=read_timeout)
523
524   def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
525     """Helper for making a multi-node call
526
527     """
528     body = serializer.DumpJson(args, indent=False)
529     c = Client(procedure, body, self.port)
530     skip_dict = self._ConnectList(c, node_list, procedure,
531                                   read_timeout=read_timeout)
532     skip_dict.update(c.GetResults())
533     return skip_dict
534
535   @classmethod
536   def _StaticMultiNodeCall(cls, node_list, procedure, args,
537                            address_list=None, read_timeout=None):
538     """Helper for making a multi-node static call
539
540     """
541     body = serializer.DumpJson(args, indent=False)
542     c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
543     c.ConnectList(node_list, address_list=address_list,
544                   read_timeout=read_timeout)
545     return c.GetResults()
546
547   def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
548     """Helper for making a single-node call
549
550     """
551     body = serializer.DumpJson(args, indent=False)
552     c = Client(procedure, body, self.port)
553     result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
554     if result is None:
555       # we did connect, node is not offline
556       result = c.GetResults()[node]
557     return result
558
559   @classmethod
560   def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
561     """Helper for making a single-node static call
562
563     """
564     body = serializer.DumpJson(args, indent=False)
565     c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
566     c.ConnectNode(node, read_timeout=read_timeout)
567     return c.GetResults()[node]
568
569   @staticmethod
570   def _Compress(data):
571     """Compresses a string for transport over RPC.
572
573     Small amounts of data are not compressed.
574
575     @type data: str
576     @param data: Data
577     @rtype: tuple
578     @return: Encoded data to send
579
580     """
581     # Small amounts of data are not compressed
582     if len(data) < 512:
583       return (constants.RPC_ENCODING_NONE, data)
584
585     # Compress with zlib and encode in base64
586     return (constants.RPC_ENCODING_ZLIB_BASE64,
587             base64.b64encode(zlib.compress(data, 3)))
588
589   #
590   # Begin RPC calls
591   #
592
593   @_RpcTimeout(_TMO_URGENT)
594   def call_bdev_sizes(self, node_list, devices):
595     """Gets the sizes of requested block devices present on a node
596
597     This is a multi-node call.
598
599     """
600     return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
601
602   @_RpcTimeout(_TMO_URGENT)
603   def call_lv_list(self, node_list, vg_name):
604     """Gets the logical volumes present in a given volume group.
605
606     This is a multi-node call.
607
608     """
609     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
610
611   @_RpcTimeout(_TMO_URGENT)
612   def call_vg_list(self, node_list):
613     """Gets the volume group list.
614
615     This is a multi-node call.
616
617     """
618     return self._MultiNodeCall(node_list, "vg_list", [])
619
620   @_RpcTimeout(_TMO_NORMAL)
621   def call_storage_list(self, node_list, su_name, su_args, name, fields):
622     """Get list of storage units.
623
624     This is a multi-node call.
625
626     """
627     return self._MultiNodeCall(node_list, "storage_list",
628                                [su_name, su_args, name, fields])
629
630   @_RpcTimeout(_TMO_NORMAL)
631   def call_storage_modify(self, node, su_name, su_args, name, changes):
632     """Modify a storage unit.
633
634     This is a single-node call.
635
636     """
637     return self._SingleNodeCall(node, "storage_modify",
638                                 [su_name, su_args, name, changes])
639
640   @_RpcTimeout(_TMO_NORMAL)
641   def call_storage_execute(self, node, su_name, su_args, name, op):
642     """Executes an operation on a storage unit.
643
644     This is a single-node call.
645
646     """
647     return self._SingleNodeCall(node, "storage_execute",
648                                 [su_name, su_args, name, op])
649
650   @_RpcTimeout(_TMO_URGENT)
651   def call_bridges_exist(self, node, bridges_list):
652     """Checks if a node has all the bridges given.
653
654     This method checks if all bridges given in the bridges_list are
655     present on the remote node, so that an instance that uses interfaces
656     on those bridges can be started.
657
658     This is a single-node call.
659
660     """
661     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
662
663   @_RpcTimeout(_TMO_NORMAL)
664   def call_instance_start(self, node, instance, hvp, bep, startup_paused):
665     """Starts an instance.
666
667     This is a single-node call.
668
669     """
670     idict = self._InstDict(instance, hvp=hvp, bep=bep)
671     return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
672
673   @_RpcTimeout(_TMO_NORMAL)
674   def call_instance_shutdown(self, node, instance, timeout):
675     """Stops an instance.
676
677     This is a single-node call.
678
679     """
680     return self._SingleNodeCall(node, "instance_shutdown",
681                                 [self._InstDict(instance), timeout])
682
683   @_RpcTimeout(_TMO_NORMAL)
684   def call_migration_info(self, node, instance):
685     """Gather the information necessary to prepare an instance migration.
686
687     This is a single-node call.
688
689     @type node: string
690     @param node: the node on which the instance is currently running
691     @type instance: C{objects.Instance}
692     @param instance: the instance definition
693
694     """
695     return self._SingleNodeCall(node, "migration_info",
696                                 [self._InstDict(instance)])
697
698   @_RpcTimeout(_TMO_NORMAL)
699   def call_accept_instance(self, node, instance, info, target):
700     """Prepare a node to accept an instance.
701
702     This is a single-node call.
703
704     @type node: string
705     @param node: the target node for the migration
706     @type instance: C{objects.Instance}
707     @param instance: the instance definition
708     @type info: opaque/hypervisor specific (string/data)
709     @param info: result for the call_migration_info call
710     @type target: string
711     @param target: target hostname (usually ip address) (on the node itself)
712
713     """
714     return self._SingleNodeCall(node, "accept_instance",
715                                 [self._InstDict(instance), info, target])
716
717   @_RpcTimeout(_TMO_NORMAL)
718   def call_finalize_migration(self, node, instance, info, success):
719     """Finalize any target-node migration specific operation.
720
721     This is called both in case of a successful migration and in case of error
722     (in which case it should abort the migration).
723
724     This is a single-node call.
725
726     @type node: string
727     @param node: the target node for the migration
728     @type instance: C{objects.Instance}
729     @param instance: the instance definition
730     @type info: opaque/hypervisor specific (string/data)
731     @param info: result for the call_migration_info call
732     @type success: boolean
733     @param success: whether the migration was a success or a failure
734
735     """
736     return self._SingleNodeCall(node, "finalize_migration",
737                                 [self._InstDict(instance), info, success])
738
739   @_RpcTimeout(_TMO_SLOW)
740   def call_instance_migrate(self, node, instance, target, live):
741     """Migrate an instance.
742
743     This is a single-node call.
744
745     @type node: string
746     @param node: the node on which the instance is currently running
747     @type instance: C{objects.Instance}
748     @param instance: the instance definition
749     @type target: string
750     @param target: the target node name
751     @type live: boolean
752     @param live: whether the migration should be done live or not (the
753         interpretation of this parameter is left to the hypervisor)
754
755     """
756     return self._SingleNodeCall(node, "instance_migrate",
757                                 [self._InstDict(instance), target, live])
758
759   @_RpcTimeout(_TMO_NORMAL)
760   def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
761     """Reboots an instance.
762
763     This is a single-node call.
764
765     """
766     return self._SingleNodeCall(node, "instance_reboot",
767                                 [self._InstDict(inst), reboot_type,
768                                  shutdown_timeout])
769
770   @_RpcTimeout(_TMO_1DAY)
771   def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
772     """Installs an OS on the given instance.
773
774     This is a single-node call.
775
776     """
777     return self._SingleNodeCall(node, "instance_os_add",
778                                 [self._InstDict(inst, osp=osparams),
779                                  reinstall, debug])
780
781   @_RpcTimeout(_TMO_SLOW)
782   def call_instance_run_rename(self, node, inst, old_name, debug):
783     """Run the OS rename script for an instance.
784
785     This is a single-node call.
786
787     """
788     return self._SingleNodeCall(node, "instance_run_rename",
789                                 [self._InstDict(inst), old_name, debug])
790
791   @_RpcTimeout(_TMO_URGENT)
792   def call_instance_info(self, node, instance, hname):
793     """Returns information about a single instance.
794
795     This is a single-node call.
796
797     @type node: list
798     @param node: the list of nodes to query
799     @type instance: string
800     @param instance: the instance name
801     @type hname: string
802     @param hname: the hypervisor type of the instance
803
804     """
805     return self._SingleNodeCall(node, "instance_info", [instance, hname])
806
807   @_RpcTimeout(_TMO_NORMAL)
808   def call_instance_migratable(self, node, instance):
809     """Checks whether the given instance can be migrated.
810
811     This is a single-node call.
812
813     @param node: the node to query
814     @type instance: L{objects.Instance}
815     @param instance: the instance to check
816
817
818     """
819     return self._SingleNodeCall(node, "instance_migratable",
820                                 [self._InstDict(instance)])
821
822   @_RpcTimeout(_TMO_URGENT)
823   def call_all_instances_info(self, node_list, hypervisor_list):
824     """Returns information about all instances on the given nodes.
825
826     This is a multi-node call.
827
828     @type node_list: list
829     @param node_list: the list of nodes to query
830     @type hypervisor_list: list
831     @param hypervisor_list: the hypervisors to query for instances
832
833     """
834     return self._MultiNodeCall(node_list, "all_instances_info",
835                                [hypervisor_list])
836
837   @_RpcTimeout(_TMO_URGENT)
838   def call_instance_list(self, node_list, hypervisor_list):
839     """Returns the list of running instances on a given node.
840
841     This is a multi-node call.
842
843     @type node_list: list
844     @param node_list: the list of nodes to query
845     @type hypervisor_list: list
846     @param hypervisor_list: the hypervisors to query for instances
847
848     """
849     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
850
851   @_RpcTimeout(_TMO_FAST)
852   def call_node_tcp_ping(self, node, source, target, port, timeout,
853                          live_port_needed):
854     """Do a TcpPing on the remote node
855
856     This is a single-node call.
857
858     """
859     return self._SingleNodeCall(node, "node_tcp_ping",
860                                 [source, target, port, timeout,
861                                  live_port_needed])
862
863   @_RpcTimeout(_TMO_FAST)
864   def call_node_has_ip_address(self, node, address):
865     """Checks if a node has the given IP address.
866
867     This is a single-node call.
868
869     """
870     return self._SingleNodeCall(node, "node_has_ip_address", [address])
871
872   @_RpcTimeout(_TMO_URGENT)
873   def call_node_info(self, node_list, vg_name, hypervisor_type):
874     """Return node information.
875
876     This will return memory information and volume group size and free
877     space.
878
879     This is a multi-node call.
880
881     @type node_list: list
882     @param node_list: the list of nodes to query
883     @type vg_name: C{string}
884     @param vg_name: the name of the volume group to ask for disk space
885         information
886     @type hypervisor_type: C{str}
887     @param hypervisor_type: the name of the hypervisor to ask for
888         memory information
889
890     """
891     return self._MultiNodeCall(node_list, "node_info",
892                                [vg_name, hypervisor_type])
893
894   @_RpcTimeout(_TMO_NORMAL)
895   def call_etc_hosts_modify(self, node, mode, name, ip):
896     """Modify hosts file with name
897
898     @type node: string
899     @param node: The node to call
900     @type mode: string
901     @param mode: The mode to operate. Currently "add" or "remove"
902     @type name: string
903     @param name: The host name to be modified
904     @type ip: string
905     @param ip: The ip of the entry (just valid if mode is "add")
906
907     """
908     return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
909
910   @_RpcTimeout(_TMO_NORMAL)
911   def call_node_verify(self, node_list, checkdict, cluster_name):
912     """Request verification of given parameters.
913
914     This is a multi-node call.
915
916     """
917     return self._MultiNodeCall(node_list, "node_verify",
918                                [checkdict, cluster_name])
919
920   @classmethod
921   @_RpcTimeout(_TMO_FAST)
922   def call_node_start_master_daemons(cls, node, no_voting):
923     """Starts master daemons on a node.
924
925     This is a single-node call.
926
927     """
928     return cls._StaticSingleNodeCall(node, "node_start_master_daemons",
929                                      [no_voting])
930
931   @classmethod
932   @_RpcTimeout(_TMO_FAST)
933   def call_node_activate_master_ip(cls, node):
934     """Activates master IP on a node.
935
936     This is a single-node call.
937
938     """
939     return cls._StaticSingleNodeCall(node, "node_activate_master_ip", [])
940
941   @classmethod
942   @_RpcTimeout(_TMO_FAST)
943   def call_node_stop_master(cls, node):
944     """Deactivates master IP and stops master daemons on a node.
945
946     This is a single-node call.
947
948     """
949     return cls._StaticSingleNodeCall(node, "node_stop_master", [])
950
951   @classmethod
952   @_RpcTimeout(_TMO_FAST)
953   def call_node_deactivate_master_ip(cls, node):
954     """Deactivates master IP on a node.
955
956     This is a single-node call.
957
958     """
959     return cls._StaticSingleNodeCall(node, "node_deactivate_master_ip", [])
960
961   @classmethod
962   @_RpcTimeout(_TMO_URGENT)
963   def call_master_info(cls, node_list):
964     """Query master info.
965
966     This is a multi-node call.
967
968     """
969     # TODO: should this method query down nodes?
970     return cls._StaticMultiNodeCall(node_list, "master_info", [])
971
972   @classmethod
973   @_RpcTimeout(_TMO_URGENT)
974   def call_version(cls, node_list):
975     """Query node version.
976
977     This is a multi-node call.
978
979     """
980     return cls._StaticMultiNodeCall(node_list, "version", [])
981
982   @_RpcTimeout(_TMO_NORMAL)
983   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
984     """Request creation of a given block device.
985
986     This is a single-node call.
987
988     """
989     return self._SingleNodeCall(node, "blockdev_create",
990                                 [bdev.ToDict(), size, owner, on_primary, info])
991
992   @_RpcTimeout(_TMO_SLOW)
993   def call_blockdev_wipe(self, node, bdev, offset, size):
994     """Request wipe at given offset with given size of a block device.
995
996     This is a single-node call.
997
998     """
999     return self._SingleNodeCall(node, "blockdev_wipe",
1000                                 [bdev.ToDict(), offset, size])
1001
1002   @_RpcTimeout(_TMO_NORMAL)
1003   def call_blockdev_remove(self, node, bdev):
1004     """Request removal of a given block device.
1005
1006     This is a single-node call.
1007
1008     """
1009     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
1010
1011   @_RpcTimeout(_TMO_NORMAL)
1012   def call_blockdev_rename(self, node, devlist):
1013     """Request rename of the given block devices.
1014
1015     This is a single-node call.
1016
1017     """
1018     return self._SingleNodeCall(node, "blockdev_rename",
1019                                 [(d.ToDict(), uid) for d, uid in devlist])
1020
1021   @_RpcTimeout(_TMO_NORMAL)
1022   def call_blockdev_pause_resume_sync(self, node, disks, pause):
1023     """Request a pause/resume of given block device.
1024
1025     This is a single-node call.
1026
1027     """
1028     return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1029                                 [[bdev.ToDict() for bdev in disks], pause])
1030
1031   @_RpcTimeout(_TMO_NORMAL)
1032   def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1033     """Request assembling of a given block device.
1034
1035     This is a single-node call.
1036
1037     """
1038     return self._SingleNodeCall(node, "blockdev_assemble",
1039                                 [disk.ToDict(), owner, on_primary, idx])
1040
1041   @_RpcTimeout(_TMO_NORMAL)
1042   def call_blockdev_shutdown(self, node, disk):
1043     """Request shutdown of a given block device.
1044
1045     This is a single-node call.
1046
1047     """
1048     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1049
1050   @_RpcTimeout(_TMO_NORMAL)
1051   def call_blockdev_addchildren(self, node, bdev, ndevs):
1052     """Request adding a list of children to a (mirroring) device.
1053
1054     This is a single-node call.
1055
1056     """
1057     return self._SingleNodeCall(node, "blockdev_addchildren",
1058                                 [bdev.ToDict(),
1059                                  [disk.ToDict() for disk in ndevs]])
1060
1061   @_RpcTimeout(_TMO_NORMAL)
1062   def call_blockdev_removechildren(self, node, bdev, ndevs):
1063     """Request removing a list of children from a (mirroring) device.
1064
1065     This is a single-node call.
1066
1067     """
1068     return self._SingleNodeCall(node, "blockdev_removechildren",
1069                                 [bdev.ToDict(),
1070                                  [disk.ToDict() for disk in ndevs]])
1071
1072   @_RpcTimeout(_TMO_NORMAL)
1073   def call_blockdev_getmirrorstatus(self, node, disks):
1074     """Request status of a (mirroring) device.
1075
1076     This is a single-node call.
1077
1078     """
1079     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1080                                   [dsk.ToDict() for dsk in disks])
1081     if not result.fail_msg:
1082       result.payload = [objects.BlockDevStatus.FromDict(i)
1083                         for i in result.payload]
1084     return result
1085
1086   @_RpcTimeout(_TMO_NORMAL)
1087   def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1088     """Request status of (mirroring) devices from multiple nodes.
1089
1090     This is a multi-node call.
1091
1092     """
1093     result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1094                                  [dict((name, [dsk.ToDict() for dsk in disks])
1095                                        for name, disks in node_disks.items())])
1096     for nres in result.values():
1097       if nres.fail_msg:
1098         continue
1099
1100       for idx, (success, status) in enumerate(nres.payload):
1101         if success:
1102           nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1103
1104     return result
1105
1106   @_RpcTimeout(_TMO_NORMAL)
1107   def call_blockdev_find(self, node, disk):
1108     """Request identification of a given block device.
1109
1110     This is a single-node call.
1111
1112     """
1113     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1114     if not result.fail_msg and result.payload is not None:
1115       result.payload = objects.BlockDevStatus.FromDict(result.payload)
1116     return result
1117
1118   @_RpcTimeout(_TMO_NORMAL)
1119   def call_blockdev_close(self, node, instance_name, disks):
1120     """Closes the given block devices.
1121
1122     This is a single-node call.
1123
1124     """
1125     params = [instance_name, [cf.ToDict() for cf in disks]]
1126     return self._SingleNodeCall(node, "blockdev_close", params)
1127
1128   @_RpcTimeout(_TMO_NORMAL)
1129   def call_blockdev_getsize(self, node, disks):
1130     """Returns the size of the given disks.
1131
1132     This is a single-node call.
1133
1134     """
1135     params = [[cf.ToDict() for cf in disks]]
1136     return self._SingleNodeCall(node, "blockdev_getsize", params)
1137
1138   @_RpcTimeout(_TMO_NORMAL)
1139   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1140     """Disconnects the network of the given drbd devices.
1141
1142     This is a multi-node call.
1143
1144     """
1145     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1146                                [nodes_ip, [cf.ToDict() for cf in disks]])
1147
1148   @_RpcTimeout(_TMO_NORMAL)
1149   def call_drbd_attach_net(self, node_list, nodes_ip,
1150                            disks, instance_name, multimaster):
1151     """Disconnects the given drbd devices.
1152
1153     This is a multi-node call.
1154
1155     """
1156     return self._MultiNodeCall(node_list, "drbd_attach_net",
1157                                [nodes_ip, [cf.ToDict() for cf in disks],
1158                                 instance_name, multimaster])
1159
1160   @_RpcTimeout(_TMO_SLOW)
1161   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1162     """Waits for the synchronization of drbd devices is complete.
1163
1164     This is a multi-node call.
1165
1166     """
1167     return self._MultiNodeCall(node_list, "drbd_wait_sync",
1168                                [nodes_ip, [cf.ToDict() for cf in disks]])
1169
1170   @_RpcTimeout(_TMO_URGENT)
1171   def call_drbd_helper(self, node_list):
1172     """Gets drbd helper.
1173
1174     This is a multi-node call.
1175
1176     """
1177     return self._MultiNodeCall(node_list, "drbd_helper", [])
1178
1179   @classmethod
1180   @_RpcTimeout(_TMO_NORMAL)
1181   def call_upload_file(cls, node_list, file_name, address_list=None):
1182     """Upload a file.
1183
1184     The node will refuse the operation in case the file is not on the
1185     approved file list.
1186
1187     This is a multi-node call.
1188
1189     @type node_list: list
1190     @param node_list: the list of node names to upload to
1191     @type file_name: str
1192     @param file_name: the filename to upload
1193     @type address_list: list or None
1194     @keyword address_list: an optional list of node addresses, in order
1195         to optimize the RPC speed
1196
1197     """
1198     file_contents = utils.ReadFile(file_name)
1199     data = cls._Compress(file_contents)
1200     st = os.stat(file_name)
1201     getents = runtime.GetEnts()
1202     params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1203               getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1204     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1205                                     address_list=address_list)
1206
1207   @classmethod
1208   @_RpcTimeout(_TMO_NORMAL)
1209   def call_write_ssconf_files(cls, node_list, values):
1210     """Write ssconf files.
1211
1212     This is a multi-node call.
1213
1214     """
1215     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1216
1217   @_RpcTimeout(_TMO_NORMAL)
1218   def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1219     """Runs OOB.
1220
1221     This is a single-node call.
1222
1223     """
1224     return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1225                                                   remote_node, timeout])
1226
1227   @_RpcTimeout(_TMO_FAST)
1228   def call_os_diagnose(self, node_list):
1229     """Request a diagnose of OS definitions.
1230
1231     This is a multi-node call.
1232
1233     """
1234     return self._MultiNodeCall(node_list, "os_diagnose", [])
1235
1236   @_RpcTimeout(_TMO_FAST)
1237   def call_os_get(self, node, name):
1238     """Returns an OS definition.
1239
1240     This is a single-node call.
1241
1242     """
1243     result = self._SingleNodeCall(node, "os_get", [name])
1244     if not result.fail_msg and isinstance(result.payload, dict):
1245       result.payload = objects.OS.FromDict(result.payload)
1246     return result
1247
1248   @_RpcTimeout(_TMO_FAST)
1249   def call_os_validate(self, required, nodes, name, checks, params):
1250     """Run a validation routine for a given OS.
1251
1252     This is a multi-node call.
1253
1254     """
1255     return self._MultiNodeCall(nodes, "os_validate",
1256                                [required, name, checks, params])
1257
1258   @_RpcTimeout(_TMO_NORMAL)
1259   def call_hooks_runner(self, node_list, hpath, phase, env):
1260     """Call the hooks runner.
1261
1262     Args:
1263       - op: the OpCode instance
1264       - env: a dictionary with the environment
1265
1266     This is a multi-node call.
1267
1268     """
1269     params = [hpath, phase, env]
1270     return self._MultiNodeCall(node_list, "hooks_runner", params)
1271
1272   @_RpcTimeout(_TMO_NORMAL)
1273   def call_iallocator_runner(self, node, name, idata):
1274     """Call an iallocator on a remote node
1275
1276     Args:
1277       - name: the iallocator name
1278       - input: the json-encoded input string
1279
1280     This is a single-node call.
1281
1282     """
1283     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1284
1285   @_RpcTimeout(_TMO_NORMAL)
1286   def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
1287     """Request a snapshot of the given block device.
1288
1289     This is a single-node call.
1290
1291     """
1292     return self._SingleNodeCall(node, "blockdev_grow",
1293                                 [cf_bdev.ToDict(), amount, dryrun])
1294
1295   @_RpcTimeout(_TMO_1DAY)
1296   def call_blockdev_export(self, node, cf_bdev,
1297                            dest_node, dest_path, cluster_name):
1298     """Export a given disk to another node.
1299
1300     This is a single-node call.
1301
1302     """
1303     return self._SingleNodeCall(node, "blockdev_export",
1304                                 [cf_bdev.ToDict(), dest_node, dest_path,
1305                                  cluster_name])
1306
1307   @_RpcTimeout(_TMO_NORMAL)
1308   def call_blockdev_snapshot(self, node, cf_bdev):
1309     """Request a snapshot of the given block device.
1310
1311     This is a single-node call.
1312
1313     """
1314     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1315
1316   @_RpcTimeout(_TMO_NORMAL)
1317   def call_finalize_export(self, node, instance, snap_disks):
1318     """Request the completion of an export operation.
1319
1320     This writes the export config file, etc.
1321
1322     This is a single-node call.
1323
1324     """
1325     flat_disks = []
1326     for disk in snap_disks:
1327       if isinstance(disk, bool):
1328         flat_disks.append(disk)
1329       else:
1330         flat_disks.append(disk.ToDict())
1331
1332     return self._SingleNodeCall(node, "finalize_export",
1333                                 [self._InstDict(instance), flat_disks])
1334
1335   @_RpcTimeout(_TMO_FAST)
1336   def call_export_info(self, node, path):
1337     """Queries the export information in a given path.
1338
1339     This is a single-node call.
1340
1341     """
1342     return self._SingleNodeCall(node, "export_info", [path])
1343
1344   @_RpcTimeout(_TMO_FAST)
1345   def call_export_list(self, node_list):
1346     """Gets the stored exports list.
1347
1348     This is a multi-node call.
1349
1350     """
1351     return self._MultiNodeCall(node_list, "export_list", [])
1352
1353   @_RpcTimeout(_TMO_FAST)
1354   def call_export_remove(self, node, export):
1355     """Requests removal of a given export.
1356
1357     This is a single-node call.
1358
1359     """
1360     return self._SingleNodeCall(node, "export_remove", [export])
1361
1362   @classmethod
1363   @_RpcTimeout(_TMO_NORMAL)
1364   def call_node_leave_cluster(cls, node, modify_ssh_setup):
1365     """Requests a node to clean the cluster information it has.
1366
1367     This will remove the configuration information from the ganeti data
1368     dir.
1369
1370     This is a single-node call.
1371
1372     """
1373     return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1374                                      [modify_ssh_setup])
1375
1376   @_RpcTimeout(_TMO_FAST)
1377   def call_node_volumes(self, node_list):
1378     """Gets all volumes on node(s).
1379
1380     This is a multi-node call.
1381
1382     """
1383     return self._MultiNodeCall(node_list, "node_volumes", [])
1384
1385   @_RpcTimeout(_TMO_FAST)
1386   def call_node_demote_from_mc(self, node):
1387     """Demote a node from the master candidate role.
1388
1389     This is a single-node call.
1390
1391     """
1392     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1393
1394   @_RpcTimeout(_TMO_NORMAL)
1395   def call_node_powercycle(self, node, hypervisor):
1396     """Tries to powercycle a node.
1397
1398     This is a single-node call.
1399
1400     """
1401     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1402
1403   @_RpcTimeout(None)
1404   def call_test_delay(self, node_list, duration):
1405     """Sleep for a fixed time on given node(s).
1406
1407     This is a multi-node call.
1408
1409     """
1410     return self._MultiNodeCall(node_list, "test_delay", [duration],
1411                                read_timeout=int(duration + 5))
1412
1413   @_RpcTimeout(_TMO_FAST)
1414   def call_file_storage_dir_create(self, node, file_storage_dir):
1415     """Create the given file storage directory.
1416
1417     This is a single-node call.
1418
1419     """
1420     return self._SingleNodeCall(node, "file_storage_dir_create",
1421                                 [file_storage_dir])
1422
1423   @_RpcTimeout(_TMO_FAST)
1424   def call_file_storage_dir_remove(self, node, file_storage_dir):
1425     """Remove the given file storage directory.
1426
1427     This is a single-node call.
1428
1429     """
1430     return self._SingleNodeCall(node, "file_storage_dir_remove",
1431                                 [file_storage_dir])
1432
1433   @_RpcTimeout(_TMO_FAST)
1434   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1435                                    new_file_storage_dir):
1436     """Rename file storage directory.
1437
1438     This is a single-node call.
1439
1440     """
1441     return self._SingleNodeCall(node, "file_storage_dir_rename",
1442                                 [old_file_storage_dir, new_file_storage_dir])
1443
1444   @classmethod
1445   @_RpcTimeout(_TMO_URGENT)
1446   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1447     """Update job queue.
1448
1449     This is a multi-node call.
1450
1451     """
1452     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1453                                     [file_name, cls._Compress(content)],
1454                                     address_list=address_list)
1455
1456   @classmethod
1457   @_RpcTimeout(_TMO_NORMAL)
1458   def call_jobqueue_purge(cls, node):
1459     """Purge job queue.
1460
1461     This is a single-node call.
1462
1463     """
1464     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1465
1466   @classmethod
1467   @_RpcTimeout(_TMO_URGENT)
1468   def call_jobqueue_rename(cls, node_list, address_list, rename):
1469     """Rename a job queue file.
1470
1471     This is a multi-node call.
1472
1473     """
1474     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1475                                     address_list=address_list)
1476
1477   @_RpcTimeout(_TMO_NORMAL)
1478   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1479     """Validate the hypervisor params.
1480
1481     This is a multi-node call.
1482
1483     @type node_list: list
1484     @param node_list: the list of nodes to query
1485     @type hvname: string
1486     @param hvname: the hypervisor name
1487     @type hvparams: dict
1488     @param hvparams: the hypervisor parameters to be validated
1489
1490     """
1491     cluster = self._cfg.GetClusterInfo()
1492     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1493     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1494                                [hvname, hv_full])
1495
1496   @_RpcTimeout(_TMO_NORMAL)
1497   def call_x509_cert_create(self, node, validity):
1498     """Creates a new X509 certificate for SSL/TLS.
1499
1500     This is a single-node call.
1501
1502     @type validity: int
1503     @param validity: Validity in seconds
1504
1505     """
1506     return self._SingleNodeCall(node, "x509_cert_create", [validity])
1507
1508   @_RpcTimeout(_TMO_NORMAL)
1509   def call_x509_cert_remove(self, node, name):
1510     """Removes a X509 certificate.
1511
1512     This is a single-node call.
1513
1514     @type name: string
1515     @param name: Certificate name
1516
1517     """
1518     return self._SingleNodeCall(node, "x509_cert_remove", [name])
1519
1520   @_RpcTimeout(_TMO_NORMAL)
1521   def call_import_start(self, node, opts, instance, component,
1522                         dest, dest_args):
1523     """Starts a listener for an import.
1524
1525     This is a single-node call.
1526
1527     @type node: string
1528     @param node: Node name
1529     @type instance: C{objects.Instance}
1530     @param instance: Instance object
1531     @type component: string
1532     @param component: which part of the instance is being imported
1533
1534     """
1535     return self._SingleNodeCall(node, "import_start",
1536                                 [opts.ToDict(),
1537                                  self._InstDict(instance), component, dest,
1538                                  _EncodeImportExportIO(dest, dest_args)])
1539
1540   @_RpcTimeout(_TMO_NORMAL)
1541   def call_export_start(self, node, opts, host, port,
1542                         instance, component, source, source_args):
1543     """Starts an export daemon.
1544
1545     This is a single-node call.
1546
1547     @type node: string
1548     @param node: Node name
1549     @type instance: C{objects.Instance}
1550     @param instance: Instance object
1551     @type component: string
1552     @param component: which part of the instance is being imported
1553
1554     """
1555     return self._SingleNodeCall(node, "export_start",
1556                                 [opts.ToDict(), host, port,
1557                                  self._InstDict(instance),
1558                                  component, source,
1559                                  _EncodeImportExportIO(source, source_args)])
1560
1561   @_RpcTimeout(_TMO_FAST)
1562   def call_impexp_status(self, node, names):
1563     """Gets the status of an import or export.
1564
1565     This is a single-node call.
1566
1567     @type node: string
1568     @param node: Node name
1569     @type names: List of strings
1570     @param names: Import/export names
1571     @rtype: List of L{objects.ImportExportStatus} instances
1572     @return: Returns a list of the state of each named import/export or None if
1573              a status couldn't be retrieved
1574
1575     """
1576     result = self._SingleNodeCall(node, "impexp_status", [names])
1577
1578     if not result.fail_msg:
1579       decoded = []
1580
1581       for i in result.payload:
1582         if i is None:
1583           decoded.append(None)
1584           continue
1585         decoded.append(objects.ImportExportStatus.FromDict(i))
1586
1587       result.payload = decoded
1588
1589     return result
1590
1591   @_RpcTimeout(_TMO_NORMAL)
1592   def call_impexp_abort(self, node, name):
1593     """Aborts an import or export.
1594
1595     This is a single-node call.
1596
1597     @type node: string
1598     @param node: Node name
1599     @type name: string
1600     @param name: Import/export name
1601
1602     """
1603     return self._SingleNodeCall(node, "impexp_abort", [name])
1604
1605   @_RpcTimeout(_TMO_NORMAL)
1606   def call_impexp_cleanup(self, node, name):
1607     """Cleans up after an import or export.
1608
1609     This is a single-node call.
1610
1611     @type node: string
1612     @param node: Node name
1613     @type name: string
1614     @param name: Import/export name
1615
1616     """
1617     return self._SingleNodeCall(node, "impexp_cleanup", [name])