4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
50 # pylint has a bug here, doesn't see this import
51 import ganeti.http.client # pylint: disable=W0611
54 # Timeout for connecting to nodes (seconds)
55 _RPC_CONNECT_TIMEOUT = 5
57 _RPC_CLIENT_HEADERS = [
58 "Content-type: %s" % http.HTTP_APP_JSON,
62 # Various time constants for the timeout table
63 _TMO_URGENT = 60 # one minute
64 _TMO_FAST = 5 * 60 # five minutes
65 _TMO_NORMAL = 15 * 60 # 15 minutes
66 _TMO_SLOW = 3600 # one hour
70 # Timeout table that will be built later by decorators
71 # Guidelines for choosing timeouts:
72 # - call used during watcher: timeout -> 1min, _TMO_URGENT
73 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
74 # - other calls: 15 min, _TMO_NORMAL
75 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
82 """Initializes the module-global HTTP client manager.
84 Must be called before using any RPC function and while exactly one thread is
88 # curl_global_init(3) and curl_global_cleanup(3) must be called with only
89 # one thread running. This check is just a safety measure -- it doesn't
91 assert threading.activeCount() == 1, \
92 "Found more than one active thread when initializing pycURL"
94 logging.info("Using PycURL %s", pycurl.version)
96 pycurl.global_init(pycurl.GLOBAL_ALL)
100 """Stops the module-global HTTP client manager.
102 Must be called before quitting the program and while exactly one thread is
106 pycurl.global_cleanup()
109 def _ConfigRpcCurl(curl):
110 noded_cert = str(constants.NODED_CERT_FILE)
112 curl.setopt(pycurl.FOLLOWLOCATION, False)
113 curl.setopt(pycurl.CAINFO, noded_cert)
114 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
115 curl.setopt(pycurl.SSL_VERIFYPEER, True)
116 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
117 curl.setopt(pycurl.SSLCERT, noded_cert)
118 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
119 curl.setopt(pycurl.SSLKEY, noded_cert)
120 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
123 # Aliasing this module avoids the following warning by epydoc: "Warning: No
124 # information available for ganeti.rpc._RpcThreadLocal's base threading.local"
125 _threading = threading
128 class _RpcThreadLocal(_threading.local):
129 def GetHttpClientPool(self):
130 """Returns a per-thread HTTP client pool.
132 @rtype: L{http.client.HttpClientPool}
137 except AttributeError:
138 pool = http.client.HttpClientPool(_ConfigRpcCurl)
144 # Remove module alias (see above)
148 _thread_local = _RpcThreadLocal()
151 def _RpcTimeout(secs):
152 """Timeout decorator.
154 When applied to a rpc call_* function, it updates the global timeout
155 table with the given function/timeout.
160 assert name.startswith("call_")
161 _TIMEOUTS[name[len("call_"):]] = secs
167 """RPC-wrapper decorator.
169 When applied to a function, it runs it with the RPC system
170 initialized, and it shutsdown the system afterwards. This means the
171 function must be called without RPC being initialized.
174 def wrapper(*args, **kwargs):
177 return fn(*args, **kwargs)
183 class RpcResult(object):
186 This class holds an RPC result. It is needed since in multi-node
187 calls we can't raise an exception just because one one out of many
188 failed, and therefore we use this class to encapsulate the result.
190 @ivar data: the data payload, for successful results, or None
191 @ivar call: the name of the RPC call
192 @ivar node: the name of the node to which we made the call
193 @ivar offline: whether the operation failed because the node was
194 offline, as opposed to actual failure; offline=True will always
195 imply failed=True, in order to allow simpler checking if
196 the user doesn't care about the exact failure mode
197 @ivar fail_msg: the error message if the call failed
200 def __init__(self, data=None, failed=False, offline=False,
201 call=None, node=None):
202 self.offline = offline
207 self.fail_msg = "Node is marked offline"
208 self.data = self.payload = None
210 self.fail_msg = self._EnsureErr(data)
211 self.data = self.payload = None
214 if not isinstance(self.data, (tuple, list)):
215 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
219 self.fail_msg = ("RPC layer error: invalid result length (%d), "
220 "expected 2" % len(self.data))
222 elif not self.data[0]:
223 self.fail_msg = self._EnsureErr(self.data[1])
228 self.payload = data[1]
230 for attr_name in ["call", "data", "fail_msg",
231 "node", "offline", "payload"]:
232 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
236 """Helper to ensure we return a 'True' value for error."""
240 return "No error information"
242 def Raise(self, msg, prereq=False, ecode=None):
243 """If the result has failed, raise an OpExecError.
245 This is used so that LU code doesn't have to check for each
246 result, but instead can call this function.
249 if not self.fail_msg:
252 if not msg: # one could pass None for default message
253 msg = ("Call '%s' to node '%s' has failed: %s" %
254 (self.call, self.node, self.fail_msg))
256 msg = "%s: %s" % (msg, self.fail_msg)
258 ec = errors.OpPrereqError
260 ec = errors.OpExecError
261 if ecode is not None:
265 raise ec(*args) # pylint: disable=W0142
268 def _AddressLookup(node_list,
269 ssc=ssconf.SimpleStore,
270 nslookup_fn=netutils.Hostname.GetIP):
271 """Return addresses for given node names.
273 @type node_list: list
274 @param node_list: List of node names
276 @param ssc: SimpleStore class that is used to obtain node->ip mappings
277 @type nslookup_fn: callable
278 @param nslookup_fn: function use to do NS lookup
279 @rtype: list of addresses and/or None's
280 @returns: List of corresponding addresses, if found
284 iplist = ss.GetNodePrimaryIPList()
285 family = ss.GetPrimaryIPFamily()
287 ipmap = dict(entry.split() for entry in iplist)
288 for node in node_list:
289 address = ipmap.get(node)
291 address = nslookup_fn(node, family=family)
292 addresses.append(address)
300 This class, given a (remote) method name, a list of parameters and a
301 list of nodes, will contact (in parallel) all nodes, and return a
302 dict of results (key: node name, value: result).
304 One current bug is that generic failure is still signaled by
305 'False' result, which is not good. This overloading of values can
309 def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
310 assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
312 self.procedure = procedure
316 self._address_lookup_fn = address_lookup_fn
318 def ConnectList(self, node_list, address_list=None, read_timeout=None):
319 """Add a list of nodes to the target nodes.
321 @type node_list: list
322 @param node_list: the list of node names to connect
323 @type address_list: list or None
324 @keyword address_list: either None or a list with node addresses,
325 which must have the same length as the node list
326 @type read_timeout: int
327 @param read_timeout: overwrites default timeout for operation
330 if address_list is None:
331 # Always use IP address instead of node name
332 address_list = self._address_lookup_fn(node_list)
334 assert len(node_list) == len(address_list), \
335 "Name and address lists must have the same length"
337 for node, address in zip(node_list, address_list):
338 self.ConnectNode(node, address, read_timeout=read_timeout)
340 def ConnectNode(self, name, address=None, read_timeout=None):
341 """Add a node to the target list.
344 @param name: the node name
346 @param address: the node address, if known
347 @type read_timeout: int
348 @param read_timeout: overwrites default timeout for operation
352 # Always use IP address instead of node name
353 address = self._address_lookup_fn([name])[0]
355 assert(address is not None)
357 if read_timeout is None:
358 read_timeout = _TIMEOUTS[self.procedure]
360 self._request[name] = \
361 http.client.HttpClientRequest(str(address), self.port,
362 http.HTTP_PUT, str("/%s" % self.procedure),
363 headers=_RPC_CLIENT_HEADERS,
364 post_data=str(self.body),
365 read_timeout=read_timeout)
367 def GetResults(self, http_pool=None):
368 """Call nodes and return results.
371 @return: List of RPC results
375 http_pool = http.client.HttpClientPool(_ConfigRpcCurl)
377 http_pool.ProcessRequests(self._request.values())
381 for name, req in self._request.iteritems():
382 if req.success and req.resp_status_code == http.HTTP_OK:
383 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
384 node=name, call=self.procedure)
387 # TODO: Better error reporting
393 logging.error("RPC error in %s from node %s: %s",
394 self.procedure, name, msg)
395 results[name] = RpcResult(data=msg, failed=True, node=name,
401 def _EncodeImportExportIO(ieio, ieioargs):
402 """Encodes import/export I/O information.
405 if ieio == constants.IEIO_RAW_DISK:
406 assert len(ieioargs) == 1
407 return (ieioargs[0].ToDict(), )
409 if ieio == constants.IEIO_SCRIPT:
410 assert len(ieioargs) == 2
411 return (ieioargs[0].ToDict(), ieioargs[1])
416 class RpcRunner(object):
417 """RPC runner class"""
419 def __init__(self, cfg):
420 """Initialized the rpc runner.
422 @type cfg: C{config.ConfigWriter}
423 @param cfg: the configuration object that will be used to get data
428 self.port = netutils.GetDaemonPort(constants.NODED)
430 def _InstDict(self, instance, hvp=None, bep=None, osp=None):
431 """Convert the given instance to a dict.
433 This is done via the instance's ToDict() method and additionally
434 we fill the hvparams with the cluster defaults.
436 @type instance: L{objects.Instance}
437 @param instance: an Instance object
438 @type hvp: dict or None
439 @param hvp: a dictionary with overridden hypervisor parameters
440 @type bep: dict or None
441 @param bep: a dictionary with overridden backend parameters
442 @type osp: dict or None
443 @param osp: a dictionary with overridden os parameters
445 @return: the instance dict, with the hvparams filled with the
449 idict = instance.ToDict()
450 cluster = self._cfg.GetClusterInfo()
451 idict["hvparams"] = cluster.FillHV(instance)
453 idict["hvparams"].update(hvp)
454 idict["beparams"] = cluster.FillBE(instance)
456 idict["beparams"].update(bep)
457 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
459 idict["osparams"].update(osp)
460 for nic in idict["nics"]:
461 nic['nicparams'] = objects.FillDict(
462 cluster.nicparams[constants.PP_DEFAULT],
466 def _ConnectList(self, client, node_list, call, read_timeout=None):
467 """Helper for computing node addresses.
469 @type client: L{ganeti.rpc.Client}
470 @param client: a C{Client} instance
471 @type node_list: list
472 @param node_list: the node list we should connect
474 @param call: the name of the remote procedure call, for filling in
475 correctly any eventual offline nodes' results
476 @type read_timeout: int
477 @param read_timeout: overwrites the default read timeout for the
481 all_nodes = self._cfg.GetAllNodesInfo()
485 for node in node_list:
486 if node in all_nodes:
487 if all_nodes[node].offline:
488 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
490 val = all_nodes[node].primary_ip
493 addr_list.append(val)
494 name_list.append(node)
496 client.ConnectList(name_list, address_list=addr_list,
497 read_timeout=read_timeout)
500 def _ConnectNode(self, client, node, call, read_timeout=None):
501 """Helper for computing one node's address.
503 @type client: L{ganeti.rpc.Client}
504 @param client: a C{Client} instance
506 @param node: the node we should connect
508 @param call: the name of the remote procedure call, for filling in
509 correctly any eventual offline nodes' results
510 @type read_timeout: int
511 @param read_timeout: overwrites the default read timeout for the
515 node_info = self._cfg.GetNodeInfo(node)
516 if node_info is not None:
517 if node_info.offline:
518 return RpcResult(node=node, offline=True, call=call)
519 addr = node_info.primary_ip
522 client.ConnectNode(node, address=addr, read_timeout=read_timeout)
524 def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
525 """Helper for making a multi-node call
528 body = serializer.DumpJson(args, indent=False)
529 c = Client(procedure, body, self.port)
530 skip_dict = self._ConnectList(c, node_list, procedure,
531 read_timeout=read_timeout)
532 skip_dict.update(c.GetResults())
536 def _StaticMultiNodeCall(cls, node_list, procedure, args,
537 address_list=None, read_timeout=None):
538 """Helper for making a multi-node static call
541 body = serializer.DumpJson(args, indent=False)
542 c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
543 c.ConnectList(node_list, address_list=address_list,
544 read_timeout=read_timeout)
545 return c.GetResults()
547 def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
548 """Helper for making a single-node call
551 body = serializer.DumpJson(args, indent=False)
552 c = Client(procedure, body, self.port)
553 result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
555 # we did connect, node is not offline
556 result = c.GetResults()[node]
560 def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
561 """Helper for making a single-node static call
564 body = serializer.DumpJson(args, indent=False)
565 c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
566 c.ConnectNode(node, read_timeout=read_timeout)
567 return c.GetResults()[node]
571 """Compresses a string for transport over RPC.
573 Small amounts of data are not compressed.
578 @return: Encoded data to send
581 # Small amounts of data are not compressed
583 return (constants.RPC_ENCODING_NONE, data)
585 # Compress with zlib and encode in base64
586 return (constants.RPC_ENCODING_ZLIB_BASE64,
587 base64.b64encode(zlib.compress(data, 3)))
593 @_RpcTimeout(_TMO_URGENT)
594 def call_bdev_sizes(self, node_list, devices):
595 """Gets the sizes of requested block devices present on a node
597 This is a multi-node call.
600 return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
602 @_RpcTimeout(_TMO_URGENT)
603 def call_lv_list(self, node_list, vg_name):
604 """Gets the logical volumes present in a given volume group.
606 This is a multi-node call.
609 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
611 @_RpcTimeout(_TMO_URGENT)
612 def call_vg_list(self, node_list):
613 """Gets the volume group list.
615 This is a multi-node call.
618 return self._MultiNodeCall(node_list, "vg_list", [])
620 @_RpcTimeout(_TMO_NORMAL)
621 def call_storage_list(self, node_list, su_name, su_args, name, fields):
622 """Get list of storage units.
624 This is a multi-node call.
627 return self._MultiNodeCall(node_list, "storage_list",
628 [su_name, su_args, name, fields])
630 @_RpcTimeout(_TMO_NORMAL)
631 def call_storage_modify(self, node, su_name, su_args, name, changes):
632 """Modify a storage unit.
634 This is a single-node call.
637 return self._SingleNodeCall(node, "storage_modify",
638 [su_name, su_args, name, changes])
640 @_RpcTimeout(_TMO_NORMAL)
641 def call_storage_execute(self, node, su_name, su_args, name, op):
642 """Executes an operation on a storage unit.
644 This is a single-node call.
647 return self._SingleNodeCall(node, "storage_execute",
648 [su_name, su_args, name, op])
650 @_RpcTimeout(_TMO_URGENT)
651 def call_bridges_exist(self, node, bridges_list):
652 """Checks if a node has all the bridges given.
654 This method checks if all bridges given in the bridges_list are
655 present on the remote node, so that an instance that uses interfaces
656 on those bridges can be started.
658 This is a single-node call.
661 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
663 @_RpcTimeout(_TMO_NORMAL)
664 def call_instance_start(self, node, instance, hvp, bep, startup_paused):
665 """Starts an instance.
667 This is a single-node call.
670 idict = self._InstDict(instance, hvp=hvp, bep=bep)
671 return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
673 @_RpcTimeout(_TMO_NORMAL)
674 def call_instance_shutdown(self, node, instance, timeout):
675 """Stops an instance.
677 This is a single-node call.
680 return self._SingleNodeCall(node, "instance_shutdown",
681 [self._InstDict(instance), timeout])
683 @_RpcTimeout(_TMO_NORMAL)
684 def call_migration_info(self, node, instance):
685 """Gather the information necessary to prepare an instance migration.
687 This is a single-node call.
690 @param node: the node on which the instance is currently running
691 @type instance: C{objects.Instance}
692 @param instance: the instance definition
695 return self._SingleNodeCall(node, "migration_info",
696 [self._InstDict(instance)])
698 @_RpcTimeout(_TMO_NORMAL)
699 def call_accept_instance(self, node, instance, info, target):
700 """Prepare a node to accept an instance.
702 This is a single-node call.
705 @param node: the target node for the migration
706 @type instance: C{objects.Instance}
707 @param instance: the instance definition
708 @type info: opaque/hypervisor specific (string/data)
709 @param info: result for the call_migration_info call
711 @param target: target hostname (usually ip address) (on the node itself)
714 return self._SingleNodeCall(node, "accept_instance",
715 [self._InstDict(instance), info, target])
717 @_RpcTimeout(_TMO_NORMAL)
718 def call_finalize_migration(self, node, instance, info, success):
719 """Finalize any target-node migration specific operation.
721 This is called both in case of a successful migration and in case of error
722 (in which case it should abort the migration).
724 This is a single-node call.
727 @param node: the target node for the migration
728 @type instance: C{objects.Instance}
729 @param instance: the instance definition
730 @type info: opaque/hypervisor specific (string/data)
731 @param info: result for the call_migration_info call
732 @type success: boolean
733 @param success: whether the migration was a success or a failure
736 return self._SingleNodeCall(node, "finalize_migration",
737 [self._InstDict(instance), info, success])
739 @_RpcTimeout(_TMO_SLOW)
740 def call_instance_migrate(self, node, instance, target, live):
741 """Migrate an instance.
743 This is a single-node call.
746 @param node: the node on which the instance is currently running
747 @type instance: C{objects.Instance}
748 @param instance: the instance definition
750 @param target: the target node name
752 @param live: whether the migration should be done live or not (the
753 interpretation of this parameter is left to the hypervisor)
756 return self._SingleNodeCall(node, "instance_migrate",
757 [self._InstDict(instance), target, live])
759 @_RpcTimeout(_TMO_NORMAL)
760 def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
761 """Reboots an instance.
763 This is a single-node call.
766 return self._SingleNodeCall(node, "instance_reboot",
767 [self._InstDict(inst), reboot_type,
770 @_RpcTimeout(_TMO_1DAY)
771 def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
772 """Installs an OS on the given instance.
774 This is a single-node call.
777 return self._SingleNodeCall(node, "instance_os_add",
778 [self._InstDict(inst, osp=osparams),
781 @_RpcTimeout(_TMO_SLOW)
782 def call_instance_run_rename(self, node, inst, old_name, debug):
783 """Run the OS rename script for an instance.
785 This is a single-node call.
788 return self._SingleNodeCall(node, "instance_run_rename",
789 [self._InstDict(inst), old_name, debug])
791 @_RpcTimeout(_TMO_URGENT)
792 def call_instance_info(self, node, instance, hname):
793 """Returns information about a single instance.
795 This is a single-node call.
798 @param node: the list of nodes to query
799 @type instance: string
800 @param instance: the instance name
802 @param hname: the hypervisor type of the instance
805 return self._SingleNodeCall(node, "instance_info", [instance, hname])
807 @_RpcTimeout(_TMO_NORMAL)
808 def call_instance_migratable(self, node, instance):
809 """Checks whether the given instance can be migrated.
811 This is a single-node call.
813 @param node: the node to query
814 @type instance: L{objects.Instance}
815 @param instance: the instance to check
819 return self._SingleNodeCall(node, "instance_migratable",
820 [self._InstDict(instance)])
822 @_RpcTimeout(_TMO_URGENT)
823 def call_all_instances_info(self, node_list, hypervisor_list):
824 """Returns information about all instances on the given nodes.
826 This is a multi-node call.
828 @type node_list: list
829 @param node_list: the list of nodes to query
830 @type hypervisor_list: list
831 @param hypervisor_list: the hypervisors to query for instances
834 return self._MultiNodeCall(node_list, "all_instances_info",
837 @_RpcTimeout(_TMO_URGENT)
838 def call_instance_list(self, node_list, hypervisor_list):
839 """Returns the list of running instances on a given node.
841 This is a multi-node call.
843 @type node_list: list
844 @param node_list: the list of nodes to query
845 @type hypervisor_list: list
846 @param hypervisor_list: the hypervisors to query for instances
849 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
851 @_RpcTimeout(_TMO_FAST)
852 def call_node_tcp_ping(self, node, source, target, port, timeout,
854 """Do a TcpPing on the remote node
856 This is a single-node call.
859 return self._SingleNodeCall(node, "node_tcp_ping",
860 [source, target, port, timeout,
863 @_RpcTimeout(_TMO_FAST)
864 def call_node_has_ip_address(self, node, address):
865 """Checks if a node has the given IP address.
867 This is a single-node call.
870 return self._SingleNodeCall(node, "node_has_ip_address", [address])
872 @_RpcTimeout(_TMO_URGENT)
873 def call_node_info(self, node_list, vg_name, hypervisor_type):
874 """Return node information.
876 This will return memory information and volume group size and free
879 This is a multi-node call.
881 @type node_list: list
882 @param node_list: the list of nodes to query
883 @type vg_name: C{string}
884 @param vg_name: the name of the volume group to ask for disk space
886 @type hypervisor_type: C{str}
887 @param hypervisor_type: the name of the hypervisor to ask for
891 return self._MultiNodeCall(node_list, "node_info",
892 [vg_name, hypervisor_type])
894 @_RpcTimeout(_TMO_NORMAL)
895 def call_etc_hosts_modify(self, node, mode, name, ip):
896 """Modify hosts file with name
899 @param node: The node to call
901 @param mode: The mode to operate. Currently "add" or "remove"
903 @param name: The host name to be modified
905 @param ip: The ip of the entry (just valid if mode is "add")
908 return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
910 @_RpcTimeout(_TMO_NORMAL)
911 def call_node_verify(self, node_list, checkdict, cluster_name):
912 """Request verification of given parameters.
914 This is a multi-node call.
917 return self._MultiNodeCall(node_list, "node_verify",
918 [checkdict, cluster_name])
921 @_RpcTimeout(_TMO_FAST)
922 def call_node_start_master_daemons(cls, node, no_voting):
923 """Starts master daemons on a node.
925 This is a single-node call.
928 return cls._StaticSingleNodeCall(node, "node_start_master_daemons",
932 @_RpcTimeout(_TMO_FAST)
933 def call_node_activate_master_ip(cls, node):
934 """Activates master IP on a node.
936 This is a single-node call.
939 return cls._StaticSingleNodeCall(node, "node_activate_master_ip", [])
942 @_RpcTimeout(_TMO_FAST)
943 def call_node_stop_master(cls, node):
944 """Deactivates master IP and stops master daemons on a node.
946 This is a single-node call.
949 return cls._StaticSingleNodeCall(node, "node_stop_master", [])
952 @_RpcTimeout(_TMO_FAST)
953 def call_node_deactivate_master_ip(cls, node):
954 """Deactivates master IP on a node.
956 This is a single-node call.
959 return cls._StaticSingleNodeCall(node, "node_deactivate_master_ip", [])
962 @_RpcTimeout(_TMO_URGENT)
963 def call_master_info(cls, node_list):
964 """Query master info.
966 This is a multi-node call.
969 # TODO: should this method query down nodes?
970 return cls._StaticMultiNodeCall(node_list, "master_info", [])
973 @_RpcTimeout(_TMO_URGENT)
974 def call_version(cls, node_list):
975 """Query node version.
977 This is a multi-node call.
980 return cls._StaticMultiNodeCall(node_list, "version", [])
982 @_RpcTimeout(_TMO_NORMAL)
983 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
984 """Request creation of a given block device.
986 This is a single-node call.
989 return self._SingleNodeCall(node, "blockdev_create",
990 [bdev.ToDict(), size, owner, on_primary, info])
992 @_RpcTimeout(_TMO_SLOW)
993 def call_blockdev_wipe(self, node, bdev, offset, size):
994 """Request wipe at given offset with given size of a block device.
996 This is a single-node call.
999 return self._SingleNodeCall(node, "blockdev_wipe",
1000 [bdev.ToDict(), offset, size])
1002 @_RpcTimeout(_TMO_NORMAL)
1003 def call_blockdev_remove(self, node, bdev):
1004 """Request removal of a given block device.
1006 This is a single-node call.
1009 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
1011 @_RpcTimeout(_TMO_NORMAL)
1012 def call_blockdev_rename(self, node, devlist):
1013 """Request rename of the given block devices.
1015 This is a single-node call.
1018 return self._SingleNodeCall(node, "blockdev_rename",
1019 [(d.ToDict(), uid) for d, uid in devlist])
1021 @_RpcTimeout(_TMO_NORMAL)
1022 def call_blockdev_pause_resume_sync(self, node, disks, pause):
1023 """Request a pause/resume of given block device.
1025 This is a single-node call.
1028 return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1029 [[bdev.ToDict() for bdev in disks], pause])
1031 @_RpcTimeout(_TMO_NORMAL)
1032 def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1033 """Request assembling of a given block device.
1035 This is a single-node call.
1038 return self._SingleNodeCall(node, "blockdev_assemble",
1039 [disk.ToDict(), owner, on_primary, idx])
1041 @_RpcTimeout(_TMO_NORMAL)
1042 def call_blockdev_shutdown(self, node, disk):
1043 """Request shutdown of a given block device.
1045 This is a single-node call.
1048 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1050 @_RpcTimeout(_TMO_NORMAL)
1051 def call_blockdev_addchildren(self, node, bdev, ndevs):
1052 """Request adding a list of children to a (mirroring) device.
1054 This is a single-node call.
1057 return self._SingleNodeCall(node, "blockdev_addchildren",
1059 [disk.ToDict() for disk in ndevs]])
1061 @_RpcTimeout(_TMO_NORMAL)
1062 def call_blockdev_removechildren(self, node, bdev, ndevs):
1063 """Request removing a list of children from a (mirroring) device.
1065 This is a single-node call.
1068 return self._SingleNodeCall(node, "blockdev_removechildren",
1070 [disk.ToDict() for disk in ndevs]])
1072 @_RpcTimeout(_TMO_NORMAL)
1073 def call_blockdev_getmirrorstatus(self, node, disks):
1074 """Request status of a (mirroring) device.
1076 This is a single-node call.
1079 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1080 [dsk.ToDict() for dsk in disks])
1081 if not result.fail_msg:
1082 result.payload = [objects.BlockDevStatus.FromDict(i)
1083 for i in result.payload]
1086 @_RpcTimeout(_TMO_NORMAL)
1087 def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1088 """Request status of (mirroring) devices from multiple nodes.
1090 This is a multi-node call.
1093 result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1094 [dict((name, [dsk.ToDict() for dsk in disks])
1095 for name, disks in node_disks.items())])
1096 for nres in result.values():
1100 for idx, (success, status) in enumerate(nres.payload):
1102 nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1106 @_RpcTimeout(_TMO_NORMAL)
1107 def call_blockdev_find(self, node, disk):
1108 """Request identification of a given block device.
1110 This is a single-node call.
1113 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1114 if not result.fail_msg and result.payload is not None:
1115 result.payload = objects.BlockDevStatus.FromDict(result.payload)
1118 @_RpcTimeout(_TMO_NORMAL)
1119 def call_blockdev_close(self, node, instance_name, disks):
1120 """Closes the given block devices.
1122 This is a single-node call.
1125 params = [instance_name, [cf.ToDict() for cf in disks]]
1126 return self._SingleNodeCall(node, "blockdev_close", params)
1128 @_RpcTimeout(_TMO_NORMAL)
1129 def call_blockdev_getsize(self, node, disks):
1130 """Returns the size of the given disks.
1132 This is a single-node call.
1135 params = [[cf.ToDict() for cf in disks]]
1136 return self._SingleNodeCall(node, "blockdev_getsize", params)
1138 @_RpcTimeout(_TMO_NORMAL)
1139 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1140 """Disconnects the network of the given drbd devices.
1142 This is a multi-node call.
1145 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1146 [nodes_ip, [cf.ToDict() for cf in disks]])
1148 @_RpcTimeout(_TMO_NORMAL)
1149 def call_drbd_attach_net(self, node_list, nodes_ip,
1150 disks, instance_name, multimaster):
1151 """Disconnects the given drbd devices.
1153 This is a multi-node call.
1156 return self._MultiNodeCall(node_list, "drbd_attach_net",
1157 [nodes_ip, [cf.ToDict() for cf in disks],
1158 instance_name, multimaster])
1160 @_RpcTimeout(_TMO_SLOW)
1161 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1162 """Waits for the synchronization of drbd devices is complete.
1164 This is a multi-node call.
1167 return self._MultiNodeCall(node_list, "drbd_wait_sync",
1168 [nodes_ip, [cf.ToDict() for cf in disks]])
1170 @_RpcTimeout(_TMO_URGENT)
1171 def call_drbd_helper(self, node_list):
1172 """Gets drbd helper.
1174 This is a multi-node call.
1177 return self._MultiNodeCall(node_list, "drbd_helper", [])
1180 @_RpcTimeout(_TMO_NORMAL)
1181 def call_upload_file(cls, node_list, file_name, address_list=None):
1184 The node will refuse the operation in case the file is not on the
1187 This is a multi-node call.
1189 @type node_list: list
1190 @param node_list: the list of node names to upload to
1191 @type file_name: str
1192 @param file_name: the filename to upload
1193 @type address_list: list or None
1194 @keyword address_list: an optional list of node addresses, in order
1195 to optimize the RPC speed
1198 file_contents = utils.ReadFile(file_name)
1199 data = cls._Compress(file_contents)
1200 st = os.stat(file_name)
1201 getents = runtime.GetEnts()
1202 params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1203 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1204 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1205 address_list=address_list)
1208 @_RpcTimeout(_TMO_NORMAL)
1209 def call_write_ssconf_files(cls, node_list, values):
1210 """Write ssconf files.
1212 This is a multi-node call.
1215 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1217 @_RpcTimeout(_TMO_NORMAL)
1218 def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1221 This is a single-node call.
1224 return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1225 remote_node, timeout])
1227 @_RpcTimeout(_TMO_FAST)
1228 def call_os_diagnose(self, node_list):
1229 """Request a diagnose of OS definitions.
1231 This is a multi-node call.
1234 return self._MultiNodeCall(node_list, "os_diagnose", [])
1236 @_RpcTimeout(_TMO_FAST)
1237 def call_os_get(self, node, name):
1238 """Returns an OS definition.
1240 This is a single-node call.
1243 result = self._SingleNodeCall(node, "os_get", [name])
1244 if not result.fail_msg and isinstance(result.payload, dict):
1245 result.payload = objects.OS.FromDict(result.payload)
1248 @_RpcTimeout(_TMO_FAST)
1249 def call_os_validate(self, required, nodes, name, checks, params):
1250 """Run a validation routine for a given OS.
1252 This is a multi-node call.
1255 return self._MultiNodeCall(nodes, "os_validate",
1256 [required, name, checks, params])
1258 @_RpcTimeout(_TMO_NORMAL)
1259 def call_hooks_runner(self, node_list, hpath, phase, env):
1260 """Call the hooks runner.
1263 - op: the OpCode instance
1264 - env: a dictionary with the environment
1266 This is a multi-node call.
1269 params = [hpath, phase, env]
1270 return self._MultiNodeCall(node_list, "hooks_runner", params)
1272 @_RpcTimeout(_TMO_NORMAL)
1273 def call_iallocator_runner(self, node, name, idata):
1274 """Call an iallocator on a remote node
1277 - name: the iallocator name
1278 - input: the json-encoded input string
1280 This is a single-node call.
1283 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1285 @_RpcTimeout(_TMO_NORMAL)
1286 def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
1287 """Request a snapshot of the given block device.
1289 This is a single-node call.
1292 return self._SingleNodeCall(node, "blockdev_grow",
1293 [cf_bdev.ToDict(), amount, dryrun])
1295 @_RpcTimeout(_TMO_1DAY)
1296 def call_blockdev_export(self, node, cf_bdev,
1297 dest_node, dest_path, cluster_name):
1298 """Export a given disk to another node.
1300 This is a single-node call.
1303 return self._SingleNodeCall(node, "blockdev_export",
1304 [cf_bdev.ToDict(), dest_node, dest_path,
1307 @_RpcTimeout(_TMO_NORMAL)
1308 def call_blockdev_snapshot(self, node, cf_bdev):
1309 """Request a snapshot of the given block device.
1311 This is a single-node call.
1314 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1316 @_RpcTimeout(_TMO_NORMAL)
1317 def call_finalize_export(self, node, instance, snap_disks):
1318 """Request the completion of an export operation.
1320 This writes the export config file, etc.
1322 This is a single-node call.
1326 for disk in snap_disks:
1327 if isinstance(disk, bool):
1328 flat_disks.append(disk)
1330 flat_disks.append(disk.ToDict())
1332 return self._SingleNodeCall(node, "finalize_export",
1333 [self._InstDict(instance), flat_disks])
1335 @_RpcTimeout(_TMO_FAST)
1336 def call_export_info(self, node, path):
1337 """Queries the export information in a given path.
1339 This is a single-node call.
1342 return self._SingleNodeCall(node, "export_info", [path])
1344 @_RpcTimeout(_TMO_FAST)
1345 def call_export_list(self, node_list):
1346 """Gets the stored exports list.
1348 This is a multi-node call.
1351 return self._MultiNodeCall(node_list, "export_list", [])
1353 @_RpcTimeout(_TMO_FAST)
1354 def call_export_remove(self, node, export):
1355 """Requests removal of a given export.
1357 This is a single-node call.
1360 return self._SingleNodeCall(node, "export_remove", [export])
1363 @_RpcTimeout(_TMO_NORMAL)
1364 def call_node_leave_cluster(cls, node, modify_ssh_setup):
1365 """Requests a node to clean the cluster information it has.
1367 This will remove the configuration information from the ganeti data
1370 This is a single-node call.
1373 return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1376 @_RpcTimeout(_TMO_FAST)
1377 def call_node_volumes(self, node_list):
1378 """Gets all volumes on node(s).
1380 This is a multi-node call.
1383 return self._MultiNodeCall(node_list, "node_volumes", [])
1385 @_RpcTimeout(_TMO_FAST)
1386 def call_node_demote_from_mc(self, node):
1387 """Demote a node from the master candidate role.
1389 This is a single-node call.
1392 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1394 @_RpcTimeout(_TMO_NORMAL)
1395 def call_node_powercycle(self, node, hypervisor):
1396 """Tries to powercycle a node.
1398 This is a single-node call.
1401 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1404 def call_test_delay(self, node_list, duration):
1405 """Sleep for a fixed time on given node(s).
1407 This is a multi-node call.
1410 return self._MultiNodeCall(node_list, "test_delay", [duration],
1411 read_timeout=int(duration + 5))
1413 @_RpcTimeout(_TMO_FAST)
1414 def call_file_storage_dir_create(self, node, file_storage_dir):
1415 """Create the given file storage directory.
1417 This is a single-node call.
1420 return self._SingleNodeCall(node, "file_storage_dir_create",
1423 @_RpcTimeout(_TMO_FAST)
1424 def call_file_storage_dir_remove(self, node, file_storage_dir):
1425 """Remove the given file storage directory.
1427 This is a single-node call.
1430 return self._SingleNodeCall(node, "file_storage_dir_remove",
1433 @_RpcTimeout(_TMO_FAST)
1434 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1435 new_file_storage_dir):
1436 """Rename file storage directory.
1438 This is a single-node call.
1441 return self._SingleNodeCall(node, "file_storage_dir_rename",
1442 [old_file_storage_dir, new_file_storage_dir])
1445 @_RpcTimeout(_TMO_URGENT)
1446 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1447 """Update job queue.
1449 This is a multi-node call.
1452 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1453 [file_name, cls._Compress(content)],
1454 address_list=address_list)
1457 @_RpcTimeout(_TMO_NORMAL)
1458 def call_jobqueue_purge(cls, node):
1461 This is a single-node call.
1464 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1467 @_RpcTimeout(_TMO_URGENT)
1468 def call_jobqueue_rename(cls, node_list, address_list, rename):
1469 """Rename a job queue file.
1471 This is a multi-node call.
1474 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1475 address_list=address_list)
1477 @_RpcTimeout(_TMO_NORMAL)
1478 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1479 """Validate the hypervisor params.
1481 This is a multi-node call.
1483 @type node_list: list
1484 @param node_list: the list of nodes to query
1485 @type hvname: string
1486 @param hvname: the hypervisor name
1487 @type hvparams: dict
1488 @param hvparams: the hypervisor parameters to be validated
1491 cluster = self._cfg.GetClusterInfo()
1492 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1493 return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1496 @_RpcTimeout(_TMO_NORMAL)
1497 def call_x509_cert_create(self, node, validity):
1498 """Creates a new X509 certificate for SSL/TLS.
1500 This is a single-node call.
1503 @param validity: Validity in seconds
1506 return self._SingleNodeCall(node, "x509_cert_create", [validity])
1508 @_RpcTimeout(_TMO_NORMAL)
1509 def call_x509_cert_remove(self, node, name):
1510 """Removes a X509 certificate.
1512 This is a single-node call.
1515 @param name: Certificate name
1518 return self._SingleNodeCall(node, "x509_cert_remove", [name])
1520 @_RpcTimeout(_TMO_NORMAL)
1521 def call_import_start(self, node, opts, instance, component,
1523 """Starts a listener for an import.
1525 This is a single-node call.
1528 @param node: Node name
1529 @type instance: C{objects.Instance}
1530 @param instance: Instance object
1531 @type component: string
1532 @param component: which part of the instance is being imported
1535 return self._SingleNodeCall(node, "import_start",
1537 self._InstDict(instance), component, dest,
1538 _EncodeImportExportIO(dest, dest_args)])
1540 @_RpcTimeout(_TMO_NORMAL)
1541 def call_export_start(self, node, opts, host, port,
1542 instance, component, source, source_args):
1543 """Starts an export daemon.
1545 This is a single-node call.
1548 @param node: Node name
1549 @type instance: C{objects.Instance}
1550 @param instance: Instance object
1551 @type component: string
1552 @param component: which part of the instance is being imported
1555 return self._SingleNodeCall(node, "export_start",
1556 [opts.ToDict(), host, port,
1557 self._InstDict(instance),
1559 _EncodeImportExportIO(source, source_args)])
1561 @_RpcTimeout(_TMO_FAST)
1562 def call_impexp_status(self, node, names):
1563 """Gets the status of an import or export.
1565 This is a single-node call.
1568 @param node: Node name
1569 @type names: List of strings
1570 @param names: Import/export names
1571 @rtype: List of L{objects.ImportExportStatus} instances
1572 @return: Returns a list of the state of each named import/export or None if
1573 a status couldn't be retrieved
1576 result = self._SingleNodeCall(node, "impexp_status", [names])
1578 if not result.fail_msg:
1581 for i in result.payload:
1583 decoded.append(None)
1585 decoded.append(objects.ImportExportStatus.FromDict(i))
1587 result.payload = decoded
1591 @_RpcTimeout(_TMO_NORMAL)
1592 def call_impexp_abort(self, node, name):
1593 """Aborts an import or export.
1595 This is a single-node call.
1598 @param node: Node name
1600 @param name: Import/export name
1603 return self._SingleNodeCall(node, "impexp_abort", [name])
1605 @_RpcTimeout(_TMO_NORMAL)
1606 def call_impexp_cleanup(self, node, name):
1607 """Cleans up after an import or export.
1609 This is a single-node call.
1612 @param node: Node name
1614 @param name: Import/export name
1617 return self._SingleNodeCall(node, "impexp_cleanup", [name])