4 # Copyright (C) 2006, 2007, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
49 # pylint has a bug here, doesn't see this import
50 import ganeti.http.client # pylint: disable-msg=W0611
53 # Timeout for connecting to nodes (seconds)
54 _RPC_CONNECT_TIMEOUT = 5
56 _RPC_CLIENT_HEADERS = [
57 "Content-type: %s" % http.HTTP_APP_JSON,
60 # Various time constants for the timeout table
61 _TMO_URGENT = 60 # one minute
62 _TMO_FAST = 5 * 60 # five minutes
63 _TMO_NORMAL = 15 * 60 # 15 minutes
64 _TMO_SLOW = 3600 # one hour
68 # Timeout table that will be built later by decorators
69 # Guidelines for choosing timeouts:
70 # - call used during watcher: timeout -> 1min, _TMO_URGENT
71 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
72 # - other calls: 15 min, _TMO_NORMAL
73 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
80 """Initializes the module-global HTTP client manager.
82 Must be called before using any RPC function and while exactly one thread is
86 # curl_global_init(3) and curl_global_cleanup(3) must be called with only
87 # one thread running. This check is just a safety measure -- it doesn't
89 assert threading.activeCount() == 1, \
90 "Found more than one active thread when initializing pycURL"
92 logging.info("Using PycURL %s", pycurl.version)
94 pycurl.global_init(pycurl.GLOBAL_ALL)
98 """Stops the module-global HTTP client manager.
100 Must be called before quitting the program and while exactly one thread is
104 pycurl.global_cleanup()
107 def _ConfigRpcCurl(curl):
108 noded_cert = str(constants.NODED_CERT_FILE)
110 curl.setopt(pycurl.FOLLOWLOCATION, False)
111 curl.setopt(pycurl.CAINFO, noded_cert)
112 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
113 curl.setopt(pycurl.SSL_VERIFYPEER, True)
114 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
115 curl.setopt(pycurl.SSLCERT, noded_cert)
116 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
117 curl.setopt(pycurl.SSLKEY, noded_cert)
118 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
121 class _RpcThreadLocal(threading.local):
122 def GetHttpClientPool(self):
123 """Returns a per-thread HTTP client pool.
125 @rtype: L{http.client.HttpClientPool}
130 except AttributeError:
131 pool = http.client.HttpClientPool(_ConfigRpcCurl)
137 _thread_local = _RpcThreadLocal()
140 def _RpcTimeout(secs):
141 """Timeout decorator.
143 When applied to a rpc call_* function, it updates the global timeout
144 table with the given function/timeout.
149 assert name.startswith("call_")
150 _TIMEOUTS[name[len("call_"):]] = secs
156 """RPC-wrapper decorator.
158 When applied to a function, it runs it with the RPC system
159 initialized, and it shutsdown the system afterwards. This means the
160 function must be called without RPC being initialized.
163 def wrapper(*args, **kwargs):
166 return fn(*args, **kwargs)
172 class RpcResult(object):
175 This class holds an RPC result. It is needed since in multi-node
176 calls we can't raise an exception just because one one out of many
177 failed, and therefore we use this class to encapsulate the result.
179 @ivar data: the data payload, for successful results, or None
180 @ivar call: the name of the RPC call
181 @ivar node: the name of the node to which we made the call
182 @ivar offline: whether the operation failed because the node was
183 offline, as opposed to actual failure; offline=True will always
184 imply failed=True, in order to allow simpler checking if
185 the user doesn't care about the exact failure mode
186 @ivar fail_msg: the error message if the call failed
189 def __init__(self, data=None, failed=False, offline=False,
190 call=None, node=None):
191 self.offline = offline
196 self.fail_msg = "Node is marked offline"
197 self.data = self.payload = None
199 self.fail_msg = self._EnsureErr(data)
200 self.data = self.payload = None
203 if not isinstance(self.data, (tuple, list)):
204 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
208 self.fail_msg = ("RPC layer error: invalid result length (%d), "
209 "expected 2" % len(self.data))
211 elif not self.data[0]:
212 self.fail_msg = self._EnsureErr(self.data[1])
217 self.payload = data[1]
219 assert hasattr(self, "call")
220 assert hasattr(self, "data")
221 assert hasattr(self, "fail_msg")
222 assert hasattr(self, "node")
223 assert hasattr(self, "offline")
224 assert hasattr(self, "payload")
228 """Helper to ensure we return a 'True' value for error."""
232 return "No error information"
234 def Raise(self, msg, prereq=False, ecode=None):
235 """If the result has failed, raise an OpExecError.
237 This is used so that LU code doesn't have to check for each
238 result, but instead can call this function.
241 if not self.fail_msg:
244 if not msg: # one could pass None for default message
245 msg = ("Call '%s' to node '%s' has failed: %s" %
246 (self.call, self.node, self.fail_msg))
248 msg = "%s: %s" % (msg, self.fail_msg)
250 ec = errors.OpPrereqError
252 ec = errors.OpExecError
253 if ecode is not None:
257 raise ec(*args) # pylint: disable-msg=W0142
260 def _AddressLookup(node_list,
261 ssc=ssconf.SimpleStore,
262 nslookup_fn=netutils.Hostname.GetIP):
263 """Return addresses for given node names.
265 @type node_list: list
266 @param node_list: List of node names
268 @param ssc: SimpleStore class that is used to obtain node->ip mappings
269 @type nslookup_fn: callable
270 @param nslookup_fn: function use to do NS lookup
271 @rtype: list of addresses and/or None's
272 @returns: List of corresponding addresses, if found
276 iplist = ss.GetNodePrimaryIPList()
277 family = ss.GetPrimaryIPFamily()
279 ipmap = dict(entry.split() for entry in iplist)
280 for node in node_list:
281 address = ipmap.get(node)
283 address = nslookup_fn(node, family=family)
284 addresses.append(address)
292 This class, given a (remote) method name, a list of parameters and a
293 list of nodes, will contact (in parallel) all nodes, and return a
294 dict of results (key: node name, value: result).
296 One current bug is that generic failure is still signaled by
297 'False' result, which is not good. This overloading of values can
301 def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
302 assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
304 self.procedure = procedure
308 self._address_lookup_fn = address_lookup_fn
310 def ConnectList(self, node_list, address_list=None, read_timeout=None):
311 """Add a list of nodes to the target nodes.
313 @type node_list: list
314 @param node_list: the list of node names to connect
315 @type address_list: list or None
316 @keyword address_list: either None or a list with node addresses,
317 which must have the same length as the node list
318 @type read_timeout: int
319 @param read_timeout: overwrites default timeout for operation
322 if address_list is None:
323 # Always use IP address instead of node name
324 address_list = self._address_lookup_fn(node_list)
326 assert len(node_list) == len(address_list), \
327 "Name and address lists must have the same length"
329 for node, address in zip(node_list, address_list):
330 self.ConnectNode(node, address, read_timeout=read_timeout)
332 def ConnectNode(self, name, address=None, read_timeout=None):
333 """Add a node to the target list.
336 @param name: the node name
338 @param address: the node address, if known
339 @type read_timeout: int
340 @param read_timeout: overwrites default timeout for operation
344 # Always use IP address instead of node name
345 address = self._address_lookup_fn([name])[0]
347 assert(address is not None)
349 if read_timeout is None:
350 read_timeout = _TIMEOUTS[self.procedure]
352 self._request[name] = \
353 http.client.HttpClientRequest(str(address), self.port,
354 http.HTTP_PUT, str("/%s" % self.procedure),
355 headers=_RPC_CLIENT_HEADERS,
356 post_data=str(self.body),
357 read_timeout=read_timeout)
359 def GetResults(self, http_pool=None):
360 """Call nodes and return results.
363 @return: List of RPC results
367 http_pool = _thread_local.GetHttpClientPool()
369 http_pool.ProcessRequests(self._request.values())
373 for name, req in self._request.iteritems():
374 if req.success and req.resp_status_code == http.HTTP_OK:
375 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
376 node=name, call=self.procedure)
379 # TODO: Better error reporting
385 logging.error("RPC error in %s from node %s: %s",
386 self.procedure, name, msg)
387 results[name] = RpcResult(data=msg, failed=True, node=name,
393 def _EncodeImportExportIO(ieio, ieioargs):
394 """Encodes import/export I/O information.
397 if ieio == constants.IEIO_RAW_DISK:
398 assert len(ieioargs) == 1
399 return (ieioargs[0].ToDict(), )
401 if ieio == constants.IEIO_SCRIPT:
402 assert len(ieioargs) == 2
403 return (ieioargs[0].ToDict(), ieioargs[1])
408 class RpcRunner(object):
409 """RPC runner class"""
411 def __init__(self, cfg):
412 """Initialized the rpc runner.
414 @type cfg: C{config.ConfigWriter}
415 @param cfg: the configuration object that will be used to get data
420 self.port = netutils.GetDaemonPort(constants.NODED)
422 def _InstDict(self, instance, hvp=None, bep=None, osp=None):
423 """Convert the given instance to a dict.
425 This is done via the instance's ToDict() method and additionally
426 we fill the hvparams with the cluster defaults.
428 @type instance: L{objects.Instance}
429 @param instance: an Instance object
430 @type hvp: dict or None
431 @param hvp: a dictionary with overridden hypervisor parameters
432 @type bep: dict or None
433 @param bep: a dictionary with overridden backend parameters
434 @type osp: dict or None
435 @param osp: a dictionary with overriden os parameters
437 @return: the instance dict, with the hvparams filled with the
441 idict = instance.ToDict()
442 cluster = self._cfg.GetClusterInfo()
443 idict["hvparams"] = cluster.FillHV(instance)
445 idict["hvparams"].update(hvp)
446 idict["beparams"] = cluster.FillBE(instance)
448 idict["beparams"].update(bep)
449 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
451 idict["osparams"].update(osp)
452 for nic in idict["nics"]:
453 nic['nicparams'] = objects.FillDict(
454 cluster.nicparams[constants.PP_DEFAULT],
458 def _ConnectList(self, client, node_list, call, read_timeout=None):
459 """Helper for computing node addresses.
461 @type client: L{ganeti.rpc.Client}
462 @param client: a C{Client} instance
463 @type node_list: list
464 @param node_list: the node list we should connect
466 @param call: the name of the remote procedure call, for filling in
467 correctly any eventual offline nodes' results
468 @type read_timeout: int
469 @param read_timeout: overwrites the default read timeout for the
473 all_nodes = self._cfg.GetAllNodesInfo()
477 for node in node_list:
478 if node in all_nodes:
479 if all_nodes[node].offline:
480 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
482 val = all_nodes[node].primary_ip
485 addr_list.append(val)
486 name_list.append(node)
488 client.ConnectList(name_list, address_list=addr_list,
489 read_timeout=read_timeout)
492 def _ConnectNode(self, client, node, call, read_timeout=None):
493 """Helper for computing one node's address.
495 @type client: L{ganeti.rpc.Client}
496 @param client: a C{Client} instance
498 @param node: the node we should connect
500 @param call: the name of the remote procedure call, for filling in
501 correctly any eventual offline nodes' results
502 @type read_timeout: int
503 @param read_timeout: overwrites the default read timeout for the
507 node_info = self._cfg.GetNodeInfo(node)
508 if node_info is not None:
509 if node_info.offline:
510 return RpcResult(node=node, offline=True, call=call)
511 addr = node_info.primary_ip
514 client.ConnectNode(node, address=addr, read_timeout=read_timeout)
516 def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
517 """Helper for making a multi-node call
520 body = serializer.DumpJson(args, indent=False)
521 c = Client(procedure, body, self.port)
522 skip_dict = self._ConnectList(c, node_list, procedure,
523 read_timeout=read_timeout)
524 skip_dict.update(c.GetResults())
528 def _StaticMultiNodeCall(cls, node_list, procedure, args,
529 address_list=None, read_timeout=None):
530 """Helper for making a multi-node static call
533 body = serializer.DumpJson(args, indent=False)
534 c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
535 c.ConnectList(node_list, address_list=address_list,
536 read_timeout=read_timeout)
537 return c.GetResults()
539 def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
540 """Helper for making a single-node call
543 body = serializer.DumpJson(args, indent=False)
544 c = Client(procedure, body, self.port)
545 result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
547 # we did connect, node is not offline
548 result = c.GetResults()[node]
552 def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
553 """Helper for making a single-node static call
556 body = serializer.DumpJson(args, indent=False)
557 c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
558 c.ConnectNode(node, read_timeout=read_timeout)
559 return c.GetResults()[node]
563 """Compresses a string for transport over RPC.
565 Small amounts of data are not compressed.
570 @return: Encoded data to send
573 # Small amounts of data are not compressed
575 return (constants.RPC_ENCODING_NONE, data)
577 # Compress with zlib and encode in base64
578 return (constants.RPC_ENCODING_ZLIB_BASE64,
579 base64.b64encode(zlib.compress(data, 3)))
585 @_RpcTimeout(_TMO_URGENT)
586 def call_lv_list(self, node_list, vg_name):
587 """Gets the logical volumes present in a given volume group.
589 This is a multi-node call.
592 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
594 @_RpcTimeout(_TMO_URGENT)
595 def call_vg_list(self, node_list):
596 """Gets the volume group list.
598 This is a multi-node call.
601 return self._MultiNodeCall(node_list, "vg_list", [])
603 @_RpcTimeout(_TMO_NORMAL)
604 def call_storage_list(self, node_list, su_name, su_args, name, fields):
605 """Get list of storage units.
607 This is a multi-node call.
610 return self._MultiNodeCall(node_list, "storage_list",
611 [su_name, su_args, name, fields])
613 @_RpcTimeout(_TMO_NORMAL)
614 def call_storage_modify(self, node, su_name, su_args, name, changes):
615 """Modify a storage unit.
617 This is a single-node call.
620 return self._SingleNodeCall(node, "storage_modify",
621 [su_name, su_args, name, changes])
623 @_RpcTimeout(_TMO_NORMAL)
624 def call_storage_execute(self, node, su_name, su_args, name, op):
625 """Executes an operation on a storage unit.
627 This is a single-node call.
630 return self._SingleNodeCall(node, "storage_execute",
631 [su_name, su_args, name, op])
633 @_RpcTimeout(_TMO_URGENT)
634 def call_bridges_exist(self, node, bridges_list):
635 """Checks if a node has all the bridges given.
637 This method checks if all bridges given in the bridges_list are
638 present on the remote node, so that an instance that uses interfaces
639 on those bridges can be started.
641 This is a single-node call.
644 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
646 @_RpcTimeout(_TMO_NORMAL)
647 def call_instance_start(self, node, instance, hvp, bep):
648 """Starts an instance.
650 This is a single-node call.
653 idict = self._InstDict(instance, hvp=hvp, bep=bep)
654 return self._SingleNodeCall(node, "instance_start", [idict])
656 @_RpcTimeout(_TMO_NORMAL)
657 def call_instance_shutdown(self, node, instance, timeout):
658 """Stops an instance.
660 This is a single-node call.
663 return self._SingleNodeCall(node, "instance_shutdown",
664 [self._InstDict(instance), timeout])
666 @_RpcTimeout(_TMO_NORMAL)
667 def call_migration_info(self, node, instance):
668 """Gather the information necessary to prepare an instance migration.
670 This is a single-node call.
673 @param node: the node on which the instance is currently running
674 @type instance: C{objects.Instance}
675 @param instance: the instance definition
678 return self._SingleNodeCall(node, "migration_info",
679 [self._InstDict(instance)])
681 @_RpcTimeout(_TMO_NORMAL)
682 def call_accept_instance(self, node, instance, info, target):
683 """Prepare a node to accept an instance.
685 This is a single-node call.
688 @param node: the target node for the migration
689 @type instance: C{objects.Instance}
690 @param instance: the instance definition
691 @type info: opaque/hypervisor specific (string/data)
692 @param info: result for the call_migration_info call
694 @param target: target hostname (usually ip address) (on the node itself)
697 return self._SingleNodeCall(node, "accept_instance",
698 [self._InstDict(instance), info, target])
700 @_RpcTimeout(_TMO_NORMAL)
701 def call_finalize_migration(self, node, instance, info, success):
702 """Finalize any target-node migration specific operation.
704 This is called both in case of a successful migration and in case of error
705 (in which case it should abort the migration).
707 This is a single-node call.
710 @param node: the target node for the migration
711 @type instance: C{objects.Instance}
712 @param instance: the instance definition
713 @type info: opaque/hypervisor specific (string/data)
714 @param info: result for the call_migration_info call
715 @type success: boolean
716 @param success: whether the migration was a success or a failure
719 return self._SingleNodeCall(node, "finalize_migration",
720 [self._InstDict(instance), info, success])
722 @_RpcTimeout(_TMO_SLOW)
723 def call_instance_migrate(self, node, instance, target, live):
724 """Migrate an instance.
726 This is a single-node call.
729 @param node: the node on which the instance is currently running
730 @type instance: C{objects.Instance}
731 @param instance: the instance definition
733 @param target: the target node name
735 @param live: whether the migration should be done live or not (the
736 interpretation of this parameter is left to the hypervisor)
739 return self._SingleNodeCall(node, "instance_migrate",
740 [self._InstDict(instance), target, live])
742 @_RpcTimeout(_TMO_NORMAL)
743 def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
744 """Reboots an instance.
746 This is a single-node call.
749 return self._SingleNodeCall(node, "instance_reboot",
750 [self._InstDict(inst), reboot_type,
753 @_RpcTimeout(_TMO_1DAY)
754 def call_instance_os_add(self, node, inst, reinstall, debug):
755 """Installs an OS on the given instance.
757 This is a single-node call.
760 return self._SingleNodeCall(node, "instance_os_add",
761 [self._InstDict(inst), reinstall, debug])
763 @_RpcTimeout(_TMO_SLOW)
764 def call_instance_run_rename(self, node, inst, old_name, debug):
765 """Run the OS rename script for an instance.
767 This is a single-node call.
770 return self._SingleNodeCall(node, "instance_run_rename",
771 [self._InstDict(inst), old_name, debug])
773 @_RpcTimeout(_TMO_URGENT)
774 def call_instance_info(self, node, instance, hname):
775 """Returns information about a single instance.
777 This is a single-node call.
780 @param node: the list of nodes to query
781 @type instance: string
782 @param instance: the instance name
784 @param hname: the hypervisor type of the instance
787 return self._SingleNodeCall(node, "instance_info", [instance, hname])
789 @_RpcTimeout(_TMO_NORMAL)
790 def call_instance_migratable(self, node, instance):
791 """Checks whether the given instance can be migrated.
793 This is a single-node call.
795 @param node: the node to query
796 @type instance: L{objects.Instance}
797 @param instance: the instance to check
801 return self._SingleNodeCall(node, "instance_migratable",
802 [self._InstDict(instance)])
804 @_RpcTimeout(_TMO_URGENT)
805 def call_all_instances_info(self, node_list, hypervisor_list):
806 """Returns information about all instances on the given nodes.
808 This is a multi-node call.
810 @type node_list: list
811 @param node_list: the list of nodes to query
812 @type hypervisor_list: list
813 @param hypervisor_list: the hypervisors to query for instances
816 return self._MultiNodeCall(node_list, "all_instances_info",
819 @_RpcTimeout(_TMO_URGENT)
820 def call_instance_list(self, node_list, hypervisor_list):
821 """Returns the list of running instances on a given node.
823 This is a multi-node call.
825 @type node_list: list
826 @param node_list: the list of nodes to query
827 @type hypervisor_list: list
828 @param hypervisor_list: the hypervisors to query for instances
831 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
833 @_RpcTimeout(_TMO_FAST)
834 def call_node_tcp_ping(self, node, source, target, port, timeout,
836 """Do a TcpPing on the remote node
838 This is a single-node call.
841 return self._SingleNodeCall(node, "node_tcp_ping",
842 [source, target, port, timeout,
845 @_RpcTimeout(_TMO_FAST)
846 def call_node_has_ip_address(self, node, address):
847 """Checks if a node has the given IP address.
849 This is a single-node call.
852 return self._SingleNodeCall(node, "node_has_ip_address", [address])
854 @_RpcTimeout(_TMO_URGENT)
855 def call_node_info(self, node_list, vg_name, hypervisor_type):
856 """Return node information.
858 This will return memory information and volume group size and free
861 This is a multi-node call.
863 @type node_list: list
864 @param node_list: the list of nodes to query
865 @type vg_name: C{string}
866 @param vg_name: the name of the volume group to ask for disk space
868 @type hypervisor_type: C{str}
869 @param hypervisor_type: the name of the hypervisor to ask for
873 return self._MultiNodeCall(node_list, "node_info",
874 [vg_name, hypervisor_type])
876 @_RpcTimeout(_TMO_NORMAL)
877 def call_etc_hosts_modify(self, node, mode, name, ip):
878 """Modify hosts file with name
881 @param node: The node to call
883 @param mode: The mode to operate. Currently "add" or "remove"
885 @param name: The host name to be modified
887 @param ip: The ip of the entry (just valid if mode is "add")
890 return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
892 @_RpcTimeout(_TMO_NORMAL)
893 def call_node_verify(self, node_list, checkdict, cluster_name):
894 """Request verification of given parameters.
896 This is a multi-node call.
899 return self._MultiNodeCall(node_list, "node_verify",
900 [checkdict, cluster_name])
903 @_RpcTimeout(_TMO_FAST)
904 def call_node_start_master(cls, node, start_daemons, no_voting):
905 """Tells a node to activate itself as a master.
907 This is a single-node call.
910 return cls._StaticSingleNodeCall(node, "node_start_master",
911 [start_daemons, no_voting])
914 @_RpcTimeout(_TMO_FAST)
915 def call_node_stop_master(cls, node, stop_daemons):
916 """Tells a node to demote itself from master status.
918 This is a single-node call.
921 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
924 @_RpcTimeout(_TMO_URGENT)
925 def call_master_info(cls, node_list):
926 """Query master info.
928 This is a multi-node call.
931 # TODO: should this method query down nodes?
932 return cls._StaticMultiNodeCall(node_list, "master_info", [])
935 @_RpcTimeout(_TMO_URGENT)
936 def call_version(cls, node_list):
937 """Query node version.
939 This is a multi-node call.
942 return cls._StaticMultiNodeCall(node_list, "version", [])
944 @_RpcTimeout(_TMO_NORMAL)
945 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
946 """Request creation of a given block device.
948 This is a single-node call.
951 return self._SingleNodeCall(node, "blockdev_create",
952 [bdev.ToDict(), size, owner, on_primary, info])
954 @_RpcTimeout(_TMO_NORMAL)
955 def call_blockdev_remove(self, node, bdev):
956 """Request removal of a given block device.
958 This is a single-node call.
961 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
963 @_RpcTimeout(_TMO_NORMAL)
964 def call_blockdev_rename(self, node, devlist):
965 """Request rename of the given block devices.
967 This is a single-node call.
970 return self._SingleNodeCall(node, "blockdev_rename",
971 [(d.ToDict(), uid) for d, uid in devlist])
973 @_RpcTimeout(_TMO_NORMAL)
974 def call_blockdev_assemble(self, node, disk, owner, on_primary):
975 """Request assembling of a given block device.
977 This is a single-node call.
980 return self._SingleNodeCall(node, "blockdev_assemble",
981 [disk.ToDict(), owner, on_primary])
983 @_RpcTimeout(_TMO_NORMAL)
984 def call_blockdev_shutdown(self, node, disk):
985 """Request shutdown of a given block device.
987 This is a single-node call.
990 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
992 @_RpcTimeout(_TMO_NORMAL)
993 def call_blockdev_addchildren(self, node, bdev, ndevs):
994 """Request adding a list of children to a (mirroring) device.
996 This is a single-node call.
999 return self._SingleNodeCall(node, "blockdev_addchildren",
1001 [disk.ToDict() for disk in ndevs]])
1003 @_RpcTimeout(_TMO_NORMAL)
1004 def call_blockdev_removechildren(self, node, bdev, ndevs):
1005 """Request removing a list of children from a (mirroring) device.
1007 This is a single-node call.
1010 return self._SingleNodeCall(node, "blockdev_removechildren",
1012 [disk.ToDict() for disk in ndevs]])
1014 @_RpcTimeout(_TMO_NORMAL)
1015 def call_blockdev_getmirrorstatus(self, node, disks):
1016 """Request status of a (mirroring) device.
1018 This is a single-node call.
1021 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1022 [dsk.ToDict() for dsk in disks])
1023 if not result.fail_msg:
1024 result.payload = [objects.BlockDevStatus.FromDict(i)
1025 for i in result.payload]
1028 @_RpcTimeout(_TMO_NORMAL)
1029 def call_blockdev_find(self, node, disk):
1030 """Request identification of a given block device.
1032 This is a single-node call.
1035 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1036 if not result.fail_msg and result.payload is not None:
1037 result.payload = objects.BlockDevStatus.FromDict(result.payload)
1040 @_RpcTimeout(_TMO_NORMAL)
1041 def call_blockdev_close(self, node, instance_name, disks):
1042 """Closes the given block devices.
1044 This is a single-node call.
1047 params = [instance_name, [cf.ToDict() for cf in disks]]
1048 return self._SingleNodeCall(node, "blockdev_close", params)
1050 @_RpcTimeout(_TMO_NORMAL)
1051 def call_blockdev_getsizes(self, node, disks):
1052 """Returns the size of the given disks.
1054 This is a single-node call.
1057 params = [[cf.ToDict() for cf in disks]]
1058 return self._SingleNodeCall(node, "blockdev_getsize", params)
1060 @_RpcTimeout(_TMO_NORMAL)
1061 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1062 """Disconnects the network of the given drbd devices.
1064 This is a multi-node call.
1067 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1068 [nodes_ip, [cf.ToDict() for cf in disks]])
1070 @_RpcTimeout(_TMO_NORMAL)
1071 def call_drbd_attach_net(self, node_list, nodes_ip,
1072 disks, instance_name, multimaster):
1073 """Disconnects the given drbd devices.
1075 This is a multi-node call.
1078 return self._MultiNodeCall(node_list, "drbd_attach_net",
1079 [nodes_ip, [cf.ToDict() for cf in disks],
1080 instance_name, multimaster])
1082 @_RpcTimeout(_TMO_SLOW)
1083 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1084 """Waits for the synchronization of drbd devices is complete.
1086 This is a multi-node call.
1089 return self._MultiNodeCall(node_list, "drbd_wait_sync",
1090 [nodes_ip, [cf.ToDict() for cf in disks]])
1092 @_RpcTimeout(_TMO_URGENT)
1093 def call_drbd_helper(self, node_list):
1094 """Gets drbd helper.
1096 This is a multi-node call.
1099 return self._MultiNodeCall(node_list, "drbd_helper", [])
1102 @_RpcTimeout(_TMO_NORMAL)
1103 def call_upload_file(cls, node_list, file_name, address_list=None):
1106 The node will refuse the operation in case the file is not on the
1109 This is a multi-node call.
1111 @type node_list: list
1112 @param node_list: the list of node names to upload to
1113 @type file_name: str
1114 @param file_name: the filename to upload
1115 @type address_list: list or None
1116 @keyword address_list: an optional list of node addresses, in order
1117 to optimize the RPC speed
1120 file_contents = utils.ReadFile(file_name)
1121 data = cls._Compress(file_contents)
1122 st = os.stat(file_name)
1123 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1124 st.st_atime, st.st_mtime]
1125 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1126 address_list=address_list)
1129 @_RpcTimeout(_TMO_NORMAL)
1130 def call_write_ssconf_files(cls, node_list, values):
1131 """Write ssconf files.
1133 This is a multi-node call.
1136 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1138 @_RpcTimeout(_TMO_FAST)
1139 def call_os_diagnose(self, node_list):
1140 """Request a diagnose of OS definitions.
1142 This is a multi-node call.
1145 return self._MultiNodeCall(node_list, "os_diagnose", [])
1147 @_RpcTimeout(_TMO_FAST)
1148 def call_os_get(self, node, name):
1149 """Returns an OS definition.
1151 This is a single-node call.
1154 result = self._SingleNodeCall(node, "os_get", [name])
1155 if not result.fail_msg and isinstance(result.payload, dict):
1156 result.payload = objects.OS.FromDict(result.payload)
1159 @_RpcTimeout(_TMO_FAST)
1160 def call_os_validate(self, required, nodes, name, checks, params):
1161 """Run a validation routine for a given OS.
1163 This is a multi-node call.
1166 return self._MultiNodeCall(nodes, "os_validate",
1167 [required, name, checks, params])
1169 @_RpcTimeout(_TMO_NORMAL)
1170 def call_hooks_runner(self, node_list, hpath, phase, env):
1171 """Call the hooks runner.
1174 - op: the OpCode instance
1175 - env: a dictionary with the environment
1177 This is a multi-node call.
1180 params = [hpath, phase, env]
1181 return self._MultiNodeCall(node_list, "hooks_runner", params)
1183 @_RpcTimeout(_TMO_NORMAL)
1184 def call_iallocator_runner(self, node, name, idata):
1185 """Call an iallocator on a remote node
1188 - name: the iallocator name
1189 - input: the json-encoded input string
1191 This is a single-node call.
1194 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1196 @_RpcTimeout(_TMO_NORMAL)
1197 def call_blockdev_grow(self, node, cf_bdev, amount):
1198 """Request a snapshot of the given block device.
1200 This is a single-node call.
1203 return self._SingleNodeCall(node, "blockdev_grow",
1204 [cf_bdev.ToDict(), amount])
1206 @_RpcTimeout(_TMO_1DAY)
1207 def call_blockdev_export(self, node, cf_bdev,
1208 dest_node, dest_path, cluster_name):
1209 """Export a given disk to another node.
1211 This is a single-node call.
1214 return self._SingleNodeCall(node, "blockdev_export",
1215 [cf_bdev.ToDict(), dest_node, dest_path,
1218 @_RpcTimeout(_TMO_NORMAL)
1219 def call_blockdev_snapshot(self, node, cf_bdev):
1220 """Request a snapshot of the given block device.
1222 This is a single-node call.
1225 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1227 @_RpcTimeout(_TMO_NORMAL)
1228 def call_finalize_export(self, node, instance, snap_disks):
1229 """Request the completion of an export operation.
1231 This writes the export config file, etc.
1233 This is a single-node call.
1237 for disk in snap_disks:
1238 if isinstance(disk, bool):
1239 flat_disks.append(disk)
1241 flat_disks.append(disk.ToDict())
1243 return self._SingleNodeCall(node, "finalize_export",
1244 [self._InstDict(instance), flat_disks])
1246 @_RpcTimeout(_TMO_FAST)
1247 def call_export_info(self, node, path):
1248 """Queries the export information in a given path.
1250 This is a single-node call.
1253 return self._SingleNodeCall(node, "export_info", [path])
1255 @_RpcTimeout(_TMO_FAST)
1256 def call_export_list(self, node_list):
1257 """Gets the stored exports list.
1259 This is a multi-node call.
1262 return self._MultiNodeCall(node_list, "export_list", [])
1264 @_RpcTimeout(_TMO_FAST)
1265 def call_export_remove(self, node, export):
1266 """Requests removal of a given export.
1268 This is a single-node call.
1271 return self._SingleNodeCall(node, "export_remove", [export])
1274 @_RpcTimeout(_TMO_NORMAL)
1275 def call_node_leave_cluster(cls, node, modify_ssh_setup):
1276 """Requests a node to clean the cluster information it has.
1278 This will remove the configuration information from the ganeti data
1281 This is a single-node call.
1284 return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1287 @_RpcTimeout(_TMO_FAST)
1288 def call_node_volumes(self, node_list):
1289 """Gets all volumes on node(s).
1291 This is a multi-node call.
1294 return self._MultiNodeCall(node_list, "node_volumes", [])
1296 @_RpcTimeout(_TMO_FAST)
1297 def call_node_demote_from_mc(self, node):
1298 """Demote a node from the master candidate role.
1300 This is a single-node call.
1303 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1305 @_RpcTimeout(_TMO_NORMAL)
1306 def call_node_powercycle(self, node, hypervisor):
1307 """Tries to powercycle a node.
1309 This is a single-node call.
1312 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1315 def call_test_delay(self, node_list, duration):
1316 """Sleep for a fixed time on given node(s).
1318 This is a multi-node call.
1321 return self._MultiNodeCall(node_list, "test_delay", [duration],
1322 read_timeout=int(duration + 5))
1324 @_RpcTimeout(_TMO_FAST)
1325 def call_file_storage_dir_create(self, node, file_storage_dir):
1326 """Create the given file storage directory.
1328 This is a single-node call.
1331 return self._SingleNodeCall(node, "file_storage_dir_create",
1334 @_RpcTimeout(_TMO_FAST)
1335 def call_file_storage_dir_remove(self, node, file_storage_dir):
1336 """Remove the given file storage directory.
1338 This is a single-node call.
1341 return self._SingleNodeCall(node, "file_storage_dir_remove",
1344 @_RpcTimeout(_TMO_FAST)
1345 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1346 new_file_storage_dir):
1347 """Rename file storage directory.
1349 This is a single-node call.
1352 return self._SingleNodeCall(node, "file_storage_dir_rename",
1353 [old_file_storage_dir, new_file_storage_dir])
1356 @_RpcTimeout(_TMO_FAST)
1357 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1358 """Update job queue.
1360 This is a multi-node call.
1363 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1364 [file_name, cls._Compress(content)],
1365 address_list=address_list)
1368 @_RpcTimeout(_TMO_NORMAL)
1369 def call_jobqueue_purge(cls, node):
1372 This is a single-node call.
1375 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1378 @_RpcTimeout(_TMO_FAST)
1379 def call_jobqueue_rename(cls, node_list, address_list, rename):
1380 """Rename a job queue file.
1382 This is a multi-node call.
1385 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1386 address_list=address_list)
1388 @_RpcTimeout(_TMO_NORMAL)
1389 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1390 """Validate the hypervisor params.
1392 This is a multi-node call.
1394 @type node_list: list
1395 @param node_list: the list of nodes to query
1396 @type hvname: string
1397 @param hvname: the hypervisor name
1398 @type hvparams: dict
1399 @param hvparams: the hypervisor parameters to be validated
1402 cluster = self._cfg.GetClusterInfo()
1403 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1404 return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1407 @_RpcTimeout(_TMO_NORMAL)
1408 def call_x509_cert_create(self, node, validity):
1409 """Creates a new X509 certificate for SSL/TLS.
1411 This is a single-node call.
1414 @param validity: Validity in seconds
1417 return self._SingleNodeCall(node, "x509_cert_create", [validity])
1419 @_RpcTimeout(_TMO_NORMAL)
1420 def call_x509_cert_remove(self, node, name):
1421 """Removes a X509 certificate.
1423 This is a single-node call.
1426 @param name: Certificate name
1429 return self._SingleNodeCall(node, "x509_cert_remove", [name])
1431 @_RpcTimeout(_TMO_NORMAL)
1432 def call_import_start(self, node, opts, instance, dest, dest_args):
1433 """Starts a listener for an import.
1435 This is a single-node call.
1438 @param node: Node name
1439 @type instance: C{objects.Instance}
1440 @param instance: Instance object
1443 return self._SingleNodeCall(node, "import_start",
1445 self._InstDict(instance), dest,
1446 _EncodeImportExportIO(dest, dest_args)])
1448 @_RpcTimeout(_TMO_NORMAL)
1449 def call_export_start(self, node, opts, host, port,
1450 instance, source, source_args):
1451 """Starts an export daemon.
1453 This is a single-node call.
1456 @param node: Node name
1457 @type instance: C{objects.Instance}
1458 @param instance: Instance object
1461 return self._SingleNodeCall(node, "export_start",
1462 [opts.ToDict(), host, port,
1463 self._InstDict(instance), source,
1464 _EncodeImportExportIO(source, source_args)])
1466 @_RpcTimeout(_TMO_FAST)
1467 def call_impexp_status(self, node, names):
1468 """Gets the status of an import or export.
1470 This is a single-node call.
1473 @param node: Node name
1474 @type names: List of strings
1475 @param names: Import/export names
1476 @rtype: List of L{objects.ImportExportStatus} instances
1477 @return: Returns a list of the state of each named import/export or None if
1478 a status couldn't be retrieved
1481 result = self._SingleNodeCall(node, "impexp_status", [names])
1483 if not result.fail_msg:
1486 for i in result.payload:
1488 decoded.append(None)
1490 decoded.append(objects.ImportExportStatus.FromDict(i))
1492 result.payload = decoded
1496 @_RpcTimeout(_TMO_NORMAL)
1497 def call_impexp_abort(self, node, name):
1498 """Aborts an import or export.
1500 This is a single-node call.
1503 @param node: Node name
1505 @param name: Import/export name
1508 return self._SingleNodeCall(node, "impexp_abort", [name])
1510 @_RpcTimeout(_TMO_NORMAL)
1511 def call_impexp_cleanup(self, node, name):
1512 """Cleans up after an import or export.
1514 This is a single-node call.
1517 @param node: Node name
1519 @param name: Import/export name
1522 return self._SingleNodeCall(node, "impexp_cleanup", [name])