4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
50 # pylint has a bug here, doesn't see this import
51 import ganeti.http.client # pylint: disable-msg=W0611
54 # Timeout for connecting to nodes (seconds)
55 _RPC_CONNECT_TIMEOUT = 5
57 _RPC_CLIENT_HEADERS = [
58 "Content-type: %s" % http.HTTP_APP_JSON,
62 # Various time constants for the timeout table
63 _TMO_URGENT = 60 # one minute
64 _TMO_FAST = 5 * 60 # five minutes
65 _TMO_NORMAL = 15 * 60 # 15 minutes
66 _TMO_SLOW = 3600 # one hour
70 # Timeout table that will be built later by decorators
71 # Guidelines for choosing timeouts:
72 # - call used during watcher: timeout -> 1min, _TMO_URGENT
73 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
74 # - other calls: 15 min, _TMO_NORMAL
75 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
82 """Initializes the module-global HTTP client manager.
84 Must be called before using any RPC function and while exactly one thread is
88 # curl_global_init(3) and curl_global_cleanup(3) must be called with only
89 # one thread running. This check is just a safety measure -- it doesn't
91 assert threading.activeCount() == 1, \
92 "Found more than one active thread when initializing pycURL"
94 logging.info("Using PycURL %s", pycurl.version)
96 pycurl.global_init(pycurl.GLOBAL_ALL)
100 """Stops the module-global HTTP client manager.
102 Must be called before quitting the program and while exactly one thread is
106 pycurl.global_cleanup()
109 def _ConfigRpcCurl(curl):
110 noded_cert = str(constants.NODED_CERT_FILE)
112 curl.setopt(pycurl.FOLLOWLOCATION, False)
113 curl.setopt(pycurl.CAINFO, noded_cert)
114 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
115 curl.setopt(pycurl.SSL_VERIFYPEER, True)
116 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
117 curl.setopt(pycurl.SSLCERT, noded_cert)
118 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
119 curl.setopt(pycurl.SSLKEY, noded_cert)
120 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
123 # Aliasing this module avoids the following warning by epydoc: "Warning: No
124 # information available for ganeti.rpc._RpcThreadLocal's base threading.local"
125 _threading = threading
128 class _RpcThreadLocal(_threading.local):
129 def GetHttpClientPool(self):
130 """Returns a per-thread HTTP client pool.
132 @rtype: L{http.client.HttpClientPool}
137 except AttributeError:
138 pool = http.client.HttpClientPool(_ConfigRpcCurl)
144 # Remove module alias (see above)
148 _thread_local = _RpcThreadLocal()
151 def _RpcTimeout(secs):
152 """Timeout decorator.
154 When applied to a rpc call_* function, it updates the global timeout
155 table with the given function/timeout.
160 assert name.startswith("call_")
161 _TIMEOUTS[name[len("call_"):]] = secs
167 """RPC-wrapper decorator.
169 When applied to a function, it runs it with the RPC system
170 initialized, and it shutsdown the system afterwards. This means the
171 function must be called without RPC being initialized.
174 def wrapper(*args, **kwargs):
177 return fn(*args, **kwargs)
183 class RpcResult(object):
186 This class holds an RPC result. It is needed since in multi-node
187 calls we can't raise an exception just because one one out of many
188 failed, and therefore we use this class to encapsulate the result.
190 @ivar data: the data payload, for successful results, or None
191 @ivar call: the name of the RPC call
192 @ivar node: the name of the node to which we made the call
193 @ivar offline: whether the operation failed because the node was
194 offline, as opposed to actual failure; offline=True will always
195 imply failed=True, in order to allow simpler checking if
196 the user doesn't care about the exact failure mode
197 @ivar fail_msg: the error message if the call failed
200 def __init__(self, data=None, failed=False, offline=False,
201 call=None, node=None):
202 self.offline = offline
207 self.fail_msg = "Node is marked offline"
208 self.data = self.payload = None
210 self.fail_msg = self._EnsureErr(data)
211 self.data = self.payload = None
214 if not isinstance(self.data, (tuple, list)):
215 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
219 self.fail_msg = ("RPC layer error: invalid result length (%d), "
220 "expected 2" % len(self.data))
222 elif not self.data[0]:
223 self.fail_msg = self._EnsureErr(self.data[1])
228 self.payload = data[1]
230 for attr_name in ["call", "data", "fail_msg",
231 "node", "offline", "payload"]:
232 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
236 """Helper to ensure we return a 'True' value for error."""
240 return "No error information"
242 def Raise(self, msg, prereq=False, ecode=None):
243 """If the result has failed, raise an OpExecError.
245 This is used so that LU code doesn't have to check for each
246 result, but instead can call this function.
249 if not self.fail_msg:
252 if not msg: # one could pass None for default message
253 msg = ("Call '%s' to node '%s' has failed: %s" %
254 (self.call, self.node, self.fail_msg))
256 msg = "%s: %s" % (msg, self.fail_msg)
258 ec = errors.OpPrereqError
260 ec = errors.OpExecError
261 if ecode is not None:
265 raise ec(*args) # pylint: disable-msg=W0142
268 def _AddressLookup(node_list,
269 ssc=ssconf.SimpleStore,
270 nslookup_fn=netutils.Hostname.GetIP):
271 """Return addresses for given node names.
273 @type node_list: list
274 @param node_list: List of node names
276 @param ssc: SimpleStore class that is used to obtain node->ip mappings
277 @type nslookup_fn: callable
278 @param nslookup_fn: function use to do NS lookup
279 @rtype: list of addresses and/or None's
280 @returns: List of corresponding addresses, if found
284 iplist = ss.GetNodePrimaryIPList()
285 family = ss.GetPrimaryIPFamily()
287 ipmap = dict(entry.split() for entry in iplist)
288 for node in node_list:
289 address = ipmap.get(node)
291 address = nslookup_fn(node, family=family)
292 addresses.append(address)
300 This class, given a (remote) method name, a list of parameters and a
301 list of nodes, will contact (in parallel) all nodes, and return a
302 dict of results (key: node name, value: result).
304 One current bug is that generic failure is still signaled by
305 'False' result, which is not good. This overloading of values can
309 def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
310 assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
312 self.procedure = procedure
316 self._address_lookup_fn = address_lookup_fn
318 def ConnectList(self, node_list, address_list=None, read_timeout=None):
319 """Add a list of nodes to the target nodes.
321 @type node_list: list
322 @param node_list: the list of node names to connect
323 @type address_list: list or None
324 @keyword address_list: either None or a list with node addresses,
325 which must have the same length as the node list
326 @type read_timeout: int
327 @param read_timeout: overwrites default timeout for operation
330 if address_list is None:
331 # Always use IP address instead of node name
332 address_list = self._address_lookup_fn(node_list)
334 assert len(node_list) == len(address_list), \
335 "Name and address lists must have the same length"
337 for node, address in zip(node_list, address_list):
338 self.ConnectNode(node, address, read_timeout=read_timeout)
340 def ConnectNode(self, name, address=None, read_timeout=None):
341 """Add a node to the target list.
344 @param name: the node name
346 @param address: the node address, if known
347 @type read_timeout: int
348 @param read_timeout: overwrites default timeout for operation
352 # Always use IP address instead of node name
353 address = self._address_lookup_fn([name])[0]
355 assert(address is not None)
357 if read_timeout is None:
358 read_timeout = _TIMEOUTS[self.procedure]
360 self._request[name] = \
361 http.client.HttpClientRequest(str(address), self.port,
362 http.HTTP_PUT, str("/%s" % self.procedure),
363 headers=_RPC_CLIENT_HEADERS,
364 post_data=str(self.body),
365 read_timeout=read_timeout)
367 def GetResults(self, http_pool=None):
368 """Call nodes and return results.
371 @return: List of RPC results
375 http_pool = _thread_local.GetHttpClientPool()
377 http_pool.ProcessRequests(self._request.values())
381 for name, req in self._request.iteritems():
382 if req.success and req.resp_status_code == http.HTTP_OK:
383 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
384 node=name, call=self.procedure)
387 # TODO: Better error reporting
393 logging.error("RPC error in %s from node %s: %s",
394 self.procedure, name, msg)
395 results[name] = RpcResult(data=msg, failed=True, node=name,
401 def _EncodeImportExportIO(ieio, ieioargs):
402 """Encodes import/export I/O information.
405 if ieio == constants.IEIO_RAW_DISK:
406 assert len(ieioargs) == 1
407 return (ieioargs[0].ToDict(), )
409 if ieio == constants.IEIO_SCRIPT:
410 assert len(ieioargs) == 2
411 return (ieioargs[0].ToDict(), ieioargs[1])
416 class RpcRunner(object):
417 """RPC runner class"""
419 def __init__(self, cfg):
420 """Initialized the rpc runner.
422 @type cfg: C{config.ConfigWriter}
423 @param cfg: the configuration object that will be used to get data
428 self.port = netutils.GetDaemonPort(constants.NODED)
430 def _InstDict(self, instance, hvp=None, bep=None, osp=None):
431 """Convert the given instance to a dict.
433 This is done via the instance's ToDict() method and additionally
434 we fill the hvparams with the cluster defaults.
436 @type instance: L{objects.Instance}
437 @param instance: an Instance object
438 @type hvp: dict or None
439 @param hvp: a dictionary with overridden hypervisor parameters
440 @type bep: dict or None
441 @param bep: a dictionary with overridden backend parameters
442 @type osp: dict or None
443 @param osp: a dictionary with overridden os parameters
445 @return: the instance dict, with the hvparams filled with the
449 idict = instance.ToDict()
450 cluster = self._cfg.GetClusterInfo()
451 idict["hvparams"] = cluster.FillHV(instance)
453 idict["hvparams"].update(hvp)
454 idict["beparams"] = cluster.FillBE(instance)
456 idict["beparams"].update(bep)
457 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
459 idict["osparams"].update(osp)
460 for nic in idict["nics"]:
461 nic['nicparams'] = objects.FillDict(
462 cluster.nicparams[constants.PP_DEFAULT],
466 def _ConnectList(self, client, node_list, call, read_timeout=None):
467 """Helper for computing node addresses.
469 @type client: L{ganeti.rpc.Client}
470 @param client: a C{Client} instance
471 @type node_list: list
472 @param node_list: the node list we should connect
474 @param call: the name of the remote procedure call, for filling in
475 correctly any eventual offline nodes' results
476 @type read_timeout: int
477 @param read_timeout: overwrites the default read timeout for the
481 all_nodes = self._cfg.GetAllNodesInfo()
485 for node in node_list:
486 if node in all_nodes:
487 if all_nodes[node].offline:
488 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
490 val = all_nodes[node].primary_ip
493 addr_list.append(val)
494 name_list.append(node)
496 client.ConnectList(name_list, address_list=addr_list,
497 read_timeout=read_timeout)
500 def _ConnectNode(self, client, node, call, read_timeout=None):
501 """Helper for computing one node's address.
503 @type client: L{ganeti.rpc.Client}
504 @param client: a C{Client} instance
506 @param node: the node we should connect
508 @param call: the name of the remote procedure call, for filling in
509 correctly any eventual offline nodes' results
510 @type read_timeout: int
511 @param read_timeout: overwrites the default read timeout for the
515 node_info = self._cfg.GetNodeInfo(node)
516 if node_info is not None:
517 if node_info.offline:
518 return RpcResult(node=node, offline=True, call=call)
519 addr = node_info.primary_ip
522 client.ConnectNode(node, address=addr, read_timeout=read_timeout)
524 def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
525 """Helper for making a multi-node call
528 body = serializer.DumpJson(args, indent=False)
529 c = Client(procedure, body, self.port)
530 skip_dict = self._ConnectList(c, node_list, procedure,
531 read_timeout=read_timeout)
532 skip_dict.update(c.GetResults())
536 def _StaticMultiNodeCall(cls, node_list, procedure, args,
537 address_list=None, read_timeout=None):
538 """Helper for making a multi-node static call
541 body = serializer.DumpJson(args, indent=False)
542 c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
543 c.ConnectList(node_list, address_list=address_list,
544 read_timeout=read_timeout)
545 return c.GetResults()
547 def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
548 """Helper for making a single-node call
551 body = serializer.DumpJson(args, indent=False)
552 c = Client(procedure, body, self.port)
553 result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
555 # we did connect, node is not offline
556 result = c.GetResults()[node]
560 def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
561 """Helper for making a single-node static call
564 body = serializer.DumpJson(args, indent=False)
565 c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
566 c.ConnectNode(node, read_timeout=read_timeout)
567 return c.GetResults()[node]
571 """Compresses a string for transport over RPC.
573 Small amounts of data are not compressed.
578 @return: Encoded data to send
581 # Small amounts of data are not compressed
583 return (constants.RPC_ENCODING_NONE, data)
585 # Compress with zlib and encode in base64
586 return (constants.RPC_ENCODING_ZLIB_BASE64,
587 base64.b64encode(zlib.compress(data, 3)))
593 @_RpcTimeout(_TMO_URGENT)
594 def call_lv_list(self, node_list, vg_name):
595 """Gets the logical volumes present in a given volume group.
597 This is a multi-node call.
600 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
602 @_RpcTimeout(_TMO_URGENT)
603 def call_vg_list(self, node_list):
604 """Gets the volume group list.
606 This is a multi-node call.
609 return self._MultiNodeCall(node_list, "vg_list", [])
611 @_RpcTimeout(_TMO_NORMAL)
612 def call_storage_list(self, node_list, su_name, su_args, name, fields):
613 """Get list of storage units.
615 This is a multi-node call.
618 return self._MultiNodeCall(node_list, "storage_list",
619 [su_name, su_args, name, fields])
621 @_RpcTimeout(_TMO_NORMAL)
622 def call_storage_modify(self, node, su_name, su_args, name, changes):
623 """Modify a storage unit.
625 This is a single-node call.
628 return self._SingleNodeCall(node, "storage_modify",
629 [su_name, su_args, name, changes])
631 @_RpcTimeout(_TMO_NORMAL)
632 def call_storage_execute(self, node, su_name, su_args, name, op):
633 """Executes an operation on a storage unit.
635 This is a single-node call.
638 return self._SingleNodeCall(node, "storage_execute",
639 [su_name, su_args, name, op])
641 @_RpcTimeout(_TMO_URGENT)
642 def call_bridges_exist(self, node, bridges_list):
643 """Checks if a node has all the bridges given.
645 This method checks if all bridges given in the bridges_list are
646 present on the remote node, so that an instance that uses interfaces
647 on those bridges can be started.
649 This is a single-node call.
652 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
654 @_RpcTimeout(_TMO_NORMAL)
655 def call_instance_start(self, node, instance, hvp, bep):
656 """Starts an instance.
658 This is a single-node call.
661 idict = self._InstDict(instance, hvp=hvp, bep=bep)
662 return self._SingleNodeCall(node, "instance_start", [idict])
664 @_RpcTimeout(_TMO_NORMAL)
665 def call_instance_shutdown(self, node, instance, timeout):
666 """Stops an instance.
668 This is a single-node call.
671 return self._SingleNodeCall(node, "instance_shutdown",
672 [self._InstDict(instance), timeout])
674 @_RpcTimeout(_TMO_NORMAL)
675 def call_migration_info(self, node, instance):
676 """Gather the information necessary to prepare an instance migration.
678 This is a single-node call.
681 @param node: the node on which the instance is currently running
682 @type instance: C{objects.Instance}
683 @param instance: the instance definition
686 return self._SingleNodeCall(node, "migration_info",
687 [self._InstDict(instance)])
689 @_RpcTimeout(_TMO_NORMAL)
690 def call_accept_instance(self, node, instance, info, target):
691 """Prepare a node to accept an instance.
693 This is a single-node call.
696 @param node: the target node for the migration
697 @type instance: C{objects.Instance}
698 @param instance: the instance definition
699 @type info: opaque/hypervisor specific (string/data)
700 @param info: result for the call_migration_info call
702 @param target: target hostname (usually ip address) (on the node itself)
705 return self._SingleNodeCall(node, "accept_instance",
706 [self._InstDict(instance), info, target])
708 @_RpcTimeout(_TMO_NORMAL)
709 def call_finalize_migration(self, node, instance, info, success):
710 """Finalize any target-node migration specific operation.
712 This is called both in case of a successful migration and in case of error
713 (in which case it should abort the migration).
715 This is a single-node call.
718 @param node: the target node for the migration
719 @type instance: C{objects.Instance}
720 @param instance: the instance definition
721 @type info: opaque/hypervisor specific (string/data)
722 @param info: result for the call_migration_info call
723 @type success: boolean
724 @param success: whether the migration was a success or a failure
727 return self._SingleNodeCall(node, "finalize_migration",
728 [self._InstDict(instance), info, success])
730 @_RpcTimeout(_TMO_SLOW)
731 def call_instance_migrate(self, node, instance, target, live):
732 """Migrate an instance.
734 This is a single-node call.
737 @param node: the node on which the instance is currently running
738 @type instance: C{objects.Instance}
739 @param instance: the instance definition
741 @param target: the target node name
743 @param live: whether the migration should be done live or not (the
744 interpretation of this parameter is left to the hypervisor)
747 return self._SingleNodeCall(node, "instance_migrate",
748 [self._InstDict(instance), target, live])
750 @_RpcTimeout(_TMO_NORMAL)
751 def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
752 """Reboots an instance.
754 This is a single-node call.
757 return self._SingleNodeCall(node, "instance_reboot",
758 [self._InstDict(inst), reboot_type,
761 @_RpcTimeout(_TMO_1DAY)
762 def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
763 """Installs an OS on the given instance.
765 This is a single-node call.
768 return self._SingleNodeCall(node, "instance_os_add",
769 [self._InstDict(inst, osp=osparams),
772 @_RpcTimeout(_TMO_SLOW)
773 def call_instance_run_rename(self, node, inst, old_name, debug):
774 """Run the OS rename script for an instance.
776 This is a single-node call.
779 return self._SingleNodeCall(node, "instance_run_rename",
780 [self._InstDict(inst), old_name, debug])
782 @_RpcTimeout(_TMO_URGENT)
783 def call_instance_info(self, node, instance, hname):
784 """Returns information about a single instance.
786 This is a single-node call.
789 @param node: the list of nodes to query
790 @type instance: string
791 @param instance: the instance name
793 @param hname: the hypervisor type of the instance
796 return self._SingleNodeCall(node, "instance_info", [instance, hname])
798 @_RpcTimeout(_TMO_NORMAL)
799 def call_instance_migratable(self, node, instance):
800 """Checks whether the given instance can be migrated.
802 This is a single-node call.
804 @param node: the node to query
805 @type instance: L{objects.Instance}
806 @param instance: the instance to check
810 return self._SingleNodeCall(node, "instance_migratable",
811 [self._InstDict(instance)])
813 @_RpcTimeout(_TMO_URGENT)
814 def call_all_instances_info(self, node_list, hypervisor_list):
815 """Returns information about all instances on the given nodes.
817 This is a multi-node call.
819 @type node_list: list
820 @param node_list: the list of nodes to query
821 @type hypervisor_list: list
822 @param hypervisor_list: the hypervisors to query for instances
825 return self._MultiNodeCall(node_list, "all_instances_info",
828 @_RpcTimeout(_TMO_URGENT)
829 def call_instance_list(self, node_list, hypervisor_list):
830 """Returns the list of running instances on a given node.
832 This is a multi-node call.
834 @type node_list: list
835 @param node_list: the list of nodes to query
836 @type hypervisor_list: list
837 @param hypervisor_list: the hypervisors to query for instances
840 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
842 @_RpcTimeout(_TMO_FAST)
843 def call_node_tcp_ping(self, node, source, target, port, timeout,
845 """Do a TcpPing on the remote node
847 This is a single-node call.
850 return self._SingleNodeCall(node, "node_tcp_ping",
851 [source, target, port, timeout,
854 @_RpcTimeout(_TMO_FAST)
855 def call_node_has_ip_address(self, node, address):
856 """Checks if a node has the given IP address.
858 This is a single-node call.
861 return self._SingleNodeCall(node, "node_has_ip_address", [address])
863 @_RpcTimeout(_TMO_URGENT)
864 def call_node_info(self, node_list, vg_name, hypervisor_type):
865 """Return node information.
867 This will return memory information and volume group size and free
870 This is a multi-node call.
872 @type node_list: list
873 @param node_list: the list of nodes to query
874 @type vg_name: C{string}
875 @param vg_name: the name of the volume group to ask for disk space
877 @type hypervisor_type: C{str}
878 @param hypervisor_type: the name of the hypervisor to ask for
882 return self._MultiNodeCall(node_list, "node_info",
883 [vg_name, hypervisor_type])
885 @_RpcTimeout(_TMO_NORMAL)
886 def call_etc_hosts_modify(self, node, mode, name, ip):
887 """Modify hosts file with name
890 @param node: The node to call
892 @param mode: The mode to operate. Currently "add" or "remove"
894 @param name: The host name to be modified
896 @param ip: The ip of the entry (just valid if mode is "add")
899 return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
901 @_RpcTimeout(_TMO_NORMAL)
902 def call_node_verify(self, node_list, checkdict, cluster_name):
903 """Request verification of given parameters.
905 This is a multi-node call.
908 return self._MultiNodeCall(node_list, "node_verify",
909 [checkdict, cluster_name])
912 @_RpcTimeout(_TMO_FAST)
913 def call_node_start_master(cls, node, start_daemons, no_voting):
914 """Tells a node to activate itself as a master.
916 This is a single-node call.
919 return cls._StaticSingleNodeCall(node, "node_start_master",
920 [start_daemons, no_voting])
923 @_RpcTimeout(_TMO_FAST)
924 def call_node_stop_master(cls, node, stop_daemons):
925 """Tells a node to demote itself from master status.
927 This is a single-node call.
930 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
933 @_RpcTimeout(_TMO_URGENT)
934 def call_master_info(cls, node_list):
935 """Query master info.
937 This is a multi-node call.
940 # TODO: should this method query down nodes?
941 return cls._StaticMultiNodeCall(node_list, "master_info", [])
944 @_RpcTimeout(_TMO_URGENT)
945 def call_version(cls, node_list):
946 """Query node version.
948 This is a multi-node call.
951 return cls._StaticMultiNodeCall(node_list, "version", [])
953 @_RpcTimeout(_TMO_NORMAL)
954 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
955 """Request creation of a given block device.
957 This is a single-node call.
960 return self._SingleNodeCall(node, "blockdev_create",
961 [bdev.ToDict(), size, owner, on_primary, info])
963 @_RpcTimeout(_TMO_SLOW)
964 def call_blockdev_wipe(self, node, bdev, offset, size):
965 """Request wipe at given offset with given size of a block device.
967 This is a single-node call.
970 return self._SingleNodeCall(node, "blockdev_wipe",
971 [bdev.ToDict(), offset, size])
973 @_RpcTimeout(_TMO_NORMAL)
974 def call_blockdev_remove(self, node, bdev):
975 """Request removal of a given block device.
977 This is a single-node call.
980 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
982 @_RpcTimeout(_TMO_NORMAL)
983 def call_blockdev_rename(self, node, devlist):
984 """Request rename of the given block devices.
986 This is a single-node call.
989 return self._SingleNodeCall(node, "blockdev_rename",
990 [(d.ToDict(), uid) for d, uid in devlist])
992 @_RpcTimeout(_TMO_NORMAL)
993 def call_blockdev_pause_resume_sync(self, node, disks, pause):
994 """Request a pause/resume of given block device.
996 This is a single-node call.
999 return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1000 [[bdev.ToDict() for bdev in disks], pause])
1002 @_RpcTimeout(_TMO_NORMAL)
1003 def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1004 """Request assembling of a given block device.
1006 This is a single-node call.
1009 return self._SingleNodeCall(node, "blockdev_assemble",
1010 [disk.ToDict(), owner, on_primary, idx])
1012 @_RpcTimeout(_TMO_NORMAL)
1013 def call_blockdev_shutdown(self, node, disk):
1014 """Request shutdown of a given block device.
1016 This is a single-node call.
1019 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1021 @_RpcTimeout(_TMO_NORMAL)
1022 def call_blockdev_addchildren(self, node, bdev, ndevs):
1023 """Request adding a list of children to a (mirroring) device.
1025 This is a single-node call.
1028 return self._SingleNodeCall(node, "blockdev_addchildren",
1030 [disk.ToDict() for disk in ndevs]])
1032 @_RpcTimeout(_TMO_NORMAL)
1033 def call_blockdev_removechildren(self, node, bdev, ndevs):
1034 """Request removing a list of children from a (mirroring) device.
1036 This is a single-node call.
1039 return self._SingleNodeCall(node, "blockdev_removechildren",
1041 [disk.ToDict() for disk in ndevs]])
1043 @_RpcTimeout(_TMO_NORMAL)
1044 def call_blockdev_getmirrorstatus(self, node, disks):
1045 """Request status of a (mirroring) device.
1047 This is a single-node call.
1050 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1051 [dsk.ToDict() for dsk in disks])
1052 if not result.fail_msg:
1053 result.payload = [objects.BlockDevStatus.FromDict(i)
1054 for i in result.payload]
1057 @_RpcTimeout(_TMO_NORMAL)
1058 def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1059 """Request status of (mirroring) devices from multiple nodes.
1061 This is a multi-node call.
1064 result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1065 [dict((name, [dsk.ToDict() for dsk in disks])
1066 for name, disks in node_disks.items())])
1067 for nres in result.values():
1071 for idx, (success, status) in enumerate(nres.payload):
1073 nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1077 @_RpcTimeout(_TMO_NORMAL)
1078 def call_blockdev_find(self, node, disk):
1079 """Request identification of a given block device.
1081 This is a single-node call.
1084 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1085 if not result.fail_msg and result.payload is not None:
1086 result.payload = objects.BlockDevStatus.FromDict(result.payload)
1089 @_RpcTimeout(_TMO_NORMAL)
1090 def call_blockdev_close(self, node, instance_name, disks):
1091 """Closes the given block devices.
1093 This is a single-node call.
1096 params = [instance_name, [cf.ToDict() for cf in disks]]
1097 return self._SingleNodeCall(node, "blockdev_close", params)
1099 @_RpcTimeout(_TMO_NORMAL)
1100 def call_blockdev_getsize(self, node, disks):
1101 """Returns the size of the given disks.
1103 This is a single-node call.
1106 params = [[cf.ToDict() for cf in disks]]
1107 return self._SingleNodeCall(node, "blockdev_getsize", params)
1109 @_RpcTimeout(_TMO_NORMAL)
1110 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1111 """Disconnects the network of the given drbd devices.
1113 This is a multi-node call.
1116 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1117 [nodes_ip, [cf.ToDict() for cf in disks]])
1119 @_RpcTimeout(_TMO_NORMAL)
1120 def call_drbd_attach_net(self, node_list, nodes_ip,
1121 disks, instance_name, multimaster):
1122 """Disconnects the given drbd devices.
1124 This is a multi-node call.
1127 return self._MultiNodeCall(node_list, "drbd_attach_net",
1128 [nodes_ip, [cf.ToDict() for cf in disks],
1129 instance_name, multimaster])
1131 @_RpcTimeout(_TMO_SLOW)
1132 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1133 """Waits for the synchronization of drbd devices is complete.
1135 This is a multi-node call.
1138 return self._MultiNodeCall(node_list, "drbd_wait_sync",
1139 [nodes_ip, [cf.ToDict() for cf in disks]])
1141 @_RpcTimeout(_TMO_URGENT)
1142 def call_drbd_helper(self, node_list):
1143 """Gets drbd helper.
1145 This is a multi-node call.
1148 return self._MultiNodeCall(node_list, "drbd_helper", [])
1151 @_RpcTimeout(_TMO_NORMAL)
1152 def call_upload_file(cls, node_list, file_name, address_list=None):
1155 The node will refuse the operation in case the file is not on the
1158 This is a multi-node call.
1160 @type node_list: list
1161 @param node_list: the list of node names to upload to
1162 @type file_name: str
1163 @param file_name: the filename to upload
1164 @type address_list: list or None
1165 @keyword address_list: an optional list of node addresses, in order
1166 to optimize the RPC speed
1169 file_contents = utils.ReadFile(file_name)
1170 data = cls._Compress(file_contents)
1171 st = os.stat(file_name)
1172 getents = runtime.GetEnts()
1173 params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1174 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1175 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1176 address_list=address_list)
1179 @_RpcTimeout(_TMO_NORMAL)
1180 def call_write_ssconf_files(cls, node_list, values):
1181 """Write ssconf files.
1183 This is a multi-node call.
1186 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1188 @_RpcTimeout(_TMO_NORMAL)
1189 def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1192 This is a single-node call.
1195 return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1196 remote_node, timeout])
1198 @_RpcTimeout(_TMO_FAST)
1199 def call_os_diagnose(self, node_list):
1200 """Request a diagnose of OS definitions.
1202 This is a multi-node call.
1205 return self._MultiNodeCall(node_list, "os_diagnose", [])
1207 @_RpcTimeout(_TMO_FAST)
1208 def call_os_get(self, node, name):
1209 """Returns an OS definition.
1211 This is a single-node call.
1214 result = self._SingleNodeCall(node, "os_get", [name])
1215 if not result.fail_msg and isinstance(result.payload, dict):
1216 result.payload = objects.OS.FromDict(result.payload)
1219 @_RpcTimeout(_TMO_FAST)
1220 def call_os_validate(self, required, nodes, name, checks, params):
1221 """Run a validation routine for a given OS.
1223 This is a multi-node call.
1226 return self._MultiNodeCall(nodes, "os_validate",
1227 [required, name, checks, params])
1229 @_RpcTimeout(_TMO_NORMAL)
1230 def call_hooks_runner(self, node_list, hpath, phase, env):
1231 """Call the hooks runner.
1234 - op: the OpCode instance
1235 - env: a dictionary with the environment
1237 This is a multi-node call.
1240 params = [hpath, phase, env]
1241 return self._MultiNodeCall(node_list, "hooks_runner", params)
1243 @_RpcTimeout(_TMO_NORMAL)
1244 def call_iallocator_runner(self, node, name, idata):
1245 """Call an iallocator on a remote node
1248 - name: the iallocator name
1249 - input: the json-encoded input string
1251 This is a single-node call.
1254 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1256 @_RpcTimeout(_TMO_NORMAL)
1257 def call_blockdev_grow(self, node, cf_bdev, amount):
1258 """Request a snapshot of the given block device.
1260 This is a single-node call.
1263 return self._SingleNodeCall(node, "blockdev_grow",
1264 [cf_bdev.ToDict(), amount])
1266 @_RpcTimeout(_TMO_1DAY)
1267 def call_blockdev_export(self, node, cf_bdev,
1268 dest_node, dest_path, cluster_name):
1269 """Export a given disk to another node.
1271 This is a single-node call.
1274 return self._SingleNodeCall(node, "blockdev_export",
1275 [cf_bdev.ToDict(), dest_node, dest_path,
1278 @_RpcTimeout(_TMO_NORMAL)
1279 def call_blockdev_snapshot(self, node, cf_bdev):
1280 """Request a snapshot of the given block device.
1282 This is a single-node call.
1285 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1287 @_RpcTimeout(_TMO_NORMAL)
1288 def call_finalize_export(self, node, instance, snap_disks):
1289 """Request the completion of an export operation.
1291 This writes the export config file, etc.
1293 This is a single-node call.
1297 for disk in snap_disks:
1298 if isinstance(disk, bool):
1299 flat_disks.append(disk)
1301 flat_disks.append(disk.ToDict())
1303 return self._SingleNodeCall(node, "finalize_export",
1304 [self._InstDict(instance), flat_disks])
1306 @_RpcTimeout(_TMO_FAST)
1307 def call_export_info(self, node, path):
1308 """Queries the export information in a given path.
1310 This is a single-node call.
1313 return self._SingleNodeCall(node, "export_info", [path])
1315 @_RpcTimeout(_TMO_FAST)
1316 def call_export_list(self, node_list):
1317 """Gets the stored exports list.
1319 This is a multi-node call.
1322 return self._MultiNodeCall(node_list, "export_list", [])
1324 @_RpcTimeout(_TMO_FAST)
1325 def call_export_remove(self, node, export):
1326 """Requests removal of a given export.
1328 This is a single-node call.
1331 return self._SingleNodeCall(node, "export_remove", [export])
1334 @_RpcTimeout(_TMO_NORMAL)
1335 def call_node_leave_cluster(cls, node, modify_ssh_setup):
1336 """Requests a node to clean the cluster information it has.
1338 This will remove the configuration information from the ganeti data
1341 This is a single-node call.
1344 return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1347 @_RpcTimeout(_TMO_FAST)
1348 def call_node_volumes(self, node_list):
1349 """Gets all volumes on node(s).
1351 This is a multi-node call.
1354 return self._MultiNodeCall(node_list, "node_volumes", [])
1356 @_RpcTimeout(_TMO_FAST)
1357 def call_node_demote_from_mc(self, node):
1358 """Demote a node from the master candidate role.
1360 This is a single-node call.
1363 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1365 @_RpcTimeout(_TMO_NORMAL)
1366 def call_node_powercycle(self, node, hypervisor):
1367 """Tries to powercycle a node.
1369 This is a single-node call.
1372 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1375 def call_test_delay(self, node_list, duration):
1376 """Sleep for a fixed time on given node(s).
1378 This is a multi-node call.
1381 return self._MultiNodeCall(node_list, "test_delay", [duration],
1382 read_timeout=int(duration + 5))
1384 @_RpcTimeout(_TMO_FAST)
1385 def call_file_storage_dir_create(self, node, file_storage_dir):
1386 """Create the given file storage directory.
1388 This is a single-node call.
1391 return self._SingleNodeCall(node, "file_storage_dir_create",
1394 @_RpcTimeout(_TMO_FAST)
1395 def call_file_storage_dir_remove(self, node, file_storage_dir):
1396 """Remove the given file storage directory.
1398 This is a single-node call.
1401 return self._SingleNodeCall(node, "file_storage_dir_remove",
1404 @_RpcTimeout(_TMO_FAST)
1405 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1406 new_file_storage_dir):
1407 """Rename file storage directory.
1409 This is a single-node call.
1412 return self._SingleNodeCall(node, "file_storage_dir_rename",
1413 [old_file_storage_dir, new_file_storage_dir])
1416 @_RpcTimeout(_TMO_URGENT)
1417 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1418 """Update job queue.
1420 This is a multi-node call.
1423 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1424 [file_name, cls._Compress(content)],
1425 address_list=address_list)
1428 @_RpcTimeout(_TMO_NORMAL)
1429 def call_jobqueue_purge(cls, node):
1432 This is a single-node call.
1435 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1438 @_RpcTimeout(_TMO_URGENT)
1439 def call_jobqueue_rename(cls, node_list, address_list, rename):
1440 """Rename a job queue file.
1442 This is a multi-node call.
1445 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1446 address_list=address_list)
1448 @_RpcTimeout(_TMO_NORMAL)
1449 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1450 """Validate the hypervisor params.
1452 This is a multi-node call.
1454 @type node_list: list
1455 @param node_list: the list of nodes to query
1456 @type hvname: string
1457 @param hvname: the hypervisor name
1458 @type hvparams: dict
1459 @param hvparams: the hypervisor parameters to be validated
1462 cluster = self._cfg.GetClusterInfo()
1463 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1464 return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1467 @_RpcTimeout(_TMO_NORMAL)
1468 def call_x509_cert_create(self, node, validity):
1469 """Creates a new X509 certificate for SSL/TLS.
1471 This is a single-node call.
1474 @param validity: Validity in seconds
1477 return self._SingleNodeCall(node, "x509_cert_create", [validity])
1479 @_RpcTimeout(_TMO_NORMAL)
1480 def call_x509_cert_remove(self, node, name):
1481 """Removes a X509 certificate.
1483 This is a single-node call.
1486 @param name: Certificate name
1489 return self._SingleNodeCall(node, "x509_cert_remove", [name])
1491 @_RpcTimeout(_TMO_NORMAL)
1492 def call_import_start(self, node, opts, instance, dest, dest_args):
1493 """Starts a listener for an import.
1495 This is a single-node call.
1498 @param node: Node name
1499 @type instance: C{objects.Instance}
1500 @param instance: Instance object
1503 return self._SingleNodeCall(node, "import_start",
1505 self._InstDict(instance), dest,
1506 _EncodeImportExportIO(dest, dest_args)])
1508 @_RpcTimeout(_TMO_NORMAL)
1509 def call_export_start(self, node, opts, host, port,
1510 instance, source, source_args):
1511 """Starts an export daemon.
1513 This is a single-node call.
1516 @param node: Node name
1517 @type instance: C{objects.Instance}
1518 @param instance: Instance object
1521 return self._SingleNodeCall(node, "export_start",
1522 [opts.ToDict(), host, port,
1523 self._InstDict(instance), source,
1524 _EncodeImportExportIO(source, source_args)])
1526 @_RpcTimeout(_TMO_FAST)
1527 def call_impexp_status(self, node, names):
1528 """Gets the status of an import or export.
1530 This is a single-node call.
1533 @param node: Node name
1534 @type names: List of strings
1535 @param names: Import/export names
1536 @rtype: List of L{objects.ImportExportStatus} instances
1537 @return: Returns a list of the state of each named import/export or None if
1538 a status couldn't be retrieved
1541 result = self._SingleNodeCall(node, "impexp_status", [names])
1543 if not result.fail_msg:
1546 for i in result.payload:
1548 decoded.append(None)
1550 decoded.append(objects.ImportExportStatus.FromDict(i))
1552 result.payload = decoded
1556 @_RpcTimeout(_TMO_NORMAL)
1557 def call_impexp_abort(self, node, name):
1558 """Aborts an import or export.
1560 This is a single-node call.
1563 @param node: Node name
1565 @param name: Import/export name
1568 return self._SingleNodeCall(node, "impexp_abort", [name])
1570 @_RpcTimeout(_TMO_NORMAL)
1571 def call_impexp_cleanup(self, node, name):
1572 """Cleans up after an import or export.
1574 This is a single-node call.
1577 @param node: Node name
1579 @param name: Import/export name
1582 return self._SingleNodeCall(node, "impexp_cleanup", [name])