4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
49 # pylint has a bug here, doesn't see this import
50 import ganeti.http.client # pylint: disable-msg=W0611
53 # Timeout for connecting to nodes (seconds)
54 _RPC_CONNECT_TIMEOUT = 5
56 _RPC_CLIENT_HEADERS = [
57 "Content-type: %s" % http.HTTP_APP_JSON,
61 # Various time constants for the timeout table
62 _TMO_URGENT = 60 # one minute
63 _TMO_FAST = 5 * 60 # five minutes
64 _TMO_NORMAL = 15 * 60 # 15 minutes
65 _TMO_SLOW = 3600 # one hour
69 # Timeout table that will be built later by decorators
70 # Guidelines for choosing timeouts:
71 # - call used during watcher: timeout -> 1min, _TMO_URGENT
72 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
73 # - other calls: 15 min, _TMO_NORMAL
74 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
81 """Initializes the module-global HTTP client manager.
83 Must be called before using any RPC function and while exactly one thread is
87 # curl_global_init(3) and curl_global_cleanup(3) must be called with only
88 # one thread running. This check is just a safety measure -- it doesn't
90 assert threading.activeCount() == 1, \
91 "Found more than one active thread when initializing pycURL"
93 logging.info("Using PycURL %s", pycurl.version)
95 pycurl.global_init(pycurl.GLOBAL_ALL)
99 """Stops the module-global HTTP client manager.
101 Must be called before quitting the program and while exactly one thread is
105 pycurl.global_cleanup()
108 def _ConfigRpcCurl(curl):
109 noded_cert = str(constants.NODED_CERT_FILE)
111 curl.setopt(pycurl.FOLLOWLOCATION, False)
112 curl.setopt(pycurl.CAINFO, noded_cert)
113 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
114 curl.setopt(pycurl.SSL_VERIFYPEER, True)
115 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
116 curl.setopt(pycurl.SSLCERT, noded_cert)
117 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
118 curl.setopt(pycurl.SSLKEY, noded_cert)
119 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
122 class _RpcThreadLocal(threading.local):
123 def GetHttpClientPool(self):
124 """Returns a per-thread HTTP client pool.
126 @rtype: L{http.client.HttpClientPool}
131 except AttributeError:
132 pool = http.client.HttpClientPool(_ConfigRpcCurl)
138 _thread_local = _RpcThreadLocal()
141 def _RpcTimeout(secs):
142 """Timeout decorator.
144 When applied to a rpc call_* function, it updates the global timeout
145 table with the given function/timeout.
150 assert name.startswith("call_")
151 _TIMEOUTS[name[len("call_"):]] = secs
157 """RPC-wrapper decorator.
159 When applied to a function, it runs it with the RPC system
160 initialized, and it shutsdown the system afterwards. This means the
161 function must be called without RPC being initialized.
164 def wrapper(*args, **kwargs):
167 return fn(*args, **kwargs)
173 class RpcResult(object):
176 This class holds an RPC result. It is needed since in multi-node
177 calls we can't raise an exception just because one one out of many
178 failed, and therefore we use this class to encapsulate the result.
180 @ivar data: the data payload, for successful results, or None
181 @ivar call: the name of the RPC call
182 @ivar node: the name of the node to which we made the call
183 @ivar offline: whether the operation failed because the node was
184 offline, as opposed to actual failure; offline=True will always
185 imply failed=True, in order to allow simpler checking if
186 the user doesn't care about the exact failure mode
187 @ivar fail_msg: the error message if the call failed
190 def __init__(self, data=None, failed=False, offline=False,
191 call=None, node=None):
192 self.offline = offline
197 self.fail_msg = "Node is marked offline"
198 self.data = self.payload = None
200 self.fail_msg = self._EnsureErr(data)
201 self.data = self.payload = None
204 if not isinstance(self.data, (tuple, list)):
205 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
209 self.fail_msg = ("RPC layer error: invalid result length (%d), "
210 "expected 2" % len(self.data))
212 elif not self.data[0]:
213 self.fail_msg = self._EnsureErr(self.data[1])
218 self.payload = data[1]
220 assert hasattr(self, "call")
221 assert hasattr(self, "data")
222 assert hasattr(self, "fail_msg")
223 assert hasattr(self, "node")
224 assert hasattr(self, "offline")
225 assert hasattr(self, "payload")
229 """Helper to ensure we return a 'True' value for error."""
233 return "No error information"
235 def Raise(self, msg, prereq=False, ecode=None):
236 """If the result has failed, raise an OpExecError.
238 This is used so that LU code doesn't have to check for each
239 result, but instead can call this function.
242 if not self.fail_msg:
245 if not msg: # one could pass None for default message
246 msg = ("Call '%s' to node '%s' has failed: %s" %
247 (self.call, self.node, self.fail_msg))
249 msg = "%s: %s" % (msg, self.fail_msg)
251 ec = errors.OpPrereqError
253 ec = errors.OpExecError
254 if ecode is not None:
258 raise ec(*args) # pylint: disable-msg=W0142
261 def _AddressLookup(node_list,
262 ssc=ssconf.SimpleStore,
263 nslookup_fn=netutils.Hostname.GetIP):
264 """Return addresses for given node names.
266 @type node_list: list
267 @param node_list: List of node names
269 @param ssc: SimpleStore class that is used to obtain node->ip mappings
270 @type nslookup_fn: callable
271 @param nslookup_fn: function use to do NS lookup
272 @rtype: list of addresses and/or None's
273 @returns: List of corresponding addresses, if found
277 iplist = ss.GetNodePrimaryIPList()
278 family = ss.GetPrimaryIPFamily()
280 ipmap = dict(entry.split() for entry in iplist)
281 for node in node_list:
282 address = ipmap.get(node)
284 address = nslookup_fn(node, family=family)
285 addresses.append(address)
293 This class, given a (remote) method name, a list of parameters and a
294 list of nodes, will contact (in parallel) all nodes, and return a
295 dict of results (key: node name, value: result).
297 One current bug is that generic failure is still signaled by
298 'False' result, which is not good. This overloading of values can
302 def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
303 assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
305 self.procedure = procedure
309 self._address_lookup_fn = address_lookup_fn
311 def ConnectList(self, node_list, address_list=None, read_timeout=None):
312 """Add a list of nodes to the target nodes.
314 @type node_list: list
315 @param node_list: the list of node names to connect
316 @type address_list: list or None
317 @keyword address_list: either None or a list with node addresses,
318 which must have the same length as the node list
319 @type read_timeout: int
320 @param read_timeout: overwrites default timeout for operation
323 if address_list is None:
324 # Always use IP address instead of node name
325 address_list = self._address_lookup_fn(node_list)
327 assert len(node_list) == len(address_list), \
328 "Name and address lists must have the same length"
330 for node, address in zip(node_list, address_list):
331 self.ConnectNode(node, address, read_timeout=read_timeout)
333 def ConnectNode(self, name, address=None, read_timeout=None):
334 """Add a node to the target list.
337 @param name: the node name
339 @param address: the node address, if known
340 @type read_timeout: int
341 @param read_timeout: overwrites default timeout for operation
345 # Always use IP address instead of node name
346 address = self._address_lookup_fn([name])[0]
348 assert(address is not None)
350 if read_timeout is None:
351 read_timeout = _TIMEOUTS[self.procedure]
353 self._request[name] = \
354 http.client.HttpClientRequest(str(address), self.port,
355 http.HTTP_PUT, str("/%s" % self.procedure),
356 headers=_RPC_CLIENT_HEADERS,
357 post_data=str(self.body),
358 read_timeout=read_timeout)
360 def GetResults(self, http_pool=None):
361 """Call nodes and return results.
364 @return: List of RPC results
368 http_pool = _thread_local.GetHttpClientPool()
370 http_pool.ProcessRequests(self._request.values())
374 for name, req in self._request.iteritems():
375 if req.success and req.resp_status_code == http.HTTP_OK:
376 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
377 node=name, call=self.procedure)
380 # TODO: Better error reporting
386 logging.error("RPC error in %s from node %s: %s",
387 self.procedure, name, msg)
388 results[name] = RpcResult(data=msg, failed=True, node=name,
394 def _EncodeImportExportIO(ieio, ieioargs):
395 """Encodes import/export I/O information.
398 if ieio == constants.IEIO_RAW_DISK:
399 assert len(ieioargs) == 1
400 return (ieioargs[0].ToDict(), )
402 if ieio == constants.IEIO_SCRIPT:
403 assert len(ieioargs) == 2
404 return (ieioargs[0].ToDict(), ieioargs[1])
409 class RpcRunner(object):
410 """RPC runner class"""
412 def __init__(self, cfg):
413 """Initialized the rpc runner.
415 @type cfg: C{config.ConfigWriter}
416 @param cfg: the configuration object that will be used to get data
421 self.port = netutils.GetDaemonPort(constants.NODED)
423 def _InstDict(self, instance, hvp=None, bep=None, osp=None):
424 """Convert the given instance to a dict.
426 This is done via the instance's ToDict() method and additionally
427 we fill the hvparams with the cluster defaults.
429 @type instance: L{objects.Instance}
430 @param instance: an Instance object
431 @type hvp: dict or None
432 @param hvp: a dictionary with overridden hypervisor parameters
433 @type bep: dict or None
434 @param bep: a dictionary with overridden backend parameters
435 @type osp: dict or None
436 @param osp: a dictionary with overriden os parameters
438 @return: the instance dict, with the hvparams filled with the
442 idict = instance.ToDict()
443 cluster = self._cfg.GetClusterInfo()
444 idict["hvparams"] = cluster.FillHV(instance)
446 idict["hvparams"].update(hvp)
447 idict["beparams"] = cluster.FillBE(instance)
449 idict["beparams"].update(bep)
450 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
452 idict["osparams"].update(osp)
453 for nic in idict["nics"]:
454 nic['nicparams'] = objects.FillDict(
455 cluster.nicparams[constants.PP_DEFAULT],
459 def _ConnectList(self, client, node_list, call, read_timeout=None):
460 """Helper for computing node addresses.
462 @type client: L{ganeti.rpc.Client}
463 @param client: a C{Client} instance
464 @type node_list: list
465 @param node_list: the node list we should connect
467 @param call: the name of the remote procedure call, for filling in
468 correctly any eventual offline nodes' results
469 @type read_timeout: int
470 @param read_timeout: overwrites the default read timeout for the
474 all_nodes = self._cfg.GetAllNodesInfo()
478 for node in node_list:
479 if node in all_nodes:
480 if all_nodes[node].offline:
481 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
483 val = all_nodes[node].primary_ip
486 addr_list.append(val)
487 name_list.append(node)
489 client.ConnectList(name_list, address_list=addr_list,
490 read_timeout=read_timeout)
493 def _ConnectNode(self, client, node, call, read_timeout=None):
494 """Helper for computing one node's address.
496 @type client: L{ganeti.rpc.Client}
497 @param client: a C{Client} instance
499 @param node: the node we should connect
501 @param call: the name of the remote procedure call, for filling in
502 correctly any eventual offline nodes' results
503 @type read_timeout: int
504 @param read_timeout: overwrites the default read timeout for the
508 node_info = self._cfg.GetNodeInfo(node)
509 if node_info is not None:
510 if node_info.offline:
511 return RpcResult(node=node, offline=True, call=call)
512 addr = node_info.primary_ip
515 client.ConnectNode(node, address=addr, read_timeout=read_timeout)
517 def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
518 """Helper for making a multi-node call
521 body = serializer.DumpJson(args, indent=False)
522 c = Client(procedure, body, self.port)
523 skip_dict = self._ConnectList(c, node_list, procedure,
524 read_timeout=read_timeout)
525 skip_dict.update(c.GetResults())
529 def _StaticMultiNodeCall(cls, node_list, procedure, args,
530 address_list=None, read_timeout=None):
531 """Helper for making a multi-node static call
534 body = serializer.DumpJson(args, indent=False)
535 c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
536 c.ConnectList(node_list, address_list=address_list,
537 read_timeout=read_timeout)
538 return c.GetResults()
540 def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
541 """Helper for making a single-node call
544 body = serializer.DumpJson(args, indent=False)
545 c = Client(procedure, body, self.port)
546 result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
548 # we did connect, node is not offline
549 result = c.GetResults()[node]
553 def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
554 """Helper for making a single-node static call
557 body = serializer.DumpJson(args, indent=False)
558 c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
559 c.ConnectNode(node, read_timeout=read_timeout)
560 return c.GetResults()[node]
564 """Compresses a string for transport over RPC.
566 Small amounts of data are not compressed.
571 @return: Encoded data to send
574 # Small amounts of data are not compressed
576 return (constants.RPC_ENCODING_NONE, data)
578 # Compress with zlib and encode in base64
579 return (constants.RPC_ENCODING_ZLIB_BASE64,
580 base64.b64encode(zlib.compress(data, 3)))
586 @_RpcTimeout(_TMO_URGENT)
587 def call_lv_list(self, node_list, vg_name):
588 """Gets the logical volumes present in a given volume group.
590 This is a multi-node call.
593 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
595 @_RpcTimeout(_TMO_URGENT)
596 def call_vg_list(self, node_list):
597 """Gets the volume group list.
599 This is a multi-node call.
602 return self._MultiNodeCall(node_list, "vg_list", [])
604 @_RpcTimeout(_TMO_NORMAL)
605 def call_storage_list(self, node_list, su_name, su_args, name, fields):
606 """Get list of storage units.
608 This is a multi-node call.
611 return self._MultiNodeCall(node_list, "storage_list",
612 [su_name, su_args, name, fields])
614 @_RpcTimeout(_TMO_NORMAL)
615 def call_storage_modify(self, node, su_name, su_args, name, changes):
616 """Modify a storage unit.
618 This is a single-node call.
621 return self._SingleNodeCall(node, "storage_modify",
622 [su_name, su_args, name, changes])
624 @_RpcTimeout(_TMO_NORMAL)
625 def call_storage_execute(self, node, su_name, su_args, name, op):
626 """Executes an operation on a storage unit.
628 This is a single-node call.
631 return self._SingleNodeCall(node, "storage_execute",
632 [su_name, su_args, name, op])
634 @_RpcTimeout(_TMO_URGENT)
635 def call_bridges_exist(self, node, bridges_list):
636 """Checks if a node has all the bridges given.
638 This method checks if all bridges given in the bridges_list are
639 present on the remote node, so that an instance that uses interfaces
640 on those bridges can be started.
642 This is a single-node call.
645 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
647 @_RpcTimeout(_TMO_NORMAL)
648 def call_instance_start(self, node, instance, hvp, bep):
649 """Starts an instance.
651 This is a single-node call.
654 idict = self._InstDict(instance, hvp=hvp, bep=bep)
655 return self._SingleNodeCall(node, "instance_start", [idict])
657 @_RpcTimeout(_TMO_NORMAL)
658 def call_instance_shutdown(self, node, instance, timeout):
659 """Stops an instance.
661 This is a single-node call.
664 return self._SingleNodeCall(node, "instance_shutdown",
665 [self._InstDict(instance), timeout])
667 @_RpcTimeout(_TMO_NORMAL)
668 def call_migration_info(self, node, instance):
669 """Gather the information necessary to prepare an instance migration.
671 This is a single-node call.
674 @param node: the node on which the instance is currently running
675 @type instance: C{objects.Instance}
676 @param instance: the instance definition
679 return self._SingleNodeCall(node, "migration_info",
680 [self._InstDict(instance)])
682 @_RpcTimeout(_TMO_NORMAL)
683 def call_accept_instance(self, node, instance, info, target):
684 """Prepare a node to accept an instance.
686 This is a single-node call.
689 @param node: the target node for the migration
690 @type instance: C{objects.Instance}
691 @param instance: the instance definition
692 @type info: opaque/hypervisor specific (string/data)
693 @param info: result for the call_migration_info call
695 @param target: target hostname (usually ip address) (on the node itself)
698 return self._SingleNodeCall(node, "accept_instance",
699 [self._InstDict(instance), info, target])
701 @_RpcTimeout(_TMO_NORMAL)
702 def call_finalize_migration(self, node, instance, info, success):
703 """Finalize any target-node migration specific operation.
705 This is called both in case of a successful migration and in case of error
706 (in which case it should abort the migration).
708 This is a single-node call.
711 @param node: the target node for the migration
712 @type instance: C{objects.Instance}
713 @param instance: the instance definition
714 @type info: opaque/hypervisor specific (string/data)
715 @param info: result for the call_migration_info call
716 @type success: boolean
717 @param success: whether the migration was a success or a failure
720 return self._SingleNodeCall(node, "finalize_migration",
721 [self._InstDict(instance), info, success])
723 @_RpcTimeout(_TMO_SLOW)
724 def call_instance_migrate(self, node, instance, target, live):
725 """Migrate an instance.
727 This is a single-node call.
730 @param node: the node on which the instance is currently running
731 @type instance: C{objects.Instance}
732 @param instance: the instance definition
734 @param target: the target node name
736 @param live: whether the migration should be done live or not (the
737 interpretation of this parameter is left to the hypervisor)
740 return self._SingleNodeCall(node, "instance_migrate",
741 [self._InstDict(instance), target, live])
743 @_RpcTimeout(_TMO_NORMAL)
744 def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
745 """Reboots an instance.
747 This is a single-node call.
750 return self._SingleNodeCall(node, "instance_reboot",
751 [self._InstDict(inst), reboot_type,
754 @_RpcTimeout(_TMO_1DAY)
755 def call_instance_os_add(self, node, inst, reinstall, debug):
756 """Installs an OS on the given instance.
758 This is a single-node call.
761 return self._SingleNodeCall(node, "instance_os_add",
762 [self._InstDict(inst), reinstall, debug])
764 @_RpcTimeout(_TMO_SLOW)
765 def call_instance_run_rename(self, node, inst, old_name, debug):
766 """Run the OS rename script for an instance.
768 This is a single-node call.
771 return self._SingleNodeCall(node, "instance_run_rename",
772 [self._InstDict(inst), old_name, debug])
774 @_RpcTimeout(_TMO_URGENT)
775 def call_instance_info(self, node, instance, hname):
776 """Returns information about a single instance.
778 This is a single-node call.
781 @param node: the list of nodes to query
782 @type instance: string
783 @param instance: the instance name
785 @param hname: the hypervisor type of the instance
788 return self._SingleNodeCall(node, "instance_info", [instance, hname])
790 @_RpcTimeout(_TMO_NORMAL)
791 def call_instance_migratable(self, node, instance):
792 """Checks whether the given instance can be migrated.
794 This is a single-node call.
796 @param node: the node to query
797 @type instance: L{objects.Instance}
798 @param instance: the instance to check
802 return self._SingleNodeCall(node, "instance_migratable",
803 [self._InstDict(instance)])
805 @_RpcTimeout(_TMO_URGENT)
806 def call_all_instances_info(self, node_list, hypervisor_list):
807 """Returns information about all instances on the given nodes.
809 This is a multi-node call.
811 @type node_list: list
812 @param node_list: the list of nodes to query
813 @type hypervisor_list: list
814 @param hypervisor_list: the hypervisors to query for instances
817 return self._MultiNodeCall(node_list, "all_instances_info",
820 @_RpcTimeout(_TMO_URGENT)
821 def call_instance_list(self, node_list, hypervisor_list):
822 """Returns the list of running instances on a given node.
824 This is a multi-node call.
826 @type node_list: list
827 @param node_list: the list of nodes to query
828 @type hypervisor_list: list
829 @param hypervisor_list: the hypervisors to query for instances
832 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
834 @_RpcTimeout(_TMO_FAST)
835 def call_node_tcp_ping(self, node, source, target, port, timeout,
837 """Do a TcpPing on the remote node
839 This is a single-node call.
842 return self._SingleNodeCall(node, "node_tcp_ping",
843 [source, target, port, timeout,
846 @_RpcTimeout(_TMO_FAST)
847 def call_node_has_ip_address(self, node, address):
848 """Checks if a node has the given IP address.
850 This is a single-node call.
853 return self._SingleNodeCall(node, "node_has_ip_address", [address])
855 @_RpcTimeout(_TMO_URGENT)
856 def call_node_info(self, node_list, vg_name, hypervisor_type):
857 """Return node information.
859 This will return memory information and volume group size and free
862 This is a multi-node call.
864 @type node_list: list
865 @param node_list: the list of nodes to query
866 @type vg_name: C{string}
867 @param vg_name: the name of the volume group to ask for disk space
869 @type hypervisor_type: C{str}
870 @param hypervisor_type: the name of the hypervisor to ask for
874 return self._MultiNodeCall(node_list, "node_info",
875 [vg_name, hypervisor_type])
877 @_RpcTimeout(_TMO_NORMAL)
878 def call_etc_hosts_modify(self, node, mode, name, ip):
879 """Modify hosts file with name
882 @param node: The node to call
884 @param mode: The mode to operate. Currently "add" or "remove"
886 @param name: The host name to be modified
888 @param ip: The ip of the entry (just valid if mode is "add")
891 return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
893 @_RpcTimeout(_TMO_NORMAL)
894 def call_node_verify(self, node_list, checkdict, cluster_name):
895 """Request verification of given parameters.
897 This is a multi-node call.
900 return self._MultiNodeCall(node_list, "node_verify",
901 [checkdict, cluster_name])
904 @_RpcTimeout(_TMO_FAST)
905 def call_node_start_master(cls, node, start_daemons, no_voting):
906 """Tells a node to activate itself as a master.
908 This is a single-node call.
911 return cls._StaticSingleNodeCall(node, "node_start_master",
912 [start_daemons, no_voting])
915 @_RpcTimeout(_TMO_FAST)
916 def call_node_stop_master(cls, node, stop_daemons):
917 """Tells a node to demote itself from master status.
919 This is a single-node call.
922 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
925 @_RpcTimeout(_TMO_URGENT)
926 def call_master_info(cls, node_list):
927 """Query master info.
929 This is a multi-node call.
932 # TODO: should this method query down nodes?
933 return cls._StaticMultiNodeCall(node_list, "master_info", [])
936 @_RpcTimeout(_TMO_URGENT)
937 def call_version(cls, node_list):
938 """Query node version.
940 This is a multi-node call.
943 return cls._StaticMultiNodeCall(node_list, "version", [])
945 @_RpcTimeout(_TMO_NORMAL)
946 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
947 """Request creation of a given block device.
949 This is a single-node call.
952 return self._SingleNodeCall(node, "blockdev_create",
953 [bdev.ToDict(), size, owner, on_primary, info])
955 @_RpcTimeout(_TMO_NORMAL)
956 def call_blockdev_remove(self, node, bdev):
957 """Request removal of a given block device.
959 This is a single-node call.
962 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
964 @_RpcTimeout(_TMO_NORMAL)
965 def call_blockdev_rename(self, node, devlist):
966 """Request rename of the given block devices.
968 This is a single-node call.
971 return self._SingleNodeCall(node, "blockdev_rename",
972 [(d.ToDict(), uid) for d, uid in devlist])
974 @_RpcTimeout(_TMO_NORMAL)
975 def call_blockdev_assemble(self, node, disk, owner, on_primary):
976 """Request assembling of a given block device.
978 This is a single-node call.
981 return self._SingleNodeCall(node, "blockdev_assemble",
982 [disk.ToDict(), owner, on_primary])
984 @_RpcTimeout(_TMO_NORMAL)
985 def call_blockdev_shutdown(self, node, disk):
986 """Request shutdown of a given block device.
988 This is a single-node call.
991 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
993 @_RpcTimeout(_TMO_NORMAL)
994 def call_blockdev_addchildren(self, node, bdev, ndevs):
995 """Request adding a list of children to a (mirroring) device.
997 This is a single-node call.
1000 return self._SingleNodeCall(node, "blockdev_addchildren",
1002 [disk.ToDict() for disk in ndevs]])
1004 @_RpcTimeout(_TMO_NORMAL)
1005 def call_blockdev_removechildren(self, node, bdev, ndevs):
1006 """Request removing a list of children from a (mirroring) device.
1008 This is a single-node call.
1011 return self._SingleNodeCall(node, "blockdev_removechildren",
1013 [disk.ToDict() for disk in ndevs]])
1015 @_RpcTimeout(_TMO_NORMAL)
1016 def call_blockdev_getmirrorstatus(self, node, disks):
1017 """Request status of a (mirroring) device.
1019 This is a single-node call.
1022 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1023 [dsk.ToDict() for dsk in disks])
1024 if not result.fail_msg:
1025 result.payload = [objects.BlockDevStatus.FromDict(i)
1026 for i in result.payload]
1029 @_RpcTimeout(_TMO_NORMAL)
1030 def call_blockdev_find(self, node, disk):
1031 """Request identification of a given block device.
1033 This is a single-node call.
1036 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1037 if not result.fail_msg and result.payload is not None:
1038 result.payload = objects.BlockDevStatus.FromDict(result.payload)
1041 @_RpcTimeout(_TMO_NORMAL)
1042 def call_blockdev_close(self, node, instance_name, disks):
1043 """Closes the given block devices.
1045 This is a single-node call.
1048 params = [instance_name, [cf.ToDict() for cf in disks]]
1049 return self._SingleNodeCall(node, "blockdev_close", params)
1051 @_RpcTimeout(_TMO_NORMAL)
1052 def call_blockdev_getsizes(self, node, disks):
1053 """Returns the size of the given disks.
1055 This is a single-node call.
1058 params = [[cf.ToDict() for cf in disks]]
1059 return self._SingleNodeCall(node, "blockdev_getsize", params)
1061 @_RpcTimeout(_TMO_NORMAL)
1062 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1063 """Disconnects the network of the given drbd devices.
1065 This is a multi-node call.
1068 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1069 [nodes_ip, [cf.ToDict() for cf in disks]])
1071 @_RpcTimeout(_TMO_NORMAL)
1072 def call_drbd_attach_net(self, node_list, nodes_ip,
1073 disks, instance_name, multimaster):
1074 """Disconnects the given drbd devices.
1076 This is a multi-node call.
1079 return self._MultiNodeCall(node_list, "drbd_attach_net",
1080 [nodes_ip, [cf.ToDict() for cf in disks],
1081 instance_name, multimaster])
1083 @_RpcTimeout(_TMO_SLOW)
1084 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1085 """Waits for the synchronization of drbd devices is complete.
1087 This is a multi-node call.
1090 return self._MultiNodeCall(node_list, "drbd_wait_sync",
1091 [nodes_ip, [cf.ToDict() for cf in disks]])
1093 @_RpcTimeout(_TMO_URGENT)
1094 def call_drbd_helper(self, node_list):
1095 """Gets drbd helper.
1097 This is a multi-node call.
1100 return self._MultiNodeCall(node_list, "drbd_helper", [])
1103 @_RpcTimeout(_TMO_NORMAL)
1104 def call_upload_file(cls, node_list, file_name, address_list=None):
1107 The node will refuse the operation in case the file is not on the
1110 This is a multi-node call.
1112 @type node_list: list
1113 @param node_list: the list of node names to upload to
1114 @type file_name: str
1115 @param file_name: the filename to upload
1116 @type address_list: list or None
1117 @keyword address_list: an optional list of node addresses, in order
1118 to optimize the RPC speed
1121 file_contents = utils.ReadFile(file_name)
1122 data = cls._Compress(file_contents)
1123 st = os.stat(file_name)
1124 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1125 st.st_atime, st.st_mtime]
1126 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1127 address_list=address_list)
1130 @_RpcTimeout(_TMO_NORMAL)
1131 def call_write_ssconf_files(cls, node_list, values):
1132 """Write ssconf files.
1134 This is a multi-node call.
1137 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1139 @_RpcTimeout(_TMO_FAST)
1140 def call_os_diagnose(self, node_list):
1141 """Request a diagnose of OS definitions.
1143 This is a multi-node call.
1146 return self._MultiNodeCall(node_list, "os_diagnose", [])
1148 @_RpcTimeout(_TMO_FAST)
1149 def call_os_get(self, node, name):
1150 """Returns an OS definition.
1152 This is a single-node call.
1155 result = self._SingleNodeCall(node, "os_get", [name])
1156 if not result.fail_msg and isinstance(result.payload, dict):
1157 result.payload = objects.OS.FromDict(result.payload)
1160 @_RpcTimeout(_TMO_FAST)
1161 def call_os_validate(self, required, nodes, name, checks, params):
1162 """Run a validation routine for a given OS.
1164 This is a multi-node call.
1167 return self._MultiNodeCall(nodes, "os_validate",
1168 [required, name, checks, params])
1170 @_RpcTimeout(_TMO_NORMAL)
1171 def call_hooks_runner(self, node_list, hpath, phase, env):
1172 """Call the hooks runner.
1175 - op: the OpCode instance
1176 - env: a dictionary with the environment
1178 This is a multi-node call.
1181 params = [hpath, phase, env]
1182 return self._MultiNodeCall(node_list, "hooks_runner", params)
1184 @_RpcTimeout(_TMO_NORMAL)
1185 def call_iallocator_runner(self, node, name, idata):
1186 """Call an iallocator on a remote node
1189 - name: the iallocator name
1190 - input: the json-encoded input string
1192 This is a single-node call.
1195 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1197 @_RpcTimeout(_TMO_NORMAL)
1198 def call_blockdev_grow(self, node, cf_bdev, amount):
1199 """Request a snapshot of the given block device.
1201 This is a single-node call.
1204 return self._SingleNodeCall(node, "blockdev_grow",
1205 [cf_bdev.ToDict(), amount])
1207 @_RpcTimeout(_TMO_1DAY)
1208 def call_blockdev_export(self, node, cf_bdev,
1209 dest_node, dest_path, cluster_name):
1210 """Export a given disk to another node.
1212 This is a single-node call.
1215 return self._SingleNodeCall(node, "blockdev_export",
1216 [cf_bdev.ToDict(), dest_node, dest_path,
1219 @_RpcTimeout(_TMO_NORMAL)
1220 def call_blockdev_snapshot(self, node, cf_bdev):
1221 """Request a snapshot of the given block device.
1223 This is a single-node call.
1226 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1228 @_RpcTimeout(_TMO_NORMAL)
1229 def call_finalize_export(self, node, instance, snap_disks):
1230 """Request the completion of an export operation.
1232 This writes the export config file, etc.
1234 This is a single-node call.
1238 for disk in snap_disks:
1239 if isinstance(disk, bool):
1240 flat_disks.append(disk)
1242 flat_disks.append(disk.ToDict())
1244 return self._SingleNodeCall(node, "finalize_export",
1245 [self._InstDict(instance), flat_disks])
1247 @_RpcTimeout(_TMO_FAST)
1248 def call_export_info(self, node, path):
1249 """Queries the export information in a given path.
1251 This is a single-node call.
1254 return self._SingleNodeCall(node, "export_info", [path])
1256 @_RpcTimeout(_TMO_FAST)
1257 def call_export_list(self, node_list):
1258 """Gets the stored exports list.
1260 This is a multi-node call.
1263 return self._MultiNodeCall(node_list, "export_list", [])
1265 @_RpcTimeout(_TMO_FAST)
1266 def call_export_remove(self, node, export):
1267 """Requests removal of a given export.
1269 This is a single-node call.
1272 return self._SingleNodeCall(node, "export_remove", [export])
1275 @_RpcTimeout(_TMO_NORMAL)
1276 def call_node_leave_cluster(cls, node, modify_ssh_setup):
1277 """Requests a node to clean the cluster information it has.
1279 This will remove the configuration information from the ganeti data
1282 This is a single-node call.
1285 return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1288 @_RpcTimeout(_TMO_FAST)
1289 def call_node_volumes(self, node_list):
1290 """Gets all volumes on node(s).
1292 This is a multi-node call.
1295 return self._MultiNodeCall(node_list, "node_volumes", [])
1297 @_RpcTimeout(_TMO_FAST)
1298 def call_node_demote_from_mc(self, node):
1299 """Demote a node from the master candidate role.
1301 This is a single-node call.
1304 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1306 @_RpcTimeout(_TMO_NORMAL)
1307 def call_node_powercycle(self, node, hypervisor):
1308 """Tries to powercycle a node.
1310 This is a single-node call.
1313 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1316 def call_test_delay(self, node_list, duration):
1317 """Sleep for a fixed time on given node(s).
1319 This is a multi-node call.
1322 return self._MultiNodeCall(node_list, "test_delay", [duration],
1323 read_timeout=int(duration + 5))
1325 @_RpcTimeout(_TMO_FAST)
1326 def call_file_storage_dir_create(self, node, file_storage_dir):
1327 """Create the given file storage directory.
1329 This is a single-node call.
1332 return self._SingleNodeCall(node, "file_storage_dir_create",
1335 @_RpcTimeout(_TMO_FAST)
1336 def call_file_storage_dir_remove(self, node, file_storage_dir):
1337 """Remove the given file storage directory.
1339 This is a single-node call.
1342 return self._SingleNodeCall(node, "file_storage_dir_remove",
1345 @_RpcTimeout(_TMO_FAST)
1346 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1347 new_file_storage_dir):
1348 """Rename file storage directory.
1350 This is a single-node call.
1353 return self._SingleNodeCall(node, "file_storage_dir_rename",
1354 [old_file_storage_dir, new_file_storage_dir])
1357 @_RpcTimeout(_TMO_FAST)
1358 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1359 """Update job queue.
1361 This is a multi-node call.
1364 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1365 [file_name, cls._Compress(content)],
1366 address_list=address_list)
1369 @_RpcTimeout(_TMO_NORMAL)
1370 def call_jobqueue_purge(cls, node):
1373 This is a single-node call.
1376 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1379 @_RpcTimeout(_TMO_FAST)
1380 def call_jobqueue_rename(cls, node_list, address_list, rename):
1381 """Rename a job queue file.
1383 This is a multi-node call.
1386 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1387 address_list=address_list)
1389 @_RpcTimeout(_TMO_NORMAL)
1390 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1391 """Validate the hypervisor params.
1393 This is a multi-node call.
1395 @type node_list: list
1396 @param node_list: the list of nodes to query
1397 @type hvname: string
1398 @param hvname: the hypervisor name
1399 @type hvparams: dict
1400 @param hvparams: the hypervisor parameters to be validated
1403 cluster = self._cfg.GetClusterInfo()
1404 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1405 return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1408 @_RpcTimeout(_TMO_NORMAL)
1409 def call_x509_cert_create(self, node, validity):
1410 """Creates a new X509 certificate for SSL/TLS.
1412 This is a single-node call.
1415 @param validity: Validity in seconds
1418 return self._SingleNodeCall(node, "x509_cert_create", [validity])
1420 @_RpcTimeout(_TMO_NORMAL)
1421 def call_x509_cert_remove(self, node, name):
1422 """Removes a X509 certificate.
1424 This is a single-node call.
1427 @param name: Certificate name
1430 return self._SingleNodeCall(node, "x509_cert_remove", [name])
1432 @_RpcTimeout(_TMO_NORMAL)
1433 def call_import_start(self, node, opts, instance, dest, dest_args):
1434 """Starts a listener for an import.
1436 This is a single-node call.
1439 @param node: Node name
1440 @type instance: C{objects.Instance}
1441 @param instance: Instance object
1444 return self._SingleNodeCall(node, "import_start",
1446 self._InstDict(instance), dest,
1447 _EncodeImportExportIO(dest, dest_args)])
1449 @_RpcTimeout(_TMO_NORMAL)
1450 def call_export_start(self, node, opts, host, port,
1451 instance, source, source_args):
1452 """Starts an export daemon.
1454 This is a single-node call.
1457 @param node: Node name
1458 @type instance: C{objects.Instance}
1459 @param instance: Instance object
1462 return self._SingleNodeCall(node, "export_start",
1463 [opts.ToDict(), host, port,
1464 self._InstDict(instance), source,
1465 _EncodeImportExportIO(source, source_args)])
1467 @_RpcTimeout(_TMO_FAST)
1468 def call_impexp_status(self, node, names):
1469 """Gets the status of an import or export.
1471 This is a single-node call.
1474 @param node: Node name
1475 @type names: List of strings
1476 @param names: Import/export names
1477 @rtype: List of L{objects.ImportExportStatus} instances
1478 @return: Returns a list of the state of each named import/export or None if
1479 a status couldn't be retrieved
1482 result = self._SingleNodeCall(node, "impexp_status", [names])
1484 if not result.fail_msg:
1487 for i in result.payload:
1489 decoded.append(None)
1491 decoded.append(objects.ImportExportStatus.FromDict(i))
1493 result.payload = decoded
1497 @_RpcTimeout(_TMO_NORMAL)
1498 def call_impexp_abort(self, node, name):
1499 """Aborts an import or export.
1501 This is a single-node call.
1504 @param node: Node name
1506 @param name: Import/export name
1509 return self._SingleNodeCall(node, "impexp_abort", [name])
1511 @_RpcTimeout(_TMO_NORMAL)
1512 def call_impexp_cleanup(self, node, name):
1513 """Cleans up after an import or export.
1515 This is a single-node call.
1518 @param node: Node name
1520 @param name: Import/export name
1523 return self._SingleNodeCall(node, "impexp_cleanup", [name])