4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49 from ganeti import compat
51 # pylint has a bug here, doesn't see this import
52 import ganeti.http.client # pylint: disable=W0611
55 # Timeout for connecting to nodes (seconds)
56 _RPC_CONNECT_TIMEOUT = 5
58 _RPC_CLIENT_HEADERS = [
59 "Content-type: %s" % http.HTTP_APP_JSON,
63 # Various time constants for the timeout table
64 _TMO_URGENT = 60 # one minute
65 _TMO_FAST = 5 * 60 # five minutes
66 _TMO_NORMAL = 15 * 60 # 15 minutes
67 _TMO_SLOW = 3600 # one hour
71 # Timeout table that will be built later by decorators
72 # Guidelines for choosing timeouts:
73 # - call used during watcher: timeout -> 1min, _TMO_URGENT
74 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
75 # - other calls: 15 min, _TMO_NORMAL
76 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
81 #: Special value to describe an offline host
86 """Initializes the module-global HTTP client manager.
88 Must be called before using any RPC function and while exactly one thread is
92 # curl_global_init(3) and curl_global_cleanup(3) must be called with only
93 # one thread running. This check is just a safety measure -- it doesn't
95 assert threading.activeCount() == 1, \
96 "Found more than one active thread when initializing pycURL"
98 logging.info("Using PycURL %s", pycurl.version)
100 pycurl.global_init(pycurl.GLOBAL_ALL)
104 """Stops the module-global HTTP client manager.
106 Must be called before quitting the program and while exactly one thread is
110 pycurl.global_cleanup()
113 def _ConfigRpcCurl(curl):
114 noded_cert = str(constants.NODED_CERT_FILE)
116 curl.setopt(pycurl.FOLLOWLOCATION, False)
117 curl.setopt(pycurl.CAINFO, noded_cert)
118 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
119 curl.setopt(pycurl.SSL_VERIFYPEER, True)
120 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
121 curl.setopt(pycurl.SSLCERT, noded_cert)
122 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
123 curl.setopt(pycurl.SSLKEY, noded_cert)
124 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
127 # Aliasing this module avoids the following warning by epydoc: "Warning: No
128 # information available for ganeti.rpc._RpcThreadLocal's base threading.local"
129 _threading = threading
132 class _RpcThreadLocal(_threading.local):
133 def GetHttpClientPool(self):
134 """Returns a per-thread HTTP client pool.
136 @rtype: L{http.client.HttpClientPool}
141 except AttributeError:
142 pool = http.client.HttpClientPool(_ConfigRpcCurl)
148 # Remove module alias (see above)
152 _thread_local = _RpcThreadLocal()
155 def _RpcTimeout(secs):
156 """Timeout decorator.
158 When applied to a rpc call_* function, it updates the global timeout
159 table with the given function/timeout.
164 assert name.startswith("call_")
165 _TIMEOUTS[name[len("call_"):]] = secs
171 """RPC-wrapper decorator.
173 When applied to a function, it runs it with the RPC system
174 initialized, and it shutsdown the system afterwards. This means the
175 function must be called without RPC being initialized.
178 def wrapper(*args, **kwargs):
181 return fn(*args, **kwargs)
188 """Compresses a string for transport over RPC.
190 Small amounts of data are not compressed.
195 @return: Encoded data to send
198 # Small amounts of data are not compressed
200 return (constants.RPC_ENCODING_NONE, data)
202 # Compress with zlib and encode in base64
203 return (constants.RPC_ENCODING_ZLIB_BASE64,
204 base64.b64encode(zlib.compress(data, 3)))
207 class RpcResult(object):
210 This class holds an RPC result. It is needed since in multi-node
211 calls we can't raise an exception just because one one out of many
212 failed, and therefore we use this class to encapsulate the result.
214 @ivar data: the data payload, for successful results, or None
215 @ivar call: the name of the RPC call
216 @ivar node: the name of the node to which we made the call
217 @ivar offline: whether the operation failed because the node was
218 offline, as opposed to actual failure; offline=True will always
219 imply failed=True, in order to allow simpler checking if
220 the user doesn't care about the exact failure mode
221 @ivar fail_msg: the error message if the call failed
224 def __init__(self, data=None, failed=False, offline=False,
225 call=None, node=None):
226 self.offline = offline
231 self.fail_msg = "Node is marked offline"
232 self.data = self.payload = None
234 self.fail_msg = self._EnsureErr(data)
235 self.data = self.payload = None
238 if not isinstance(self.data, (tuple, list)):
239 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
243 self.fail_msg = ("RPC layer error: invalid result length (%d), "
244 "expected 2" % len(self.data))
246 elif not self.data[0]:
247 self.fail_msg = self._EnsureErr(self.data[1])
252 self.payload = data[1]
254 for attr_name in ["call", "data", "fail_msg",
255 "node", "offline", "payload"]:
256 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
260 """Helper to ensure we return a 'True' value for error."""
264 return "No error information"
266 def Raise(self, msg, prereq=False, ecode=None):
267 """If the result has failed, raise an OpExecError.
269 This is used so that LU code doesn't have to check for each
270 result, but instead can call this function.
273 if not self.fail_msg:
276 if not msg: # one could pass None for default message
277 msg = ("Call '%s' to node '%s' has failed: %s" %
278 (self.call, self.node, self.fail_msg))
280 msg = "%s: %s" % (msg, self.fail_msg)
282 ec = errors.OpPrereqError
284 ec = errors.OpExecError
285 if ecode is not None:
289 raise ec(*args) # pylint: disable=W0142
292 def _SsconfResolver(node_list,
293 ssc=ssconf.SimpleStore,
294 nslookup_fn=netutils.Hostname.GetIP):
295 """Return addresses for given node names.
297 @type node_list: list
298 @param node_list: List of node names
300 @param ssc: SimpleStore class that is used to obtain node->ip mappings
301 @type nslookup_fn: callable
302 @param nslookup_fn: function use to do NS lookup
303 @rtype: list of tuple; (string, string)
304 @return: List of tuples containing node name and IP address
308 iplist = ss.GetNodePrimaryIPList()
309 family = ss.GetPrimaryIPFamily()
310 ipmap = dict(entry.split() for entry in iplist)
313 for node in node_list:
316 ip = nslookup_fn(node, family=family)
317 result.append((node, ip))
322 class _StaticResolver:
323 def __init__(self, addresses):
324 """Initializes this class.
327 self._addresses = addresses
329 def __call__(self, hosts):
330 """Returns static addresses for hosts.
333 assert len(hosts) == len(self._addresses)
334 return zip(hosts, self._addresses)
337 def _CheckConfigNode(name, node):
338 """Checks if a node is online.
341 @param name: Node name
342 @type node: L{objects.Node} or None
343 @param node: Node object
347 # Depend on DNS for name resolution
356 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
357 """Calculate node addresses using configuration.
360 # Special case for single-host lookups
363 return [_CheckConfigNode(name, single_node_fn(name))]
365 all_nodes = all_nodes_fn()
366 return [_CheckConfigNode(name, all_nodes.get(name, None))
371 def __init__(self, resolver, port, lock_monitor_cb=None):
372 """Initializes this class.
374 @param resolver: callable accepting a list of hostnames, returning a list
375 of tuples containing name and IP address (IP address can be the name or
376 the special value L{_OFFLINE} to mark offline machines)
378 @param port: TCP port
379 @param lock_monitor_cb: Callable for registering with lock monitor
382 self._resolver = resolver
384 self._lock_monitor_cb = lock_monitor_cb
387 def _PrepareRequests(hosts, port, procedure, body, read_timeout):
388 """Prepares requests by sorting offline hosts into separate list.
394 for (name, ip) in hosts:
396 # Node is marked as offline
397 results[name] = RpcResult(node=name, offline=True, call=procedure)
400 http.client.HttpClientRequest(str(ip), port,
401 http.HTTP_PUT, str("/%s" % procedure),
402 headers=_RPC_CLIENT_HEADERS,
404 read_timeout=read_timeout,
405 nicename="%s/%s" % (name, procedure))
407 return (results, requests)
410 def _CombineResults(results, requests, procedure):
411 """Combines pre-computed results for offline hosts with actual call results.
414 for name, req in requests.items():
415 if req.success and req.resp_status_code == http.HTTP_OK:
416 host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
417 node=name, call=procedure)
419 # TODO: Better error reporting
425 logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
426 host_result = RpcResult(data=msg, failed=True, node=name,
429 results[name] = host_result
433 def __call__(self, hosts, procedure, body, read_timeout=None, http_pool=None):
434 """Makes an RPC request to a number of nodes.
436 @type hosts: sequence
437 @param hosts: Hostnames
438 @type procedure: string
439 @param procedure: Request path
441 @param body: Request body
442 @type read_timeout: int or None
443 @param read_timeout: Read timeout for request
446 assert procedure in _TIMEOUTS, "RPC call not declared in the timeouts table"
449 http_pool = _thread_local.GetHttpClientPool()
451 if read_timeout is None:
452 read_timeout = _TIMEOUTS[procedure]
454 (results, requests) = \
455 self._PrepareRequests(self._resolver(hosts), self._port, procedure,
456 str(body), read_timeout)
458 http_pool.ProcessRequests(requests.values(),
459 lock_monitor_cb=self._lock_monitor_cb)
461 assert not frozenset(results).intersection(requests)
463 return self._CombineResults(results, requests, procedure)
466 def _EncodeImportExportIO(ieio, ieioargs):
467 """Encodes import/export I/O information.
470 if ieio == constants.IEIO_RAW_DISK:
471 assert len(ieioargs) == 1
472 return (ieioargs[0].ToDict(), )
474 if ieio == constants.IEIO_SCRIPT:
475 assert len(ieioargs) == 2
476 return (ieioargs[0].ToDict(), ieioargs[1])
481 class RpcRunner(object):
485 def __init__(self, context):
486 """Initialized the RPC runner.
488 @type context: C{masterd.GanetiContext}
489 @param context: Ganeti context
492 self._cfg = context.cfg
493 self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
494 self._cfg.GetNodeInfo,
495 self._cfg.GetAllNodesInfo),
496 netutils.GetDaemonPort(constants.NODED),
497 lock_monitor_cb=context.glm.AddToLockMonitor)
499 def _InstDict(self, instance, hvp=None, bep=None, osp=None):
500 """Convert the given instance to a dict.
502 This is done via the instance's ToDict() method and additionally
503 we fill the hvparams with the cluster defaults.
505 @type instance: L{objects.Instance}
506 @param instance: an Instance object
507 @type hvp: dict or None
508 @param hvp: a dictionary with overridden hypervisor parameters
509 @type bep: dict or None
510 @param bep: a dictionary with overridden backend parameters
511 @type osp: dict or None
512 @param osp: a dictionary with overridden os parameters
514 @return: the instance dict, with the hvparams filled with the
518 idict = instance.ToDict()
519 cluster = self._cfg.GetClusterInfo()
520 idict["hvparams"] = cluster.FillHV(instance)
522 idict["hvparams"].update(hvp)
523 idict["beparams"] = cluster.FillBE(instance)
525 idict["beparams"].update(bep)
526 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
528 idict["osparams"].update(osp)
529 for nic in idict["nics"]:
530 nic['nicparams'] = objects.FillDict(
531 cluster.nicparams[constants.PP_DEFAULT],
535 def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
536 """Helper for making a multi-node call
539 body = serializer.DumpJson(args, indent=False)
540 return self._proc(node_list, procedure, body, read_timeout=read_timeout)
543 def _StaticMultiNodeCall(node_list, procedure, args,
544 address_list=None, read_timeout=None):
545 """Helper for making a multi-node static call
548 body = serializer.DumpJson(args, indent=False)
550 if address_list is None:
551 resolver = _SsconfResolver
553 # Caller provided an address list
554 resolver = _StaticResolver(address_list)
556 proc = _RpcProcessor(resolver,
557 netutils.GetDaemonPort(constants.NODED))
558 return proc(node_list, procedure, body, read_timeout=read_timeout)
560 def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
561 """Helper for making a single-node call
564 body = serializer.DumpJson(args, indent=False)
565 return self._proc([node], procedure, body, read_timeout=read_timeout)[node]
568 def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
569 """Helper for making a single-node static call
572 body = serializer.DumpJson(args, indent=False)
573 proc = _RpcProcessor(_SsconfResolver,
574 netutils.GetDaemonPort(constants.NODED))
575 return proc([node], procedure, body, read_timeout=read_timeout)[node]
581 @_RpcTimeout(_TMO_URGENT)
582 def call_bdev_sizes(self, node_list, devices):
583 """Gets the sizes of requested block devices present on a node
585 This is a multi-node call.
588 return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
590 @_RpcTimeout(_TMO_URGENT)
591 def call_lv_list(self, node_list, vg_name):
592 """Gets the logical volumes present in a given volume group.
594 This is a multi-node call.
597 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
599 @_RpcTimeout(_TMO_URGENT)
600 def call_vg_list(self, node_list):
601 """Gets the volume group list.
603 This is a multi-node call.
606 return self._MultiNodeCall(node_list, "vg_list", [])
608 @_RpcTimeout(_TMO_NORMAL)
609 def call_storage_list(self, node_list, su_name, su_args, name, fields):
610 """Get list of storage units.
612 This is a multi-node call.
615 return self._MultiNodeCall(node_list, "storage_list",
616 [su_name, su_args, name, fields])
618 @_RpcTimeout(_TMO_NORMAL)
619 def call_storage_modify(self, node, su_name, su_args, name, changes):
620 """Modify a storage unit.
622 This is a single-node call.
625 return self._SingleNodeCall(node, "storage_modify",
626 [su_name, su_args, name, changes])
628 @_RpcTimeout(_TMO_NORMAL)
629 def call_storage_execute(self, node, su_name, su_args, name, op):
630 """Executes an operation on a storage unit.
632 This is a single-node call.
635 return self._SingleNodeCall(node, "storage_execute",
636 [su_name, su_args, name, op])
638 @_RpcTimeout(_TMO_URGENT)
639 def call_bridges_exist(self, node, bridges_list):
640 """Checks if a node has all the bridges given.
642 This method checks if all bridges given in the bridges_list are
643 present on the remote node, so that an instance that uses interfaces
644 on those bridges can be started.
646 This is a single-node call.
649 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
651 @_RpcTimeout(_TMO_NORMAL)
652 def call_instance_start(self, node, instance, hvp, bep, startup_paused):
653 """Starts an instance.
655 This is a single-node call.
658 idict = self._InstDict(instance, hvp=hvp, bep=bep)
659 return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
661 @_RpcTimeout(_TMO_NORMAL)
662 def call_instance_shutdown(self, node, instance, timeout):
663 """Stops an instance.
665 This is a single-node call.
668 return self._SingleNodeCall(node, "instance_shutdown",
669 [self._InstDict(instance), timeout])
671 @_RpcTimeout(_TMO_NORMAL)
672 def call_migration_info(self, node, instance):
673 """Gather the information necessary to prepare an instance migration.
675 This is a single-node call.
678 @param node: the node on which the instance is currently running
679 @type instance: C{objects.Instance}
680 @param instance: the instance definition
683 return self._SingleNodeCall(node, "migration_info",
684 [self._InstDict(instance)])
686 @_RpcTimeout(_TMO_NORMAL)
687 def call_accept_instance(self, node, instance, info, target):
688 """Prepare a node to accept an instance.
690 This is a single-node call.
693 @param node: the target node for the migration
694 @type instance: C{objects.Instance}
695 @param instance: the instance definition
696 @type info: opaque/hypervisor specific (string/data)
697 @param info: result for the call_migration_info call
699 @param target: target hostname (usually ip address) (on the node itself)
702 return self._SingleNodeCall(node, "accept_instance",
703 [self._InstDict(instance), info, target])
705 @_RpcTimeout(_TMO_NORMAL)
706 def call_instance_finalize_migration_dst(self, node, instance, info, success):
707 """Finalize any target-node migration specific operation.
709 This is called both in case of a successful migration and in case of error
710 (in which case it should abort the migration).
712 This is a single-node call.
715 @param node: the target node for the migration
716 @type instance: C{objects.Instance}
717 @param instance: the instance definition
718 @type info: opaque/hypervisor specific (string/data)
719 @param info: result for the call_migration_info call
720 @type success: boolean
721 @param success: whether the migration was a success or a failure
724 return self._SingleNodeCall(node, "instance_finalize_migration_dst",
725 [self._InstDict(instance), info, success])
727 @_RpcTimeout(_TMO_SLOW)
728 def call_instance_migrate(self, node, instance, target, live):
729 """Migrate an instance.
731 This is a single-node call.
734 @param node: the node on which the instance is currently running
735 @type instance: C{objects.Instance}
736 @param instance: the instance definition
738 @param target: the target node name
740 @param live: whether the migration should be done live or not (the
741 interpretation of this parameter is left to the hypervisor)
744 return self._SingleNodeCall(node, "instance_migrate",
745 [self._InstDict(instance), target, live])
747 @_RpcTimeout(_TMO_SLOW)
748 def call_instance_finalize_migration_src(self, node, instance, success, live):
749 """Finalize the instance migration on the source node.
751 This is a single-node call.
753 @type instance: L{objects.Instance}
754 @param instance: the instance that was migrated
756 @param success: whether the migration succeeded or not
758 @param live: whether the user requested a live migration or not
761 return self._SingleNodeCall(node, "instance_finalize_migration_src",
762 [self._InstDict(instance), success, live])
764 @_RpcTimeout(_TMO_SLOW)
765 def call_instance_get_migration_status(self, node, instance):
766 """Report migration status.
768 This is a single-node call that must be executed on the source node.
770 @type instance: L{objects.Instance}
771 @param instance: the instance that is being migrated
772 @rtype: L{objects.MigrationStatus}
773 @return: the status of the current migration (one of
774 L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
775 progress info that can be retrieved from the hypervisor
778 result = self._SingleNodeCall(node, "instance_get_migration_status",
779 [self._InstDict(instance)])
780 if not result.fail_msg and result.payload is not None:
781 result.payload = objects.MigrationStatus.FromDict(result.payload)
784 @_RpcTimeout(_TMO_NORMAL)
785 def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
786 """Reboots an instance.
788 This is a single-node call.
791 return self._SingleNodeCall(node, "instance_reboot",
792 [self._InstDict(inst), reboot_type,
795 @_RpcTimeout(_TMO_1DAY)
796 def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
797 """Installs an OS on the given instance.
799 This is a single-node call.
802 return self._SingleNodeCall(node, "instance_os_add",
803 [self._InstDict(inst, osp=osparams),
806 @_RpcTimeout(_TMO_SLOW)
807 def call_instance_run_rename(self, node, inst, old_name, debug):
808 """Run the OS rename script for an instance.
810 This is a single-node call.
813 return self._SingleNodeCall(node, "instance_run_rename",
814 [self._InstDict(inst), old_name, debug])
816 @_RpcTimeout(_TMO_URGENT)
817 def call_instance_info(self, node, instance, hname):
818 """Returns information about a single instance.
820 This is a single-node call.
823 @param node: the list of nodes to query
824 @type instance: string
825 @param instance: the instance name
827 @param hname: the hypervisor type of the instance
830 return self._SingleNodeCall(node, "instance_info", [instance, hname])
832 @_RpcTimeout(_TMO_NORMAL)
833 def call_instance_migratable(self, node, instance):
834 """Checks whether the given instance can be migrated.
836 This is a single-node call.
838 @param node: the node to query
839 @type instance: L{objects.Instance}
840 @param instance: the instance to check
844 return self._SingleNodeCall(node, "instance_migratable",
845 [self._InstDict(instance)])
847 @_RpcTimeout(_TMO_URGENT)
848 def call_all_instances_info(self, node_list, hypervisor_list):
849 """Returns information about all instances on the given nodes.
851 This is a multi-node call.
853 @type node_list: list
854 @param node_list: the list of nodes to query
855 @type hypervisor_list: list
856 @param hypervisor_list: the hypervisors to query for instances
859 return self._MultiNodeCall(node_list, "all_instances_info",
862 @_RpcTimeout(_TMO_URGENT)
863 def call_instance_list(self, node_list, hypervisor_list):
864 """Returns the list of running instances on a given node.
866 This is a multi-node call.
868 @type node_list: list
869 @param node_list: the list of nodes to query
870 @type hypervisor_list: list
871 @param hypervisor_list: the hypervisors to query for instances
874 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
876 @_RpcTimeout(_TMO_FAST)
877 def call_node_tcp_ping(self, node, source, target, port, timeout,
879 """Do a TcpPing on the remote node
881 This is a single-node call.
884 return self._SingleNodeCall(node, "node_tcp_ping",
885 [source, target, port, timeout,
888 @_RpcTimeout(_TMO_FAST)
889 def call_node_has_ip_address(self, node, address):
890 """Checks if a node has the given IP address.
892 This is a single-node call.
895 return self._SingleNodeCall(node, "node_has_ip_address", [address])
897 @_RpcTimeout(_TMO_URGENT)
898 def call_node_info(self, node_list, vg_name, hypervisor_type):
899 """Return node information.
901 This will return memory information and volume group size and free
904 This is a multi-node call.
906 @type node_list: list
907 @param node_list: the list of nodes to query
908 @type vg_name: C{string}
909 @param vg_name: the name of the volume group to ask for disk space
911 @type hypervisor_type: C{str}
912 @param hypervisor_type: the name of the hypervisor to ask for
916 return self._MultiNodeCall(node_list, "node_info",
917 [vg_name, hypervisor_type])
919 @_RpcTimeout(_TMO_NORMAL)
920 def call_etc_hosts_modify(self, node, mode, name, ip):
921 """Modify hosts file with name
924 @param node: The node to call
926 @param mode: The mode to operate. Currently "add" or "remove"
928 @param name: The host name to be modified
930 @param ip: The ip of the entry (just valid if mode is "add")
933 return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
935 @_RpcTimeout(_TMO_NORMAL)
936 def call_node_verify(self, node_list, checkdict, cluster_name):
937 """Request verification of given parameters.
939 This is a multi-node call.
942 return self._MultiNodeCall(node_list, "node_verify",
943 [checkdict, cluster_name])
946 @_RpcTimeout(_TMO_FAST)
947 def call_node_start_master_daemons(cls, node, no_voting):
948 """Starts master daemons on a node.
950 This is a single-node call.
953 return cls._StaticSingleNodeCall(node, "node_start_master_daemons",
957 @_RpcTimeout(_TMO_FAST)
958 def call_node_activate_master_ip(cls, node):
959 """Activates master IP on a node.
961 This is a single-node call.
964 return cls._StaticSingleNodeCall(node, "node_activate_master_ip", [])
967 @_RpcTimeout(_TMO_FAST)
968 def call_node_stop_master(cls, node):
969 """Deactivates master IP and stops master daemons on a node.
971 This is a single-node call.
974 return cls._StaticSingleNodeCall(node, "node_stop_master", [])
977 @_RpcTimeout(_TMO_FAST)
978 def call_node_deactivate_master_ip(cls, node):
979 """Deactivates master IP on a node.
981 This is a single-node call.
984 return cls._StaticSingleNodeCall(node, "node_deactivate_master_ip", [])
987 @_RpcTimeout(_TMO_FAST)
988 def call_node_change_master_netmask(cls, node, netmask):
989 """Change master IP netmask.
991 This is a single-node call.
994 return cls._StaticSingleNodeCall(node, "node_change_master_netmask",
998 @_RpcTimeout(_TMO_URGENT)
999 def call_master_info(cls, node_list):
1000 """Query master info.
1002 This is a multi-node call.
1005 # TODO: should this method query down nodes?
1006 return cls._StaticMultiNodeCall(node_list, "master_info", [])
1009 @_RpcTimeout(_TMO_URGENT)
1010 def call_version(cls, node_list):
1011 """Query node version.
1013 This is a multi-node call.
1016 return cls._StaticMultiNodeCall(node_list, "version", [])
1018 @_RpcTimeout(_TMO_NORMAL)
1019 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
1020 """Request creation of a given block device.
1022 This is a single-node call.
1025 return self._SingleNodeCall(node, "blockdev_create",
1026 [bdev.ToDict(), size, owner, on_primary, info])
1028 @_RpcTimeout(_TMO_SLOW)
1029 def call_blockdev_wipe(self, node, bdev, offset, size):
1030 """Request wipe at given offset with given size of a block device.
1032 This is a single-node call.
1035 return self._SingleNodeCall(node, "blockdev_wipe",
1036 [bdev.ToDict(), offset, size])
1038 @_RpcTimeout(_TMO_NORMAL)
1039 def call_blockdev_remove(self, node, bdev):
1040 """Request removal of a given block device.
1042 This is a single-node call.
1045 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
1047 @_RpcTimeout(_TMO_NORMAL)
1048 def call_blockdev_rename(self, node, devlist):
1049 """Request rename of the given block devices.
1051 This is a single-node call.
1054 return self._SingleNodeCall(node, "blockdev_rename",
1055 [(d.ToDict(), uid) for d, uid in devlist])
1057 @_RpcTimeout(_TMO_NORMAL)
1058 def call_blockdev_pause_resume_sync(self, node, disks, pause):
1059 """Request a pause/resume of given block device.
1061 This is a single-node call.
1064 return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1065 [[bdev.ToDict() for bdev in disks], pause])
1067 @_RpcTimeout(_TMO_NORMAL)
1068 def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1069 """Request assembling of a given block device.
1071 This is a single-node call.
1074 return self._SingleNodeCall(node, "blockdev_assemble",
1075 [disk.ToDict(), owner, on_primary, idx])
1077 @_RpcTimeout(_TMO_NORMAL)
1078 def call_blockdev_shutdown(self, node, disk):
1079 """Request shutdown of a given block device.
1081 This is a single-node call.
1084 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1086 @_RpcTimeout(_TMO_NORMAL)
1087 def call_blockdev_addchildren(self, node, bdev, ndevs):
1088 """Request adding a list of children to a (mirroring) device.
1090 This is a single-node call.
1093 return self._SingleNodeCall(node, "blockdev_addchildren",
1095 [disk.ToDict() for disk in ndevs]])
1097 @_RpcTimeout(_TMO_NORMAL)
1098 def call_blockdev_removechildren(self, node, bdev, ndevs):
1099 """Request removing a list of children from a (mirroring) device.
1101 This is a single-node call.
1104 return self._SingleNodeCall(node, "blockdev_removechildren",
1106 [disk.ToDict() for disk in ndevs]])
1108 @_RpcTimeout(_TMO_NORMAL)
1109 def call_blockdev_getmirrorstatus(self, node, disks):
1110 """Request status of a (mirroring) device.
1112 This is a single-node call.
1115 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1116 [dsk.ToDict() for dsk in disks])
1117 if not result.fail_msg:
1118 result.payload = [objects.BlockDevStatus.FromDict(i)
1119 for i in result.payload]
1122 @_RpcTimeout(_TMO_NORMAL)
1123 def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1124 """Request status of (mirroring) devices from multiple nodes.
1126 This is a multi-node call.
1129 result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1130 [dict((name, [dsk.ToDict() for dsk in disks])
1131 for name, disks in node_disks.items())])
1132 for nres in result.values():
1136 for idx, (success, status) in enumerate(nres.payload):
1138 nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1142 @_RpcTimeout(_TMO_NORMAL)
1143 def call_blockdev_find(self, node, disk):
1144 """Request identification of a given block device.
1146 This is a single-node call.
1149 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1150 if not result.fail_msg and result.payload is not None:
1151 result.payload = objects.BlockDevStatus.FromDict(result.payload)
1154 @_RpcTimeout(_TMO_NORMAL)
1155 def call_blockdev_close(self, node, instance_name, disks):
1156 """Closes the given block devices.
1158 This is a single-node call.
1161 params = [instance_name, [cf.ToDict() for cf in disks]]
1162 return self._SingleNodeCall(node, "blockdev_close", params)
1164 @_RpcTimeout(_TMO_NORMAL)
1165 def call_blockdev_getsize(self, node, disks):
1166 """Returns the size of the given disks.
1168 This is a single-node call.
1171 params = [[cf.ToDict() for cf in disks]]
1172 return self._SingleNodeCall(node, "blockdev_getsize", params)
1174 @_RpcTimeout(_TMO_NORMAL)
1175 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1176 """Disconnects the network of the given drbd devices.
1178 This is a multi-node call.
1181 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1182 [nodes_ip, [cf.ToDict() for cf in disks]])
1184 @_RpcTimeout(_TMO_NORMAL)
1185 def call_drbd_attach_net(self, node_list, nodes_ip,
1186 disks, instance_name, multimaster):
1187 """Disconnects the given drbd devices.
1189 This is a multi-node call.
1192 return self._MultiNodeCall(node_list, "drbd_attach_net",
1193 [nodes_ip, [cf.ToDict() for cf in disks],
1194 instance_name, multimaster])
1196 @_RpcTimeout(_TMO_SLOW)
1197 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1198 """Waits for the synchronization of drbd devices is complete.
1200 This is a multi-node call.
1203 return self._MultiNodeCall(node_list, "drbd_wait_sync",
1204 [nodes_ip, [cf.ToDict() for cf in disks]])
1206 @_RpcTimeout(_TMO_URGENT)
1207 def call_drbd_helper(self, node_list):
1208 """Gets drbd helper.
1210 This is a multi-node call.
1213 return self._MultiNodeCall(node_list, "drbd_helper", [])
1216 @_RpcTimeout(_TMO_NORMAL)
1217 def call_upload_file(cls, node_list, file_name, address_list=None):
1220 The node will refuse the operation in case the file is not on the
1223 This is a multi-node call.
1225 @type node_list: list
1226 @param node_list: the list of node names to upload to
1227 @type file_name: str
1228 @param file_name: the filename to upload
1229 @type address_list: list or None
1230 @keyword address_list: an optional list of node addresses, in order
1231 to optimize the RPC speed
1234 file_contents = utils.ReadFile(file_name)
1235 data = _Compress(file_contents)
1236 st = os.stat(file_name)
1237 getents = runtime.GetEnts()
1238 params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1239 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1240 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1241 address_list=address_list)
1244 @_RpcTimeout(_TMO_NORMAL)
1245 def call_write_ssconf_files(cls, node_list, values):
1246 """Write ssconf files.
1248 This is a multi-node call.
1251 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1253 @_RpcTimeout(_TMO_NORMAL)
1254 def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1257 This is a single-node call.
1260 return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1261 remote_node, timeout])
1263 @_RpcTimeout(_TMO_FAST)
1264 def call_os_diagnose(self, node_list):
1265 """Request a diagnose of OS definitions.
1267 This is a multi-node call.
1270 return self._MultiNodeCall(node_list, "os_diagnose", [])
1272 @_RpcTimeout(_TMO_FAST)
1273 def call_os_get(self, node, name):
1274 """Returns an OS definition.
1276 This is a single-node call.
1279 result = self._SingleNodeCall(node, "os_get", [name])
1280 if not result.fail_msg and isinstance(result.payload, dict):
1281 result.payload = objects.OS.FromDict(result.payload)
1284 @_RpcTimeout(_TMO_FAST)
1285 def call_os_validate(self, required, nodes, name, checks, params):
1286 """Run a validation routine for a given OS.
1288 This is a multi-node call.
1291 return self._MultiNodeCall(nodes, "os_validate",
1292 [required, name, checks, params])
1294 @_RpcTimeout(_TMO_NORMAL)
1295 def call_hooks_runner(self, node_list, hpath, phase, env):
1296 """Call the hooks runner.
1299 - op: the OpCode instance
1300 - env: a dictionary with the environment
1302 This is a multi-node call.
1305 params = [hpath, phase, env]
1306 return self._MultiNodeCall(node_list, "hooks_runner", params)
1308 @_RpcTimeout(_TMO_NORMAL)
1309 def call_iallocator_runner(self, node, name, idata):
1310 """Call an iallocator on a remote node
1313 - name: the iallocator name
1314 - input: the json-encoded input string
1316 This is a single-node call.
1319 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1321 @_RpcTimeout(_TMO_NORMAL)
1322 def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
1323 """Request a snapshot of the given block device.
1325 This is a single-node call.
1328 return self._SingleNodeCall(node, "blockdev_grow",
1329 [cf_bdev.ToDict(), amount, dryrun])
1331 @_RpcTimeout(_TMO_1DAY)
1332 def call_blockdev_export(self, node, cf_bdev,
1333 dest_node, dest_path, cluster_name):
1334 """Export a given disk to another node.
1336 This is a single-node call.
1339 return self._SingleNodeCall(node, "blockdev_export",
1340 [cf_bdev.ToDict(), dest_node, dest_path,
1343 @_RpcTimeout(_TMO_NORMAL)
1344 def call_blockdev_snapshot(self, node, cf_bdev):
1345 """Request a snapshot of the given block device.
1347 This is a single-node call.
1350 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1352 @_RpcTimeout(_TMO_NORMAL)
1353 def call_finalize_export(self, node, instance, snap_disks):
1354 """Request the completion of an export operation.
1356 This writes the export config file, etc.
1358 This is a single-node call.
1362 for disk in snap_disks:
1363 if isinstance(disk, bool):
1364 flat_disks.append(disk)
1366 flat_disks.append(disk.ToDict())
1368 return self._SingleNodeCall(node, "finalize_export",
1369 [self._InstDict(instance), flat_disks])
1371 @_RpcTimeout(_TMO_FAST)
1372 def call_export_info(self, node, path):
1373 """Queries the export information in a given path.
1375 This is a single-node call.
1378 return self._SingleNodeCall(node, "export_info", [path])
1380 @_RpcTimeout(_TMO_FAST)
1381 def call_export_list(self, node_list):
1382 """Gets the stored exports list.
1384 This is a multi-node call.
1387 return self._MultiNodeCall(node_list, "export_list", [])
1389 @_RpcTimeout(_TMO_FAST)
1390 def call_export_remove(self, node, export):
1391 """Requests removal of a given export.
1393 This is a single-node call.
1396 return self._SingleNodeCall(node, "export_remove", [export])
1399 @_RpcTimeout(_TMO_NORMAL)
1400 def call_node_leave_cluster(cls, node, modify_ssh_setup):
1401 """Requests a node to clean the cluster information it has.
1403 This will remove the configuration information from the ganeti data
1406 This is a single-node call.
1409 return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1412 @_RpcTimeout(_TMO_FAST)
1413 def call_node_volumes(self, node_list):
1414 """Gets all volumes on node(s).
1416 This is a multi-node call.
1419 return self._MultiNodeCall(node_list, "node_volumes", [])
1421 @_RpcTimeout(_TMO_FAST)
1422 def call_node_demote_from_mc(self, node):
1423 """Demote a node from the master candidate role.
1425 This is a single-node call.
1428 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1430 @_RpcTimeout(_TMO_NORMAL)
1431 def call_node_powercycle(self, node, hypervisor):
1432 """Tries to powercycle a node.
1434 This is a single-node call.
1437 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1440 def call_test_delay(self, node_list, duration):
1441 """Sleep for a fixed time on given node(s).
1443 This is a multi-node call.
1446 return self._MultiNodeCall(node_list, "test_delay", [duration],
1447 read_timeout=int(duration + 5))
1449 @_RpcTimeout(_TMO_FAST)
1450 def call_file_storage_dir_create(self, node, file_storage_dir):
1451 """Create the given file storage directory.
1453 This is a single-node call.
1456 return self._SingleNodeCall(node, "file_storage_dir_create",
1459 @_RpcTimeout(_TMO_FAST)
1460 def call_file_storage_dir_remove(self, node, file_storage_dir):
1461 """Remove the given file storage directory.
1463 This is a single-node call.
1466 return self._SingleNodeCall(node, "file_storage_dir_remove",
1469 @_RpcTimeout(_TMO_FAST)
1470 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1471 new_file_storage_dir):
1472 """Rename file storage directory.
1474 This is a single-node call.
1477 return self._SingleNodeCall(node, "file_storage_dir_rename",
1478 [old_file_storage_dir, new_file_storage_dir])
1481 @_RpcTimeout(_TMO_URGENT)
1482 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1483 """Update job queue.
1485 This is a multi-node call.
1488 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1489 [file_name, _Compress(content)],
1490 address_list=address_list)
1493 @_RpcTimeout(_TMO_NORMAL)
1494 def call_jobqueue_purge(cls, node):
1497 This is a single-node call.
1500 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1503 @_RpcTimeout(_TMO_URGENT)
1504 def call_jobqueue_rename(cls, node_list, address_list, rename):
1505 """Rename a job queue file.
1507 This is a multi-node call.
1510 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1511 address_list=address_list)
1513 @_RpcTimeout(_TMO_NORMAL)
1514 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1515 """Validate the hypervisor params.
1517 This is a multi-node call.
1519 @type node_list: list
1520 @param node_list: the list of nodes to query
1521 @type hvname: string
1522 @param hvname: the hypervisor name
1523 @type hvparams: dict
1524 @param hvparams: the hypervisor parameters to be validated
1527 cluster = self._cfg.GetClusterInfo()
1528 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1529 return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1532 @_RpcTimeout(_TMO_NORMAL)
1533 def call_x509_cert_create(self, node, validity):
1534 """Creates a new X509 certificate for SSL/TLS.
1536 This is a single-node call.
1539 @param validity: Validity in seconds
1542 return self._SingleNodeCall(node, "x509_cert_create", [validity])
1544 @_RpcTimeout(_TMO_NORMAL)
1545 def call_x509_cert_remove(self, node, name):
1546 """Removes a X509 certificate.
1548 This is a single-node call.
1551 @param name: Certificate name
1554 return self._SingleNodeCall(node, "x509_cert_remove", [name])
1556 @_RpcTimeout(_TMO_NORMAL)
1557 def call_import_start(self, node, opts, instance, component,
1559 """Starts a listener for an import.
1561 This is a single-node call.
1564 @param node: Node name
1565 @type instance: C{objects.Instance}
1566 @param instance: Instance object
1567 @type component: string
1568 @param component: which part of the instance is being imported
1571 return self._SingleNodeCall(node, "import_start",
1573 self._InstDict(instance), component, dest,
1574 _EncodeImportExportIO(dest, dest_args)])
1576 @_RpcTimeout(_TMO_NORMAL)
1577 def call_export_start(self, node, opts, host, port,
1578 instance, component, source, source_args):
1579 """Starts an export daemon.
1581 This is a single-node call.
1584 @param node: Node name
1585 @type instance: C{objects.Instance}
1586 @param instance: Instance object
1587 @type component: string
1588 @param component: which part of the instance is being imported
1591 return self._SingleNodeCall(node, "export_start",
1592 [opts.ToDict(), host, port,
1593 self._InstDict(instance),
1595 _EncodeImportExportIO(source, source_args)])
1597 @_RpcTimeout(_TMO_FAST)
1598 def call_impexp_status(self, node, names):
1599 """Gets the status of an import or export.
1601 This is a single-node call.
1604 @param node: Node name
1605 @type names: List of strings
1606 @param names: Import/export names
1607 @rtype: List of L{objects.ImportExportStatus} instances
1608 @return: Returns a list of the state of each named import/export or None if
1609 a status couldn't be retrieved
1612 result = self._SingleNodeCall(node, "impexp_status", [names])
1614 if not result.fail_msg:
1617 for i in result.payload:
1619 decoded.append(None)
1621 decoded.append(objects.ImportExportStatus.FromDict(i))
1623 result.payload = decoded
1627 @_RpcTimeout(_TMO_NORMAL)
1628 def call_impexp_abort(self, node, name):
1629 """Aborts an import or export.
1631 This is a single-node call.
1634 @param node: Node name
1636 @param name: Import/export name
1639 return self._SingleNodeCall(node, "impexp_abort", [name])
1641 @_RpcTimeout(_TMO_NORMAL)
1642 def call_impexp_cleanup(self, node, name):
1643 """Cleans up after an import or export.
1645 This is a single-node call.
1648 @param node: Node name
1650 @param name: Import/export name
1653 return self._SingleNodeCall(node, "impexp_cleanup", [name])