Add OS parameters to cluster and instance objects
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
44
45 # pylint has a bug here, doesn't see this import
46 import ganeti.http.client  # pylint: disable-msg=W0611
47
48
49 # Module level variable
50 _http_manager = None
51
52 # Various time constants for the timeout table
53 _TMO_URGENT = 60 # one minute
54 _TMO_FAST = 5 * 60 # five minutes
55 _TMO_NORMAL = 15 * 60 # 15 minutes
56 _TMO_SLOW = 3600 # one hour
57 _TMO_4HRS = 4 * 3600
58 _TMO_1DAY = 86400
59
60 # Timeout table that will be built later by decorators
61 # Guidelines for choosing timeouts:
62 # - call used during watcher: timeout -> 1min, _TMO_URGENT
63 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
64 # - other calls: 15 min, _TMO_NORMAL
65 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
66
67 _TIMEOUTS = {
68 }
69
70
71 def Init():
72   """Initializes the module-global HTTP client manager.
73
74   Must be called before using any RPC function.
75
76   """
77   global _http_manager # pylint: disable-msg=W0603
78
79   assert not _http_manager, "RPC module initialized more than once"
80
81   http.InitSsl()
82
83   _http_manager = http.client.HttpClientManager()
84
85
86 def Shutdown():
87   """Stops the module-global HTTP client manager.
88
89   Must be called before quitting the program.
90
91   """
92   global _http_manager # pylint: disable-msg=W0603
93
94   if _http_manager:
95     _http_manager.Shutdown()
96     _http_manager = None
97
98
99 def _RpcTimeout(secs):
100   """Timeout decorator.
101
102   When applied to a rpc call_* function, it updates the global timeout
103   table with the given function/timeout.
104
105   """
106   def decorator(f):
107     name = f.__name__
108     assert name.startswith("call_")
109     _TIMEOUTS[name[len("call_"):]] = secs
110     return f
111   return decorator
112
113
114 class RpcResult(object):
115   """RPC Result class.
116
117   This class holds an RPC result. It is needed since in multi-node
118   calls we can't raise an exception just because one one out of many
119   failed, and therefore we use this class to encapsulate the result.
120
121   @ivar data: the data payload, for successful results, or None
122   @ivar call: the name of the RPC call
123   @ivar node: the name of the node to which we made the call
124   @ivar offline: whether the operation failed because the node was
125       offline, as opposed to actual failure; offline=True will always
126       imply failed=True, in order to allow simpler checking if
127       the user doesn't care about the exact failure mode
128   @ivar fail_msg: the error message if the call failed
129
130   """
131   def __init__(self, data=None, failed=False, offline=False,
132                call=None, node=None):
133     self.offline = offline
134     self.call = call
135     self.node = node
136
137     if offline:
138       self.fail_msg = "Node is marked offline"
139       self.data = self.payload = None
140     elif failed:
141       self.fail_msg = self._EnsureErr(data)
142       self.data = self.payload = None
143     else:
144       self.data = data
145       if not isinstance(self.data, (tuple, list)):
146         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
147                          type(self.data))
148         self.payload = None
149       elif len(data) != 2:
150         self.fail_msg = ("RPC layer error: invalid result length (%d), "
151                          "expected 2" % len(self.data))
152         self.payload = None
153       elif not self.data[0]:
154         self.fail_msg = self._EnsureErr(self.data[1])
155         self.payload = None
156       else:
157         # finally success
158         self.fail_msg = None
159         self.payload = data[1]
160
161     assert hasattr(self, "call")
162     assert hasattr(self, "data")
163     assert hasattr(self, "fail_msg")
164     assert hasattr(self, "node")
165     assert hasattr(self, "offline")
166     assert hasattr(self, "payload")
167
168   @staticmethod
169   def _EnsureErr(val):
170     """Helper to ensure we return a 'True' value for error."""
171     if val:
172       return val
173     else:
174       return "No error information"
175
176   def Raise(self, msg, prereq=False, ecode=None):
177     """If the result has failed, raise an OpExecError.
178
179     This is used so that LU code doesn't have to check for each
180     result, but instead can call this function.
181
182     """
183     if not self.fail_msg:
184       return
185
186     if not msg: # one could pass None for default message
187       msg = ("Call '%s' to node '%s' has failed: %s" %
188              (self.call, self.node, self.fail_msg))
189     else:
190       msg = "%s: %s" % (msg, self.fail_msg)
191     if prereq:
192       ec = errors.OpPrereqError
193     else:
194       ec = errors.OpExecError
195     if ecode is not None:
196       args = (msg, ecode)
197     else:
198       args = (msg, )
199     raise ec(*args) # pylint: disable-msg=W0142
200
201
202 class Client:
203   """RPC Client class.
204
205   This class, given a (remote) method name, a list of parameters and a
206   list of nodes, will contact (in parallel) all nodes, and return a
207   dict of results (key: node name, value: result).
208
209   One current bug is that generic failure is still signaled by
210   'False' result, which is not good. This overloading of values can
211   cause bugs.
212
213   """
214   def __init__(self, procedure, body, port):
215     assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
216                                     " timeouts table")
217     self.procedure = procedure
218     self.body = body
219     self.port = port
220     self.nc = {}
221
222     self._ssl_params = \
223       http.HttpSslParams(ssl_key_path=constants.NODED_CERT_FILE,
224                          ssl_cert_path=constants.NODED_CERT_FILE)
225
226   def ConnectList(self, node_list, address_list=None, read_timeout=None):
227     """Add a list of nodes to the target nodes.
228
229     @type node_list: list
230     @param node_list: the list of node names to connect
231     @type address_list: list or None
232     @keyword address_list: either None or a list with node addresses,
233         which must have the same length as the node list
234     @type read_timeout: int
235     @param read_timeout: overwrites the default read timeout for the
236         given operation
237
238     """
239     if address_list is None:
240       address_list = [None for _ in node_list]
241     else:
242       assert len(node_list) == len(address_list), \
243              "Name and address lists should have the same length"
244     for node, address in zip(node_list, address_list):
245       self.ConnectNode(node, address, read_timeout=read_timeout)
246
247   def ConnectNode(self, name, address=None, read_timeout=None):
248     """Add a node to the target list.
249
250     @type name: str
251     @param name: the node name
252     @type address: str
253     @keyword address: the node address, if known
254
255     """
256     if address is None:
257       address = name
258
259     if read_timeout is None:
260       read_timeout = _TIMEOUTS[self.procedure]
261
262     self.nc[name] = \
263       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
264                                     "/%s" % self.procedure,
265                                     post_data=self.body,
266                                     ssl_params=self._ssl_params,
267                                     ssl_verify_peer=True,
268                                     read_timeout=read_timeout)
269
270   def GetResults(self):
271     """Call nodes and return results.
272
273     @rtype: list
274     @return: List of RPC results
275
276     """
277     assert _http_manager, "RPC module not initialized"
278
279     _http_manager.ExecRequests(self.nc.values())
280
281     results = {}
282
283     for name, req in self.nc.iteritems():
284       if req.success and req.resp_status_code == http.HTTP_OK:
285         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
286                                   node=name, call=self.procedure)
287         continue
288
289       # TODO: Better error reporting
290       if req.error:
291         msg = req.error
292       else:
293         msg = req.resp_body
294
295       logging.error("RPC error in %s from node %s: %s",
296                     self.procedure, name, msg)
297       results[name] = RpcResult(data=msg, failed=True, node=name,
298                                 call=self.procedure)
299
300     return results
301
302
303 def _EncodeImportExportIO(ieio, ieioargs):
304   """Encodes import/export I/O information.
305
306   """
307   if ieio == constants.IEIO_RAW_DISK:
308     assert len(ieioargs) == 1
309     return (ieioargs[0].ToDict(), )
310
311   if ieio == constants.IEIO_SCRIPT:
312     assert len(ieioargs) == 2
313     return (ieioargs[0].ToDict(), ieioargs[1])
314
315   return ieioargs
316
317
318 class RpcRunner(object):
319   """RPC runner class"""
320
321   def __init__(self, cfg):
322     """Initialized the rpc runner.
323
324     @type cfg:  C{config.ConfigWriter}
325     @param cfg: the configuration object that will be used to get data
326                 about the cluster
327
328     """
329     self._cfg = cfg
330     self.port = utils.GetDaemonPort(constants.NODED)
331
332   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
333     """Convert the given instance to a dict.
334
335     This is done via the instance's ToDict() method and additionally
336     we fill the hvparams with the cluster defaults.
337
338     @type instance: L{objects.Instance}
339     @param instance: an Instance object
340     @type hvp: dict or None
341     @param hvp: a dictionary with overridden hypervisor parameters
342     @type bep: dict or None
343     @param bep: a dictionary with overridden backend parameters
344     @type osp: dict or None
345     @param osp: a dictionary with overriden os parameters
346     @rtype: dict
347     @return: the instance dict, with the hvparams filled with the
348         cluster defaults
349
350     """
351     idict = instance.ToDict()
352     cluster = self._cfg.GetClusterInfo()
353     idict["hvparams"] = cluster.FillHV(instance)
354     if hvp is not None:
355       idict["hvparams"].update(hvp)
356     idict["beparams"] = cluster.FillBE(instance)
357     if bep is not None:
358       idict["beparams"].update(bep)
359     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
360     if osp is not None:
361       idict["osparams"].update(osp)
362     for nic in idict["nics"]:
363       nic['nicparams'] = objects.FillDict(
364         cluster.nicparams[constants.PP_DEFAULT],
365         nic['nicparams'])
366     return idict
367
368   def _ConnectList(self, client, node_list, call, read_timeout=None):
369     """Helper for computing node addresses.
370
371     @type client: L{ganeti.rpc.Client}
372     @param client: a C{Client} instance
373     @type node_list: list
374     @param node_list: the node list we should connect
375     @type call: string
376     @param call: the name of the remote procedure call, for filling in
377         correctly any eventual offline nodes' results
378     @type read_timeout: int
379     @param read_timeout: overwrites the default read timeout for the
380         given operation
381
382     """
383     all_nodes = self._cfg.GetAllNodesInfo()
384     name_list = []
385     addr_list = []
386     skip_dict = {}
387     for node in node_list:
388       if node in all_nodes:
389         if all_nodes[node].offline:
390           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
391           continue
392         val = all_nodes[node].primary_ip
393       else:
394         val = None
395       addr_list.append(val)
396       name_list.append(node)
397     if name_list:
398       client.ConnectList(name_list, address_list=addr_list,
399                          read_timeout=read_timeout)
400     return skip_dict
401
402   def _ConnectNode(self, client, node, call, read_timeout=None):
403     """Helper for computing one node's address.
404
405     @type client: L{ganeti.rpc.Client}
406     @param client: a C{Client} instance
407     @type node: str
408     @param node: the node we should connect
409     @type call: string
410     @param call: the name of the remote procedure call, for filling in
411         correctly any eventual offline nodes' results
412     @type read_timeout: int
413     @param read_timeout: overwrites the default read timeout for the
414         given operation
415
416     """
417     node_info = self._cfg.GetNodeInfo(node)
418     if node_info is not None:
419       if node_info.offline:
420         return RpcResult(node=node, offline=True, call=call)
421       addr = node_info.primary_ip
422     else:
423       addr = None
424     client.ConnectNode(node, address=addr, read_timeout=read_timeout)
425
426   def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
427     """Helper for making a multi-node call
428
429     """
430     body = serializer.DumpJson(args, indent=False)
431     c = Client(procedure, body, self.port)
432     skip_dict = self._ConnectList(c, node_list, procedure,
433                                   read_timeout=read_timeout)
434     skip_dict.update(c.GetResults())
435     return skip_dict
436
437   @classmethod
438   def _StaticMultiNodeCall(cls, node_list, procedure, args,
439                            address_list=None, read_timeout=None):
440     """Helper for making a multi-node static call
441
442     """
443     body = serializer.DumpJson(args, indent=False)
444     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
445     c.ConnectList(node_list, address_list=address_list,
446                   read_timeout=read_timeout)
447     return c.GetResults()
448
449   def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
450     """Helper for making a single-node call
451
452     """
453     body = serializer.DumpJson(args, indent=False)
454     c = Client(procedure, body, self.port)
455     result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
456     if result is None:
457       # we did connect, node is not offline
458       result = c.GetResults()[node]
459     return result
460
461   @classmethod
462   def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
463     """Helper for making a single-node static call
464
465     """
466     body = serializer.DumpJson(args, indent=False)
467     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
468     c.ConnectNode(node, read_timeout=read_timeout)
469     return c.GetResults()[node]
470
471   @staticmethod
472   def _Compress(data):
473     """Compresses a string for transport over RPC.
474
475     Small amounts of data are not compressed.
476
477     @type data: str
478     @param data: Data
479     @rtype: tuple
480     @return: Encoded data to send
481
482     """
483     # Small amounts of data are not compressed
484     if len(data) < 512:
485       return (constants.RPC_ENCODING_NONE, data)
486
487     # Compress with zlib and encode in base64
488     return (constants.RPC_ENCODING_ZLIB_BASE64,
489             base64.b64encode(zlib.compress(data, 3)))
490
491   #
492   # Begin RPC calls
493   #
494
495   @_RpcTimeout(_TMO_URGENT)
496   def call_lv_list(self, node_list, vg_name):
497     """Gets the logical volumes present in a given volume group.
498
499     This is a multi-node call.
500
501     """
502     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
503
504   @_RpcTimeout(_TMO_URGENT)
505   def call_vg_list(self, node_list):
506     """Gets the volume group list.
507
508     This is a multi-node call.
509
510     """
511     return self._MultiNodeCall(node_list, "vg_list", [])
512
513   @_RpcTimeout(_TMO_NORMAL)
514   def call_storage_list(self, node_list, su_name, su_args, name, fields):
515     """Get list of storage units.
516
517     This is a multi-node call.
518
519     """
520     return self._MultiNodeCall(node_list, "storage_list",
521                                [su_name, su_args, name, fields])
522
523   @_RpcTimeout(_TMO_NORMAL)
524   def call_storage_modify(self, node, su_name, su_args, name, changes):
525     """Modify a storage unit.
526
527     This is a single-node call.
528
529     """
530     return self._SingleNodeCall(node, "storage_modify",
531                                 [su_name, su_args, name, changes])
532
533   @_RpcTimeout(_TMO_NORMAL)
534   def call_storage_execute(self, node, su_name, su_args, name, op):
535     """Executes an operation on a storage unit.
536
537     This is a single-node call.
538
539     """
540     return self._SingleNodeCall(node, "storage_execute",
541                                 [su_name, su_args, name, op])
542
543   @_RpcTimeout(_TMO_URGENT)
544   def call_bridges_exist(self, node, bridges_list):
545     """Checks if a node has all the bridges given.
546
547     This method checks if all bridges given in the bridges_list are
548     present on the remote node, so that an instance that uses interfaces
549     on those bridges can be started.
550
551     This is a single-node call.
552
553     """
554     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
555
556   @_RpcTimeout(_TMO_NORMAL)
557   def call_instance_start(self, node, instance, hvp, bep):
558     """Starts an instance.
559
560     This is a single-node call.
561
562     """
563     idict = self._InstDict(instance, hvp=hvp, bep=bep)
564     return self._SingleNodeCall(node, "instance_start", [idict])
565
566   @_RpcTimeout(_TMO_NORMAL)
567   def call_instance_shutdown(self, node, instance, timeout):
568     """Stops an instance.
569
570     This is a single-node call.
571
572     """
573     return self._SingleNodeCall(node, "instance_shutdown",
574                                 [self._InstDict(instance), timeout])
575
576   @_RpcTimeout(_TMO_NORMAL)
577   def call_migration_info(self, node, instance):
578     """Gather the information necessary to prepare an instance migration.
579
580     This is a single-node call.
581
582     @type node: string
583     @param node: the node on which the instance is currently running
584     @type instance: C{objects.Instance}
585     @param instance: the instance definition
586
587     """
588     return self._SingleNodeCall(node, "migration_info",
589                                 [self._InstDict(instance)])
590
591   @_RpcTimeout(_TMO_NORMAL)
592   def call_accept_instance(self, node, instance, info, target):
593     """Prepare a node to accept an instance.
594
595     This is a single-node call.
596
597     @type node: string
598     @param node: the target node for the migration
599     @type instance: C{objects.Instance}
600     @param instance: the instance definition
601     @type info: opaque/hypervisor specific (string/data)
602     @param info: result for the call_migration_info call
603     @type target: string
604     @param target: target hostname (usually ip address) (on the node itself)
605
606     """
607     return self._SingleNodeCall(node, "accept_instance",
608                                 [self._InstDict(instance), info, target])
609
610   @_RpcTimeout(_TMO_NORMAL)
611   def call_finalize_migration(self, node, instance, info, success):
612     """Finalize any target-node migration specific operation.
613
614     This is called both in case of a successful migration and in case of error
615     (in which case it should abort the migration).
616
617     This is a single-node call.
618
619     @type node: string
620     @param node: the target node for the migration
621     @type instance: C{objects.Instance}
622     @param instance: the instance definition
623     @type info: opaque/hypervisor specific (string/data)
624     @param info: result for the call_migration_info call
625     @type success: boolean
626     @param success: whether the migration was a success or a failure
627
628     """
629     return self._SingleNodeCall(node, "finalize_migration",
630                                 [self._InstDict(instance), info, success])
631
632   @_RpcTimeout(_TMO_SLOW)
633   def call_instance_migrate(self, node, instance, target, live):
634     """Migrate an instance.
635
636     This is a single-node call.
637
638     @type node: string
639     @param node: the node on which the instance is currently running
640     @type instance: C{objects.Instance}
641     @param instance: the instance definition
642     @type target: string
643     @param target: the target node name
644     @type live: boolean
645     @param live: whether the migration should be done live or not (the
646         interpretation of this parameter is left to the hypervisor)
647
648     """
649     return self._SingleNodeCall(node, "instance_migrate",
650                                 [self._InstDict(instance), target, live])
651
652   @_RpcTimeout(_TMO_NORMAL)
653   def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
654     """Reboots an instance.
655
656     This is a single-node call.
657
658     """
659     return self._SingleNodeCall(node, "instance_reboot",
660                                 [self._InstDict(inst), reboot_type,
661                                  shutdown_timeout])
662
663   @_RpcTimeout(_TMO_1DAY)
664   def call_instance_os_add(self, node, inst, reinstall, debug):
665     """Installs an OS on the given instance.
666
667     This is a single-node call.
668
669     """
670     return self._SingleNodeCall(node, "instance_os_add",
671                                 [self._InstDict(inst), reinstall, debug])
672
673   @_RpcTimeout(_TMO_SLOW)
674   def call_instance_run_rename(self, node, inst, old_name, debug):
675     """Run the OS rename script for an instance.
676
677     This is a single-node call.
678
679     """
680     return self._SingleNodeCall(node, "instance_run_rename",
681                                 [self._InstDict(inst), old_name, debug])
682
683   @_RpcTimeout(_TMO_URGENT)
684   def call_instance_info(self, node, instance, hname):
685     """Returns information about a single instance.
686
687     This is a single-node call.
688
689     @type node: list
690     @param node: the list of nodes to query
691     @type instance: string
692     @param instance: the instance name
693     @type hname: string
694     @param hname: the hypervisor type of the instance
695
696     """
697     return self._SingleNodeCall(node, "instance_info", [instance, hname])
698
699   @_RpcTimeout(_TMO_NORMAL)
700   def call_instance_migratable(self, node, instance):
701     """Checks whether the given instance can be migrated.
702
703     This is a single-node call.
704
705     @param node: the node to query
706     @type instance: L{objects.Instance}
707     @param instance: the instance to check
708
709
710     """
711     return self._SingleNodeCall(node, "instance_migratable",
712                                 [self._InstDict(instance)])
713
714   @_RpcTimeout(_TMO_URGENT)
715   def call_all_instances_info(self, node_list, hypervisor_list):
716     """Returns information about all instances on the given nodes.
717
718     This is a multi-node call.
719
720     @type node_list: list
721     @param node_list: the list of nodes to query
722     @type hypervisor_list: list
723     @param hypervisor_list: the hypervisors to query for instances
724
725     """
726     return self._MultiNodeCall(node_list, "all_instances_info",
727                                [hypervisor_list])
728
729   @_RpcTimeout(_TMO_URGENT)
730   def call_instance_list(self, node_list, hypervisor_list):
731     """Returns the list of running instances on a given node.
732
733     This is a multi-node call.
734
735     @type node_list: list
736     @param node_list: the list of nodes to query
737     @type hypervisor_list: list
738     @param hypervisor_list: the hypervisors to query for instances
739
740     """
741     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
742
743   @_RpcTimeout(_TMO_FAST)
744   def call_node_tcp_ping(self, node, source, target, port, timeout,
745                          live_port_needed):
746     """Do a TcpPing on the remote node
747
748     This is a single-node call.
749
750     """
751     return self._SingleNodeCall(node, "node_tcp_ping",
752                                 [source, target, port, timeout,
753                                  live_port_needed])
754
755   @_RpcTimeout(_TMO_FAST)
756   def call_node_has_ip_address(self, node, address):
757     """Checks if a node has the given IP address.
758
759     This is a single-node call.
760
761     """
762     return self._SingleNodeCall(node, "node_has_ip_address", [address])
763
764   @_RpcTimeout(_TMO_URGENT)
765   def call_node_info(self, node_list, vg_name, hypervisor_type):
766     """Return node information.
767
768     This will return memory information and volume group size and free
769     space.
770
771     This is a multi-node call.
772
773     @type node_list: list
774     @param node_list: the list of nodes to query
775     @type vg_name: C{string}
776     @param vg_name: the name of the volume group to ask for disk space
777         information
778     @type hypervisor_type: C{str}
779     @param hypervisor_type: the name of the hypervisor to ask for
780         memory information
781
782     """
783     return self._MultiNodeCall(node_list, "node_info",
784                                [vg_name, hypervisor_type])
785
786   @_RpcTimeout(_TMO_NORMAL)
787   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
788     """Add a node to the cluster.
789
790     This is a single-node call.
791
792     """
793     return self._SingleNodeCall(node, "node_add",
794                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
795
796   @_RpcTimeout(_TMO_NORMAL)
797   def call_node_verify(self, node_list, checkdict, cluster_name):
798     """Request verification of given parameters.
799
800     This is a multi-node call.
801
802     """
803     return self._MultiNodeCall(node_list, "node_verify",
804                                [checkdict, cluster_name])
805
806   @classmethod
807   @_RpcTimeout(_TMO_FAST)
808   def call_node_start_master(cls, node, start_daemons, no_voting):
809     """Tells a node to activate itself as a master.
810
811     This is a single-node call.
812
813     """
814     return cls._StaticSingleNodeCall(node, "node_start_master",
815                                      [start_daemons, no_voting])
816
817   @classmethod
818   @_RpcTimeout(_TMO_FAST)
819   def call_node_stop_master(cls, node, stop_daemons):
820     """Tells a node to demote itself from master status.
821
822     This is a single-node call.
823
824     """
825     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
826
827   @classmethod
828   @_RpcTimeout(_TMO_URGENT)
829   def call_master_info(cls, node_list):
830     """Query master info.
831
832     This is a multi-node call.
833
834     """
835     # TODO: should this method query down nodes?
836     return cls._StaticMultiNodeCall(node_list, "master_info", [])
837
838   @classmethod
839   @_RpcTimeout(_TMO_URGENT)
840   def call_version(cls, node_list):
841     """Query node version.
842
843     This is a multi-node call.
844
845     """
846     return cls._StaticMultiNodeCall(node_list, "version", [])
847
848   @_RpcTimeout(_TMO_NORMAL)
849   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
850     """Request creation of a given block device.
851
852     This is a single-node call.
853
854     """
855     return self._SingleNodeCall(node, "blockdev_create",
856                                 [bdev.ToDict(), size, owner, on_primary, info])
857
858   @_RpcTimeout(_TMO_NORMAL)
859   def call_blockdev_remove(self, node, bdev):
860     """Request removal of a given block device.
861
862     This is a single-node call.
863
864     """
865     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
866
867   @_RpcTimeout(_TMO_NORMAL)
868   def call_blockdev_rename(self, node, devlist):
869     """Request rename of the given block devices.
870
871     This is a single-node call.
872
873     """
874     return self._SingleNodeCall(node, "blockdev_rename",
875                                 [(d.ToDict(), uid) for d, uid in devlist])
876
877   @_RpcTimeout(_TMO_NORMAL)
878   def call_blockdev_assemble(self, node, disk, owner, on_primary):
879     """Request assembling of a given block device.
880
881     This is a single-node call.
882
883     """
884     return self._SingleNodeCall(node, "blockdev_assemble",
885                                 [disk.ToDict(), owner, on_primary])
886
887   @_RpcTimeout(_TMO_NORMAL)
888   def call_blockdev_shutdown(self, node, disk):
889     """Request shutdown of a given block device.
890
891     This is a single-node call.
892
893     """
894     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
895
896   @_RpcTimeout(_TMO_NORMAL)
897   def call_blockdev_addchildren(self, node, bdev, ndevs):
898     """Request adding a list of children to a (mirroring) device.
899
900     This is a single-node call.
901
902     """
903     return self._SingleNodeCall(node, "blockdev_addchildren",
904                                 [bdev.ToDict(),
905                                  [disk.ToDict() for disk in ndevs]])
906
907   @_RpcTimeout(_TMO_NORMAL)
908   def call_blockdev_removechildren(self, node, bdev, ndevs):
909     """Request removing a list of children from a (mirroring) device.
910
911     This is a single-node call.
912
913     """
914     return self._SingleNodeCall(node, "blockdev_removechildren",
915                                 [bdev.ToDict(),
916                                  [disk.ToDict() for disk in ndevs]])
917
918   @_RpcTimeout(_TMO_NORMAL)
919   def call_blockdev_getmirrorstatus(self, node, disks):
920     """Request status of a (mirroring) device.
921
922     This is a single-node call.
923
924     """
925     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
926                                   [dsk.ToDict() for dsk in disks])
927     if not result.fail_msg:
928       result.payload = [objects.BlockDevStatus.FromDict(i)
929                         for i in result.payload]
930     return result
931
932   @_RpcTimeout(_TMO_NORMAL)
933   def call_blockdev_find(self, node, disk):
934     """Request identification of a given block device.
935
936     This is a single-node call.
937
938     """
939     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
940     if not result.fail_msg and result.payload is not None:
941       result.payload = objects.BlockDevStatus.FromDict(result.payload)
942     return result
943
944   @_RpcTimeout(_TMO_NORMAL)
945   def call_blockdev_close(self, node, instance_name, disks):
946     """Closes the given block devices.
947
948     This is a single-node call.
949
950     """
951     params = [instance_name, [cf.ToDict() for cf in disks]]
952     return self._SingleNodeCall(node, "blockdev_close", params)
953
954   @_RpcTimeout(_TMO_NORMAL)
955   def call_blockdev_getsizes(self, node, disks):
956     """Returns the size of the given disks.
957
958     This is a single-node call.
959
960     """
961     params = [[cf.ToDict() for cf in disks]]
962     return self._SingleNodeCall(node, "blockdev_getsize", params)
963
964   @_RpcTimeout(_TMO_NORMAL)
965   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
966     """Disconnects the network of the given drbd devices.
967
968     This is a multi-node call.
969
970     """
971     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
972                                [nodes_ip, [cf.ToDict() for cf in disks]])
973
974   @_RpcTimeout(_TMO_NORMAL)
975   def call_drbd_attach_net(self, node_list, nodes_ip,
976                            disks, instance_name, multimaster):
977     """Disconnects the given drbd devices.
978
979     This is a multi-node call.
980
981     """
982     return self._MultiNodeCall(node_list, "drbd_attach_net",
983                                [nodes_ip, [cf.ToDict() for cf in disks],
984                                 instance_name, multimaster])
985
986   @_RpcTimeout(_TMO_SLOW)
987   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
988     """Waits for the synchronization of drbd devices is complete.
989
990     This is a multi-node call.
991
992     """
993     return self._MultiNodeCall(node_list, "drbd_wait_sync",
994                                [nodes_ip, [cf.ToDict() for cf in disks]])
995
996   @classmethod
997   @_RpcTimeout(_TMO_NORMAL)
998   def call_upload_file(cls, node_list, file_name, address_list=None):
999     """Upload a file.
1000
1001     The node will refuse the operation in case the file is not on the
1002     approved file list.
1003
1004     This is a multi-node call.
1005
1006     @type node_list: list
1007     @param node_list: the list of node names to upload to
1008     @type file_name: str
1009     @param file_name: the filename to upload
1010     @type address_list: list or None
1011     @keyword address_list: an optional list of node addresses, in order
1012         to optimize the RPC speed
1013
1014     """
1015     file_contents = utils.ReadFile(file_name)
1016     data = cls._Compress(file_contents)
1017     st = os.stat(file_name)
1018     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1019               st.st_atime, st.st_mtime]
1020     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1021                                     address_list=address_list)
1022
1023   @classmethod
1024   @_RpcTimeout(_TMO_NORMAL)
1025   def call_write_ssconf_files(cls, node_list, values):
1026     """Write ssconf files.
1027
1028     This is a multi-node call.
1029
1030     """
1031     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1032
1033   @_RpcTimeout(_TMO_FAST)
1034   def call_os_diagnose(self, node_list):
1035     """Request a diagnose of OS definitions.
1036
1037     This is a multi-node call.
1038
1039     """
1040     return self._MultiNodeCall(node_list, "os_diagnose", [])
1041
1042   @_RpcTimeout(_TMO_FAST)
1043   def call_os_get(self, node, name):
1044     """Returns an OS definition.
1045
1046     This is a single-node call.
1047
1048     """
1049     result = self._SingleNodeCall(node, "os_get", [name])
1050     if not result.fail_msg and isinstance(result.payload, dict):
1051       result.payload = objects.OS.FromDict(result.payload)
1052     return result
1053
1054   @_RpcTimeout(_TMO_FAST)
1055   def call_os_validate(self, required, nodes, name, checks, params):
1056     """Run a validation routine for a given OS.
1057
1058     This is a multi-node call.
1059
1060     """
1061     return self._MultiNodeCall(nodes, "os_validate",
1062                                [required, name, checks, params])
1063
1064   @_RpcTimeout(_TMO_NORMAL)
1065   def call_hooks_runner(self, node_list, hpath, phase, env):
1066     """Call the hooks runner.
1067
1068     Args:
1069       - op: the OpCode instance
1070       - env: a dictionary with the environment
1071
1072     This is a multi-node call.
1073
1074     """
1075     params = [hpath, phase, env]
1076     return self._MultiNodeCall(node_list, "hooks_runner", params)
1077
1078   @_RpcTimeout(_TMO_NORMAL)
1079   def call_iallocator_runner(self, node, name, idata):
1080     """Call an iallocator on a remote node
1081
1082     Args:
1083       - name: the iallocator name
1084       - input: the json-encoded input string
1085
1086     This is a single-node call.
1087
1088     """
1089     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1090
1091   @_RpcTimeout(_TMO_NORMAL)
1092   def call_blockdev_grow(self, node, cf_bdev, amount):
1093     """Request a snapshot of the given block device.
1094
1095     This is a single-node call.
1096
1097     """
1098     return self._SingleNodeCall(node, "blockdev_grow",
1099                                 [cf_bdev.ToDict(), amount])
1100
1101   @_RpcTimeout(_TMO_1DAY)
1102   def call_blockdev_export(self, node, cf_bdev,
1103                            dest_node, dest_path, cluster_name):
1104     """Export a given disk to another node.
1105
1106     This is a single-node call.
1107
1108     """
1109     return self._SingleNodeCall(node, "blockdev_export",
1110                                 [cf_bdev.ToDict(), dest_node, dest_path,
1111                                  cluster_name])
1112
1113   @_RpcTimeout(_TMO_NORMAL)
1114   def call_blockdev_snapshot(self, node, cf_bdev):
1115     """Request a snapshot of the given block device.
1116
1117     This is a single-node call.
1118
1119     """
1120     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1121
1122   @_RpcTimeout(_TMO_NORMAL)
1123   def call_finalize_export(self, node, instance, snap_disks):
1124     """Request the completion of an export operation.
1125
1126     This writes the export config file, etc.
1127
1128     This is a single-node call.
1129
1130     """
1131     flat_disks = []
1132     for disk in snap_disks:
1133       if isinstance(disk, bool):
1134         flat_disks.append(disk)
1135       else:
1136         flat_disks.append(disk.ToDict())
1137
1138     return self._SingleNodeCall(node, "finalize_export",
1139                                 [self._InstDict(instance), flat_disks])
1140
1141   @_RpcTimeout(_TMO_FAST)
1142   def call_export_info(self, node, path):
1143     """Queries the export information in a given path.
1144
1145     This is a single-node call.
1146
1147     """
1148     return self._SingleNodeCall(node, "export_info", [path])
1149
1150   @_RpcTimeout(_TMO_FAST)
1151   def call_export_list(self, node_list):
1152     """Gets the stored exports list.
1153
1154     This is a multi-node call.
1155
1156     """
1157     return self._MultiNodeCall(node_list, "export_list", [])
1158
1159   @_RpcTimeout(_TMO_FAST)
1160   def call_export_remove(self, node, export):
1161     """Requests removal of a given export.
1162
1163     This is a single-node call.
1164
1165     """
1166     return self._SingleNodeCall(node, "export_remove", [export])
1167
1168   @classmethod
1169   @_RpcTimeout(_TMO_NORMAL)
1170   def call_node_leave_cluster(cls, node, modify_ssh_setup):
1171     """Requests a node to clean the cluster information it has.
1172
1173     This will remove the configuration information from the ganeti data
1174     dir.
1175
1176     This is a single-node call.
1177
1178     """
1179     return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1180                                      [modify_ssh_setup])
1181
1182   @_RpcTimeout(_TMO_FAST)
1183   def call_node_volumes(self, node_list):
1184     """Gets all volumes on node(s).
1185
1186     This is a multi-node call.
1187
1188     """
1189     return self._MultiNodeCall(node_list, "node_volumes", [])
1190
1191   @_RpcTimeout(_TMO_FAST)
1192   def call_node_demote_from_mc(self, node):
1193     """Demote a node from the master candidate role.
1194
1195     This is a single-node call.
1196
1197     """
1198     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1199
1200   @_RpcTimeout(_TMO_NORMAL)
1201   def call_node_powercycle(self, node, hypervisor):
1202     """Tries to powercycle a node.
1203
1204     This is a single-node call.
1205
1206     """
1207     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1208
1209   @_RpcTimeout(None)
1210   def call_test_delay(self, node_list, duration):
1211     """Sleep for a fixed time on given node(s).
1212
1213     This is a multi-node call.
1214
1215     """
1216     return self._MultiNodeCall(node_list, "test_delay", [duration],
1217                                read_timeout=int(duration + 5))
1218
1219   @_RpcTimeout(_TMO_FAST)
1220   def call_file_storage_dir_create(self, node, file_storage_dir):
1221     """Create the given file storage directory.
1222
1223     This is a single-node call.
1224
1225     """
1226     return self._SingleNodeCall(node, "file_storage_dir_create",
1227                                 [file_storage_dir])
1228
1229   @_RpcTimeout(_TMO_FAST)
1230   def call_file_storage_dir_remove(self, node, file_storage_dir):
1231     """Remove the given file storage directory.
1232
1233     This is a single-node call.
1234
1235     """
1236     return self._SingleNodeCall(node, "file_storage_dir_remove",
1237                                 [file_storage_dir])
1238
1239   @_RpcTimeout(_TMO_FAST)
1240   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1241                                    new_file_storage_dir):
1242     """Rename file storage directory.
1243
1244     This is a single-node call.
1245
1246     """
1247     return self._SingleNodeCall(node, "file_storage_dir_rename",
1248                                 [old_file_storage_dir, new_file_storage_dir])
1249
1250   @classmethod
1251   @_RpcTimeout(_TMO_FAST)
1252   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1253     """Update job queue.
1254
1255     This is a multi-node call.
1256
1257     """
1258     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1259                                     [file_name, cls._Compress(content)],
1260                                     address_list=address_list)
1261
1262   @classmethod
1263   @_RpcTimeout(_TMO_NORMAL)
1264   def call_jobqueue_purge(cls, node):
1265     """Purge job queue.
1266
1267     This is a single-node call.
1268
1269     """
1270     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1271
1272   @classmethod
1273   @_RpcTimeout(_TMO_FAST)
1274   def call_jobqueue_rename(cls, node_list, address_list, rename):
1275     """Rename a job queue file.
1276
1277     This is a multi-node call.
1278
1279     """
1280     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1281                                     address_list=address_list)
1282
1283   @_RpcTimeout(_TMO_NORMAL)
1284   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1285     """Validate the hypervisor params.
1286
1287     This is a multi-node call.
1288
1289     @type node_list: list
1290     @param node_list: the list of nodes to query
1291     @type hvname: string
1292     @param hvname: the hypervisor name
1293     @type hvparams: dict
1294     @param hvparams: the hypervisor parameters to be validated
1295
1296     """
1297     cluster = self._cfg.GetClusterInfo()
1298     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1299     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1300                                [hvname, hv_full])
1301
1302   @_RpcTimeout(_TMO_NORMAL)
1303   def call_x509_cert_create(self, node, validity):
1304     """Creates a new X509 certificate for SSL/TLS.
1305
1306     This is a single-node call.
1307
1308     @type validity: int
1309     @param validity: Validity in seconds
1310
1311     """
1312     return self._SingleNodeCall(node, "x509_cert_create", [validity])
1313
1314   @_RpcTimeout(_TMO_NORMAL)
1315   def call_x509_cert_remove(self, node, name):
1316     """Removes a X509 certificate.
1317
1318     This is a single-node call.
1319
1320     @type name: string
1321     @param name: Certificate name
1322
1323     """
1324     return self._SingleNodeCall(node, "x509_cert_remove", [name])
1325
1326   @_RpcTimeout(_TMO_NORMAL)
1327   def call_import_start(self, node, opts, instance, dest, dest_args):
1328     """Starts a listener for an import.
1329
1330     This is a single-node call.
1331
1332     @type node: string
1333     @param node: Node name
1334     @type instance: C{objects.Instance}
1335     @param instance: Instance object
1336
1337     """
1338     return self._SingleNodeCall(node, "import_start",
1339                                 [opts.ToDict(),
1340                                  self._InstDict(instance), dest,
1341                                  _EncodeImportExportIO(dest, dest_args)])
1342
1343   @_RpcTimeout(_TMO_NORMAL)
1344   def call_export_start(self, node, opts, host, port,
1345                         instance, source, source_args):
1346     """Starts an export daemon.
1347
1348     This is a single-node call.
1349
1350     @type node: string
1351     @param node: Node name
1352     @type instance: C{objects.Instance}
1353     @param instance: Instance object
1354
1355     """
1356     return self._SingleNodeCall(node, "export_start",
1357                                 [opts.ToDict(), host, port,
1358                                  self._InstDict(instance), source,
1359                                  _EncodeImportExportIO(source, source_args)])
1360
1361   @_RpcTimeout(_TMO_FAST)
1362   def call_impexp_status(self, node, names):
1363     """Gets the status of an import or export.
1364
1365     This is a single-node call.
1366
1367     @type node: string
1368     @param node: Node name
1369     @type names: List of strings
1370     @param names: Import/export names
1371     @rtype: List of L{objects.ImportExportStatus} instances
1372     @return: Returns a list of the state of each named import/export or None if
1373              a status couldn't be retrieved
1374
1375     """
1376     result = self._SingleNodeCall(node, "impexp_status", [names])
1377
1378     if not result.fail_msg:
1379       decoded = []
1380
1381       for i in result.payload:
1382         if i is None:
1383           decoded.append(None)
1384           continue
1385         decoded.append(objects.ImportExportStatus.FromDict(i))
1386
1387       result.payload = decoded
1388
1389     return result
1390
1391   @_RpcTimeout(_TMO_NORMAL)
1392   def call_impexp_abort(self, node, name):
1393     """Aborts an import or export.
1394
1395     This is a single-node call.
1396
1397     @type node: string
1398     @param node: Node name
1399     @type name: string
1400     @param name: Import/export name
1401
1402     """
1403     return self._SingleNodeCall(node, "impexp_abort", [name])
1404
1405   @_RpcTimeout(_TMO_NORMAL)
1406   def call_impexp_cleanup(self, node, name):
1407     """Cleans up after an import or export.
1408
1409     This is a single-node call.
1410
1411     @type node: string
1412     @param node: Node name
1413     @type name: string
1414     @param name: Import/export name
1415
1416     """
1417     return self._SingleNodeCall(node, "impexp_cleanup", [name])