4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
49 # pylint has a bug here, doesn't see this import
50 import ganeti.http.client # pylint: disable-msg=W0611
53 # Timeout for connecting to nodes (seconds)
54 _RPC_CONNECT_TIMEOUT = 5
56 _RPC_CLIENT_HEADERS = [
57 "Content-type: %s" % http.HTTP_APP_JSON,
61 # Various time constants for the timeout table
62 _TMO_URGENT = 60 # one minute
63 _TMO_FAST = 5 * 60 # five minutes
64 _TMO_NORMAL = 15 * 60 # 15 minutes
65 _TMO_SLOW = 3600 # one hour
69 # Timeout table that will be built later by decorators
70 # Guidelines for choosing timeouts:
71 # - call used during watcher: timeout -> 1min, _TMO_URGENT
72 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
73 # - other calls: 15 min, _TMO_NORMAL
74 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
81 """Initializes the module-global HTTP client manager.
83 Must be called before using any RPC function and while exactly one thread is
87 # curl_global_init(3) and curl_global_cleanup(3) must be called with only
88 # one thread running. This check is just a safety measure -- it doesn't
90 assert threading.activeCount() == 1, \
91 "Found more than one active thread when initializing pycURL"
93 logging.info("Using PycURL %s", pycurl.version)
95 pycurl.global_init(pycurl.GLOBAL_ALL)
99 """Stops the module-global HTTP client manager.
101 Must be called before quitting the program and while exactly one thread is
105 pycurl.global_cleanup()
108 def _ConfigRpcCurl(curl):
109 noded_cert = str(constants.NODED_CERT_FILE)
111 curl.setopt(pycurl.FOLLOWLOCATION, False)
112 curl.setopt(pycurl.CAINFO, noded_cert)
113 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
114 curl.setopt(pycurl.SSL_VERIFYPEER, True)
115 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
116 curl.setopt(pycurl.SSLCERT, noded_cert)
117 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
118 curl.setopt(pycurl.SSLKEY, noded_cert)
119 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
122 # Aliasing this module avoids the following warning by epydoc: "Warning: No
123 # information available for ganeti.rpc._RpcThreadLocal's base threading.local"
124 _threading = threading
127 class _RpcThreadLocal(_threading.local):
128 def GetHttpClientPool(self):
129 """Returns a per-thread HTTP client pool.
131 @rtype: L{http.client.HttpClientPool}
136 except AttributeError:
137 pool = http.client.HttpClientPool(_ConfigRpcCurl)
143 # Remove module alias (see above)
147 _thread_local = _RpcThreadLocal()
150 def _RpcTimeout(secs):
151 """Timeout decorator.
153 When applied to a rpc call_* function, it updates the global timeout
154 table with the given function/timeout.
159 assert name.startswith("call_")
160 _TIMEOUTS[name[len("call_"):]] = secs
166 """RPC-wrapper decorator.
168 When applied to a function, it runs it with the RPC system
169 initialized, and it shutsdown the system afterwards. This means the
170 function must be called without RPC being initialized.
173 def wrapper(*args, **kwargs):
176 return fn(*args, **kwargs)
182 class RpcResult(object):
185 This class holds an RPC result. It is needed since in multi-node
186 calls we can't raise an exception just because one one out of many
187 failed, and therefore we use this class to encapsulate the result.
189 @ivar data: the data payload, for successful results, or None
190 @ivar call: the name of the RPC call
191 @ivar node: the name of the node to which we made the call
192 @ivar offline: whether the operation failed because the node was
193 offline, as opposed to actual failure; offline=True will always
194 imply failed=True, in order to allow simpler checking if
195 the user doesn't care about the exact failure mode
196 @ivar fail_msg: the error message if the call failed
199 def __init__(self, data=None, failed=False, offline=False,
200 call=None, node=None):
201 self.offline = offline
206 self.fail_msg = "Node is marked offline"
207 self.data = self.payload = None
209 self.fail_msg = self._EnsureErr(data)
210 self.data = self.payload = None
213 if not isinstance(self.data, (tuple, list)):
214 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
218 self.fail_msg = ("RPC layer error: invalid result length (%d), "
219 "expected 2" % len(self.data))
221 elif not self.data[0]:
222 self.fail_msg = self._EnsureErr(self.data[1])
227 self.payload = data[1]
229 for attr_name in ["call", "data", "fail_msg",
230 "node", "offline", "payload"]:
231 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
235 """Helper to ensure we return a 'True' value for error."""
239 return "No error information"
241 def Raise(self, msg, prereq=False, ecode=None):
242 """If the result has failed, raise an OpExecError.
244 This is used so that LU code doesn't have to check for each
245 result, but instead can call this function.
248 if not self.fail_msg:
251 if not msg: # one could pass None for default message
252 msg = ("Call '%s' to node '%s' has failed: %s" %
253 (self.call, self.node, self.fail_msg))
255 msg = "%s: %s" % (msg, self.fail_msg)
257 ec = errors.OpPrereqError
259 ec = errors.OpExecError
260 if ecode is not None:
264 raise ec(*args) # pylint: disable-msg=W0142
267 def _AddressLookup(node_list,
268 ssc=ssconf.SimpleStore,
269 nslookup_fn=netutils.Hostname.GetIP):
270 """Return addresses for given node names.
272 @type node_list: list
273 @param node_list: List of node names
275 @param ssc: SimpleStore class that is used to obtain node->ip mappings
276 @type nslookup_fn: callable
277 @param nslookup_fn: function use to do NS lookup
278 @rtype: list of addresses and/or None's
279 @returns: List of corresponding addresses, if found
283 iplist = ss.GetNodePrimaryIPList()
284 family = ss.GetPrimaryIPFamily()
286 ipmap = dict(entry.split() for entry in iplist)
287 for node in node_list:
288 address = ipmap.get(node)
290 address = nslookup_fn(node, family=family)
291 addresses.append(address)
299 This class, given a (remote) method name, a list of parameters and a
300 list of nodes, will contact (in parallel) all nodes, and return a
301 dict of results (key: node name, value: result).
303 One current bug is that generic failure is still signaled by
304 'False' result, which is not good. This overloading of values can
308 def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
309 assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
311 self.procedure = procedure
315 self._address_lookup_fn = address_lookup_fn
317 def ConnectList(self, node_list, address_list=None, read_timeout=None):
318 """Add a list of nodes to the target nodes.
320 @type node_list: list
321 @param node_list: the list of node names to connect
322 @type address_list: list or None
323 @keyword address_list: either None or a list with node addresses,
324 which must have the same length as the node list
325 @type read_timeout: int
326 @param read_timeout: overwrites default timeout for operation
329 if address_list is None:
330 # Always use IP address instead of node name
331 address_list = self._address_lookup_fn(node_list)
333 assert len(node_list) == len(address_list), \
334 "Name and address lists must have the same length"
336 for node, address in zip(node_list, address_list):
337 self.ConnectNode(node, address, read_timeout=read_timeout)
339 def ConnectNode(self, name, address=None, read_timeout=None):
340 """Add a node to the target list.
343 @param name: the node name
345 @param address: the node address, if known
346 @type read_timeout: int
347 @param read_timeout: overwrites default timeout for operation
351 # Always use IP address instead of node name
352 address = self._address_lookup_fn([name])[0]
354 assert(address is not None)
356 if read_timeout is None:
357 read_timeout = _TIMEOUTS[self.procedure]
359 self._request[name] = \
360 http.client.HttpClientRequest(str(address), self.port,
361 http.HTTP_PUT, str("/%s" % self.procedure),
362 headers=_RPC_CLIENT_HEADERS,
363 post_data=str(self.body),
364 read_timeout=read_timeout)
366 def GetResults(self, http_pool=None):
367 """Call nodes and return results.
370 @return: List of RPC results
374 http_pool = _thread_local.GetHttpClientPool()
376 http_pool.ProcessRequests(self._request.values())
380 for name, req in self._request.iteritems():
381 if req.success and req.resp_status_code == http.HTTP_OK:
382 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
383 node=name, call=self.procedure)
386 # TODO: Better error reporting
392 logging.error("RPC error in %s from node %s: %s",
393 self.procedure, name, msg)
394 results[name] = RpcResult(data=msg, failed=True, node=name,
400 def _EncodeImportExportIO(ieio, ieioargs):
401 """Encodes import/export I/O information.
404 if ieio == constants.IEIO_RAW_DISK:
405 assert len(ieioargs) == 1
406 return (ieioargs[0].ToDict(), )
408 if ieio == constants.IEIO_SCRIPT:
409 assert len(ieioargs) == 2
410 return (ieioargs[0].ToDict(), ieioargs[1])
415 class RpcRunner(object):
416 """RPC runner class"""
418 def __init__(self, cfg):
419 """Initialized the rpc runner.
421 @type cfg: C{config.ConfigWriter}
422 @param cfg: the configuration object that will be used to get data
427 self.port = netutils.GetDaemonPort(constants.NODED)
429 def _InstDict(self, instance, hvp=None, bep=None, osp=None):
430 """Convert the given instance to a dict.
432 This is done via the instance's ToDict() method and additionally
433 we fill the hvparams with the cluster defaults.
435 @type instance: L{objects.Instance}
436 @param instance: an Instance object
437 @type hvp: dict or None
438 @param hvp: a dictionary with overridden hypervisor parameters
439 @type bep: dict or None
440 @param bep: a dictionary with overridden backend parameters
441 @type osp: dict or None
442 @param osp: a dictionary with overridden os parameters
444 @return: the instance dict, with the hvparams filled with the
448 idict = instance.ToDict()
449 cluster = self._cfg.GetClusterInfo()
450 idict["hvparams"] = cluster.FillHV(instance)
452 idict["hvparams"].update(hvp)
453 idict["beparams"] = cluster.FillBE(instance)
455 idict["beparams"].update(bep)
456 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
458 idict["osparams"].update(osp)
459 for nic in idict["nics"]:
460 nic['nicparams'] = objects.FillDict(
461 cluster.nicparams[constants.PP_DEFAULT],
465 def _ConnectList(self, client, node_list, call, read_timeout=None):
466 """Helper for computing node addresses.
468 @type client: L{ganeti.rpc.Client}
469 @param client: a C{Client} instance
470 @type node_list: list
471 @param node_list: the node list we should connect
473 @param call: the name of the remote procedure call, for filling in
474 correctly any eventual offline nodes' results
475 @type read_timeout: int
476 @param read_timeout: overwrites the default read timeout for the
480 all_nodes = self._cfg.GetAllNodesInfo()
484 for node in node_list:
485 if node in all_nodes:
486 if all_nodes[node].offline:
487 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
489 val = all_nodes[node].primary_ip
492 addr_list.append(val)
493 name_list.append(node)
495 client.ConnectList(name_list, address_list=addr_list,
496 read_timeout=read_timeout)
499 def _ConnectNode(self, client, node, call, read_timeout=None):
500 """Helper for computing one node's address.
502 @type client: L{ganeti.rpc.Client}
503 @param client: a C{Client} instance
505 @param node: the node we should connect
507 @param call: the name of the remote procedure call, for filling in
508 correctly any eventual offline nodes' results
509 @type read_timeout: int
510 @param read_timeout: overwrites the default read timeout for the
514 node_info = self._cfg.GetNodeInfo(node)
515 if node_info is not None:
516 if node_info.offline:
517 return RpcResult(node=node, offline=True, call=call)
518 addr = node_info.primary_ip
521 client.ConnectNode(node, address=addr, read_timeout=read_timeout)
523 def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
524 """Helper for making a multi-node call
527 body = serializer.DumpJson(args, indent=False)
528 c = Client(procedure, body, self.port)
529 skip_dict = self._ConnectList(c, node_list, procedure,
530 read_timeout=read_timeout)
531 skip_dict.update(c.GetResults())
535 def _StaticMultiNodeCall(cls, node_list, procedure, args,
536 address_list=None, read_timeout=None):
537 """Helper for making a multi-node static call
540 body = serializer.DumpJson(args, indent=False)
541 c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
542 c.ConnectList(node_list, address_list=address_list,
543 read_timeout=read_timeout)
544 return c.GetResults()
546 def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
547 """Helper for making a single-node call
550 body = serializer.DumpJson(args, indent=False)
551 c = Client(procedure, body, self.port)
552 result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
554 # we did connect, node is not offline
555 result = c.GetResults()[node]
559 def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
560 """Helper for making a single-node static call
563 body = serializer.DumpJson(args, indent=False)
564 c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
565 c.ConnectNode(node, read_timeout=read_timeout)
566 return c.GetResults()[node]
570 """Compresses a string for transport over RPC.
572 Small amounts of data are not compressed.
577 @return: Encoded data to send
580 # Small amounts of data are not compressed
582 return (constants.RPC_ENCODING_NONE, data)
584 # Compress with zlib and encode in base64
585 return (constants.RPC_ENCODING_ZLIB_BASE64,
586 base64.b64encode(zlib.compress(data, 3)))
592 @_RpcTimeout(_TMO_URGENT)
593 def call_lv_list(self, node_list, vg_name):
594 """Gets the logical volumes present in a given volume group.
596 This is a multi-node call.
599 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
601 @_RpcTimeout(_TMO_URGENT)
602 def call_vg_list(self, node_list):
603 """Gets the volume group list.
605 This is a multi-node call.
608 return self._MultiNodeCall(node_list, "vg_list", [])
610 @_RpcTimeout(_TMO_NORMAL)
611 def call_storage_list(self, node_list, su_name, su_args, name, fields):
612 """Get list of storage units.
614 This is a multi-node call.
617 return self._MultiNodeCall(node_list, "storage_list",
618 [su_name, su_args, name, fields])
620 @_RpcTimeout(_TMO_NORMAL)
621 def call_storage_modify(self, node, su_name, su_args, name, changes):
622 """Modify a storage unit.
624 This is a single-node call.
627 return self._SingleNodeCall(node, "storage_modify",
628 [su_name, su_args, name, changes])
630 @_RpcTimeout(_TMO_NORMAL)
631 def call_storage_execute(self, node, su_name, su_args, name, op):
632 """Executes an operation on a storage unit.
634 This is a single-node call.
637 return self._SingleNodeCall(node, "storage_execute",
638 [su_name, su_args, name, op])
640 @_RpcTimeout(_TMO_URGENT)
641 def call_bridges_exist(self, node, bridges_list):
642 """Checks if a node has all the bridges given.
644 This method checks if all bridges given in the bridges_list are
645 present on the remote node, so that an instance that uses interfaces
646 on those bridges can be started.
648 This is a single-node call.
651 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
653 @_RpcTimeout(_TMO_NORMAL)
654 def call_instance_start(self, node, instance, hvp, bep):
655 """Starts an instance.
657 This is a single-node call.
660 idict = self._InstDict(instance, hvp=hvp, bep=bep)
661 return self._SingleNodeCall(node, "instance_start", [idict])
663 @_RpcTimeout(_TMO_NORMAL)
664 def call_instance_shutdown(self, node, instance, timeout):
665 """Stops an instance.
667 This is a single-node call.
670 return self._SingleNodeCall(node, "instance_shutdown",
671 [self._InstDict(instance), timeout])
673 @_RpcTimeout(_TMO_NORMAL)
674 def call_migration_info(self, node, instance):
675 """Gather the information necessary to prepare an instance migration.
677 This is a single-node call.
680 @param node: the node on which the instance is currently running
681 @type instance: C{objects.Instance}
682 @param instance: the instance definition
685 return self._SingleNodeCall(node, "migration_info",
686 [self._InstDict(instance)])
688 @_RpcTimeout(_TMO_NORMAL)
689 def call_accept_instance(self, node, instance, info, target):
690 """Prepare a node to accept an instance.
692 This is a single-node call.
695 @param node: the target node for the migration
696 @type instance: C{objects.Instance}
697 @param instance: the instance definition
698 @type info: opaque/hypervisor specific (string/data)
699 @param info: result for the call_migration_info call
701 @param target: target hostname (usually ip address) (on the node itself)
704 return self._SingleNodeCall(node, "accept_instance",
705 [self._InstDict(instance), info, target])
707 @_RpcTimeout(_TMO_NORMAL)
708 def call_finalize_migration(self, node, instance, info, success):
709 """Finalize any target-node migration specific operation.
711 This is called both in case of a successful migration and in case of error
712 (in which case it should abort the migration).
714 This is a single-node call.
717 @param node: the target node for the migration
718 @type instance: C{objects.Instance}
719 @param instance: the instance definition
720 @type info: opaque/hypervisor specific (string/data)
721 @param info: result for the call_migration_info call
722 @type success: boolean
723 @param success: whether the migration was a success or a failure
726 return self._SingleNodeCall(node, "finalize_migration",
727 [self._InstDict(instance), info, success])
729 @_RpcTimeout(_TMO_SLOW)
730 def call_instance_migrate(self, node, instance, target, live):
731 """Migrate an instance.
733 This is a single-node call.
736 @param node: the node on which the instance is currently running
737 @type instance: C{objects.Instance}
738 @param instance: the instance definition
740 @param target: the target node name
742 @param live: whether the migration should be done live or not (the
743 interpretation of this parameter is left to the hypervisor)
746 return self._SingleNodeCall(node, "instance_migrate",
747 [self._InstDict(instance), target, live])
749 @_RpcTimeout(_TMO_NORMAL)
750 def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
751 """Reboots an instance.
753 This is a single-node call.
756 return self._SingleNodeCall(node, "instance_reboot",
757 [self._InstDict(inst), reboot_type,
760 @_RpcTimeout(_TMO_1DAY)
761 def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
762 """Installs an OS on the given instance.
764 This is a single-node call.
767 return self._SingleNodeCall(node, "instance_os_add",
768 [self._InstDict(inst, osp=osparams),
771 @_RpcTimeout(_TMO_SLOW)
772 def call_instance_run_rename(self, node, inst, old_name, debug):
773 """Run the OS rename script for an instance.
775 This is a single-node call.
778 return self._SingleNodeCall(node, "instance_run_rename",
779 [self._InstDict(inst), old_name, debug])
781 @_RpcTimeout(_TMO_URGENT)
782 def call_instance_info(self, node, instance, hname):
783 """Returns information about a single instance.
785 This is a single-node call.
788 @param node: the list of nodes to query
789 @type instance: string
790 @param instance: the instance name
792 @param hname: the hypervisor type of the instance
795 return self._SingleNodeCall(node, "instance_info", [instance, hname])
797 @_RpcTimeout(_TMO_NORMAL)
798 def call_instance_migratable(self, node, instance):
799 """Checks whether the given instance can be migrated.
801 This is a single-node call.
803 @param node: the node to query
804 @type instance: L{objects.Instance}
805 @param instance: the instance to check
809 return self._SingleNodeCall(node, "instance_migratable",
810 [self._InstDict(instance)])
812 @_RpcTimeout(_TMO_URGENT)
813 def call_all_instances_info(self, node_list, hypervisor_list):
814 """Returns information about all instances on the given nodes.
816 This is a multi-node call.
818 @type node_list: list
819 @param node_list: the list of nodes to query
820 @type hypervisor_list: list
821 @param hypervisor_list: the hypervisors to query for instances
824 return self._MultiNodeCall(node_list, "all_instances_info",
827 @_RpcTimeout(_TMO_URGENT)
828 def call_instance_list(self, node_list, hypervisor_list):
829 """Returns the list of running instances on a given node.
831 This is a multi-node call.
833 @type node_list: list
834 @param node_list: the list of nodes to query
835 @type hypervisor_list: list
836 @param hypervisor_list: the hypervisors to query for instances
839 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
841 @_RpcTimeout(_TMO_FAST)
842 def call_node_tcp_ping(self, node, source, target, port, timeout,
844 """Do a TcpPing on the remote node
846 This is a single-node call.
849 return self._SingleNodeCall(node, "node_tcp_ping",
850 [source, target, port, timeout,
853 @_RpcTimeout(_TMO_FAST)
854 def call_node_has_ip_address(self, node, address):
855 """Checks if a node has the given IP address.
857 This is a single-node call.
860 return self._SingleNodeCall(node, "node_has_ip_address", [address])
862 @_RpcTimeout(_TMO_URGENT)
863 def call_node_info(self, node_list, vg_name, hypervisor_type):
864 """Return node information.
866 This will return memory information and volume group size and free
869 This is a multi-node call.
871 @type node_list: list
872 @param node_list: the list of nodes to query
873 @type vg_name: C{string}
874 @param vg_name: the name of the volume group to ask for disk space
876 @type hypervisor_type: C{str}
877 @param hypervisor_type: the name of the hypervisor to ask for
881 return self._MultiNodeCall(node_list, "node_info",
882 [vg_name, hypervisor_type])
884 @_RpcTimeout(_TMO_NORMAL)
885 def call_etc_hosts_modify(self, node, mode, name, ip):
886 """Modify hosts file with name
889 @param node: The node to call
891 @param mode: The mode to operate. Currently "add" or "remove"
893 @param name: The host name to be modified
895 @param ip: The ip of the entry (just valid if mode is "add")
898 return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
900 @_RpcTimeout(_TMO_NORMAL)
901 def call_node_verify(self, node_list, checkdict, cluster_name):
902 """Request verification of given parameters.
904 This is a multi-node call.
907 return self._MultiNodeCall(node_list, "node_verify",
908 [checkdict, cluster_name])
911 @_RpcTimeout(_TMO_FAST)
912 def call_node_start_master(cls, node, start_daemons, no_voting):
913 """Tells a node to activate itself as a master.
915 This is a single-node call.
918 return cls._StaticSingleNodeCall(node, "node_start_master",
919 [start_daemons, no_voting])
922 @_RpcTimeout(_TMO_FAST)
923 def call_node_stop_master(cls, node, stop_daemons):
924 """Tells a node to demote itself from master status.
926 This is a single-node call.
929 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
932 @_RpcTimeout(_TMO_URGENT)
933 def call_master_info(cls, node_list):
934 """Query master info.
936 This is a multi-node call.
939 # TODO: should this method query down nodes?
940 return cls._StaticMultiNodeCall(node_list, "master_info", [])
943 @_RpcTimeout(_TMO_URGENT)
944 def call_version(cls, node_list):
945 """Query node version.
947 This is a multi-node call.
950 return cls._StaticMultiNodeCall(node_list, "version", [])
952 @_RpcTimeout(_TMO_NORMAL)
953 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
954 """Request creation of a given block device.
956 This is a single-node call.
959 return self._SingleNodeCall(node, "blockdev_create",
960 [bdev.ToDict(), size, owner, on_primary, info])
962 @_RpcTimeout(_TMO_SLOW)
963 def call_blockdev_wipe(self, node, bdev, offset, size):
964 """Request wipe at given offset with given size of a block device.
966 This is a single-node call.
969 return self._SingleNodeCall(node, "blockdev_wipe",
970 [bdev.ToDict(), offset, size])
972 @_RpcTimeout(_TMO_NORMAL)
973 def call_blockdev_remove(self, node, bdev):
974 """Request removal of a given block device.
976 This is a single-node call.
979 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
981 @_RpcTimeout(_TMO_NORMAL)
982 def call_blockdev_rename(self, node, devlist):
983 """Request rename of the given block devices.
985 This is a single-node call.
988 return self._SingleNodeCall(node, "blockdev_rename",
989 [(d.ToDict(), uid) for d, uid in devlist])
991 @_RpcTimeout(_TMO_NORMAL)
992 def call_blockdev_pause_resume_sync(self, node, disks, pause):
993 """Request a pause/resume of given block device.
995 This is a single-node call.
998 return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
999 [[bdev.ToDict() for bdev in disks], pause])
1001 @_RpcTimeout(_TMO_NORMAL)
1002 def call_blockdev_assemble(self, node, disk, owner, on_primary):
1003 """Request assembling of a given block device.
1005 This is a single-node call.
1008 return self._SingleNodeCall(node, "blockdev_assemble",
1009 [disk.ToDict(), owner, on_primary])
1011 @_RpcTimeout(_TMO_NORMAL)
1012 def call_blockdev_shutdown(self, node, disk):
1013 """Request shutdown of a given block device.
1015 This is a single-node call.
1018 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1020 @_RpcTimeout(_TMO_NORMAL)
1021 def call_blockdev_addchildren(self, node, bdev, ndevs):
1022 """Request adding a list of children to a (mirroring) device.
1024 This is a single-node call.
1027 return self._SingleNodeCall(node, "blockdev_addchildren",
1029 [disk.ToDict() for disk in ndevs]])
1031 @_RpcTimeout(_TMO_NORMAL)
1032 def call_blockdev_removechildren(self, node, bdev, ndevs):
1033 """Request removing a list of children from a (mirroring) device.
1035 This is a single-node call.
1038 return self._SingleNodeCall(node, "blockdev_removechildren",
1040 [disk.ToDict() for disk in ndevs]])
1042 @_RpcTimeout(_TMO_NORMAL)
1043 def call_blockdev_getmirrorstatus(self, node, disks):
1044 """Request status of a (mirroring) device.
1046 This is a single-node call.
1049 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1050 [dsk.ToDict() for dsk in disks])
1051 if not result.fail_msg:
1052 result.payload = [objects.BlockDevStatus.FromDict(i)
1053 for i in result.payload]
1056 @_RpcTimeout(_TMO_NORMAL)
1057 def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1058 """Request status of (mirroring) devices from multiple nodes.
1060 This is a multi-node call.
1063 result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1064 [dict((name, [dsk.ToDict() for dsk in disks])
1065 for name, disks in node_disks.items())])
1066 for nres in result.values():
1070 for idx, (success, status) in enumerate(nres.payload):
1072 nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1076 @_RpcTimeout(_TMO_NORMAL)
1077 def call_blockdev_find(self, node, disk):
1078 """Request identification of a given block device.
1080 This is a single-node call.
1083 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1084 if not result.fail_msg and result.payload is not None:
1085 result.payload = objects.BlockDevStatus.FromDict(result.payload)
1088 @_RpcTimeout(_TMO_NORMAL)
1089 def call_blockdev_close(self, node, instance_name, disks):
1090 """Closes the given block devices.
1092 This is a single-node call.
1095 params = [instance_name, [cf.ToDict() for cf in disks]]
1096 return self._SingleNodeCall(node, "blockdev_close", params)
1098 @_RpcTimeout(_TMO_NORMAL)
1099 def call_blockdev_getsizes(self, node, disks):
1100 """Returns the size of the given disks.
1102 This is a single-node call.
1105 params = [[cf.ToDict() for cf in disks]]
1106 return self._SingleNodeCall(node, "blockdev_getsize", params)
1108 @_RpcTimeout(_TMO_NORMAL)
1109 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1110 """Disconnects the network of the given drbd devices.
1112 This is a multi-node call.
1115 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1116 [nodes_ip, [cf.ToDict() for cf in disks]])
1118 @_RpcTimeout(_TMO_NORMAL)
1119 def call_drbd_attach_net(self, node_list, nodes_ip,
1120 disks, instance_name, multimaster):
1121 """Disconnects the given drbd devices.
1123 This is a multi-node call.
1126 return self._MultiNodeCall(node_list, "drbd_attach_net",
1127 [nodes_ip, [cf.ToDict() for cf in disks],
1128 instance_name, multimaster])
1130 @_RpcTimeout(_TMO_SLOW)
1131 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1132 """Waits for the synchronization of drbd devices is complete.
1134 This is a multi-node call.
1137 return self._MultiNodeCall(node_list, "drbd_wait_sync",
1138 [nodes_ip, [cf.ToDict() for cf in disks]])
1140 @_RpcTimeout(_TMO_URGENT)
1141 def call_drbd_helper(self, node_list):
1142 """Gets drbd helper.
1144 This is a multi-node call.
1147 return self._MultiNodeCall(node_list, "drbd_helper", [])
1150 @_RpcTimeout(_TMO_NORMAL)
1151 def call_upload_file(cls, node_list, file_name, address_list=None):
1154 The node will refuse the operation in case the file is not on the
1157 This is a multi-node call.
1159 @type node_list: list
1160 @param node_list: the list of node names to upload to
1161 @type file_name: str
1162 @param file_name: the filename to upload
1163 @type address_list: list or None
1164 @keyword address_list: an optional list of node addresses, in order
1165 to optimize the RPC speed
1168 file_contents = utils.ReadFile(file_name)
1169 data = cls._Compress(file_contents)
1170 st = os.stat(file_name)
1171 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1172 st.st_atime, st.st_mtime]
1173 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1174 address_list=address_list)
1177 @_RpcTimeout(_TMO_NORMAL)
1178 def call_write_ssconf_files(cls, node_list, values):
1179 """Write ssconf files.
1181 This is a multi-node call.
1184 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1186 @_RpcTimeout(_TMO_NORMAL)
1187 def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1190 This is a single-node call.
1193 return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1194 remote_node, timeout])
1196 @_RpcTimeout(_TMO_FAST)
1197 def call_os_diagnose(self, node_list):
1198 """Request a diagnose of OS definitions.
1200 This is a multi-node call.
1203 return self._MultiNodeCall(node_list, "os_diagnose", [])
1205 @_RpcTimeout(_TMO_FAST)
1206 def call_os_get(self, node, name):
1207 """Returns an OS definition.
1209 This is a single-node call.
1212 result = self._SingleNodeCall(node, "os_get", [name])
1213 if not result.fail_msg and isinstance(result.payload, dict):
1214 result.payload = objects.OS.FromDict(result.payload)
1217 @_RpcTimeout(_TMO_FAST)
1218 def call_os_validate(self, required, nodes, name, checks, params):
1219 """Run a validation routine for a given OS.
1221 This is a multi-node call.
1224 return self._MultiNodeCall(nodes, "os_validate",
1225 [required, name, checks, params])
1227 @_RpcTimeout(_TMO_NORMAL)
1228 def call_hooks_runner(self, node_list, hpath, phase, env):
1229 """Call the hooks runner.
1232 - op: the OpCode instance
1233 - env: a dictionary with the environment
1235 This is a multi-node call.
1238 params = [hpath, phase, env]
1239 return self._MultiNodeCall(node_list, "hooks_runner", params)
1241 @_RpcTimeout(_TMO_NORMAL)
1242 def call_iallocator_runner(self, node, name, idata):
1243 """Call an iallocator on a remote node
1246 - name: the iallocator name
1247 - input: the json-encoded input string
1249 This is a single-node call.
1252 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1254 @_RpcTimeout(_TMO_NORMAL)
1255 def call_blockdev_grow(self, node, cf_bdev, amount):
1256 """Request a snapshot of the given block device.
1258 This is a single-node call.
1261 return self._SingleNodeCall(node, "blockdev_grow",
1262 [cf_bdev.ToDict(), amount])
1264 @_RpcTimeout(_TMO_1DAY)
1265 def call_blockdev_export(self, node, cf_bdev,
1266 dest_node, dest_path, cluster_name):
1267 """Export a given disk to another node.
1269 This is a single-node call.
1272 return self._SingleNodeCall(node, "blockdev_export",
1273 [cf_bdev.ToDict(), dest_node, dest_path,
1276 @_RpcTimeout(_TMO_NORMAL)
1277 def call_blockdev_snapshot(self, node, cf_bdev):
1278 """Request a snapshot of the given block device.
1280 This is a single-node call.
1283 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1285 @_RpcTimeout(_TMO_NORMAL)
1286 def call_finalize_export(self, node, instance, snap_disks):
1287 """Request the completion of an export operation.
1289 This writes the export config file, etc.
1291 This is a single-node call.
1295 for disk in snap_disks:
1296 if isinstance(disk, bool):
1297 flat_disks.append(disk)
1299 flat_disks.append(disk.ToDict())
1301 return self._SingleNodeCall(node, "finalize_export",
1302 [self._InstDict(instance), flat_disks])
1304 @_RpcTimeout(_TMO_FAST)
1305 def call_export_info(self, node, path):
1306 """Queries the export information in a given path.
1308 This is a single-node call.
1311 return self._SingleNodeCall(node, "export_info", [path])
1313 @_RpcTimeout(_TMO_FAST)
1314 def call_export_list(self, node_list):
1315 """Gets the stored exports list.
1317 This is a multi-node call.
1320 return self._MultiNodeCall(node_list, "export_list", [])
1322 @_RpcTimeout(_TMO_FAST)
1323 def call_export_remove(self, node, export):
1324 """Requests removal of a given export.
1326 This is a single-node call.
1329 return self._SingleNodeCall(node, "export_remove", [export])
1332 @_RpcTimeout(_TMO_NORMAL)
1333 def call_node_leave_cluster(cls, node, modify_ssh_setup):
1334 """Requests a node to clean the cluster information it has.
1336 This will remove the configuration information from the ganeti data
1339 This is a single-node call.
1342 return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1345 @_RpcTimeout(_TMO_FAST)
1346 def call_node_volumes(self, node_list):
1347 """Gets all volumes on node(s).
1349 This is a multi-node call.
1352 return self._MultiNodeCall(node_list, "node_volumes", [])
1354 @_RpcTimeout(_TMO_FAST)
1355 def call_node_demote_from_mc(self, node):
1356 """Demote a node from the master candidate role.
1358 This is a single-node call.
1361 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1363 @_RpcTimeout(_TMO_NORMAL)
1364 def call_node_powercycle(self, node, hypervisor):
1365 """Tries to powercycle a node.
1367 This is a single-node call.
1370 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1373 def call_test_delay(self, node_list, duration):
1374 """Sleep for a fixed time on given node(s).
1376 This is a multi-node call.
1379 return self._MultiNodeCall(node_list, "test_delay", [duration],
1380 read_timeout=int(duration + 5))
1382 @_RpcTimeout(_TMO_FAST)
1383 def call_file_storage_dir_create(self, node, file_storage_dir):
1384 """Create the given file storage directory.
1386 This is a single-node call.
1389 return self._SingleNodeCall(node, "file_storage_dir_create",
1392 @_RpcTimeout(_TMO_FAST)
1393 def call_file_storage_dir_remove(self, node, file_storage_dir):
1394 """Remove the given file storage directory.
1396 This is a single-node call.
1399 return self._SingleNodeCall(node, "file_storage_dir_remove",
1402 @_RpcTimeout(_TMO_FAST)
1403 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1404 new_file_storage_dir):
1405 """Rename file storage directory.
1407 This is a single-node call.
1410 return self._SingleNodeCall(node, "file_storage_dir_rename",
1411 [old_file_storage_dir, new_file_storage_dir])
1414 @_RpcTimeout(_TMO_URGENT)
1415 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1416 """Update job queue.
1418 This is a multi-node call.
1421 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1422 [file_name, cls._Compress(content)],
1423 address_list=address_list)
1426 @_RpcTimeout(_TMO_NORMAL)
1427 def call_jobqueue_purge(cls, node):
1430 This is a single-node call.
1433 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1436 @_RpcTimeout(_TMO_URGENT)
1437 def call_jobqueue_rename(cls, node_list, address_list, rename):
1438 """Rename a job queue file.
1440 This is a multi-node call.
1443 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1444 address_list=address_list)
1446 @_RpcTimeout(_TMO_NORMAL)
1447 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1448 """Validate the hypervisor params.
1450 This is a multi-node call.
1452 @type node_list: list
1453 @param node_list: the list of nodes to query
1454 @type hvname: string
1455 @param hvname: the hypervisor name
1456 @type hvparams: dict
1457 @param hvparams: the hypervisor parameters to be validated
1460 cluster = self._cfg.GetClusterInfo()
1461 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1462 return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1465 @_RpcTimeout(_TMO_NORMAL)
1466 def call_x509_cert_create(self, node, validity):
1467 """Creates a new X509 certificate for SSL/TLS.
1469 This is a single-node call.
1472 @param validity: Validity in seconds
1475 return self._SingleNodeCall(node, "x509_cert_create", [validity])
1477 @_RpcTimeout(_TMO_NORMAL)
1478 def call_x509_cert_remove(self, node, name):
1479 """Removes a X509 certificate.
1481 This is a single-node call.
1484 @param name: Certificate name
1487 return self._SingleNodeCall(node, "x509_cert_remove", [name])
1489 @_RpcTimeout(_TMO_NORMAL)
1490 def call_import_start(self, node, opts, instance, dest, dest_args):
1491 """Starts a listener for an import.
1493 This is a single-node call.
1496 @param node: Node name
1497 @type instance: C{objects.Instance}
1498 @param instance: Instance object
1501 return self._SingleNodeCall(node, "import_start",
1503 self._InstDict(instance), dest,
1504 _EncodeImportExportIO(dest, dest_args)])
1506 @_RpcTimeout(_TMO_NORMAL)
1507 def call_export_start(self, node, opts, host, port,
1508 instance, source, source_args):
1509 """Starts an export daemon.
1511 This is a single-node call.
1514 @param node: Node name
1515 @type instance: C{objects.Instance}
1516 @param instance: Instance object
1519 return self._SingleNodeCall(node, "export_start",
1520 [opts.ToDict(), host, port,
1521 self._InstDict(instance), source,
1522 _EncodeImportExportIO(source, source_args)])
1524 @_RpcTimeout(_TMO_FAST)
1525 def call_impexp_status(self, node, names):
1526 """Gets the status of an import or export.
1528 This is a single-node call.
1531 @param node: Node name
1532 @type names: List of strings
1533 @param names: Import/export names
1534 @rtype: List of L{objects.ImportExportStatus} instances
1535 @return: Returns a list of the state of each named import/export or None if
1536 a status couldn't be retrieved
1539 result = self._SingleNodeCall(node, "impexp_status", [names])
1541 if not result.fail_msg:
1544 for i in result.payload:
1546 decoded.append(None)
1548 decoded.append(objects.ImportExportStatus.FromDict(i))
1550 result.payload = decoded
1554 @_RpcTimeout(_TMO_NORMAL)
1555 def call_impexp_abort(self, node, name):
1556 """Aborts an import or export.
1558 This is a single-node call.
1561 @param node: Node name
1563 @param name: Import/export name
1566 return self._SingleNodeCall(node, "impexp_abort", [name])
1568 @_RpcTimeout(_TMO_NORMAL)
1569 def call_impexp_cleanup(self, node, name):
1570 """Cleans up after an import or export.
1572 This is a single-node call.
1575 @param node: Node name
1577 @param name: Import/export name
1580 return self._SingleNodeCall(node, "impexp_cleanup", [name])