jqueue: make replication on job update optional
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
44
45 # pylint has a bug here, doesn't see this import
46 import ganeti.http.client  # pylint: disable-msg=W0611
47
48
49 # Module level variable
50 _http_manager = None
51
52 # Various time constants for the timeout table
53 _TMO_URGENT = 60 # one minute
54 _TMO_FAST = 5 * 60 # five minutes
55 _TMO_NORMAL = 15 * 60 # 15 minutes
56 _TMO_SLOW = 3600 # one hour
57 _TMO_4HRS = 4 * 3600
58 _TMO_1DAY = 86400
59
60 # Timeout table that will be built later by decorators
61 # Guidelines for choosing timeouts:
62 # - call used during watcher: timeout -> 1min, _TMO_URGENT
63 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
64 # - other calls: 15 min, _TMO_NORMAL
65 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
66
67 _TIMEOUTS = {
68 }
69
70
71 def Init():
72   """Initializes the module-global HTTP client manager.
73
74   Must be called before using any RPC function.
75
76   """
77   global _http_manager # pylint: disable-msg=W0603
78
79   assert not _http_manager, "RPC module initialized more than once"
80
81   http.InitSsl()
82
83   _http_manager = http.client.HttpClientManager()
84
85
86 def Shutdown():
87   """Stops the module-global HTTP client manager.
88
89   Must be called before quitting the program.
90
91   """
92   global _http_manager # pylint: disable-msg=W0603
93
94   if _http_manager:
95     _http_manager.Shutdown()
96     _http_manager = None
97
98
99 def _RpcTimeout(secs):
100   """Timeout decorator.
101
102   When applied to a rpc call_* function, it updates the global timeout
103   table with the given function/timeout.
104
105   """
106   def decorator(f):
107     name = f.__name__
108     assert name.startswith("call_")
109     _TIMEOUTS[name[len("call_"):]] = secs
110     return f
111   return decorator
112
113
114 class RpcResult(object):
115   """RPC Result class.
116
117   This class holds an RPC result. It is needed since in multi-node
118   calls we can't raise an exception just because one one out of many
119   failed, and therefore we use this class to encapsulate the result.
120
121   @ivar data: the data payload, for successful results, or None
122   @ivar call: the name of the RPC call
123   @ivar node: the name of the node to which we made the call
124   @ivar offline: whether the operation failed because the node was
125       offline, as opposed to actual failure; offline=True will always
126       imply failed=True, in order to allow simpler checking if
127       the user doesn't care about the exact failure mode
128   @ivar fail_msg: the error message if the call failed
129
130   """
131   def __init__(self, data=None, failed=False, offline=False,
132                call=None, node=None):
133     self.offline = offline
134     self.call = call
135     self.node = node
136
137     if offline:
138       self.fail_msg = "Node is marked offline"
139       self.data = self.payload = None
140     elif failed:
141       self.fail_msg = self._EnsureErr(data)
142       self.data = self.payload = None
143     else:
144       self.data = data
145       if not isinstance(self.data, (tuple, list)):
146         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
147                          type(self.data))
148         self.payload = None
149       elif len(data) != 2:
150         self.fail_msg = ("RPC layer error: invalid result length (%d), "
151                          "expected 2" % len(self.data))
152         self.payload = None
153       elif not self.data[0]:
154         self.fail_msg = self._EnsureErr(self.data[1])
155         self.payload = None
156       else:
157         # finally success
158         self.fail_msg = None
159         self.payload = data[1]
160
161     assert hasattr(self, "call")
162     assert hasattr(self, "data")
163     assert hasattr(self, "fail_msg")
164     assert hasattr(self, "node")
165     assert hasattr(self, "offline")
166     assert hasattr(self, "payload")
167
168   @staticmethod
169   def _EnsureErr(val):
170     """Helper to ensure we return a 'True' value for error."""
171     if val:
172       return val
173     else:
174       return "No error information"
175
176   def Raise(self, msg, prereq=False, ecode=None):
177     """If the result has failed, raise an OpExecError.
178
179     This is used so that LU code doesn't have to check for each
180     result, but instead can call this function.
181
182     """
183     if not self.fail_msg:
184       return
185
186     if not msg: # one could pass None for default message
187       msg = ("Call '%s' to node '%s' has failed: %s" %
188              (self.call, self.node, self.fail_msg))
189     else:
190       msg = "%s: %s" % (msg, self.fail_msg)
191     if prereq:
192       ec = errors.OpPrereqError
193     else:
194       ec = errors.OpExecError
195     if ecode is not None:
196       args = (msg, ecode)
197     else:
198       args = (msg, )
199     raise ec(*args) # pylint: disable-msg=W0142
200
201
202 class Client:
203   """RPC Client class.
204
205   This class, given a (remote) method name, a list of parameters and a
206   list of nodes, will contact (in parallel) all nodes, and return a
207   dict of results (key: node name, value: result).
208
209   One current bug is that generic failure is still signaled by
210   'False' result, which is not good. This overloading of values can
211   cause bugs.
212
213   """
214   def __init__(self, procedure, body, port):
215     assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
216                                     " timeouts table")
217     self.procedure = procedure
218     self.body = body
219     self.port = port
220     self.nc = {}
221
222     self._ssl_params = \
223       http.HttpSslParams(ssl_key_path=constants.NODED_CERT_FILE,
224                          ssl_cert_path=constants.NODED_CERT_FILE)
225
226   def ConnectList(self, node_list, address_list=None, read_timeout=None):
227     """Add a list of nodes to the target nodes.
228
229     @type node_list: list
230     @param node_list: the list of node names to connect
231     @type address_list: list or None
232     @keyword address_list: either None or a list with node addresses,
233         which must have the same length as the node list
234     @type read_timeout: int
235     @param read_timeout: overwrites the default read timeout for the
236         given operation
237
238     """
239     if address_list is None:
240       address_list = [None for _ in node_list]
241     else:
242       assert len(node_list) == len(address_list), \
243              "Name and address lists should have the same length"
244     for node, address in zip(node_list, address_list):
245       self.ConnectNode(node, address, read_timeout=read_timeout)
246
247   def ConnectNode(self, name, address=None, read_timeout=None):
248     """Add a node to the target list.
249
250     @type name: str
251     @param name: the node name
252     @type address: str
253     @keyword address: the node address, if known
254
255     """
256     if address is None:
257       address = name
258
259     if read_timeout is None:
260       read_timeout = _TIMEOUTS[self.procedure]
261
262     self.nc[name] = \
263       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
264                                     "/%s" % self.procedure,
265                                     post_data=self.body,
266                                     ssl_params=self._ssl_params,
267                                     ssl_verify_peer=True,
268                                     read_timeout=read_timeout)
269
270   def GetResults(self):
271     """Call nodes and return results.
272
273     @rtype: list
274     @return: List of RPC results
275
276     """
277     assert _http_manager, "RPC module not initialized"
278
279     _http_manager.ExecRequests(self.nc.values())
280
281     results = {}
282
283     for name, req in self.nc.iteritems():
284       if req.success and req.resp_status_code == http.HTTP_OK:
285         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
286                                   node=name, call=self.procedure)
287         continue
288
289       # TODO: Better error reporting
290       if req.error:
291         msg = req.error
292       else:
293         msg = req.resp_body
294
295       logging.error("RPC error in %s from node %s: %s",
296                     self.procedure, name, msg)
297       results[name] = RpcResult(data=msg, failed=True, node=name,
298                                 call=self.procedure)
299
300     return results
301
302
303 def _EncodeImportExportIO(ieio, ieioargs):
304   """Encodes import/export I/O information.
305
306   """
307   if ieio == constants.IEIO_RAW_DISK:
308     assert len(ieioargs) == 1
309     return (ieioargs[0].ToDict(), )
310
311   if ieio == constants.IEIO_SCRIPT:
312     assert len(ieioargs) == 2
313     return (ieioargs[0].ToDict(), ieioargs[1])
314
315   return ieioargs
316
317
318 class RpcRunner(object):
319   """RPC runner class"""
320
321   def __init__(self, cfg):
322     """Initialized the rpc runner.
323
324     @type cfg:  C{config.ConfigWriter}
325     @param cfg: the configuration object that will be used to get data
326                 about the cluster
327
328     """
329     self._cfg = cfg
330     self.port = utils.GetDaemonPort(constants.NODED)
331
332   def _InstDict(self, instance, hvp=None, bep=None):
333     """Convert the given instance to a dict.
334
335     This is done via the instance's ToDict() method and additionally
336     we fill the hvparams with the cluster defaults.
337
338     @type instance: L{objects.Instance}
339     @param instance: an Instance object
340     @type hvp: dict or None
341     @param hvp: a dictionary with overridden hypervisor parameters
342     @type bep: dict or None
343     @param bep: a dictionary with overridden backend parameters
344     @rtype: dict
345     @return: the instance dict, with the hvparams filled with the
346         cluster defaults
347
348     """
349     idict = instance.ToDict()
350     cluster = self._cfg.GetClusterInfo()
351     idict["hvparams"] = cluster.FillHV(instance)
352     if hvp is not None:
353       idict["hvparams"].update(hvp)
354     idict["beparams"] = cluster.FillBE(instance)
355     if bep is not None:
356       idict["beparams"].update(bep)
357     for nic in idict["nics"]:
358       nic['nicparams'] = objects.FillDict(
359         cluster.nicparams[constants.PP_DEFAULT],
360         nic['nicparams'])
361     return idict
362
363   def _ConnectList(self, client, node_list, call, read_timeout=None):
364     """Helper for computing node addresses.
365
366     @type client: L{ganeti.rpc.Client}
367     @param client: a C{Client} instance
368     @type node_list: list
369     @param node_list: the node list we should connect
370     @type call: string
371     @param call: the name of the remote procedure call, for filling in
372         correctly any eventual offline nodes' results
373     @type read_timeout: int
374     @param read_timeout: overwrites the default read timeout for the
375         given operation
376
377     """
378     all_nodes = self._cfg.GetAllNodesInfo()
379     name_list = []
380     addr_list = []
381     skip_dict = {}
382     for node in node_list:
383       if node in all_nodes:
384         if all_nodes[node].offline:
385           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
386           continue
387         val = all_nodes[node].primary_ip
388       else:
389         val = None
390       addr_list.append(val)
391       name_list.append(node)
392     if name_list:
393       client.ConnectList(name_list, address_list=addr_list,
394                          read_timeout=read_timeout)
395     return skip_dict
396
397   def _ConnectNode(self, client, node, call, read_timeout=None):
398     """Helper for computing one node's address.
399
400     @type client: L{ganeti.rpc.Client}
401     @param client: a C{Client} instance
402     @type node: str
403     @param node: the node we should connect
404     @type call: string
405     @param call: the name of the remote procedure call, for filling in
406         correctly any eventual offline nodes' results
407     @type read_timeout: int
408     @param read_timeout: overwrites the default read timeout for the
409         given operation
410
411     """
412     node_info = self._cfg.GetNodeInfo(node)
413     if node_info is not None:
414       if node_info.offline:
415         return RpcResult(node=node, offline=True, call=call)
416       addr = node_info.primary_ip
417     else:
418       addr = None
419     client.ConnectNode(node, address=addr, read_timeout=read_timeout)
420
421   def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
422     """Helper for making a multi-node call
423
424     """
425     body = serializer.DumpJson(args, indent=False)
426     c = Client(procedure, body, self.port)
427     skip_dict = self._ConnectList(c, node_list, procedure,
428                                   read_timeout=read_timeout)
429     skip_dict.update(c.GetResults())
430     return skip_dict
431
432   @classmethod
433   def _StaticMultiNodeCall(cls, node_list, procedure, args,
434                            address_list=None, read_timeout=None):
435     """Helper for making a multi-node static call
436
437     """
438     body = serializer.DumpJson(args, indent=False)
439     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
440     c.ConnectList(node_list, address_list=address_list,
441                   read_timeout=read_timeout)
442     return c.GetResults()
443
444   def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
445     """Helper for making a single-node call
446
447     """
448     body = serializer.DumpJson(args, indent=False)
449     c = Client(procedure, body, self.port)
450     result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
451     if result is None:
452       # we did connect, node is not offline
453       result = c.GetResults()[node]
454     return result
455
456   @classmethod
457   def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
458     """Helper for making a single-node static call
459
460     """
461     body = serializer.DumpJson(args, indent=False)
462     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
463     c.ConnectNode(node, read_timeout=read_timeout)
464     return c.GetResults()[node]
465
466   @staticmethod
467   def _Compress(data):
468     """Compresses a string for transport over RPC.
469
470     Small amounts of data are not compressed.
471
472     @type data: str
473     @param data: Data
474     @rtype: tuple
475     @return: Encoded data to send
476
477     """
478     # Small amounts of data are not compressed
479     if len(data) < 512:
480       return (constants.RPC_ENCODING_NONE, data)
481
482     # Compress with zlib and encode in base64
483     return (constants.RPC_ENCODING_ZLIB_BASE64,
484             base64.b64encode(zlib.compress(data, 3)))
485
486   #
487   # Begin RPC calls
488   #
489
490   @_RpcTimeout(_TMO_URGENT)
491   def call_lv_list(self, node_list, vg_name):
492     """Gets the logical volumes present in a given volume group.
493
494     This is a multi-node call.
495
496     """
497     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
498
499   @_RpcTimeout(_TMO_URGENT)
500   def call_vg_list(self, node_list):
501     """Gets the volume group list.
502
503     This is a multi-node call.
504
505     """
506     return self._MultiNodeCall(node_list, "vg_list", [])
507
508   @_RpcTimeout(_TMO_NORMAL)
509   def call_storage_list(self, node_list, su_name, su_args, name, fields):
510     """Get list of storage units.
511
512     This is a multi-node call.
513
514     """
515     return self._MultiNodeCall(node_list, "storage_list",
516                                [su_name, su_args, name, fields])
517
518   @_RpcTimeout(_TMO_NORMAL)
519   def call_storage_modify(self, node, su_name, su_args, name, changes):
520     """Modify a storage unit.
521
522     This is a single-node call.
523
524     """
525     return self._SingleNodeCall(node, "storage_modify",
526                                 [su_name, su_args, name, changes])
527
528   @_RpcTimeout(_TMO_NORMAL)
529   def call_storage_execute(self, node, su_name, su_args, name, op):
530     """Executes an operation on a storage unit.
531
532     This is a single-node call.
533
534     """
535     return self._SingleNodeCall(node, "storage_execute",
536                                 [su_name, su_args, name, op])
537
538   @_RpcTimeout(_TMO_URGENT)
539   def call_bridges_exist(self, node, bridges_list):
540     """Checks if a node has all the bridges given.
541
542     This method checks if all bridges given in the bridges_list are
543     present on the remote node, so that an instance that uses interfaces
544     on those bridges can be started.
545
546     This is a single-node call.
547
548     """
549     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
550
551   @_RpcTimeout(_TMO_NORMAL)
552   def call_instance_start(self, node, instance, hvp, bep):
553     """Starts an instance.
554
555     This is a single-node call.
556
557     """
558     idict = self._InstDict(instance, hvp=hvp, bep=bep)
559     return self._SingleNodeCall(node, "instance_start", [idict])
560
561   @_RpcTimeout(_TMO_NORMAL)
562   def call_instance_shutdown(self, node, instance, timeout):
563     """Stops an instance.
564
565     This is a single-node call.
566
567     """
568     return self._SingleNodeCall(node, "instance_shutdown",
569                                 [self._InstDict(instance), timeout])
570
571   @_RpcTimeout(_TMO_NORMAL)
572   def call_migration_info(self, node, instance):
573     """Gather the information necessary to prepare an instance migration.
574
575     This is a single-node call.
576
577     @type node: string
578     @param node: the node on which the instance is currently running
579     @type instance: C{objects.Instance}
580     @param instance: the instance definition
581
582     """
583     return self._SingleNodeCall(node, "migration_info",
584                                 [self._InstDict(instance)])
585
586   @_RpcTimeout(_TMO_NORMAL)
587   def call_accept_instance(self, node, instance, info, target):
588     """Prepare a node to accept an instance.
589
590     This is a single-node call.
591
592     @type node: string
593     @param node: the target node for the migration
594     @type instance: C{objects.Instance}
595     @param instance: the instance definition
596     @type info: opaque/hypervisor specific (string/data)
597     @param info: result for the call_migration_info call
598     @type target: string
599     @param target: target hostname (usually ip address) (on the node itself)
600
601     """
602     return self._SingleNodeCall(node, "accept_instance",
603                                 [self._InstDict(instance), info, target])
604
605   @_RpcTimeout(_TMO_NORMAL)
606   def call_finalize_migration(self, node, instance, info, success):
607     """Finalize any target-node migration specific operation.
608
609     This is called both in case of a successful migration and in case of error
610     (in which case it should abort the migration).
611
612     This is a single-node call.
613
614     @type node: string
615     @param node: the target node for the migration
616     @type instance: C{objects.Instance}
617     @param instance: the instance definition
618     @type info: opaque/hypervisor specific (string/data)
619     @param info: result for the call_migration_info call
620     @type success: boolean
621     @param success: whether the migration was a success or a failure
622
623     """
624     return self._SingleNodeCall(node, "finalize_migration",
625                                 [self._InstDict(instance), info, success])
626
627   @_RpcTimeout(_TMO_SLOW)
628   def call_instance_migrate(self, node, instance, target, live):
629     """Migrate an instance.
630
631     This is a single-node call.
632
633     @type node: string
634     @param node: the node on which the instance is currently running
635     @type instance: C{objects.Instance}
636     @param instance: the instance definition
637     @type target: string
638     @param target: the target node name
639     @type live: boolean
640     @param live: whether the migration should be done live or not (the
641         interpretation of this parameter is left to the hypervisor)
642
643     """
644     return self._SingleNodeCall(node, "instance_migrate",
645                                 [self._InstDict(instance), target, live])
646
647   @_RpcTimeout(_TMO_NORMAL)
648   def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
649     """Reboots an instance.
650
651     This is a single-node call.
652
653     """
654     return self._SingleNodeCall(node, "instance_reboot",
655                                 [self._InstDict(inst), reboot_type,
656                                  shutdown_timeout])
657
658   @_RpcTimeout(_TMO_1DAY)
659   def call_instance_os_add(self, node, inst, reinstall, debug):
660     """Installs an OS on the given instance.
661
662     This is a single-node call.
663
664     """
665     return self._SingleNodeCall(node, "instance_os_add",
666                                 [self._InstDict(inst), reinstall, debug])
667
668   @_RpcTimeout(_TMO_SLOW)
669   def call_instance_run_rename(self, node, inst, old_name, debug):
670     """Run the OS rename script for an instance.
671
672     This is a single-node call.
673
674     """
675     return self._SingleNodeCall(node, "instance_run_rename",
676                                 [self._InstDict(inst), old_name, debug])
677
678   @_RpcTimeout(_TMO_URGENT)
679   def call_instance_info(self, node, instance, hname):
680     """Returns information about a single instance.
681
682     This is a single-node call.
683
684     @type node: list
685     @param node: the list of nodes to query
686     @type instance: string
687     @param instance: the instance name
688     @type hname: string
689     @param hname: the hypervisor type of the instance
690
691     """
692     return self._SingleNodeCall(node, "instance_info", [instance, hname])
693
694   @_RpcTimeout(_TMO_NORMAL)
695   def call_instance_migratable(self, node, instance):
696     """Checks whether the given instance can be migrated.
697
698     This is a single-node call.
699
700     @param node: the node to query
701     @type instance: L{objects.Instance}
702     @param instance: the instance to check
703
704
705     """
706     return self._SingleNodeCall(node, "instance_migratable",
707                                 [self._InstDict(instance)])
708
709   @_RpcTimeout(_TMO_URGENT)
710   def call_all_instances_info(self, node_list, hypervisor_list):
711     """Returns information about all instances on the given nodes.
712
713     This is a multi-node call.
714
715     @type node_list: list
716     @param node_list: the list of nodes to query
717     @type hypervisor_list: list
718     @param hypervisor_list: the hypervisors to query for instances
719
720     """
721     return self._MultiNodeCall(node_list, "all_instances_info",
722                                [hypervisor_list])
723
724   @_RpcTimeout(_TMO_URGENT)
725   def call_instance_list(self, node_list, hypervisor_list):
726     """Returns the list of running instances on a given node.
727
728     This is a multi-node call.
729
730     @type node_list: list
731     @param node_list: the list of nodes to query
732     @type hypervisor_list: list
733     @param hypervisor_list: the hypervisors to query for instances
734
735     """
736     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
737
738   @_RpcTimeout(_TMO_FAST)
739   def call_node_tcp_ping(self, node, source, target, port, timeout,
740                          live_port_needed):
741     """Do a TcpPing on the remote node
742
743     This is a single-node call.
744
745     """
746     return self._SingleNodeCall(node, "node_tcp_ping",
747                                 [source, target, port, timeout,
748                                  live_port_needed])
749
750   @_RpcTimeout(_TMO_FAST)
751   def call_node_has_ip_address(self, node, address):
752     """Checks if a node has the given IP address.
753
754     This is a single-node call.
755
756     """
757     return self._SingleNodeCall(node, "node_has_ip_address", [address])
758
759   @_RpcTimeout(_TMO_URGENT)
760   def call_node_info(self, node_list, vg_name, hypervisor_type):
761     """Return node information.
762
763     This will return memory information and volume group size and free
764     space.
765
766     This is a multi-node call.
767
768     @type node_list: list
769     @param node_list: the list of nodes to query
770     @type vg_name: C{string}
771     @param vg_name: the name of the volume group to ask for disk space
772         information
773     @type hypervisor_type: C{str}
774     @param hypervisor_type: the name of the hypervisor to ask for
775         memory information
776
777     """
778     return self._MultiNodeCall(node_list, "node_info",
779                                [vg_name, hypervisor_type])
780
781   @_RpcTimeout(_TMO_NORMAL)
782   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
783     """Add a node to the cluster.
784
785     This is a single-node call.
786
787     """
788     return self._SingleNodeCall(node, "node_add",
789                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
790
791   @_RpcTimeout(_TMO_NORMAL)
792   def call_node_verify(self, node_list, checkdict, cluster_name):
793     """Request verification of given parameters.
794
795     This is a multi-node call.
796
797     """
798     return self._MultiNodeCall(node_list, "node_verify",
799                                [checkdict, cluster_name])
800
801   @classmethod
802   @_RpcTimeout(_TMO_FAST)
803   def call_node_start_master(cls, node, start_daemons, no_voting):
804     """Tells a node to activate itself as a master.
805
806     This is a single-node call.
807
808     """
809     return cls._StaticSingleNodeCall(node, "node_start_master",
810                                      [start_daemons, no_voting])
811
812   @classmethod
813   @_RpcTimeout(_TMO_FAST)
814   def call_node_stop_master(cls, node, stop_daemons):
815     """Tells a node to demote itself from master status.
816
817     This is a single-node call.
818
819     """
820     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
821
822   @classmethod
823   @_RpcTimeout(_TMO_URGENT)
824   def call_master_info(cls, node_list):
825     """Query master info.
826
827     This is a multi-node call.
828
829     """
830     # TODO: should this method query down nodes?
831     return cls._StaticMultiNodeCall(node_list, "master_info", [])
832
833   @classmethod
834   @_RpcTimeout(_TMO_URGENT)
835   def call_version(cls, node_list):
836     """Query node version.
837
838     This is a multi-node call.
839
840     """
841     return cls._StaticMultiNodeCall(node_list, "version", [])
842
843   @_RpcTimeout(_TMO_NORMAL)
844   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
845     """Request creation of a given block device.
846
847     This is a single-node call.
848
849     """
850     return self._SingleNodeCall(node, "blockdev_create",
851                                 [bdev.ToDict(), size, owner, on_primary, info])
852
853   @_RpcTimeout(_TMO_NORMAL)
854   def call_blockdev_remove(self, node, bdev):
855     """Request removal of a given block device.
856
857     This is a single-node call.
858
859     """
860     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
861
862   @_RpcTimeout(_TMO_NORMAL)
863   def call_blockdev_rename(self, node, devlist):
864     """Request rename of the given block devices.
865
866     This is a single-node call.
867
868     """
869     return self._SingleNodeCall(node, "blockdev_rename",
870                                 [(d.ToDict(), uid) for d, uid in devlist])
871
872   @_RpcTimeout(_TMO_NORMAL)
873   def call_blockdev_assemble(self, node, disk, owner, on_primary):
874     """Request assembling of a given block device.
875
876     This is a single-node call.
877
878     """
879     return self._SingleNodeCall(node, "blockdev_assemble",
880                                 [disk.ToDict(), owner, on_primary])
881
882   @_RpcTimeout(_TMO_NORMAL)
883   def call_blockdev_shutdown(self, node, disk):
884     """Request shutdown of a given block device.
885
886     This is a single-node call.
887
888     """
889     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
890
891   @_RpcTimeout(_TMO_NORMAL)
892   def call_blockdev_addchildren(self, node, bdev, ndevs):
893     """Request adding a list of children to a (mirroring) device.
894
895     This is a single-node call.
896
897     """
898     return self._SingleNodeCall(node, "blockdev_addchildren",
899                                 [bdev.ToDict(),
900                                  [disk.ToDict() for disk in ndevs]])
901
902   @_RpcTimeout(_TMO_NORMAL)
903   def call_blockdev_removechildren(self, node, bdev, ndevs):
904     """Request removing a list of children from a (mirroring) device.
905
906     This is a single-node call.
907
908     """
909     return self._SingleNodeCall(node, "blockdev_removechildren",
910                                 [bdev.ToDict(),
911                                  [disk.ToDict() for disk in ndevs]])
912
913   @_RpcTimeout(_TMO_NORMAL)
914   def call_blockdev_getmirrorstatus(self, node, disks):
915     """Request status of a (mirroring) device.
916
917     This is a single-node call.
918
919     """
920     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
921                                   [dsk.ToDict() for dsk in disks])
922     if not result.fail_msg:
923       result.payload = [objects.BlockDevStatus.FromDict(i)
924                         for i in result.payload]
925     return result
926
927   @_RpcTimeout(_TMO_NORMAL)
928   def call_blockdev_find(self, node, disk):
929     """Request identification of a given block device.
930
931     This is a single-node call.
932
933     """
934     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
935     if not result.fail_msg and result.payload is not None:
936       result.payload = objects.BlockDevStatus.FromDict(result.payload)
937     return result
938
939   @_RpcTimeout(_TMO_NORMAL)
940   def call_blockdev_close(self, node, instance_name, disks):
941     """Closes the given block devices.
942
943     This is a single-node call.
944
945     """
946     params = [instance_name, [cf.ToDict() for cf in disks]]
947     return self._SingleNodeCall(node, "blockdev_close", params)
948
949   @_RpcTimeout(_TMO_NORMAL)
950   def call_blockdev_getsizes(self, node, disks):
951     """Returns the size of the given disks.
952
953     This is a single-node call.
954
955     """
956     params = [[cf.ToDict() for cf in disks]]
957     return self._SingleNodeCall(node, "blockdev_getsize", params)
958
959   @_RpcTimeout(_TMO_NORMAL)
960   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
961     """Disconnects the network of the given drbd devices.
962
963     This is a multi-node call.
964
965     """
966     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
967                                [nodes_ip, [cf.ToDict() for cf in disks]])
968
969   @_RpcTimeout(_TMO_NORMAL)
970   def call_drbd_attach_net(self, node_list, nodes_ip,
971                            disks, instance_name, multimaster):
972     """Disconnects the given drbd devices.
973
974     This is a multi-node call.
975
976     """
977     return self._MultiNodeCall(node_list, "drbd_attach_net",
978                                [nodes_ip, [cf.ToDict() for cf in disks],
979                                 instance_name, multimaster])
980
981   @_RpcTimeout(_TMO_SLOW)
982   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
983     """Waits for the synchronization of drbd devices is complete.
984
985     This is a multi-node call.
986
987     """
988     return self._MultiNodeCall(node_list, "drbd_wait_sync",
989                                [nodes_ip, [cf.ToDict() for cf in disks]])
990
991   @classmethod
992   @_RpcTimeout(_TMO_NORMAL)
993   def call_upload_file(cls, node_list, file_name, address_list=None):
994     """Upload a file.
995
996     The node will refuse the operation in case the file is not on the
997     approved file list.
998
999     This is a multi-node call.
1000
1001     @type node_list: list
1002     @param node_list: the list of node names to upload to
1003     @type file_name: str
1004     @param file_name: the filename to upload
1005     @type address_list: list or None
1006     @keyword address_list: an optional list of node addresses, in order
1007         to optimize the RPC speed
1008
1009     """
1010     file_contents = utils.ReadFile(file_name)
1011     data = cls._Compress(file_contents)
1012     st = os.stat(file_name)
1013     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1014               st.st_atime, st.st_mtime]
1015     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1016                                     address_list=address_list)
1017
1018   @classmethod
1019   @_RpcTimeout(_TMO_NORMAL)
1020   def call_write_ssconf_files(cls, node_list, values):
1021     """Write ssconf files.
1022
1023     This is a multi-node call.
1024
1025     """
1026     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1027
1028   @_RpcTimeout(_TMO_FAST)
1029   def call_os_diagnose(self, node_list):
1030     """Request a diagnose of OS definitions.
1031
1032     This is a multi-node call.
1033
1034     """
1035     return self._MultiNodeCall(node_list, "os_diagnose", [])
1036
1037   @_RpcTimeout(_TMO_FAST)
1038   def call_os_get(self, node, name):
1039     """Returns an OS definition.
1040
1041     This is a single-node call.
1042
1043     """
1044     result = self._SingleNodeCall(node, "os_get", [name])
1045     if not result.fail_msg and isinstance(result.payload, dict):
1046       result.payload = objects.OS.FromDict(result.payload)
1047     return result
1048
1049   @_RpcTimeout(_TMO_NORMAL)
1050   def call_hooks_runner(self, node_list, hpath, phase, env):
1051     """Call the hooks runner.
1052
1053     Args:
1054       - op: the OpCode instance
1055       - env: a dictionary with the environment
1056
1057     This is a multi-node call.
1058
1059     """
1060     params = [hpath, phase, env]
1061     return self._MultiNodeCall(node_list, "hooks_runner", params)
1062
1063   @_RpcTimeout(_TMO_NORMAL)
1064   def call_iallocator_runner(self, node, name, idata):
1065     """Call an iallocator on a remote node
1066
1067     Args:
1068       - name: the iallocator name
1069       - input: the json-encoded input string
1070
1071     This is a single-node call.
1072
1073     """
1074     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1075
1076   @_RpcTimeout(_TMO_NORMAL)
1077   def call_blockdev_grow(self, node, cf_bdev, amount):
1078     """Request a snapshot of the given block device.
1079
1080     This is a single-node call.
1081
1082     """
1083     return self._SingleNodeCall(node, "blockdev_grow",
1084                                 [cf_bdev.ToDict(), amount])
1085
1086   @_RpcTimeout(_TMO_1DAY)
1087   def call_blockdev_export(self, node, cf_bdev,
1088                            dest_node, dest_path, cluster_name):
1089     """Export a given disk to another node.
1090
1091     This is a single-node call.
1092
1093     """
1094     return self._SingleNodeCall(node, "blockdev_export",
1095                                 [cf_bdev.ToDict(), dest_node, dest_path,
1096                                  cluster_name])
1097
1098   @_RpcTimeout(_TMO_NORMAL)
1099   def call_blockdev_snapshot(self, node, cf_bdev):
1100     """Request a snapshot of the given block device.
1101
1102     This is a single-node call.
1103
1104     """
1105     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1106
1107   @_RpcTimeout(_TMO_NORMAL)
1108   def call_finalize_export(self, node, instance, snap_disks):
1109     """Request the completion of an export operation.
1110
1111     This writes the export config file, etc.
1112
1113     This is a single-node call.
1114
1115     """
1116     flat_disks = []
1117     for disk in snap_disks:
1118       if isinstance(disk, bool):
1119         flat_disks.append(disk)
1120       else:
1121         flat_disks.append(disk.ToDict())
1122
1123     return self._SingleNodeCall(node, "finalize_export",
1124                                 [self._InstDict(instance), flat_disks])
1125
1126   @_RpcTimeout(_TMO_FAST)
1127   def call_export_info(self, node, path):
1128     """Queries the export information in a given path.
1129
1130     This is a single-node call.
1131
1132     """
1133     return self._SingleNodeCall(node, "export_info", [path])
1134
1135   @_RpcTimeout(_TMO_FAST)
1136   def call_export_list(self, node_list):
1137     """Gets the stored exports list.
1138
1139     This is a multi-node call.
1140
1141     """
1142     return self._MultiNodeCall(node_list, "export_list", [])
1143
1144   @_RpcTimeout(_TMO_FAST)
1145   def call_export_remove(self, node, export):
1146     """Requests removal of a given export.
1147
1148     This is a single-node call.
1149
1150     """
1151     return self._SingleNodeCall(node, "export_remove", [export])
1152
1153   @classmethod
1154   @_RpcTimeout(_TMO_NORMAL)
1155   def call_node_leave_cluster(cls, node, modify_ssh_setup):
1156     """Requests a node to clean the cluster information it has.
1157
1158     This will remove the configuration information from the ganeti data
1159     dir.
1160
1161     This is a single-node call.
1162
1163     """
1164     return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1165                                      [modify_ssh_setup])
1166
1167   @_RpcTimeout(_TMO_FAST)
1168   def call_node_volumes(self, node_list):
1169     """Gets all volumes on node(s).
1170
1171     This is a multi-node call.
1172
1173     """
1174     return self._MultiNodeCall(node_list, "node_volumes", [])
1175
1176   @_RpcTimeout(_TMO_FAST)
1177   def call_node_demote_from_mc(self, node):
1178     """Demote a node from the master candidate role.
1179
1180     This is a single-node call.
1181
1182     """
1183     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1184
1185   @_RpcTimeout(_TMO_NORMAL)
1186   def call_node_powercycle(self, node, hypervisor):
1187     """Tries to powercycle a node.
1188
1189     This is a single-node call.
1190
1191     """
1192     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1193
1194   @_RpcTimeout(None)
1195   def call_test_delay(self, node_list, duration):
1196     """Sleep for a fixed time on given node(s).
1197
1198     This is a multi-node call.
1199
1200     """
1201     return self._MultiNodeCall(node_list, "test_delay", [duration],
1202                                read_timeout=int(duration + 5))
1203
1204   @_RpcTimeout(_TMO_FAST)
1205   def call_file_storage_dir_create(self, node, file_storage_dir):
1206     """Create the given file storage directory.
1207
1208     This is a single-node call.
1209
1210     """
1211     return self._SingleNodeCall(node, "file_storage_dir_create",
1212                                 [file_storage_dir])
1213
1214   @_RpcTimeout(_TMO_FAST)
1215   def call_file_storage_dir_remove(self, node, file_storage_dir):
1216     """Remove the given file storage directory.
1217
1218     This is a single-node call.
1219
1220     """
1221     return self._SingleNodeCall(node, "file_storage_dir_remove",
1222                                 [file_storage_dir])
1223
1224   @_RpcTimeout(_TMO_FAST)
1225   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1226                                    new_file_storage_dir):
1227     """Rename file storage directory.
1228
1229     This is a single-node call.
1230
1231     """
1232     return self._SingleNodeCall(node, "file_storage_dir_rename",
1233                                 [old_file_storage_dir, new_file_storage_dir])
1234
1235   @classmethod
1236   @_RpcTimeout(_TMO_FAST)
1237   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1238     """Update job queue.
1239
1240     This is a multi-node call.
1241
1242     """
1243     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1244                                     [file_name, cls._Compress(content)],
1245                                     address_list=address_list)
1246
1247   @classmethod
1248   @_RpcTimeout(_TMO_NORMAL)
1249   def call_jobqueue_purge(cls, node):
1250     """Purge job queue.
1251
1252     This is a single-node call.
1253
1254     """
1255     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1256
1257   @classmethod
1258   @_RpcTimeout(_TMO_FAST)
1259   def call_jobqueue_rename(cls, node_list, address_list, rename):
1260     """Rename a job queue file.
1261
1262     This is a multi-node call.
1263
1264     """
1265     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1266                                     address_list=address_list)
1267
1268   @_RpcTimeout(_TMO_NORMAL)
1269   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1270     """Validate the hypervisor params.
1271
1272     This is a multi-node call.
1273
1274     @type node_list: list
1275     @param node_list: the list of nodes to query
1276     @type hvname: string
1277     @param hvname: the hypervisor name
1278     @type hvparams: dict
1279     @param hvparams: the hypervisor parameters to be validated
1280
1281     """
1282     cluster = self._cfg.GetClusterInfo()
1283     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1284     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1285                                [hvname, hv_full])
1286
1287   @_RpcTimeout(_TMO_NORMAL)
1288   def call_x509_cert_create(self, node, validity):
1289     """Creates a new X509 certificate for SSL/TLS.
1290
1291     This is a single-node call.
1292
1293     @type validity: int
1294     @param validity: Validity in seconds
1295
1296     """
1297     return self._SingleNodeCall(node, "x509_cert_create", [validity])
1298
1299   @_RpcTimeout(_TMO_NORMAL)
1300   def call_x509_cert_remove(self, node, name):
1301     """Removes a X509 certificate.
1302
1303     This is a single-node call.
1304
1305     @type name: string
1306     @param name: Certificate name
1307
1308     """
1309     return self._SingleNodeCall(node, "x509_cert_remove", [name])
1310
1311   @_RpcTimeout(_TMO_NORMAL)
1312   def call_import_start(self, node, opts, instance, dest, dest_args):
1313     """Starts a listener for an import.
1314
1315     This is a single-node call.
1316
1317     @type node: string
1318     @param node: Node name
1319     @type instance: C{objects.Instance}
1320     @param instance: Instance object
1321
1322     """
1323     return self._SingleNodeCall(node, "import_start",
1324                                 [opts.ToDict(),
1325                                  self._InstDict(instance), dest,
1326                                  _EncodeImportExportIO(dest, dest_args)])
1327
1328   @_RpcTimeout(_TMO_NORMAL)
1329   def call_export_start(self, node, opts, host, port,
1330                         instance, source, source_args):
1331     """Starts an export daemon.
1332
1333     This is a single-node call.
1334
1335     @type node: string
1336     @param node: Node name
1337     @type instance: C{objects.Instance}
1338     @param instance: Instance object
1339
1340     """
1341     return self._SingleNodeCall(node, "export_start",
1342                                 [opts.ToDict(), host, port,
1343                                  self._InstDict(instance), source,
1344                                  _EncodeImportExportIO(source, source_args)])
1345
1346   @_RpcTimeout(_TMO_FAST)
1347   def call_impexp_status(self, node, names):
1348     """Gets the status of an import or export.
1349
1350     This is a single-node call.
1351
1352     @type node: string
1353     @param node: Node name
1354     @type names: List of strings
1355     @param names: Import/export names
1356     @rtype: List of L{objects.ImportExportStatus} instances
1357     @return: Returns a list of the state of each named import/export or None if
1358              a status couldn't be retrieved
1359
1360     """
1361     result = self._SingleNodeCall(node, "impexp_status", [names])
1362
1363     if not result.fail_msg:
1364       decoded = []
1365
1366       for i in result.payload:
1367         if i is None:
1368           decoded.append(None)
1369           continue
1370         decoded.append(objects.ImportExportStatus.FromDict(i))
1371
1372       result.payload = decoded
1373
1374     return result
1375
1376   @_RpcTimeout(_TMO_NORMAL)
1377   def call_impexp_abort(self, node, name):
1378     """Aborts an import or export.
1379
1380     This is a single-node call.
1381
1382     @type node: string
1383     @param node: Node name
1384     @type name: string
1385     @param name: Import/export name
1386
1387     """
1388     return self._SingleNodeCall(node, "impexp_abort", [name])
1389
1390   @_RpcTimeout(_TMO_NORMAL)
1391   def call_impexp_cleanup(self, node, name):
1392     """Cleans up after an import or export.
1393
1394     This is a single-node call.
1395
1396     @type node: string
1397     @param node: Node name
1398     @type name: string
1399     @param name: Import/export name
1400
1401     """
1402     return self._SingleNodeCall(node, "impexp_cleanup", [name])